On this tutorial, we discover how one can construct a completely offline, multi-step reasoning agent that makes use of the Instructor library to generate structured outputs and reliably orchestrate advanced instrument calls. On this implementation, we design an agent able to selecting the best instrument, validating inputs, planning multi-stage workflows, and recovering from errors. We carry collectively Teacher, Transformers, and thoroughly crafted Pydantic schemas to create an clever, adaptive system that mirrors real-world agentic AI habits. Try the FULL CODES here.
import subprocess
import sys
def install_dependencies():
import torch
packages = [
"instructor",
"transformers>=4.35.0",
"torch",
"accelerate",
"pydantic>=2.0.0",
"numpy",
"pandas"
]
if torch.cuda.is_available():
packages.append("bitsandbytes")
print("✅ GPU detected - putting in quantization help")
else:
print("⚠️ No GPU detected - will use CPU (slower however works)")
for package deal in packages:
subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", package])
strive:
import teacher
besides ImportError:
print("📦 Putting in dependencies...")
install_dependencies()
print("✅ Set up full!")
from typing import Literal, Non-obligatory, Listing, Union, Dict, Any
from pydantic import BaseModel, Area, validator
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import teacher
import json
from datetime import datetime
import re
We arrange the environment by putting in all required dependencies and importing the core libraries. As we lay the inspiration for the system, we be certain that every little thing, from the Teacher to the Transformers, is prepared for offline execution. This lets us begin with a clear and dependable base for constructing the agent. Try the FULL CODES here.
class SQLQuery(BaseModel):
"""Advanced SQL era with validation"""
desk: str
columns: Listing[str]
where_conditions: Non-obligatory[Dict[str, Any]] = None
joins: Non-obligatory[List[Dict[str, str]]] = None
aggregations: Non-obligatory[Dict[str, str]] = None
order_by: Non-obligatory[List[str]] = None
@validator('columns')
def validate_columns(cls, v):
if not v:
elevate ValueError("Should specify at the very least one column")
return v
class DataTransformation(BaseModel):
"""Schema for advanced information pipeline operations"""
operation: Literal["filter", "aggregate", "join", "pivot", "normalize"]
source_data: str = Area(description="Reference to information supply")
parameters: Dict[str, Any]
output_format: Literal["json", "csv", "dataframe"]
class APIRequest(BaseModel):
"""Multi-endpoint API orchestration"""
endpoints: Listing[Dict[str, str]] = Area(description="Listing of endpoints to name")
authentication: Dict[str, str]
request_order: Literal["sequential", "parallel", "conditional"]
error_handling: Literal["stop", "continue", "retry"]
max_retries: int = Area(default=3, ge=0, le=10)
class CodeGeneration(BaseModel):
"""Generate and validate code snippets"""
language: Literal["python", "javascript", "sql", "bash"]
objective: str
code: str = Area(description="The generated code")
dependencies: Listing[str] = Area(default_factory=listing)
test_cases: Listing[Dict[str, Any]] = Area(default_factory=listing)
@validator('code')
def validate_code_safety(cls, v, values):
harmful = ['eval(', 'exec(', '__import__', 'os.system']
if values.get('language') == 'python':
if any(d in v for d in harmful):
elevate ValueError("Code comprises probably harmful operations")
return v
class MultiToolPlan(BaseModel):
"""Plan for multi-step instrument execution"""
objective: str
steps: Listing[Dict[str, Any]] = Area(description="Ordered listing of instrument calls")
dependencies: Dict[str, List[str]] = Area(description="Step dependencies")
fallback_strategy: Non-obligatory[str] = None
estimated_duration: float = Area(description="Seconds")
class ToolCall(BaseModel):
"""Enhanced instrument choice with context"""
reasoning: str
confidence: float = Area(ge=0.0, le=1.0)
tool_name: Literal["sql_engine", "data_transformer", "api_orchestrator",
"code_generator", "planner", "none"]
tool_input: Non-obligatory[Union[SQLQuery, DataTransformation, APIRequest,
CodeGeneration, MultiToolPlan]] = None
requires_human_approval: bool = False
class ExecutionResult(BaseModel):
"""Wealthy consequence with metadata"""
success: bool
information: Any
execution_time: float
warnings: Listing[str] = Area(default_factory=listing)
metadata: Dict[str, Any] = Area(default_factory=dict)
We outline all of the superior Pydantic schemas that construction how our agent understands SQL queries, information pipelines, API calls, code era, and multi-step plans. As we construct these fashions, we give our agent sturdy validation, security, and readability in decoding advanced directions. This turns into the spine of our agent’s reasoning course of. Try the FULL CODES here.
def sql_engine_tool(params: SQLQuery) -> ExecutionResult:
import time
begin = time.time()
mock_tables = {
"customers": [
{"id": 1, "name": "Alice", "age": 30, "country": "USA"},
{"id": 2, "name": "Bob", "age": 25, "country": "UK"},
{"id": 3, "name": "Charlie", "age": 35, "country": "USA"},
],
"orders": [
{"id": 1, "user_id": 1, "amount": 100, "status": "completed"},
{"id": 2, "user_id": 1, "amount": 200, "status": "pending"},
{"id": 3, "user_id": 2, "amount": 150, "status": "completed"},
]
}
information = mock_tables.get(params.desk, [])
if params.where_conditions:
information = [row for row in data if all(
row.get(k) == v for k, v in params.where_conditions.items()
)]
information = [{col: row.get(col) for col in params.columns} for row in data]
warnings = []
if params.aggregations:
warnings.append("Aggregation simplified in mock mode")
return ExecutionResult(
success=True,
information=information,
execution_time=time.time() - begin,
warnings=warnings,
metadata={"rows_affected": len(information), "query_type": "SELECT"}
)
def data_transformer_tool(params: DataTransformation) -> ExecutionResult:
import time
begin = time.time()
operations = {
"filter": lambda d, p: [x for x in d if x.get(p['field']) == p['value']],
"mixture": lambda d, p: {"rely": len(d), "operation": p.get('operate', 'rely')},
"normalize": lambda d, p: [{k: v/p.get('factor', 1) for k, v in x.items()} for x in d]
}
mock_data = [{"value": i, "category": "A" if i % 2 else "B"} for i in range(10)]
op_func = operations.get(params.operation)
if op_func:
result_data = op_func(mock_data, params.parameters)
else:
result_data = mock_data
return ExecutionResult(
success=True,
information=result_data,
execution_time=time.time() - begin,
warnings=[],
metadata={"operation": params.operation, "input_rows": len(mock_data)}
)
def api_orchestrator_tool(params: APIRequest) -> ExecutionResult:
import time
begin = time.time()
outcomes = []
warnings = []
for i, endpoint in enumerate(params.endpoints):
if params.error_handling == "retry" and that i == 1:
warnings.append(f"Endpoint {endpoint.get('url')} failed, retrying...")
outcomes.append({
"endpoint": endpoint.get('url'),
"standing": 200,
"information": f"Mock response from {endpoint.get('url')}"
})
return ExecutionResult(
success=True,
information=outcomes,
execution_time=time.time() - begin,
warnings=warnings,
metadata={"endpoints_called": len(params.endpoints), "order": params.request_order}
)
def code_generator_tool(params: CodeGeneration) -> ExecutionResult:
import time
begin = time.time()
warnings = []
if len(params.code) > 1000:
warnings.append("Generated code is sort of lengthy, think about refactoring")
if not params.test_cases:
warnings.append("No check instances supplied for generated code")
return ExecutionResult(
success=True,
information={"code": params.code, "language": params.language, "dependencies": params.dependencies},
execution_time=time.time() - begin,
warnings=warnings,
metadata={"lines_of_code": len(params.code.break up('n'))}
)
def planner_tool(params: MultiToolPlan) -> ExecutionResult:
import time
begin = time.time()
warnings = []
if len(params.steps) > 10:
warnings.append("Plan has many steps, think about breaking into sub-plans")
for step_id, deps in params.dependencies.gadgets():
if step_id in deps:
warnings.append(f"Round dependency detected in step {step_id}")
return ExecutionResult(
success=True,
information={"plan": params.steps, "estimated_time": params.estimated_duration},
execution_time=time.time() - begin,
warnings=warnings,
metadata={"total_steps": len(params.steps)}
)
TOOLS = {
"sql_engine": sql_engine_tool,
"data_transformer": data_transformer_tool,
"api_orchestrator": api_orchestrator_tool,
"code_generator": code_generator_tool,
"planner": planner_tool
}
We implement the precise instruments, SQL execution, information transformation, API orchestration, code validation, and planning. As we write these instrument capabilities, we simulate lifelike workflows with managed outputs and error dealing with. This permits us to check the agent’s decision-making in an atmosphere that mirrors real-world duties. Try the FULL CODES here.
class AdvancedToolAgent:
"""Agent with advanced reasoning, error restoration, and multi-step planning"""
def __init__(self, model_name: str = "HuggingFaceH4/zephyr-7b-beta"):
import torch
print(f"🤖 Loading mannequin: {model_name}")
model_kwargs = {"device_map": "auto"}
if torch.cuda.is_available():
print("💫 GPU detected - utilizing 8-bit quantization")
from transformers import BitsAndBytesConfig
quantization_config = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_threshold=6.0
)
model_kwargs["quantization_config"] = quantization_config
else:
print("💻 CPU mode - utilizing smaller mannequin for higher efficiency")
model_name = "google/flan-t5-base"
model_kwargs["torch_dtype"] = "auto"
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.mannequin = AutoModelForCausalLM.from_pretrained(
model_name,
**model_kwargs
)
self.pipe = pipeline(
"text-generation", mannequin=self.mannequin, tokenizer=self.tokenizer,
max_new_tokens=768, temperature=0.7, do_sample=True
)
self.shopper = teacher.from_pipe(self.pipe)
self.execution_history = []
print("✅ Agent initialized!")
def route_to_tool(self, user_query: str, context: Non-obligatory[str] = None) -> ToolCall:
tool_descriptions = """
Superior Instruments:
- sql_engine: Execute advanced SQL queries with joins, aggregations, filtering
- data_transformer: Multi-step information pipelines (filter→mixture→normalize)
- api_orchestrator: Name a number of APIs with dependencies, retries, error dealing with
- code_generator: Generate protected, validated code with checks in a number of languages
- planner: Create multi-step execution plans with dependency administration
- none: Reply immediately utilizing reasoning
"""
immediate = f"""{tool_descriptions}
Person question: {user_query}
{f'Context from earlier steps: {context}' if context else ''}
Analyze the complexity and select the suitable instrument. For multi-step duties, use the planner."""
return self.shopper(immediate, response_model=ToolCall)
def execute_with_recovery(self, tool_call: ToolCall, max_retries: int = 2) -> ExecutionResult:
for try in vary(max_retries + 1):
strive:
if tool_call.tool_name == "none":
return ExecutionResult(
success=True, information="Direct response", execution_time=0.0,
warnings=[], metadata={}
)
tool_func = TOOLS.get(tool_call.tool_name)
if not tool_func:
return ExecutionResult(
success=False, information=None, execution_time=0.0,
warnings=[f"Tool {tool_call.tool_name} not found"], metadata={}
)
consequence = tool_func(tool_call.tool_input)
self.execution_history.append({
"instrument": tool_call.tool_name,
"success": consequence.success,
"timestamp": datetime.now().isoformat()
})
return consequence
besides Exception as e:
if try < max_retries:
print(f" ⚠️ Try {try + 1} failed, retrying...")
proceed
return ExecutionResult(
success=False, information=None, execution_time=0.0,
warnings=[f"Failed after {max_retries + 1} attempts: {str(e)}"],
metadata={"error": str(e)}
)
We assemble the agent itself, loading the mannequin, constructing the routing pipeline, and implementing restoration logic. As we outline strategies for instrument choice and execution, we give the agent the power to grasp queries, select methods, and gracefully deal with failures. Try the FULL CODES here.
def run(self, user_query: str, verbose: bool = True) -> Dict[str, Any]:
if verbose:
print(f"n{'='*70}")
print(f"🎯 Advanced Question: {user_query}")
print(f"{'='*70}")
if verbose:
print("n🧠 Step 1: Analyzing question complexity & routing...")
tool_call = self.route_to_tool(user_query)
if verbose:
print(f" → Device: {tool_call.tool_name}")
print(f" → Confidence: {tool_call.confidence:.2%}")
print(f" → Reasoning: {tool_call.reasoning}")
if tool_call.requires_human_approval:
print(f" ⚠️ Requires human approval!")
if verbose:
print("n⚙️ Step 2: Executing instrument with error restoration...")
consequence = self.execute_with_recovery(tool_call)
if verbose:
print(f" → Success: {consequence.success}")
print(f" → Execution time: {consequence.execution_time:.3f}s")
if consequence.warnings:
print(f" → Warnings: {', '.be part of(consequence.warnings)}")
print(f" → Knowledge preview: {str(consequence.information)[:200]}...")
if verbose and consequence.metadata:
print(f"n📊 Metadata:")
for key, worth in consequence.metadata.gadgets():
print(f" • {key}: {worth}")
if verbose:
print(f"n{'='*70}n")
return {
"question": user_query,
"tool_used": tool_call.tool_name,
"consequence": consequence,
"history_length": len(self.execution_history)
}
def fundamental():
agent = AdvancedToolAgent()
hard_queries = [
"Generate a SQL query to find all users from USA who have completed orders worth more than $150, and join with their order details",
"Create a data pipeline that filters records where category='A', then aggregates by count, and normalizes the results by a factor of 100",
"I need to call 3 APIs sequentially: first authenticate at /auth, then fetch user data at /users/{id}, and finally update preferences at /preferences. If any step fails, retry up to 3 times",
"Write a Python function that validates email addresses using regex, includes error handling, and has at least 2 test cases. Make sure it doesn't use any dangerous operations",
"Create a multi-step plan to: 1) Extract data from a database, 2) Transform it using pandas, 3) Generate a report, 4) Send via email. Show dependencies between steps"
]
print("n" + "🔥 HARD MODE: COMPLEX QUERIES ".heart(70, "=") + "n")
for i, question in enumerate(hard_queries, 1):
print(f"n{'#'*70}")
print(f"# CHALLENGE {i}/{len(hard_queries)}")
print(f"{'#'*70}")
strive:
agent.run(question, verbose=True)
besides Exception as e:
print(f"❌ Essential error: {e}n")
print("n" + f"✅ COMPLETED {len(agent.execution_history)} TOOL EXECUTIONS ".heart(70, "=") + "n")
print(f"📊 Success charge: {sum(1 for h in agent.execution_history if h['success']) / len(agent.execution_history) * 100:.1f}%")
if __name__ == "__main__":
fundamental()
We tie every little thing along with a run() methodology and a demo fundamental() operate that executes a number of hard-mode queries. As we watch the agent analyze, route, execute, and report outcomes, we see the complete energy of the structure in motion. This remaining step lets us expertise how the system performs below advanced, lifelike eventualities.
In conclusion, now we have constructed a robust agent able to understanding intricate directions, routing execution throughout a number of instruments, and gracefully recovering from errors, all inside a compact, offline system. As we check it on difficult queries, we watch it plan, cause, and execute with readability and construction. We now admire how modular schemas, validated instrument calls, and layered execution logic enable us to create brokers that behave reliably in advanced environments.
Try the FULL CODES here. Be happy to take a look at our GitHub Page for Tutorials, Codes and Notebooks. Additionally, be happy to observe us on Twitter and don’t neglect to hitch our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its recognition amongst audiences.
