Close Menu
    Facebook X (Twitter) Instagram
    Articles Stock
    • Home
    • Technology
    • AI
    • Pages
      • About us
      • Contact us
      • Disclaimer For Articles Stock
      • Privacy Policy
      • Terms and Conditions
    Facebook X (Twitter) Instagram
    Articles Stock
    AI

    How you can Design a Streaming Resolution Agent with Partial Reasoning, On-line Replanning, and Reactive Mid-Execution Adaptation in Dynamic Environments

    Naveed AhmadBy Naveed Ahmad12/03/2026Updated:12/03/2026No Comments3 Mins Read
    blog banner234


    @dataclass
    class AgentConfig:
       horizon: int = 6
       replan_on_target_move: bool = True
       replan_on_obstacle_change: bool = True
       max_steps: int = 120
       think_latency: float = 0.02
       act_latency: float = 0.01
       risk_gate: float = 0.85
       alt_search_depth: int = 2
    
    
    @dataclass
    class StreamingDecisionAgent:
       cfg: AgentConfig
       world: DynamicGridWorld
       start_time: float = subject(init=False, default_factory=time.time)
       step_id: int = subject(init=False, default=0)
       current_plan: Record[Coord] = subject(init=False, default_factory=checklist)
       current_actions: Record[str] = subject(init=False, default_factory=checklist)
       last_snapshot: Dict[str, Any] = subject(init=False, default_factory=dict)
       stats: Dict[str, Any] = subject(init=False, default_factory=lambda: defaultdict(int))
    
    
       def _now(self) -> float:
           return time.time() - self.start_time
    
    
       def _emit(self, form: str, msg: str, information: Non-compulsory[Dict[str, Any]] = None) -> StreamEvent:
           return StreamEvent(t=self._now(), form=form, step=self.step_id, msg=msg, information=information or {})
    
    
       def _need_replan(self, obs: Dict[str, Any]) -> bool:
           ch = obs["changes"]
           if obs["done"]:
               return False
           if not self.current_plan or len(self.current_plan) <= 1:
               return True
           if self.cfg.replan_on_target_move and ch.get("target_moved"):
               return True
           if self.cfg.replan_on_obstacle_change and (ch.get("obstacles_added") or ch.get("obstacles_cleared")):
               return True
           if len(self.current_plan) > 1 and self.current_plan[1] in self.world.obstacles:
               return True
           return False
    
    
       def _plan(self) -> PlanResult:
           time.sleep(self.cfg.think_latency)
           self.stats["replans"] += 1
           return astar(self.world, self.world.agent, self.world.goal)
    
    
       def _choose_action(self, planned_action: str) -> Tuple[str, str]:
           ax, ay = self.world.agent
           action_to_delta = {"R": (1,0), "L": (-1,0), "D": (0,1), "U": (0,-1), "S": (0,0)}
           dx, dy = action_to_delta[planned_action]
           nxt = (ax+dx, ay+dy)
           if not self.world.in_bounds(nxt) or not self.world.satisfactory(nxt):
               self.stats["overrides"] += 1
               return "S", "planned_move_invalid -> wait."
           r = action_risk(self.world, nxt)
           if r > self.cfg.risk_gate:
               candidates = ["U","D","L","R","S"]
               finest = (planned_action, float("inf"), "keep_plan")
               for a in candidates:
                   dx, dy = action_to_delta[a]
                   p = (ax+dx, ay+dy)
                   if not self.world.in_bounds(p) or not self.world.satisfactory(p):
                       proceed
                   rating = action_risk(self.world, p) + 0.05 * self.world.manhattan(p, self.world.goal)
                   if rating < finest[1]:
                       finest = (a, rating, "risk_avoidance_override")
               if finest[0] != planned_action:
                   self.stats["overrides"] += 1
                   return finest[0], finest[2]
           return planned_action, "follow_plan"
    
    
       def run(self) -> Generator[StreamEvent, None, None]:
           yield self._emit("observe", "Initialize: studying preliminary state.", {"agent": self.world.agent, "goal": self.world.goal})
           yield self._emit("world", "Preliminary world snapshot.", {"grid": self.world.render()})
           for self.step_id in vary(1, self.cfg.max_steps + 1):
               if self.step_id == 1 or self._need_replan(self.last_snapshot):
                   pr = self._plan()
                   self.current_plan = pr.path
                   self.current_actions = path_to_actions(pr.path)
                   if pr.cause != "found_path":
                       yield self._emit("plan", "Planner couldn't discover a path inside funds; switching to reactive exploration.", {"cause": pr.cause, "expanded": pr.expanded})
                       self.current_actions = []
                   else:
                       horizon_path = pr.path[: max(2, min(len(pr.path), self.cfg.horizon + 1))]
                       yield self._emit("plan", f"Plan up to date (on-line A*). Decide to subsequent {len(horizon_path)-1} strikes, then re-evaluate.", {"cause": pr.cause, "path_len": len(pr.path), "expanded": pr.expanded, "commit_horizon": self.cfg.horizon, "horizon_path": horizon_path, "grid_with_path": self.world.render(path=horizon_path)})
               if self.current_actions:
                   planned_action = self.current_actions[0]
               else:
                   ax, ay = self.world.agent
                   tx, ty = self.world.goal
                   choices = []
                   if tx > ax: choices.append("R")
                   if tx < ax: choices.append("L")
                   if ty > ay: choices.append("D")
                   if ty < ay: choices.append("U")
                   choices += ["S","U","D","L","R"]
                   planned_action = choices[0]
               motion, why = self._choose_action(planned_action)
               yield self._emit("resolve", f"Intermediate choice: motion={motion} ({why}).", {"planned_action": planned_action, "chosen_action": motion, "agent": self.world.agent, "goal": self.world.goal})
               time.sleep(self.cfg.act_latency)
               obs = self.world.step(motion)
               self.last_snapshot = obs
               if self.current_actions:
                   if motion == planned_action:
                       self.current_actions = self.current_actions[1:]
                       if len(self.current_plan) > 1:
                           self.current_plan = self.current_plan[1:]
               ch = obs["changes"]
               shock = []
               if ch.get("target_moved"): shock.append("target_moved")
               if ch.get("obstacles_added"): shock.append(f"obstacles_added={len(ch['obstacles_added'])}")
               if ch.get("obstacles_cleared"): shock.append(f"obstacles_cleared={len(ch['obstacles_cleared'])}")
               surprise_msg = ("Surprises: " + ", ".be a part of(shock)) if shock else "No main surprises."
               self.stats["steps"] += 1
               if obs["moved"]: self.stats["moves"] += 1
               if ch.get("target_moved"): self.stats["target_moves"] += 1
               if ch.get("obstacles_added") or ch.get("obstacles_cleared"): self.stats["world_shifts"] += 1
               yield self._emit("observe", f"Noticed consequence. {surprise_msg}", {"moved": obs["moved"], "agent": obs["agent"], "goal": obs["target"], "completed": obs["done"], "adjustments": ch, "grid": self.world.render(path=self.current_plan[: min(len(self.current_plan), 10)])})
               if obs["done"]:
                   yield self._emit("completed", "Objective reached. Stopping execution.", {"final_agent": obs["agent"], "final_target": obs["target"], "stats": dict(self.stats)})
                   return
           yield self._emit("completed", "Max steps reached with out reaching the objective.", {"final_agent": self.world.agent, "final_target": self.world.goal, "stats": dict(self.stats)})



    Source link

    Naveed Ahmad

    Related Posts

    Google Play is including new paid and PC video games, recreation trials, neighborhood posts, and extra

    12/03/2026

    AI ‘actor’ Tilly Norwood put out the worst tune I’ve ever heard

    12/03/2026

    Inside OpenAI’s Race to Catch As much as Claude Code

    12/03/2026
    Leave A Reply Cancel Reply

    Categories
    • AI
    Recent Comments
      Facebook X (Twitter) Instagram Pinterest
      © 2026 ThemeSphere. Designed by ThemeSphere.

      Type above and press Enter to search. Press Esc to cancel.