Close Menu
    Facebook X (Twitter) Instagram
    Articles Stock
    • Home
    • Technology
    • AI
    • Pages
      • About us
      • Contact us
      • Disclaimer For Articles Stock
      • Privacy Policy
      • Terms and Conditions
    Facebook X (Twitter) Instagram
    Articles Stock
    AI

    A Coding Implementation to Simulate Sensible Byzantine Fault Tolerance with Asyncio, Malicious Nodes, and Latency Evaluation

    Naveed AhmadBy Naveed Ahmad25/02/2026Updated:25/02/2026No Comments8 Mins Read
    blog banner23 1 18


    On this tutorial, we implement an end-to-end Sensible Byzantine Fault Tolerance (PBFT) simulator utilizing asyncio. We mannequin a practical distributed community with asynchronous message passing, configurable delays, and Byzantine nodes that deliberately deviate from the protocol. By explicitly implementing the pre-prepare, put together, and commit phases, we discover how PBFT achieves consensus below adversarial circumstances whereas respecting the theoretical 3f+1 sure. We additionally instrument the system to measure consensus latency and success charges because the variety of malicious nodes will increase, permitting us to empirically observe the boundaries of Byzantine fault tolerance.

    import asyncio
    import random
    import time
    import hashlib
    from dataclasses import dataclass, subject
    from typing import Dict, Set, Tuple, Elective, Record
    import matplotlib.pyplot as plt
    
    
    PREPREPARE = "PREPREPARE"
    PREPARE    = "PREPARE"
    COMMIT     = "COMMIT"
    
    
    @dataclass(frozen=True)
    class Msg:
       typ: str
       view: int
       seq: int
       digest: str
       sender: int
    
    
    @dataclass
    class NetConfig:
       min_delay_ms: int = 5
       max_delay_ms: int = 40
       drop_prob: float = 0.0
       reorder_prob: float = 0.0

    We set up the simulator’s basis by importing the required libraries and defining the core PBFT message sorts. We formalize community messages and parameters utilizing dataclasses to make sure structured, constant communication. We additionally outline constants representing the three PBFT phases used all through the system.

    class Community:
       def __init__(self, cfg: NetConfig):
           self.cfg = cfg
           self.nodes: Dict[int, "Node"] = {}
    
    
       def register(self, node: "Node"):
           self.nodes[node.nid] = node
    
    
       async def ship(self, dst: int, msg: Msg):
           if random.random() < self.cfg.drop_prob:
               return
    
    
           d = random.uniform(self.cfg.min_delay_ms, self.cfg.max_delay_ms) / 1000.0
           await asyncio.sleep(d)
    
    
           if random.random() < self.cfg.reorder_prob:
               await asyncio.sleep(random.uniform(0.0, 0.02))
    
    
           await self.nodes[dst].inbox.put(msg)
    
    
       async def broadcast(self, src: int, msg: Msg):
           duties = []
           for nid in self.nodes.keys():
               duties.append(asyncio.create_task(self.ship(nid, msg)))
           await asyncio.collect(*duties)

    We implement an asynchronous community layer that simulates real-world message supply with delays, reordering, and potential drops. We register nodes dynamically and use asyncio duties to broadcast messages throughout the simulated community. We mannequin non-deterministic communication habits that immediately impacts consensus latency and robustness.

    @dataclass
    class NodeConfig:
       n: int
       f: int
       primary_id: int = 0
       view: int = 0
       timeout_s: float = 2.0
    
    
    class Node:
       def __init__(self, nid: int, internet: Community, cfg: NodeConfig, byzantine: bool = False):
           self.nid = nid
           self.internet = internet
           self.cfg = cfg
           self.byzantine = byzantine
    
    
           self.inbox: asyncio.Queue[Msg] = asyncio.Queue()
    
    
           self.preprepare_seen: Dict[int, str] = {}
           self.prepare_votes: Dict[Tuple[int, str], Set[int]] = {}
           self.commit_votes: Dict[Tuple[int, str], Set[int]] = {}
    
    
           self.dedicated: Dict[int, str] = {}
           self.working = True
    
    
       @property
       def f(self) -> int:
           return self.cfg.f
    
    
       def _q_prepare(self) -> int:
           return 2 * self.f + 1
    
    
       def _q_commit(self) -> int:
           return 2 * self.f + 1
    
    
       @staticmethod
       def digest_of(payload: str) -> str:
           return hashlib.sha256(payload.encode("utf-8")).hexdigest()

    We outline the configuration and inside state of every PBFT node taking part within the protocol. We initialize knowledge buildings for monitoring pre-prepare, put together, and commit votes whereas supporting each trustworthy and Byzantine habits. We additionally implement quorum threshold logic and deterministic digest era for request validation.

     async def suggest(self, payload: str, seq: int):
           if self.nid != self.cfg.primary_id:
               elevate ValueError("Solely the first can suggest on this simplified simulator.")
    
    
           if not self.byzantine:
               dig = self.digest_of(payload)
               msg = Msg(PREPREPARE, self.cfg.view, seq, dig, self.nid)
               await self.internet.broadcast(self.nid, msg)
               return
    
    
           for dst in self.internet.nodes.keys():
               variant = f"{payload}::to={dst}::salt={random.randint(0,10**9)}"
               dig = self.digest_of(variant)
               msg = Msg(PREPREPARE, self.cfg.view, seq, dig, self.nid)
               await self.internet.ship(dst, msg)
    
    
       async def handle_preprepare(self, msg: Msg):
           seq = msg.seq
           dig = msg.digest
    
    
           if self.byzantine:
               if random.random() < 0.5:
                   return
               fake_dig = dig if random.random() < 0.5 else self.digest_of(dig + "::pretend")
               out = Msg(PREPARE, msg.view, seq, fake_dig, self.nid)
               await self.internet.broadcast(self.nid, out)
               return
    
    
           if seq not in self.preprepare_seen:
               self.preprepare_seen[seq] = dig
               out = Msg(PREPARE, msg.view, seq, dig, self.nid)
               await self.internet.broadcast(self.nid, out)
    
    
       async def handle_prepare(self, msg: Msg):
           seq, dig = msg.seq, msg.digest
           key = (seq, dig)
           voters = self.prepare_votes.setdefault(key, set())
           voters.add(msg.sender)
    
    
           if self.byzantine:
               return
    
    
           if self.preprepare_seen.get(seq) != dig:
               return
    
    
           if len(voters) >= self._q_prepare():
               out = Msg(COMMIT, msg.view, seq, dig, self.nid)
               await self.internet.broadcast(self.nid, out)
    
    
       async def handle_commit(self, msg: Msg):
           seq, dig = msg.seq, msg.digest
           key = (seq, dig)
           voters = self.commit_votes.setdefault(key, set())
           voters.add(msg.sender)
    
    
           if self.byzantine:
               return
    
    
           if self.preprepare_seen.get(seq) != dig:
               return
    
    
           if seq in self.dedicated:
               return
    
    
           if len(voters) >= self._q_commit():
               self.dedicated[seq] = dig

    We implement the core PBFT protocol logic, together with proposal dealing with and the pre-prepare and put together phases. We explicitly mannequin Byzantine equivocation by permitting malicious nodes to ship conflicting digests to totally different friends. We advance the protocol to the commit part as soon as the required put together quorum is reached.

     async def run(self):
           whereas self.working:
               msg = await self.inbox.get()
               if msg.typ == PREPREPARE:
                   await self.handle_preprepare(msg)
               elif msg.typ == PREPARE:
                   await self.handle_prepare(msg)
               elif msg.typ == COMMIT:
                   await self.handle_commit(msg)
    
    
       def cease(self):
           self.working = False
    
    
    def pbft_params(n: int) -> int:
       return (n - 1) // 3
    
    
    async def run_single_consensus(
       n: int,
       malicious: int,
       net_cfg: NetConfig,
       payload: str = "tx: pay Alice->Bob 5",
       seq: int = 1,
       timeout_s: float = 2.0,
       seed: Elective[int] = None
    ) -> Dict[str, object]:
       if seed is just not None:
           random.seed(seed)
    
    
       f_max = pbft_params(n)
       f = f_max
    
    
       internet = Community(net_cfg)
       cfg = NodeConfig(n=n, f=f, primary_id=0, view=0, timeout_s=timeout_s)
    
    
       mal_set = set(random.pattern(vary(n), ok=min(malicious, n)))
       nodes: Record[Node] = []
       for i in vary(n):
           node = Node(i, internet, cfg, byzantine=(i in mal_set))
           internet.register(node)
           nodes.append(node)
    
    
       duties = [asyncio.create_task(node.run()) for node in nodes]
    
    
       t0 = time.perf_counter()
       await nodes[cfg.primary_id].suggest(payload, seq)
    
    
       trustworthy = [node for node in nodes if not node.byzantine]
       goal = max(1, len(trustworthy))
    
    
       committed_honest = 0
       latency = None
    
    
       async def poll_commits():
           nonlocal committed_honest, latency
           whereas True:
               committed_honest = sum(1 for node in trustworthy if seq in node.dedicated)
               if committed_honest >= goal:
                   latency = time.perf_counter() - t0
                   return
               await asyncio.sleep(0.005)
    
    
       attempt:
           await asyncio.wait_for(poll_commits(), timeout=timeout_s)
           success = True
       besides asyncio.TimeoutError:
           success = False
           latency = None
    
    
       for node in nodes:
           node.cease()
       for process in duties:
           process.cancel()
       await asyncio.collect(*duties, return_exceptions=True)
    
    
       digest_set = set(node.dedicated.get(seq) for node in trustworthy if seq in node.dedicated)
       agreed = (len(digest_set) == 1) if success else False
    
    
       return {
           "n": n,
           "f": f,
           "malicious": malicious,
           "mal_set": mal_set,
           "success": success,
           "latency_s": latency,
           "honest_committed": committed_honest,
           "honest_total": len(trustworthy),
           "agreed_digest": agreed,
       }

    We full the PBFT state machine by processing commit messages and finalizing selections as soon as commit quorums are happy. We run the node occasion loop to repeatedly course of incoming messages asynchronously. We additionally embody lifecycle controls to securely cease nodes after every experiment run.

    async def latency_sweep(
       n: int = 10,
       max_malicious: Elective[int] = None,
       trials_per_point: int = 5,
       timeout_s: float = 2.0,
       net_cfg: Elective[NetConfig] = None,
       seed: int = 7
    ):
       if net_cfg is None:
           net_cfg = NetConfig(min_delay_ms=5, max_delay_ms=35, drop_prob=0.0, reorder_prob=0.05)
    
    
       if max_malicious is None:
           max_malicious = n
    
    
       outcomes = []
       random.seed(seed)
    
    
       for m in vary(0, max_malicious + 1):
           latencies = []
           successes = 0
           agreements = 0
    
    
           for t in vary(trials_per_point):
               out = await run_single_consensus(
                   n=n,
                   malicious=m,
                   net_cfg=net_cfg,
                   timeout_s=timeout_s,
                   seed=seed + 1000*m + t
               )
               outcomes.append(out)
               if out["success"]:
                   successes += 1
                   latencies.append(out["latency_s"])
                   if out["agreed_digest"]:
                       agreements += 1
    
    
           avg_lat = sum(latencies)/len(latencies) if latencies else None
           print(
               f"malicious={m:second} | success={successes}/{trials_per_point} "
               f"| avg_latency={avg_lat if avg_lat is just not None else 'NA'} "
               f"| digest_agreement={agreements}/{successes if successes else 1}"
           )
    
    
       return outcomes
    
    
    def plot_latency(outcomes: Record[Dict[str, object]], trials_per_point: int):
       by_m = {}
       for r in outcomes:
           m = r["malicious"]
           by_m.setdefault(m, []).append(r)
    
    
       xs, ys = [], []
       success_rate = []
       for m in sorted(by_m.keys()):
           group = by_m[m]
           lats = [g["latency_s"] for g in group if g["latency_s"] is just not None]
           succ = sum(1 for g in group if g["success"])
           xs.append(m)
           ys.append(sum(lats)/len(lats) if lats else float("nan"))
           success_rate.append(succ / len(group))
    
    
       plt.determine()
       plt.plot(xs, ys, marker="o")
       plt.xlabel("Variety of malicious (Byzantine) nodes")
       plt.ylabel("Consensus latency (seconds) — avg over successes")
       plt.title("PBFT Simulator: Latency vs Malicious Nodes")
       plt.grid(True)
       plt.present()
    
    
       plt.determine()
       plt.plot(xs, success_rate, marker="o")
       plt.xlabel("Variety of malicious (Byzantine) nodes")
       plt.ylabel("Success price")
       plt.title("PBFT Simulator: Success Fee vs Malicious Nodes")
       plt.ylim(-0.05, 1.05)
       plt.grid(True)
       plt.present()
    
    
    async def foremost():
       n = 10
       trials = 6
       f = pbft_params(n)
       print(f"n={n} => PBFT theoretical max f = flooring((n-1)/3) = {f}")
       print("Concept: security/liveness sometimes assumed when malicious <= f and timing assumptions maintain.n")
    
    
       outcomes = await latency_sweep(
           n=n,
           max_malicious=min(n, f + 6),
           trials_per_point=trials,
           timeout_s=2.0,
           net_cfg=NetConfig(min_delay_ms=5, max_delay_ms=35, drop_prob=0.0, reorder_prob=0.05),
           seed=11
       )
       plot_latency(outcomes, trials)
    
    
    await foremost()

    We orchestrate large-scale experiments by sweeping throughout totally different numbers of malicious nodes and amassing latency statistics. We mixture outcomes to research consensus success charges and visualize system habits utilizing plots. We run the total experiment pipeline and observe how PBFT degrades because the variety of Byzantine faults approaches and exceeds theoretical limits.

    In conclusion, we gained hands-on perception into how PBFT behaves past textbook ensures and the way adversarial stress impacts each latency and liveness in apply. We noticed how quorum thresholds implement security, why consensus breaks down as soon as Byzantine nodes exceed the tolerated sure, and the way asynchronous networks amplify these results. This implementation offers a sensible basis for experimenting with extra superior distributed-systems ideas, similar to view modifications, chief rotation, or authenticated messaging. It helps us construct instinct for the design trade-offs that underpin trendy blockchain and distributed belief methods.


    Try the Full Codes here. Additionally, be happy to comply with us on Twitter and don’t neglect to hitch our 120k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.




    Source link

    Naveed Ahmad

    Related Posts

    Spanish ‘soonicorn’ Multiverse Computing releases free compressed AI mannequin

    25/02/2026

    The AI Tax Is Actual. Use Design to Get Your Refund.

    25/02/2026

    Discord delays world rollout of age verification after backlash

    25/02/2026
    Leave A Reply Cancel Reply

    Categories
    • AI
    Recent Comments
      Facebook X (Twitter) Instagram Pinterest
      © 2026 ThemeSphere. Designed by ThemeSphere.

      Type above and press Enter to search. Press Esc to cancel.