On this tutorial, we implement an end-to-end Sensible Byzantine Fault Tolerance (PBFT) simulator utilizing asyncio. We mannequin a practical distributed community with asynchronous message passing, configurable delays, and Byzantine nodes that deliberately deviate from the protocol. By explicitly implementing the pre-prepare, put together, and commit phases, we discover how PBFT achieves consensus below adversarial circumstances whereas respecting the theoretical 3f+1 sure. We additionally instrument the system to measure consensus latency and success charges because the variety of malicious nodes will increase, permitting us to empirically observe the boundaries of Byzantine fault tolerance.
import asyncio
import random
import time
import hashlib
from dataclasses import dataclass, subject
from typing import Dict, Set, Tuple, Elective, Record
import matplotlib.pyplot as plt
PREPREPARE = "PREPREPARE"
PREPARE = "PREPARE"
COMMIT = "COMMIT"
@dataclass(frozen=True)
class Msg:
typ: str
view: int
seq: int
digest: str
sender: int
@dataclass
class NetConfig:
min_delay_ms: int = 5
max_delay_ms: int = 40
drop_prob: float = 0.0
reorder_prob: float = 0.0
We set up the simulator’s basis by importing the required libraries and defining the core PBFT message sorts. We formalize community messages and parameters utilizing dataclasses to make sure structured, constant communication. We additionally outline constants representing the three PBFT phases used all through the system.
class Community:
def __init__(self, cfg: NetConfig):
self.cfg = cfg
self.nodes: Dict[int, "Node"] = {}
def register(self, node: "Node"):
self.nodes[node.nid] = node
async def ship(self, dst: int, msg: Msg):
if random.random() < self.cfg.drop_prob:
return
d = random.uniform(self.cfg.min_delay_ms, self.cfg.max_delay_ms) / 1000.0
await asyncio.sleep(d)
if random.random() < self.cfg.reorder_prob:
await asyncio.sleep(random.uniform(0.0, 0.02))
await self.nodes[dst].inbox.put(msg)
async def broadcast(self, src: int, msg: Msg):
duties = []
for nid in self.nodes.keys():
duties.append(asyncio.create_task(self.ship(nid, msg)))
await asyncio.collect(*duties)
We implement an asynchronous community layer that simulates real-world message supply with delays, reordering, and potential drops. We register nodes dynamically and use asyncio duties to broadcast messages throughout the simulated community. We mannequin non-deterministic communication habits that immediately impacts consensus latency and robustness.
@dataclass
class NodeConfig:
n: int
f: int
primary_id: int = 0
view: int = 0
timeout_s: float = 2.0
class Node:
def __init__(self, nid: int, internet: Community, cfg: NodeConfig, byzantine: bool = False):
self.nid = nid
self.internet = internet
self.cfg = cfg
self.byzantine = byzantine
self.inbox: asyncio.Queue[Msg] = asyncio.Queue()
self.preprepare_seen: Dict[int, str] = {}
self.prepare_votes: Dict[Tuple[int, str], Set[int]] = {}
self.commit_votes: Dict[Tuple[int, str], Set[int]] = {}
self.dedicated: Dict[int, str] = {}
self.working = True
@property
def f(self) -> int:
return self.cfg.f
def _q_prepare(self) -> int:
return 2 * self.f + 1
def _q_commit(self) -> int:
return 2 * self.f + 1
@staticmethod
def digest_of(payload: str) -> str:
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
We outline the configuration and inside state of every PBFT node taking part within the protocol. We initialize knowledge buildings for monitoring pre-prepare, put together, and commit votes whereas supporting each trustworthy and Byzantine habits. We additionally implement quorum threshold logic and deterministic digest era for request validation.
async def suggest(self, payload: str, seq: int):
if self.nid != self.cfg.primary_id:
elevate ValueError("Solely the first can suggest on this simplified simulator.")
if not self.byzantine:
dig = self.digest_of(payload)
msg = Msg(PREPREPARE, self.cfg.view, seq, dig, self.nid)
await self.internet.broadcast(self.nid, msg)
return
for dst in self.internet.nodes.keys():
variant = f"{payload}::to={dst}::salt={random.randint(0,10**9)}"
dig = self.digest_of(variant)
msg = Msg(PREPREPARE, self.cfg.view, seq, dig, self.nid)
await self.internet.ship(dst, msg)
async def handle_preprepare(self, msg: Msg):
seq = msg.seq
dig = msg.digest
if self.byzantine:
if random.random() < 0.5:
return
fake_dig = dig if random.random() < 0.5 else self.digest_of(dig + "::pretend")
out = Msg(PREPARE, msg.view, seq, fake_dig, self.nid)
await self.internet.broadcast(self.nid, out)
return
if seq not in self.preprepare_seen:
self.preprepare_seen[seq] = dig
out = Msg(PREPARE, msg.view, seq, dig, self.nid)
await self.internet.broadcast(self.nid, out)
async def handle_prepare(self, msg: Msg):
seq, dig = msg.seq, msg.digest
key = (seq, dig)
voters = self.prepare_votes.setdefault(key, set())
voters.add(msg.sender)
if self.byzantine:
return
if self.preprepare_seen.get(seq) != dig:
return
if len(voters) >= self._q_prepare():
out = Msg(COMMIT, msg.view, seq, dig, self.nid)
await self.internet.broadcast(self.nid, out)
async def handle_commit(self, msg: Msg):
seq, dig = msg.seq, msg.digest
key = (seq, dig)
voters = self.commit_votes.setdefault(key, set())
voters.add(msg.sender)
if self.byzantine:
return
if self.preprepare_seen.get(seq) != dig:
return
if seq in self.dedicated:
return
if len(voters) >= self._q_commit():
self.dedicated[seq] = dig
We implement the core PBFT protocol logic, together with proposal dealing with and the pre-prepare and put together phases. We explicitly mannequin Byzantine equivocation by permitting malicious nodes to ship conflicting digests to totally different friends. We advance the protocol to the commit part as soon as the required put together quorum is reached.
async def run(self):
whereas self.working:
msg = await self.inbox.get()
if msg.typ == PREPREPARE:
await self.handle_preprepare(msg)
elif msg.typ == PREPARE:
await self.handle_prepare(msg)
elif msg.typ == COMMIT:
await self.handle_commit(msg)
def cease(self):
self.working = False
def pbft_params(n: int) -> int:
return (n - 1) // 3
async def run_single_consensus(
n: int,
malicious: int,
net_cfg: NetConfig,
payload: str = "tx: pay Alice->Bob 5",
seq: int = 1,
timeout_s: float = 2.0,
seed: Elective[int] = None
) -> Dict[str, object]:
if seed is just not None:
random.seed(seed)
f_max = pbft_params(n)
f = f_max
internet = Community(net_cfg)
cfg = NodeConfig(n=n, f=f, primary_id=0, view=0, timeout_s=timeout_s)
mal_set = set(random.pattern(vary(n), ok=min(malicious, n)))
nodes: Record[Node] = []
for i in vary(n):
node = Node(i, internet, cfg, byzantine=(i in mal_set))
internet.register(node)
nodes.append(node)
duties = [asyncio.create_task(node.run()) for node in nodes]
t0 = time.perf_counter()
await nodes[cfg.primary_id].suggest(payload, seq)
trustworthy = [node for node in nodes if not node.byzantine]
goal = max(1, len(trustworthy))
committed_honest = 0
latency = None
async def poll_commits():
nonlocal committed_honest, latency
whereas True:
committed_honest = sum(1 for node in trustworthy if seq in node.dedicated)
if committed_honest >= goal:
latency = time.perf_counter() - t0
return
await asyncio.sleep(0.005)
attempt:
await asyncio.wait_for(poll_commits(), timeout=timeout_s)
success = True
besides asyncio.TimeoutError:
success = False
latency = None
for node in nodes:
node.cease()
for process in duties:
process.cancel()
await asyncio.collect(*duties, return_exceptions=True)
digest_set = set(node.dedicated.get(seq) for node in trustworthy if seq in node.dedicated)
agreed = (len(digest_set) == 1) if success else False
return {
"n": n,
"f": f,
"malicious": malicious,
"mal_set": mal_set,
"success": success,
"latency_s": latency,
"honest_committed": committed_honest,
"honest_total": len(trustworthy),
"agreed_digest": agreed,
}
We full the PBFT state machine by processing commit messages and finalizing selections as soon as commit quorums are happy. We run the node occasion loop to repeatedly course of incoming messages asynchronously. We additionally embody lifecycle controls to securely cease nodes after every experiment run.
async def latency_sweep(
n: int = 10,
max_malicious: Elective[int] = None,
trials_per_point: int = 5,
timeout_s: float = 2.0,
net_cfg: Elective[NetConfig] = None,
seed: int = 7
):
if net_cfg is None:
net_cfg = NetConfig(min_delay_ms=5, max_delay_ms=35, drop_prob=0.0, reorder_prob=0.05)
if max_malicious is None:
max_malicious = n
outcomes = []
random.seed(seed)
for m in vary(0, max_malicious + 1):
latencies = []
successes = 0
agreements = 0
for t in vary(trials_per_point):
out = await run_single_consensus(
n=n,
malicious=m,
net_cfg=net_cfg,
timeout_s=timeout_s,
seed=seed + 1000*m + t
)
outcomes.append(out)
if out["success"]:
successes += 1
latencies.append(out["latency_s"])
if out["agreed_digest"]:
agreements += 1
avg_lat = sum(latencies)/len(latencies) if latencies else None
print(
f"malicious={m:second} | success={successes}/{trials_per_point} "
f"| avg_latency={avg_lat if avg_lat is just not None else 'NA'} "
f"| digest_agreement={agreements}/{successes if successes else 1}"
)
return outcomes
def plot_latency(outcomes: Record[Dict[str, object]], trials_per_point: int):
by_m = {}
for r in outcomes:
m = r["malicious"]
by_m.setdefault(m, []).append(r)
xs, ys = [], []
success_rate = []
for m in sorted(by_m.keys()):
group = by_m[m]
lats = [g["latency_s"] for g in group if g["latency_s"] is just not None]
succ = sum(1 for g in group if g["success"])
xs.append(m)
ys.append(sum(lats)/len(lats) if lats else float("nan"))
success_rate.append(succ / len(group))
plt.determine()
plt.plot(xs, ys, marker="o")
plt.xlabel("Variety of malicious (Byzantine) nodes")
plt.ylabel("Consensus latency (seconds) — avg over successes")
plt.title("PBFT Simulator: Latency vs Malicious Nodes")
plt.grid(True)
plt.present()
plt.determine()
plt.plot(xs, success_rate, marker="o")
plt.xlabel("Variety of malicious (Byzantine) nodes")
plt.ylabel("Success price")
plt.title("PBFT Simulator: Success Fee vs Malicious Nodes")
plt.ylim(-0.05, 1.05)
plt.grid(True)
plt.present()
async def foremost():
n = 10
trials = 6
f = pbft_params(n)
print(f"n={n} => PBFT theoretical max f = flooring((n-1)/3) = {f}")
print("Concept: security/liveness sometimes assumed when malicious <= f and timing assumptions maintain.n")
outcomes = await latency_sweep(
n=n,
max_malicious=min(n, f + 6),
trials_per_point=trials,
timeout_s=2.0,
net_cfg=NetConfig(min_delay_ms=5, max_delay_ms=35, drop_prob=0.0, reorder_prob=0.05),
seed=11
)
plot_latency(outcomes, trials)
await foremost()
We orchestrate large-scale experiments by sweeping throughout totally different numbers of malicious nodes and amassing latency statistics. We mixture outcomes to research consensus success charges and visualize system habits utilizing plots. We run the total experiment pipeline and observe how PBFT degrades because the variety of Byzantine faults approaches and exceeds theoretical limits.
In conclusion, we gained hands-on perception into how PBFT behaves past textbook ensures and the way adversarial stress impacts each latency and liveness in apply. We noticed how quorum thresholds implement security, why consensus breaks down as soon as Byzantine nodes exceed the tolerated sure, and the way asynchronous networks amplify these results. This implementation offers a sensible basis for experimenting with extra superior distributed-systems ideas, similar to view modifications, chief rotation, or authenticated messaging. It helps us construct instinct for the design trade-offs that underpin trendy blockchain and distributed belief methods.
Try the Full Codes here. Additionally, be happy to comply with us on Twitter and don’t neglect to hitch our 120k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
