Close Menu
    Facebook X (Twitter) Instagram
    Articles Stock
    • Home
    • Technology
    • AI
    • Pages
      • About us
      • Contact us
      • Disclaimer For Articles Stock
      • Privacy Policy
      • Terms and Conditions
    Facebook X (Twitter) Instagram
    Articles Stock
    AI

    Find out how to Design a Manufacturing-Grade Multi-Agent Communication System Utilizing LangGraph Structured Message Bus, ACP Logging, and Persistent Shared State Structure

    Naveed AhmadBy Naveed Ahmad02/03/2026Updated:02/03/2026No Comments8 Mins Read
    blog banner23 2


    On this tutorial, we construct a sophisticated multi-agent communication system utilizing a structured message bus structure powered by LangGraph and Pydantic. We outline a strict ACP-style message schema that enables brokers to speak through a shared state fairly than calling one another instantly, enabling modularity, traceability, and production-grade orchestration. We implement three specialised brokers, a Planner, Executor, and Validator, that coordinate by means of structured messages, persistent state, and routing logic. We additionally combine SQLite-based persistence to supply sturdy reminiscence throughout executions and visualize the agent communication move to know how messages propagate by means of the system.

    !pip -q set up -U "pydantic==2.12.3"
    !pip -q set up -U langgraph langchain-core networkx matplotlib
    !pip -q set up -U langgraph-checkpoint-sqlite
    
    
    import os
    import json
    import uuid
    import sqlite3
    from datetime import datetime, timezone
    from typing import Any, Dict, Record, Literal, Elective, Tuple
    
    
    from pydantic import BaseModel, Discipline
    
    
    import networkx as nx
    import matplotlib.pyplot as plt
    
    
    from langgraph.graph import StateGraph, END
    from langgraph.checkpoint.sqlite import SqliteSaver
    
    
    
    
    Position = Literal["planner", "executor", "validator", "user", "system"]
    MsgType = Literal["task", "plan", "result", "validation", "error", "control"]
    
    
    class ACPMessage(BaseModel):
       msg_id: str = Discipline(default_factory=lambda: str(uuid.uuid4()))
       ts: str = Discipline(default_factory=lambda: datetime.now(timezone.utc).isoformat().substitute("+00:00", "Z"))
       sender: Position
       receiver: Position
       msg_type: MsgType
       content material: str
       meta: Dict[str, Any] = Discipline(default_factory=dict)
       hint: Dict[str, Any] = Discipline(default_factory=dict)
    
    
    def acp_log_path() -> str:
       os.makedirs("acp_logs", exist_ok=True)
       return os.path.be a part of("acp_logs", "acp_messages.jsonl")
    
    
    def append_acp_log(m: ACPMessage) -> None:
       with open(acp_log_path(), "a", encoding="utf-8") as f:
           f.write(m.model_dump_json() + "n")

    We set up and import all of the required libraries wanted to construct a structured multi-agent communication system. We outline the ACP-style message schema utilizing Pydantic, which permits us to implement a strict and structured format for agent communication. We additionally implement structured logging to persist each message exchanged between brokers, enabling traceability and observability of the system.

    class BusState(BaseModel):
       purpose: str = ""
       finished: bool = False
       errors: Record[str] = Discipline(default_factory=checklist)
       mailbox: Record[ACPMessage] = Discipline(default_factory=checklist)
       edges: Record[Tuple[str, str, str]] = Discipline(default_factory=checklist)
       active_role: Position = "person"
       step: int = 0
    
    
    
    
    def bus_update(
       state: BusState,
       sender: Position,
       receiver: Position,
       msg_type: MsgType,
       content material: str,
       meta: Elective[Dict[str, Any]] = None,
       hint: Elective[Dict[str, Any]] = None,
    ) -> Dict[str, Any]:
       m = ACPMessage(
           sender=sender,
           receiver=receiver,
           msg_type=msg_type,
           content material=content material,
           meta=meta or {},
           hint=hint or {},
       )
       append_acp_log(m)
       return {
           "purpose": state.purpose,
           "finished": state.finished,
           "errors": state.errors,
           "mailbox": state.mailbox + [m],
           "edges": state.edges + [(sender, receiver, msg_type)],
           "active_role": receiver,
           "step": state.step + 1,
       }

    We outline the shared state construction that acts because the centralized message bus for all brokers. We implement the BusState class to retailer the purpose, mailbox, routing data, and execution progress. We additionally create the bus_update operate, which permits us to generate structured messages, replace the shared state, and constantly persist message logs.

    def planner_agent(state_dict: Dict[str, Any]) -> Dict[str, Any]:
       state = BusState.model_validate(state_dict)
       purpose = state.purpose.strip()
       if not purpose:
           return bus_update(state, "planner", "validator", "error", "No purpose offered.", meta={"purpose": "empty_goal"})
       plan = [
           "Interpret the goal and extract requirements.",
           "Decide an execution strategy with clear outputs.",
           "Ask Executor to produce the result.",
           "Ask Validator to check correctness + completeness.",
       ]
       plan_text = "n".be a part of([f"{i+1}. {p}" for i, p in enumerate(plan)])
       return bus_update(
           state,
           "planner",
           "executor",
           "plan",
           plan_text,
           meta={"purpose": purpose, "plan_steps": len(plan)},
           hint={"coverage": "deterministic_planner_v1"},
       )
    
    
    
    
    def executor_agent(state_dict: Dict[str, Any]) -> Dict[str, Any]:
       state = BusState.model_validate(state_dict)
       purpose = state.purpose.strip()
       latest_plan = None
       for m in reversed(state.mailbox):
           if m.receiver == "executor" and m.msg_type == "plan":
               latest_plan = m.content material
               break
       end result = {
           "purpose": purpose,
           "assumptions": [
               "We can produce a concise, actionable output.",
               "We can validate via rule-based checks.",
           ],
           "output": f"Executed process for purpose: {purpose}",
           "deliverables": [
               "A clear summary",
               "A step-by-step action list",
               "Any constraints and edge cases",
           ],
           "plan_seen": bool(latest_plan),
       }
       result_text = json.dumps(end result, indent=2)
       return bus_update(
           state,
           "executor",
           "validator",
           "end result",
           result_text,
           meta={"artifact_type": "json", "bytes": len(result_text.encode("utf-8"))},
           hint={"coverage": "deterministic_executor_v1"},
       )

    We implement the Planner and Executor brokers, which deal with process planning and execution. We design the Planner agent to interpret the purpose and generate a structured execution plan, which is then handed by means of the message bus. We implement the Executor agent to learn the plan, execute it, and produce a structured end result artifact that downstream brokers can validate.

    def validator_agent(state_dict: Dict[str, Any]) -> Dict[str, Any]:
       state = BusState.model_validate(state_dict)
       purpose = state.purpose.strip()
       latest_result = None
       for m in reversed(state.mailbox):
           if m.receiver == "validator" and m.msg_type in ("end result", "error"):
               latest_result = m
               break
       if latest_result is None:
           upd = bus_update(state, "validator", "planner", "error", "No end result to validate.", meta={"purpose": "missing_result"})
           upd["done"] = True
           upd["errors"] = state.errors + ["missing_result"]
           return upd
       if latest_result.msg_type == "error":
           upd = bus_update(
               state,
               "validator",
               "planner",
               "validation",
               f"Validation failed as a result of upstream error occurred: {latest_result.content material}",
               meta={"standing": "fail"},
           )
           upd["done"] = True
           upd["errors"] = state.errors + [latest_result.content]
           return upd
       attempt:
           parsed = json.masses(latest_result.content material)
       besides Exception as e:
           upd = bus_update(
               state,
               "validator",
               "planner",
               "validation",
               f"End result is just not legitimate JSON: {e}",
               meta={"standing": "fail"},
           )
           upd["done"] = True
           upd["errors"] = state.errors + [f"invalid_json: {e}"]
           return upd
       points = []
       if parsed.get("purpose") != purpose:
           points.append("End result.purpose doesn't match enter purpose.")
       if "deliverables" not in parsed or not isinstance(parsed["deliverables"], checklist) or len(parsed["deliverables"]) == 0:
           points.append("Lacking or empty deliverables checklist.")
       if points:
           upd = bus_update(
               state,
               "validator",
               "planner",
               "validation",
               "Validation failed:n- " + "n- ".be a part of(points),
               meta={"standing": "fail", "points": points},
           )
           upd["done"] = True
           upd["errors"] = state.errors + points
           return upd
       upd = bus_update(
           state,
           "validator",
           "person",
           "validation",
           "Validation handed ✅ End result seems constant and full.",
           meta={"standing": "go"},
       )
       upd["done"] = True
       upd["errors"] = state.errors
       return upd
    
    
    
    
    def route_next(state_dict: Dict[str, Any]) -> str:
       if state_dict.get("finished", False):
           return END
       position = state_dict.get("active_role", "person")
       if position == "planner":
           return "planner"
       if position == "executor":
           return "executor"
       if position == "validator":
           return "validator"
       return END

    We implement the Validator agent and the routing logic that controls agent execution move. We design the Validator to examine the execution outcomes, confirm correctness, and generate validation outcomes by means of structured checks. We additionally implement the routing operate that dynamically determines which agent ought to execute subsequent, enabling coordinated multi-agent orchestration.

    graph = StateGraph(dict)
    
    
    graph.add_node("planner", planner_agent)
    graph.add_node("executor", executor_agent)
    graph.add_node("validator", validator_agent)
    
    
    graph.set_entry_point("planner")
    
    
    graph.add_conditional_edges("planner", route_next, {"planner": "planner", "executor": "executor", "validator": "validator", END: END})
    graph.add_conditional_edges("executor", route_next, {"planner": "planner", "executor": "executor", "validator": "validator", END: END})
    graph.add_conditional_edges("validator", route_next, {"planner": "planner", "executor": "executor", "validator": "validator", END: END})
    
    
    os.makedirs("checkpoints", exist_ok=True)
    db_path = "checkpoints/langgraph_bus.sqlite"
    conn = sqlite3.join(db_path, check_same_thread=False)
    checkpointer = SqliteSaver(conn)
    
    
    app = graph.compile(checkpointer=checkpointer)
    
    
    
    
    def run_thread(purpose: str, thread_id: str) -> BusState:
       init = BusState(purpose=purpose, active_role="planner", finished=False).model_dump()
       final_state_dict = app.invoke(init, config={"configurable": {"thread_id": thread_id}})
       return BusState.model_validate(final_state_dict)
    
    
    
    
    thread_id = "demo-thread-001"
    purpose = "Design an ACP-style message bus the place planner/executor/validator coordinate by means of shared state."
    
    
    final_state = run_thread(purpose, thread_id)
    print("Finished:", final_state.finished)
    print("Steps:", final_state.step)
    print("Errors:", final_state.errors)
    
    
    print("nLast 5 messages:")
    for m in final_state.mailbox[-5:]:
       print(f"- [{m.msg_type}] {m.sender} -> {m.receiver}: {m.content material[:80]}")
    
    
    snapshot = checkpointer.get_tuple({"configurable": {"thread_id": thread_id}})
    cp = snapshot.checkpoint or {}
    cv = cp.get("channel_values", {}) or {}
    sv = cp.get("state", {}) or {}
    vals = cv if isinstance(cv, dict) and len(cv) else sv if isinstance(sv, dict) else {}
    
    
    print("nCheckpoint keys:", checklist(cp.keys()))
    if isinstance(cv, dict):
       print("channel_values keys:", checklist(cv.keys())[:30])
    if isinstance(sv, dict):
       print("state keys:", checklist(sv.keys())[:30])
    
    
    print("nPersisted step (best-effort):", vals.get("step", "NOT_FOUND"))
    print("Continued active_role (best-effort):", vals.get("active_role", "NOT_FOUND"))
    
    
    print("nACP logs:", acp_log_path())
    print("Checkpoint DB:", db_path)
    
    
    
    
    G = nx.DiGraph()
    G.add_edge("planner", "executor")
    G.add_edge("executor", "validator")
    G.add_edge("validator", "person")
    
    
    plt.determine(figsize=(6, 4))
    pos = nx.spring_layout(G, seed=7)
    nx.draw(G, pos, with_labels=True, node_size=1800, font_size=10, arrows=True)
    plt.title("Orchestration Graph: Planner → Executor → Validator")
    plt.present()
    
    
    
    
    comm = nx.MultiDiGraph()
    for (s, r, t) in final_state.edges:
       comm.add_edge(s, r, label=t)
    
    
    plt.determine(figsize=(8, 5))
    pos2 = nx.spring_layout(comm, seed=11)
    nx.draw(comm, pos2, with_labels=True, node_size=1800, font_size=10, arrows=True)
    plt.title("Communication Graph from Structured Message Bus (Runtime Edges)")
    plt.present()
    
    
    
    
    def tail_jsonl(path: str, n: int = 8) -> Record[Dict[str, Any]]:
       if not os.path.exists(path):
           return []
       with open(path, "r", encoding="utf-8") as f:
           traces = f.readlines()[-n:]
       return [json.loads(x) for x in lines]
    
    
    
    
    print("nLast ACP log entries:")
    for row in tail_jsonl(acp_log_path(), 6):
       print(f"{row['msg_type']:>10} | {row['sender']} -> {row['receiver']} | {row['ts']}")

    We assemble the LangGraph state graph, allow SQLite-based persistence, and execute the multi-agent workflow. We use a thread identifier to make sure the agent state could be saved and recovered reliably throughout executions. We additionally visualize the orchestration and communication graphs and examine endured logs, which permits us to know how brokers work together by means of the structured message bus.

    On this tutorial, we efficiently designed and carried out a structured multi-agent communication framework utilizing LangGraph’s shared-state structure and ACP-style message-bus ideas. We enabled brokers to function independently whereas speaking by means of structured, persistent messages, which improves reliability, observability, and scalability. We logged each interplay, endured agent state throughout executions, and visualized communication patterns to realize deep perception into agent coordination. This structure permits us to construct strong, modular, and production-ready multi-agent methods that may be prolonged with further brokers, LLM reasoning, reminiscence methods, and sophisticated routing methods.


    Try the Full Codes here. Additionally, be happy to observe us on Twitter and don’t overlook to affix our 120k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.




    Source link

    Naveed Ahmad

    Related Posts

    Let’s discover one of the best alternate options to Discord

    02/03/2026

    Polymarket noticed $529M traded on bets tied to bombing of Iran

    02/03/2026

    Buyers spill what they don’t seem to be searching for anymore in AI SaaS firms

    01/03/2026
    Leave A Reply Cancel Reply

    Categories
    • AI
    Recent Comments
      Facebook X (Twitter) Instagram Pinterest
      © 2026 ThemeSphere. Designed by ThemeSphere.

      Type above and press Enter to search. Press Esc to cancel.