Back to projects

~/projects

Multi-Agent Supply Chain Disruption Dashboard

multi-agentlangchainstreamlitredispydanticmlflow

Repo: mohd-vasim/ai-engineering → mas-supply-chain-disruption

When a storm closes the I-80 corridor, a port shuts down, or a shipment goes missing, three things need to happen in the right order: someone has to notice, someone has to reroute, and the customer has to be told. In a real ops team, those are different people with different tools. In this project, they are three cooperating LLM agents coordinating through a shared memory.

The Problem

In a naive multi-agent setup, agents talk by passing messages. The monitoring agent tells the logistics agent, who tells the customer agent, who tells… no one. Three hops in, the original context is lossy, contradictory beliefs creep in, and the system has no single source of truth.

The same problem shows up in distributed systems: fragmented state, semantic drift, no ground truth. This project solves it the same way distributed systems do — with a shared, typed, authoritative store outside any individual agent.

The Pattern: Shared Epistemic Memory (SEM)

A Shared Epistemic Memory is a single, persistent key-value store that all agents read from and write to. It acts as the canonical world state for the workflow.

The full design is specified in REQUIREMENTS.md in the repo. The non-negotiables:

  • Typed entries — every value is a Pydantic model (ShipmentStatus, EventLog). No free-form blobs.
  • Optimistic locking — every entry carries a version integer. Writes that don't match the expected version are rejected, so concurrent agents can't silently clobber each other.
  • TTL & staleness — every entry has a timestamp and ttl_seconds. Agents reading a stale entry treat it as missing and re-verify.
  • Audit trail — every entry records source_agent_id. You can always answer "who said this and when?"

Backing store is Upstash Redis in production; the app degrades to demo mode if no Redis URL is configured.

Agent Coordination Through Shared Memory three specialised agents, one authoritative store — no direct messaging between agents MonitoringAgent log_event LogisticsAgent update_shipment_status CustomerNotificationAgent log_event Shared Epistemic Memory Redis · Pydantic · versioned shipment:SHP-001 · v3 shipment:SHP-002 · v1 evt-7a3f · disruption evt-b1c2 · notified TTL · source_agent_id · timestamp optimistic locking on every update WRITE event READ event → WRITE shipment arrows go only between an agent and the memory — never directly between agents

The Three Agents

The system models a supply chain with three specialised agents, each with a single responsibility and a narrow tool surface:

AgentRoleTools
MonitoringAgentDetects disruptions (storms, port closures, road blocks) and logs them as events.log_event
LogisticsAgentReads recent disruption events from memory and updates affected shipment statuses.read_memory, update_shipment_status
CustomerNotificationAgentReads shipment statuses and logs a customer_notified event for any affected order.read_memory, log_event

The system prompt for each agent enforces role boundaries at the prompt level — the monitoring agent is told explicitly not to update shipment statuses, and so on. This is the cleanest way to keep tool-calling agents from drifting into each other's responsibilities.

The Workflow

A run looks like this in the Streamlit UI:

  1. MonitoringAgent runs first — given a scenario like "A severe storm has closed the I-80 corridor in Nebraska", it calls log_event to write a disruption_detected event.
  2. LogisticsAgent runs — it reads recent events from memory, decides which shipments are affected, and updates them to delayed or rerouted.
  3. CustomerNotificationAgent runs — it reads the latest shipment statuses and logs a customer_notified event for any affected order.

How the memory evolves at each step:

One Disruption, Three Agent Steps how memory state grows as each agent runs in sequence Step 1 · MonitoringAgent log_event( evt-7a3f, disruption_detected, SHP-001, "storm I-80") WRITE Memory after Step 1 evt-7a3f · disruption_detected SHP-001 — not yet seen Step 2 · LogisticsAgent read_memory(evt-7a3f) update_shipment( SHP-001, delayed, "storm") WRITE Memory after Step 2 evt-7a3f · disruption_detected SHP-001 · delayed · v2 version bumped 1 → 2 Step 3 · CustomerAgent read_memory(SHP-001) log_event( evt-b1c2, notified, SHP-001) WRITE Memory after Step 3 evt-7a3f · disruption SHP-001 · delayed · v2 evt-b1c2 · customer_notified green = terminal state · yellow = mid-workflow

You can also inspect the full memory state at any point and replay steps to see how the system reacts to different inputs.

Key Design Decisions

Why a memory store instead of direct agent-to-agent messaging? Agents don't trust each other's internal state. The memory store is the only thing that has to be consistent. This is the same insight as a database in a microservice architecture — agents are services, memory is the database.

Why Pydantic schemas on every entry? A memory store is a contract. If the monitoring agent writes "delayed" as a string and the logistics agent writes "DELAYED", downstream code has to defensively normalise. Pydantic + Literal["in_transit", "delayed", ...] makes the type system enforce it.

Why version numbers instead of last-write-wins? Two agents can read the same shipment, both decide to update it, and write at the same time. Without versioning, the second write silently overwrites the first and the system lies about what happened. The version check in SharedEpistemicMemory.update() turns that race into a ValueError the caller can retry.

Why TTL on every entry? Supply chain state goes stale fast. A "shipment is on time" fact from 12 hours ago is not useful information — the truck may have broken down an hour after it was written. TTLs force agents to treat old data as missing and re-verify against the current world.

Tech Stack

  • LangChain + langchain-nvidia-ai-endpoints — agent construction and access to GPT-OSS-120B on the NVIDIA API.
  • Upstash Redis — backing store for the Shared Epistemic Memory (serverless, edge-friendly).
  • Pydantic v2 — typed memory entries, schema validation, serialisation.
  • Streamlit — interactive dashboard with step-by-step agent control, live memory inspector, Plotly charts.
  • MLflow — full tracing of every LLM call, tool call, and agent step to Databricks MLflow.
  • Plotly — shipment status distribution and event timeline visualisations.

How to Run

bash
git clone https://github.com/mohd-vasim/ai-engineering.git
cd ai-engineering/mas-supply-chain-disruption
uv sync

# .env
NVIDIA_API_KEY=...
UPSTASH_REDIS_REST_URL=...
UPSTASH_REDIS_REST_TOKEN=...
MLFLOW_TRACKING_URI=databricks
DATABRICKS_TOKEN=...
DATABRICKS_HOST=...

uv run streamlit run app.py

Without Redis configured, the app still runs in demo mode — the in-memory store works, but the persistence and atomic-write guarantees go away. Without an NVIDIA API key, the agents won't initialise.

What's in the Repo

  • app.py — the full Streamlit dashboard, the SharedEpistemicMemory class, the three agent definitions, the typed tools, and the Plotly visualisations.
  • REQUIREMENTS.md — the full Functional Design Specification for the SEM pattern: architecture, schemas, concurrency model, error handling, observability, and open questions.

Status

Functional end-to-end. The system has been used to simulate full disruption-to-notification workflows with multiple concurrent shipment updates. Next iterations I'd want to add:

  • Pub/sub on writes — let agents subscribe to memory changes instead of polling.
  • Schema registry — currently schemas are hardcoded; a registry would make new entry types drop-in.
  • Evaluation harness — scripted scenarios with expected memory state to catch regressions.