Close Menu
    Facebook X (Twitter) Instagram
    Articles Stock
    • Home
    • Technology
    • AI
    • Pages
      • About us
      • Contact us
      • Disclaimer For Articles Stock
      • Privacy Policy
      • Terms and Conditions
    Facebook X (Twitter) Instagram
    Articles Stock
    AI

    The best way to Construct an Atomic-Brokers RAG Pipeline with Typed Schemas, Dynamic Context Injection, and Agent Chaining

    Naveed AhmadBy Naveed Ahmad12/02/2026Updated:12/02/2026No Comments7 Mins Read
    a lively futuristic scene where ai inspi TQfHRXyNSLeIVMuLcBHc4Q C0VZ9Y7YSluKroilcFdxeQ


    On this tutorial, we construct a sophisticated, end-to-end studying pipeline round Atomic-Agents by wiring collectively typed agent interfaces, structured prompting, and a compact retrieval layer that grounds outputs in actual undertaking documentation. Additionally, we display the way to plan retrieval, retrieve related context, inject it dynamically into an answering agent, and run an interactive loop that turns the setup right into a reusable analysis assistant for any new Atomic Brokers query. Take a look at the FULL CODES here.

    import os, sys, textwrap, time, json, re
    from typing import Listing, Non-obligatory, Dict, Tuple
    from dataclasses import dataclass
    import subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q",
                          "atomic-agents", "instructor", "openai", "pydantic",
                          "requests", "beautifulsoup4", "scikit-learn"])
    from getpass import getpass
    if not os.environ.get("OPENAI_API_KEY"):
       os.environ["OPENAI_API_KEY"] = getpass("Enter OPENAI_API_KEY (enter hidden): ").strip()
    MODEL = os.environ.get("OPENAI_MODEL", "gpt-4o-mini")
    from pydantic import Subject
    from openai import OpenAI
    import teacher
    from atomic_agents import AtomicAgent, AgentConfig, BaseIOSchema
    from atomic_agents.context import SystemPromptGenerator, ChatHistory, BaseDynamicContextProvider
    import requests
    from bs4 import BeautifulSoup

    We set up all required packages, import the core Atomic-Brokers primitives, and arrange Colab-compatible dependencies in a single place. We securely seize the OpenAI API key from the keyboard and retailer it within the surroundings so downstream code by no means hardcodes secrets and techniques. We additionally lock in a default mannequin identify whereas holding it configurable through an surroundings variable.

    def fetch_url_text(url: str, timeout: int = 20) -> str:
       r = requests.get(url, timeout=timeout, headers={"Person-Agent": "Mozilla/5.0"})
       r.raise_for_status()
       soup = BeautifulSoup(r.textual content, "html.parser")
       for tag in soup(["script", "style", "nav", "header", "footer", "noscript"]):
           tag.decompose()
       textual content = soup.get_text("n")
       textual content = re.sub(r"[ t]+", " ", textual content)
       textual content = re.sub(r"n{3,}", "nn", textual content).strip()
       return textual content
    
    
    def chunk_text(textual content: str, max_chars: int = 1400, overlap: int = 200) -> Listing[str]:
       if not textual content:
           return []
       chunks = []
       i = 0
       whereas i < len(textual content):
           chunk = textual content[i:i+max_chars].strip()
           if chunk:
               chunks.append(chunk)
           i += max_chars - overlap
       return chunks
    
    
    def clamp(s: str, n: int = 800) -> str:
       s = (s or "").strip()
       return s if len(s) <= n else s[:n].rstrip() + "…"

    We fetch net pages from the Atomic Brokers repo and docs, then clear them into plain textual content so retrieval turns into dependable. We chunk lengthy paperwork into overlapping segments, preserving context whereas holding every chunk sufficiently small for rating and quotation. We additionally add a small helper to clamp lengthy snippets so our injected context stays readable.

    from sklearn.feature_extraction.textual content import TfidfVectorizer
    from sklearn.metrics.pairwise import cosine_similarity
    
    
    @dataclass
    class Snippet:
       doc_id: str
       url: str
       chunk_id: int
       textual content: str
       rating: float
    
    
    class MiniCorpusRetriever:
       def __init__(self, docs: Dict[str, Tuple[str, str]]):
           self.objects: Listing[Tuple[str, str, int, str]] = []
           for doc_id, (url, uncooked) in docs.objects():
               for idx, ch in enumerate(chunk_text(uncooked)):
                   self.objects.append((doc_id, url, idx, ch))
           if not self.objects:
               increase RuntimeError("No paperwork had been fetched; can't construct TF-IDF index.")
           self.vectorizer = TfidfVectorizer(stop_words="english", max_features=50000)
           self.matrix = self.vectorizer.fit_transform([it[3] for it in self.objects])
    
    
       def search(self, question: str, okay: int = 6) -> Listing[Snippet]:
           qv = self.vectorizer.rework([query])
           sims = cosine_similarity(qv, self.matrix).ravel()
           high = sims.argsort()[::-1][:k]
           out = []
           for j in high:
               doc_id, url, chunk_id, txt = self.objects[j]
               out.append(Snippet(doc_id=doc_id, url=url, chunk_id=chunk_id, textual content=txt, rating=float(sims[j])))
           return out
    
    
    class RetrievedContextProvider(BaseDynamicContextProvider):
       def __init__(self, title: str, snippets: Listing[Snippet]):
           tremendous().__init__(title=title)
           self.snippets = snippets
    
    
       def get_info(self) -> str:
           blocks = []
           for s in self.snippets:
               blocks.append(
                   f"[{s.doc_id}#{s.chunk_id}] (rating={s.rating:.3f}) {s.url}n{clamp(s.textual content, 900)}"
               )
           return "nn".be a part of(blocks)

    We construct a mini retrieval system utilizing TF-IDF and cosine similarity over the chunked documentation corpus. We wrap every retrieved chunk in a structured Snippet object to trace doc IDs, chunk IDs, and quotation scores. We then inject top-ranked chunks into the agent’s runtime through a dynamic context supplier, holding the answering agent grounded. Take a look at the FULL CODES here.

    class PlanInput(BaseIOSchema):
       """Enter schema for the planner agent: describes the person's process and what number of retrieval queries to draft."""
       process: str = Subject(...)
       num_queries: int = Subject(4)
    
    
    class PlanOutput(BaseIOSchema):
       """Output schema from the planner agent: retrieval queries, protection guidelines, and security checks."""
       queries: Listing[str]
       must_cover: Listing[str]
       safety_checks: Listing[str]
    
    
    class AnswerInput(BaseIOSchema):
       """Enter schema for the answering agent: person query plus model constraints."""
       query: str
       model: str = "concise however superior"
    
    
    class AnswerOutput(BaseIOSchema):
       """Output schema for the answering agent: grounded reply, subsequent steps, and which citations had been used."""
       reply: str
       next_steps: Listing[str]
       used_citations: Listing[str]
    
    
    consumer = teacher.from_openai(OpenAI(api_key=os.environ["OPENAI_API_KEY"]))
    
    
    planner_prompt = SystemPromptGenerator(
       background=[
           "You are a rigorous research planner for a small RAG system.",
           "You propose retrieval queries that are diverse (lexical + semantic) and designed to find authoritative info.",
           "You do NOT answer the task; you only plan retrieval."
       ],
       steps=[
           "Read the task.",
           "Propose diverse retrieval queries (not too long).",
           "List must-cover aspects and safety checks."
       ],
       output_instructions=[
           "Return strictly the PlanOutput schema.",
           "Queries must be directly usable as search strings.",
           "Must-cover should be 4–8 bullets."
       ]
    )
    
    
    planner = AtomicAgent[PlanInput, PlanOutput](
       config=AgentConfig(
           consumer=consumer,
           mannequin=MODEL,
           system_prompt_generator=planner_prompt,
           historical past=ChatHistory(),
       )
    )
    
    
    answerer_prompt = SystemPromptGenerator(
       background=[
           "You are an expert technical tutor for Atomic Agents (atomic-agents).",
           "You are given retrieved context snippets with IDs like [doc#chunk].",
           "You have to floor claims within the supplied snippets and cite them inline."
       ],
       steps=[
           "Read the question and the provided context.",
           "Synthesize an accurate answer using only supported facts.",
           "Cite claims inline using the provided snippet IDs."
       ],
       output_instructions=[
           "Use inline citations like [readme#12] or [docs_home#3].",
           "If the context doesn't help one thing, say so briefly and recommend what to retrieve subsequent.",
           "Return strictly the AnswerOutput schema."
       ]
    )
    
    
    answerer = AtomicAgent[AnswerInput, AnswerOutput](
       config=AgentConfig(
           consumer=consumer,
           mannequin=MODEL,
           system_prompt_generator=answerer_prompt,
           historical past=ChatHistory(),
       )
    )
    

    We outline strict-typed schemas for planner and answerer inputs and outputs, and embrace docstrings to fulfill Atomic Brokers’ schema necessities. We create an Teacher-wrapped OpenAI consumer and configure two Atomic Brokers with express system prompts and chat historical past. We implement structured outputs so the planner produces queries and the answerer produces a cited response with clear subsequent steps.

    SOURCES = {
       "readme": "https://github.com/BrainBlend-AI/atomic-agents",
       "docs_home": "https://brainblend-ai.github.io/atomic-agents/",
       "examples_index": "https://brainblend-ai.github.io/atomic-agents/examples/index.html",
    }
    
    
    raw_docs: Dict[str, Tuple[str, str]] = {}
    for doc_id, url in SOURCES.objects():
       attempt:
           raw_docs[doc_id] = (url, fetch_url_text(url))
       besides Exception:
           raw_docs[doc_id] = (url, "")
    
    
    non_empty = [d for d in raw_docs.values() if d[1].strip()]
    if not non_empty:
       increase RuntimeError("All supply fetches failed or had been empty. Test community entry in Colab and retry.")
    
    
    retriever = MiniCorpusRetriever(raw_docs)
    
    
    def run_atomic_rag(query: str, okay: int = 7, verbose: bool = True) -> AnswerOutput:
       t0 = time.time()
       plan = planner.run(PlanInput(process=query, num_queries=4))
       all_snips: Listing[Snippet] = []
       for q in plan.queries:
           all_snips.prolong(retriever.search(q, okay=max(2, okay // 2)))
       greatest: Dict[Tuple[str, int], Snippet] = {}
       for s in all_snips:
           key = (s.doc_id, s.chunk_id)
           if (key not in greatest) or (s.rating > greatest[key].rating):
               greatest[key] = s
       snips = sorted(greatest.values(), key=lambda x: x.rating, reverse=True)[:k]
       ctx = RetrievedContextProvider(title="Retrieved Atomic Brokers Context", snippets=snips)
       answerer.register_context_provider("retrieved_context", ctx)
       out = answerer.run(AnswerInput(query=query, model="concise, superior, sensible"))
       if verbose:
           print(out.reply)
       return out
    
    
    demo_q = "Educate me Atomic Brokers at a sophisticated stage: clarify the core constructing blocks and present the way to chain brokers with typed schemas and dynamic context."
    run_atomic_rag(demo_q, okay=7, verbose=True)
    
    
    whereas True:
       user_q = enter("nYour query> ").strip()
       if not user_q or user_q.decrease() in {"exit", "give up"}:
           break
       run_atomic_rag(user_q, okay=7, verbose=True)

    We fetch a small set of authoritative Atomic Brokers sources and construct an area retrieval index from them. We implement a full pipeline operate that plans queries, retrieves related context, injects it, and produces a grounded remaining reply. We end by working a demo question and launching an interactive loop so we will hold asking questions and getting cited solutions.

    In conclusion, we accomplished the Atomic-Brokers workflow in Colab, cleanly separating planning, retrieval, answering, and guaranteeing robust typing. We stored the system grounded by injecting solely the highest-signal documentation chunks as dynamic context, and we enforced a quotation self-discipline that makes outputs auditable. From right here, we will scale this sample by including extra sources, swapping in stronger retrievers or rerankers, introducing tool-use brokers, and turning the pipeline right into a production-grade analysis assistant that is still each quick and reliable.


    Take a look at the FULL CODES here. Additionally, be at liberty to observe us on Twitter and don’t neglect to affix our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.




    Source link

    Naveed Ahmad

    Related Posts

    I Beloved My OpenClaw AI Agent—Till It Turned on Me

    12/02/2026

    The best way to get into a16z’s super-competitive Speedrun startup accelerator program

    12/02/2026

    CBP Indicators Clearview AI Deal to Use Face Recognition for ‘Tactical Focusing on’

    11/02/2026
    Leave A Reply Cancel Reply

    Categories
    • AI
    Recent Comments
      Facebook X (Twitter) Instagram Pinterest
      © 2026 ThemeSphere. Designed by ThemeSphere.

      Type above and press Enter to search. Press Esc to cancel.