!pip -q set up -U "protobuf<5" "flwr[simulation]" transformers peft speed up datasets sentencepiece
import torch
if torch.cuda.is_available():
!pip -q set up -U bitsandbytes
import os
os.environ["RAY_DISABLE_USAGE_STATS"] = "1"
os.environ["TOKENIZERS_PARALLELISM"] = "false"
import math
import random
import numpy as np
from typing import Dict, Record, Tuple, Non-obligatory
from torch.utils.information import DataLoader
from datasets import Dataset
import flwr as fl
from flwr.widespread import Context
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, DataCollatorForLanguageModeling
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
SEED = 7
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("System:", DEVICE)
GPU_MODEL_ID = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"
CPU_MODEL_ID = "distilgpt2"
MODEL_ID = GPU_MODEL_ID if DEVICE == "cuda" else CPU_MODEL_ID
MAX_LEN = 256 if DEVICE == "cuda" else 192
NUM_CLIENTS = 3
ROUNDS = 3
LOCAL_EPOCHS = 1
BATCH_SIZE = 2
GRAD_ACCUM = 4
LR = 2e-4
WARMUP_STEPS = 5
WEIGHT_DECAY = 0.0
LOG_EVERY = 10
CLIENT_TEXTS: Dict[int, List[str]] = {
0: [
"Policy memo: Employees must rotate on-call weekly and document incidents in the internal tracker.",
"Runbook: If latency spikes, check the database connection pool and recent deploys, then roll back if needed.",
"Security note: Never paste customer identifiers into public issue trackers. Use redacted tokens.",
"Engineering guideline: Prefer idempotent retries for event processing; avoid duplicate side-effects.",
"Postmortem template: impact, timeline, root cause, contributing factors, action items, owners, deadlines."
],
1: [
"Credit risk review: monitor delinquency curves by cohort and compare against seasonal baselines.",
"Fraud signals: repeated small authorizations, device changes, and sudden merchant-category shifts require review.",
"Portfolio strategy: tighten limits on volatile segments while maintaining service levels for stable accounts.",
"Operational note: reconcile chargebacks weekly and track win-rate by reason code.",
"Internal SOP: escalation path is analyst -> manager -> compliance for high-risk cases."
],
2: [
"Fleet ops: preventive maintenance reduces downtime; prioritize vehicles with repeated fault codes.",
"Dispatch note: optimize routes by time windows and driver hours to reduce empty miles.",
"Safety policy: enforce rest breaks and log inspections before long-haul trips.",
"Inventory update: track spare parts usage; reorder thresholds should reflect lead time and seasonality.",
"Customer SLA: late deliveries require proactive notifications and documented root cause."
],
}
for cid in record(CLIENT_TEXTS.keys()):
base = CLIENT_TEXTS[cid]
CLIENT_TEXTS[cid] = base + [f"Q: Summarize this for leadership. A: {t}" for t in base]
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
bnb_config: Non-obligatory[BitsAndBytesConfig] = None
if DEVICE == "cuda":
compute_dtype = torch.bfloat16 if torch.cuda.get_device_capability(0)[0] >= 8 else torch.float16
bnb_config = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_use_double_quant=True, bnb_4bit_compute_dtype=compute_dtype)
if "gpt2" in MODEL_ID.decrease():
TARGET_MODULES = ["c_attn", "c_proj"]
else:
TARGET_MODULES = ["q_proj", "k_proj", "v_proj", "o_proj"]
LORA_R = 16
LORA_ALPHA = 32
LORA_DROPOUT = 0.05
lora_config = LoraConfig(r=LORA_R, lora_alpha=LORA_ALPHA, lora_dropout=LORA_DROPOUT, bias="none", task_type="CAUSAL_LM", target_modules=TARGET_MODULES)
def model_primary_device(mannequin) -> torch.system:
return subsequent(mannequin.parameters()).system
def build_model_with_lora():
if DEVICE == "cuda":
mannequin = AutoModelForCausalLM.from_pretrained(MODEL_ID, device_map="auto", quantization_config=bnb_config, torch_dtype="auto")
mannequin = prepare_model_for_kbit_training(mannequin)
else:
mannequin = AutoModelForCausalLM.from_pretrained(MODEL_ID, torch_dtype=torch.float32)
mannequin.to("cpu")
mannequin = get_peft_model(mannequin, lora_config)
mannequin.prepare()
return mannequin
def make_dataset(texts: Record[str]) -> Dataset:
ds = Dataset.from_dict({"textual content": texts})
def tok(batch):
return tokenizer(batch["text"], truncation=True, max_length=MAX_LEN, padding="max_length")
ds = ds.map(tok, batched=True, remove_columns=["text"])
return ds
collator = DataCollatorForLanguageModeling(tokenizer=tokenizer, multilevel marketing=False)
def lora_state_keys(mannequin) -> Record[str]:
sd = mannequin.state_dict()
keys = sorted([k for k in sd.keys() if "lora_" in k])
if not keys:
elevate RuntimeError("No LoRA keys discovered. Your mannequin won't have the target_modules specified. " f"Present TARGET_MODULES={TARGET_MODULES}, MODEL_ID={MODEL_ID}")
return keys
def get_lora_ndarrays(mannequin) -> Record[np.ndarray]:
sd = mannequin.state_dict()
keys = lora_state_keys(mannequin)
return [sd[k].detach().float().cpu().numpy() for ok in keys]
def set_lora_ndarrays(mannequin, arrays: Record[np.ndarray]) -> None:
keys = lora_state_keys(mannequin)
if len(keys) != len(arrays):
elevate ValueError(f"Mismatch: received {len(arrays)} arrays however anticipated {len(keys)}.")
sd = mannequin.state_dict()
for ok, arr in zip(keys, arrays):
t = torch.from_numpy(arr).to(sd[k].system).to(sd[k].dtype)
sd[k].copy_(t)
def cosine_warmup_lr(step: int, total_steps: int, base_lr: float, warmup_steps: int) -> float:
if step < warmup_steps:
return base_lr * (step + 1) / max(1, warmup_steps)
progress = (step - warmup_steps) / max(1, total_steps - warmup_steps)
return base_lr * 0.5 * (1.0 + math.cos(math.pi * progress))
@torch.no_grad()
def eval_loss(mannequin, ds: Dataset, max_batches: int = 20) -> float:
mannequin.eval()
dl = DataLoader(ds, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collator)
losses = []
dev = model_primary_device(mannequin)
for i, batch in enumerate(dl):
if i >= max_batches:
break
batch = {ok: v.to(dev) for ok, v in batch.gadgets()}
out = mannequin(**batch, labels=batch["input_ids"])
losses.append(float(out.loss.detach().cpu()))
mannequin.prepare()
return float(np.imply(losses)) if losses else float("nan")
def train_one_client_round(mannequin, ds: Dataset, epochs: int, lr: float, grad_accum: int, warmup_steps: int) -> Tuple[float, int]:
dl = DataLoader(ds, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collator)
total_steps = max(1, (len(dl) * epochs) // max(1, grad_accum))
step = 0
optimizer = torch.optim.AdamW(mannequin.parameters(), lr=lr, weight_decay=WEIGHT_DECAY)
optimizer.zero_grad(set_to_none=True)
working = []
examples = 0
dev = model_primary_device(mannequin)
for _ in vary(epochs):
for bi, batch in enumerate(dl):
batch = {ok: v.to(dev) for ok, v in batch.gadgets()}
out = mannequin(**batch, labels=batch["input_ids"])
loss = out.loss / grad_accum
loss.backward()
working.append(float(loss.detach().cpu()) * grad_accum)
examples += batch["input_ids"].form[0]
if (bi + 1) % grad_accum == 0:
lr_t = cosine_warmup_lr(step, total_steps, lr, warmup_steps)
for pg in optimizer.param_groups:
pg["lr"] = lr_t
optimizer.step()
optimizer.zero_grad(set_to_none=True)
step += 1
if step % LOG_EVERY == 0:
print(f" step={step}/{total_steps} loss={np.imply(working[-LOG_EVERY:]):.4f} lr={lr_t:.2e}")
return float(np.imply(working)) if working else float("nan"), examples
