class QwenChat:
def __init__(self, mannequin, processor, system=None, instruments=None):
self.mannequin, self.processor = mannequin, processor
self.tokenizer = processor.tokenizer
self.historical past: record[dict] = []
if system: self.historical past.append({"position": "system", "content material": system})
self.instruments = instruments
def consumer(self, content material): self.historical past.append({"position":"consumer","content material":content material}); return self
def assistant(self, content material, reasoning=""):
m = {"position":"assistant","content material":content material}
if reasoning: m["reasoning_content"] = reasoning
self.historical past.append(m); return self
def tool_result(self, title, end result):
self.historical past.append({"position":"software","title":title,
"content material": end result if isinstance(end result, str) else json.dumps(end result)})
return self
def _inputs(self, enable_thinking, preserve_thinking):
return self.processor.apply_chat_template(
self.historical past, instruments=self.instruments, tokenize=True,
add_generation_prompt=True, return_dict=True, return_tensors="pt",
enable_thinking=enable_thinking, preserve_thinking=preserve_thinking,
).to(self.mannequin.machine)
def generate(self, *, enable_thinking=True, preserve_thinking=False,
max_new_tokens=2048, preset="thinking_general",
stopping_criteria=None, append_to_history=True):
inp = self._inputs(enable_thinking, preserve_thinking)
cfg = SAMPLING[preset]
gk = dict(**inp, max_new_tokens=max_new_tokens, do_sample=True,
temperature=cfg["temperature"], top_p=cfg["top_p"], top_k=cfg["top_k"],
repetition_penalty=1.0,
pad_token_id=self.tokenizer.pad_token_id or self.tokenizer.eos_token_id)
if stopping_criteria isn't None: gk["stopping_criteria"] = stopping_criteria
with torch.inference_mode(): out = self.mannequin.generate(**gk)
uncooked = self.tokenizer.decode(out[0, inp["input_ids"].form[-1]:], skip_special_tokens=True)
assume, ans = split_thinking(uncooked)
if append_to_history: self.assistant(ans, reasoning=assume)
return assume, ans
def stream(self, *, enable_thinking=True, preserve_thinking=False,
max_new_tokens=2048, preset="thinking_general",
on_thinking=None, on_answer=None):
inp = self._inputs(enable_thinking, preserve_thinking)
cfg = SAMPLING[preset]
streamer = TextIteratorStreamer(self.tokenizer, skip_prompt=True, skip_special_tokens=True)
gk = dict(**inp, streamer=streamer, max_new_tokens=max_new_tokens, do_sample=True,
temperature=cfg["temperature"], top_p=cfg["top_p"], top_k=cfg["top_k"],
pad_token_id=self.tokenizer.pad_token_id or self.tokenizer.eos_token_id)
t = threading.Thread(goal=self.mannequin.generate, kwargs=gk); t.begin()
buf, in_think = "", enable_thinking
think_text, answer_text = "", ""
for piece in streamer:
buf += piece
if in_think:
if THINK_CLOSE in buf:
close_at = buf.index(THINK_CLOSE)
resid = buf[:close_at]
if on_thinking: on_thinking(resid[len(think_text):])
think_text = resid
buf = buf[close_at + len(THINK_CLOSE):]
in_think = False
if buf and on_answer: on_answer(buf)
answer_text = buf; buf = ""
else:
if on_thinking: on_thinking(piece)
think_text += piece
else:
if on_answer: on_answer(piece)
answer_text += piece
t.be a part of()
self.assistant(answer_text.strip(), reasoning=think_text.strip())
return think_text.strip(), answer_text.strip()
def save(self, path):
with open(path, "w") as f:
json.dump({"historical past": self.historical past, "instruments": self.instruments}, f, indent=2)
@classmethod
def load(cls, mannequin, processor, path):
with open(path) as f: knowledge = json.load(f)
c = cls(mannequin, processor, instruments=knowledge.get("instruments"))
c.historical past = knowledge["history"]; return c
class ThinkingBudget(StoppingCriteria):
def __init__(self, tokenizer, funds: int):
self.funds = funds
self.open_ids = tokenizer.encode(THINK_OPEN, add_special_tokens=False)
self.close_ids = tokenizer.encode(THINK_CLOSE, add_special_tokens=False)
self.begin = None
def _find(self, seq, needle):
n = len(needle)
for i in vary(len(seq)-n+1):
if seq[i:i+n] == needle: return i
return None
def __call__(self, input_ids, scores, **kwargs):
seq = input_ids[0].tolist()
if self.begin is None:
idx = self._find(seq, self.open_ids)
if idx isn't None: self.begin = idx + len(self.open_ids)
return False
if self._find(seq[self.start:], self.close_ids) isn't None: return False
return (len(seq) - self.begin) >= self.funds
TOOL_CALL_RE = re.compile(r"s*({.*?})s*", re.S)
def run_calculate(expr: str) -> str:
if any(c not in "0123456789+-*/().% " for c in expr):
return json.dumps({"error":"unlawful chars"})
strive: return json.dumps({"end result": eval(expr, {"__builtins__": {}}, {})})
besides Exception as e: return json.dumps({"error": str(e)})
_DOCS = {
"qwen3.6": "Qwen3.6-35B-A3B is a 35B MoE with 3B lively params and 262k native context.",
"deltanet": "Gated DeltaNet is a linear-attention variant utilized in Qwen3.6's hybrid layers.",
"moe": "Qwen3.6 makes use of 256 specialists with 8 routed + 1 shared per token.",
}
def run_search_docs(q):
hits = [v for k,v in _DOCS.items() if k in q.lower()]
return json.dumps({"outcomes": hits or ["no hits"]})
def run_get_time():
import datetime as dt
return json.dumps({"iso": dt.datetime.utcnow().isoformat()+"Z"})
TOOL_FNS = {
"calculate": lambda a: run_calculate(a["expression"]),
"search_docs": lambda a: run_search_docs(a["query"]),
"get_time": lambda a: run_get_time(),
}
TOOLS_SCHEMA = [
{"type":"function","function":{"name":"calculate","description":"Evaluate arithmetic.",
"parameters":{"type":"object","properties":{"expression":{"type":"string"}},"required":["expression"]}}},
{"kind":"operate","operate":{"title":"search_docs","description":"Search inside docs.",
"parameters":{"kind":"object","properties":{"question":{"kind":"string"}},"required":["query"]}}},
{"kind":"operate","operate":{"title":"get_time","description":"Get present UTC time.",
"parameters":{"kind":"object","properties":{}}}},
]
