#!/usr/bin/env python3 import asyncio import json import os import sys import re import atexit from collections import deque from typing import Dict, Any, Optional, List, AsyncGenerator import httpx # --- Target Configurations from Shell Environment --- BACKEND_URL = os.environ.get("AMDA_BACKEND_URL", "http://127.0.0.1:8080") SYSTEM_PROMPT_PATH = os.environ.get("AMDA_SYSTEM_PROMPT", "agents/default_agent.md") GRAMMAR_FILE_PATH = os.environ.get("AMDA_GRAMMAR_FILE", "tools/tool_rules.gbnf") MAX_TOKENS = 1024 CONTEXT_LIMIT = 16384 SAFETY_THRESHOLD = int(CONTEXT_LIMIT * 0.80) DEFAULT_MAX_TURNS = 5 TERM_WIDTH = 85 # --- ANSI Styling Palette --- CYAN = "\033[96m" GREEN = "\033[92m" YELLOW = "\033[93m" RED = "\033[91m" RESET = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" CLEAR = "\033[2J\033[H" DIVIDER = f"{DIM}{'─' * TERM_WIDTH}{RESET}" # --- Specialist Persona Mapping --- PERSONA_MAP = { "geoscaper/team/planner": "agents/modules/geoscaper/team/planner.md", "geoscaper/team/builder": "agents/modules/geoscaper/team/builder.md", "geoscaper/team/reviewer": "agents/modules/geoscaper/team/reviewer.md", } # --- System-level Virtual Instructions for Master Orchestrator --- MASTER_ORCHESTRATOR_INSTRUCTION = ( "\n\n[SYSTEM INSTRUCTION] You are the Master Orchestrator. " "Your job is to route user requests to the specialized agent personas. " "To do this, use the 'activate_persona' virtual tool when needed.\n" "Available personas:\n" "- 'geoscaper/team/planner': For strategic project planning, staging config, and task queuing.\n" "- 'geoscaper/team/builder': For compiling HTML pages and generating styles/code.\n" "- 'geoscaper/team/reviewer': For running quality assurance and structure audits.\n\n" "To call a persona, output a valid JSON conforming to the grammar:\n" "{\n" " \"thought\": \"Your internal explanation for choosing this persona.\",\n" " \"tool_call\": {\n" " \"name\": \"activate_persona\",\n" " \"arguments\": {\"persona\": \"geoscaper/team/planner\"}\n" " }\n" "}\n" "Always state in your 'thought' when you want to activate or hand off to a persona.\n" "If you need to speak directly to the user, you MUST use the 'ask_user' tool." ) class JSONStreamFilter: """Pass-through streaming filter that simply unescapes the 'thought' field.""" def __init__(self, target_key: str = "thought"): self.buffer = "" self.in_value = False self.escaped = False self.target_pattern = f'"{target_key}"' def feed(self, delta: str) -> str: if not delta: return "" self.buffer += delta if not self.in_value: key_idx = self.buffer.find(self.target_pattern) if key_idx != -1: colon_idx = self.buffer.find(':', key_idx) if colon_idx != -1: quote_idx = self.buffer.find('"', colon_idx) if quote_idx != -1: self.in_value = True remaining = self.buffer[quote_idx + 1:] self.buffer = "" return self._process_chars(remaining) return "" else: return self._process_chars(delta) def _process_chars(self, text: str) -> str: output = [] for char in text: if self.escaped: if char == 'n': output.append('\n') elif char == 't': output.append('\t') elif char == 'r': output.append('\r') else: output.append(char) self.escaped = False elif char == '\\': self.escaped = True elif char == '"': self.in_value = False break else: output.append(char) return "".join(output) def extract_json_from_text(text: str) -> Optional[Dict[str, Any]]: """Extracts and self-heals incomplete or malformed JSON from LLM output.""" cleaned = text.strip() if not cleaned: return None if cleaned.startswith("```json"): cleaned = cleaned[7:] elif cleaned.startswith("```"): cleaned = cleaned[3:] if cleaned.endswith("```"): cleaned = cleaned[:-3] cleaned = cleaned.strip() start = cleaned.find('{') if start == -1: return None json_str = cleaned[start:] repaired_chars = [] in_string = False escape = False brace_stack = [] for char in json_str: if escape: repaired_chars.append(char) escape = False continue if char == '\\': repaired_chars.append(char) escape = True continue if char == '"': in_string = not in_string repaired_chars.append(char) continue if in_string: if char == '\n': repaired_chars.append('\\n') elif char == '\t': repaired_chars.append('\\t') else: repaired_chars.append(char) continue if char == '{': brace_stack.append('}') elif char == '[': brace_stack.append(']') elif char == '}': if brace_stack and brace_stack[-1] == '}': brace_stack.pop() else: continue elif char == ']': if brace_stack and brace_stack[-1] == ']': brace_stack.pop() else: continue repaired_chars.append(char) if in_string: repaired_chars.append('"') while brace_stack: repaired_chars.append(brace_stack.pop()) healed_str = "".join(repaired_chars) try: return json.loads(healed_str, strict=False) except json.JSONDecodeError: try: thought_match = re.search(r'"thought"\s*:\s*"((?:[^"\\]|\\.)*)"', healed_str, re.DOTALL) if thought_match: thought_content = thought_match.group(1).replace('\\n', '\n') tool_match = re.search(r'"tool_call"\s*:\s*\{\s*"name"\s*:\s*"([^"]+)"\s*,\s*"arguments"\s*:\s*(\{.*?\}\s*\})', healed_str, re.DOTALL) tool_call = None if tool_match: try: tool_call = { "name": tool_match.group(1), "arguments": json.loads(tool_match.group(2), strict=False) } except Exception: pass return {"thought": thought_content, "tool_call": tool_call} except Exception: pass return None async def count_tokens(client: httpx.AsyncClient, text: str) -> int: """Queries llama-server's native /tokenize endpoint to count tokens precisely.""" try: response = await client.post(f"{BACKEND_URL}/tokenize", json={"content": text}) if response.status_code == 200: tokens = response.json().get("tokens", []) return len(tokens) except Exception: pass # Safe fallback estimation (roughly 4 characters per token) return max(1, len(text) // 4) def log_telemetry(tag: str, content: str): """Appends diagnostic information to the telemetry log without spamming stdout.""" try: log_dir = "core/logs" os.makedirs(log_dir, exist_ok=True) with open(os.path.join(log_dir, "telemetry.log"), "a", encoding="utf-8") as lf: lf.write(f"[{tag}] {content}\n" + "="*50 + "\n") except Exception: pass class CPUIntentRouter: def __init__(self, model_name: str = "nomic-ai/nomic-embed-text-v1.5"): self.model = None self.model_name = model_name self.prototypes = { "geoscaper/team/planner": [ "initialize workspace project", "set up the design and staging configurations", "queue landing page task", "create home page and set background styles", "setup showcase website and launch planner" ], "geoscaper/team/builder": [ "compile the home page and generate html", "write responsive html code structure", "compile showcase project", "builder compile index.html", "build showcase webpage" ], "geoscaper/team/reviewer": [ "audit compiled home page", "run structural check and tag balance check", "verify dead links and page validation", "quality and structural audit check" ] } self.prototype_embeddings = {} def lazy_init(self): """Lazy load sentence-transformers on CPU to preserve Vulkan resources for 14B model.""" if self.model is not None: return try: import numpy as np from sentence_transformers import SentenceTransformer print(f"\n{CYAN}[Init] Loading CPU embedding model: {self.model_name}...{RESET}") self.model = SentenceTransformer(self.model_name, trust_remote_code=True, device="cpu") for persona, sentences in self.prototypes.items(): # Formulate search document input prefix for nomic-embed-text inputs = [f"search_document: {s}" for s in sentences] embeddings = self.model.encode(inputs, convert_to_numpy=True) # L2 normalize norms = np.linalg.norm(embeddings, axis=1, keepdims=True) self.prototype_embeddings[persona] = embeddings / np.maximum(norms, 1e-12) print(f"{GREEN}[Ready] CPU Embedding model initialized.{RESET}") except Exception as e: # Handle Fedora/RedHat systems missing ctypes gracefully if "No module named '_ctypes'" in str(e): print(f"\n{YELLOW}[Diagnostic] CPU embedding router skipped: Missing '_ctypes' standard module.{RESET}") print(f" On Fedora/RedHat, you can fix this by running:") print(f" {BOLD}sudo dnf install libffi-devel{RESET}") print(f" and then rebuilding/reinstalling your Python binary or environment (e.g., pyenv).") else: print(f"\n{RED}[Warn] Could not load CPU embedding router: {str(e)}{RESET}") self.model = False def route_intent(self, user_query: str, threshold: float = 0.70) -> Optional[str]: self.lazy_init() if not self.model or not self.prototype_embeddings: # Fallback to simple keyword matching if sentence-transformers is missing user_lower = user_query.lower() if any(x in user_lower for x in ["plan", "design", "workspace"]): return "geoscaper/team/planner" if any(x in user_lower for x in ["build", "compile", "html"]): return "geoscaper/team/builder" if any(x in user_lower for x in ["audit", "review", "check"]): return "geoscaper/team/reviewer" return None import numpy as np try: query_input = f"search_query: {user_query}" query_emb = self.model.encode([query_input], convert_to_numpy=True)[0] q_norm = np.linalg.norm(query_emb) if q_norm > 0: query_emb = query_emb / q_norm best_persona = None best_score = -1.0 for persona, proto_embs in self.prototype_embeddings.items(): scores = np.dot(proto_embs, query_emb) max_score = np.max(scores) if max_score > best_score: best_score = max_score best_persona = persona if best_score >= threshold: return best_persona except Exception: pass return None class ActiveAgentState: def __init__(self): self.client = httpx.AsyncClient(timeout=120.0) self.grammar = None self.message_history = deque() self.mcp_server_process = None self.jsonrpc_id = 1 self.current_persona = "orchestrator" # <-- Track the active persona # Self-healing GBNF initialization (forced overwrite to ensure strict mode) try: os.makedirs(os.path.dirname(GRAMMAR_FILE_PATH) or ".", exist_ok=True) with open(GRAMMAR_FILE_PATH, "w") as f: f.write("""root ::= object object ::= "{" ws "\\"thought\\":" ws string "," ws "\\"tool_call\\":" ws tool-call ws "}" tool-call ::= "{" ws "\\"name\\":" ws string "," ws "\\"arguments\\":" ws tool-args ws "}" tool-args ::= "{" ws ( pair ("," ws pair)* )? ws "}" pair ::= string ws ":" ws value value ::= string | number | "true" | "false" | "null" | tool-args | array array ::= "[" ws ( value ("," ws value)* )? ws "]" string ::= "\\"" ([^"\\\\\\x00-\\x1F] | "\\\\" (["\\\\/bfnrt] | "u" [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F]))* "\\"" number ::= "-"? ([0-9]+ | [0-9]* "." [0-9]+) ([eE] [+-]? [0-9]+)? ws ::= [ \\t\\n\\r]* """) print(f"{GREEN}[Init] Dynamically recovered missing GBNF file at: {GRAMMAR_FILE_PATH}{RESET}") except Exception as e: print(f"{RED}[Init] Failed to bootstrap GBNF file: {str(e)}{RESET}") if os.path.exists(GRAMMAR_FILE_PATH): with open(GRAMMAR_FILE_PATH, "r") as f: self.grammar = f.read() if os.path.exists(SYSTEM_PROMPT_PATH): with open(SYSTEM_PROMPT_PATH, "r") as f: self.system_prompt = f.read().strip() else: self.system_prompt = "You are a helpful assistant. Output must strictly match GBNF JSON grammar." # Inject Master Orchestrator persona knowledge self.system_prompt += MASTER_ORCHESTRATOR_INSTRUCTION self.message_history.append({"role": "system", "content": self.system_prompt}) atexit.register(self.shutdown_mcp_server) async def _drain_mcp_stderr(self, process: asyncio.subprocess.Process): """Asynchronously drains the stderr of the MCP subprocess to prevent buffer-blocking.""" try: while True: line = await process.stderr.readline() if not line: break decoded = line.decode('utf-8', errors='replace').strip() if decoded: print(f"{DIM}{RED}[MCP STDERR] {decoded}{RESET}") log_telemetry("MCP_STDERR", decoded) except Exception as e: log_telemetry("MCP_STDERR_CRASH", str(e)) async def launch_mcp_server(self) -> bool: """Spawns the long-running geoscaper MCP server subprocess over persistent pipes.""" server_path = None script_dir = os.path.dirname(os.path.abspath(__file__)) workspace_root = os.getcwd() # Build comprehensive resolution path checklist candidate_paths = [ os.path.join(workspace_root, "tools", "geoscaper.py"), os.path.join(workspace_root, "agents", "tools", "geoscaper.py"), os.path.join(workspace_root, "agents", "modules", "geoscaper", "geoscaper.py"), os.path.join(workspace_root, "geoscaper.py"), os.path.join(workspace_root, "core", "geoscaper.py"), os.path.join(script_dir, "geoscaper.py"), os.path.join(script_dir, "tools", "geoscaper.py"), os.path.join(script_dir, "..", "tools", "geoscaper.py"), os.path.join(script_dir, "..", "agents", "tools", "geoscaper.py"), os.path.join(script_dir, "..", "agents", "modules", "geoscaper", "geoscaper.py"), ] # Deduplicate paths keeping order seen = set() deduped_candidates = [] for p in candidate_paths: normalized = os.path.normpath(p) if normalized not in seen: seen.add(normalized) deduped_candidates.append(normalized) for p in deduped_candidates: if os.path.exists(p): server_path = p break if not server_path: print(f"{RED}[ERR] MCP server binary 'geoscaper.py' not found in workspace.{RESET}") print(f"{DIM}Searched the following locations:{RESET}") for p in deduped_candidates: print(f"{DIM} - {p}{RESET}") return False try: # Set up environment variables so subprocess can resolve local modules like lib.* env = os.environ.copy() additional_paths = [workspace_root, os.path.dirname(server_path)] if "PYTHONPATH" in env: env["PYTHONPATH"] = os.pathsep.join(additional_paths) + os.pathsep + env["PYTHONPATH"] else: env["PYTHONPATH"] = os.pathsep.join(additional_paths) self.mcp_server_process = await asyncio.create_subprocess_exec( sys.executable, server_path, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env ) # Route stderr through background task to prevent deadlock asyncio.create_task(self._drain_mcp_stderr(self.mcp_server_process)) return True except Exception as e: print(f"{RED}[ERR] Failed to boot persistent MCP tool server: {str(e)}{RESET}") return False async def call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> str: """Sends a JSON-RPC 2.0 tools/call request to the persistent MCP server and receives response.""" if not self.mcp_server_process or self.mcp_server_process.returncode is not None: await self.launch_mcp_server() self.jsonrpc_id += 1 request = { "jsonrpc": "2.0", "method": "tools/call", "params": { "name": tool_name, "arguments": arguments }, "id": self.jsonrpc_id } try: payload = json.dumps(request) + "\n" self.mcp_server_process.stdin.write(payload.encode('utf-8')) await self.mcp_server_process.stdin.drain() response = None for _ in range(50): # Bounded retry to skip arbitrary stdout debug prints stdout_line = await self.mcp_server_process.stdout.readline() if not stdout_line: return json.dumps({"status": "error", "reason": "MCP tool server disconnected abruptly."}) try: parsed = json.loads(stdout_line.decode('utf-8').strip()) if "jsonrpc" in parsed and parsed.get("id") == request["id"]: response = parsed break except json.JSONDecodeError: continue if not response: return json.dumps({"status": "error", "reason": "Failed to parse valid JSON-RPC response from MCP server."}) if "error" in response: return json.dumps({"status": "error", "reason": response["error"].get("message")}) content_items = response.get("result", {}).get("content", []) for item in content_items: if item.get("type") == "text": return item.get("text") return json.dumps({"status": "error", "reason": "Invalid MCP response envelope."}) except Exception as e: return json.dumps({"status": "error", "reason": f"MCP communication error: {str(e)}"}) def shutdown_mcp_server(self): """Teardown hook ensuring the background MCP server process is killed.""" if self.mcp_server_process and self.mcp_server_process.returncode is None: try: self.mcp_server_process.terminate() except Exception: pass async def budget(self): while True: total_tokens = 0 for msg in self.message_history: if "_token_count" not in msg: msg["_token_count"] = await count_tokens(self.client, msg["content"]) total_tokens += msg["_token_count"] if total_tokens <= SAFETY_THRESHOLD or len(self.message_history) <= 3: break if len(self.message_history) >= 4: sys_msg = self.message_history.popleft() self.message_history.popleft() if self.message_history and self.message_history[0]["role"] == "assistant": self.message_history.popleft() self.message_history.appendleft(sys_msg) async def infer(self) -> AsyncGenerator[Dict[str, Any], None]: clean_history = [{"role": m["role"], "content": m["content"]} for m in self.message_history] payload = { "messages": clean_history, "max_tokens": MAX_TOKENS, "stream": True, "temperature": 0.2, } if self.grammar: payload["grammar"] = self.grammar full_buffer = "" try: async with self.client.stream("POST", f"{BACKEND_URL}/v1/chat/completions", json=payload) as response: if response.status_code != 200: await response.aread() yield {"type": "error", "raw": f"HTTP {response.status_code}"} return is_streaming = "text/event-stream" in response.headers.get("content-type", "").lower() if is_streaming: async for line in response.aiter_lines(): line = line.strip() if not line or not line.startswith("data:"): continue data_str = line[5:].strip() if data_str == "[DONE]": continue try: chunk = json.loads(data_str) delta = chunk["choices"][0]["delta"].get("content") or chunk["choices"][0]["delta"].get("reasoning_content") or "" full_buffer += delta yield {"type": "token", "delta": delta} except Exception: continue else: await response.aread() try: result = response.json() content = result["choices"][0]["message"].get("content") or result["choices"][0]["message"].get("reasoning_content") or "" full_buffer = content yield {"type": "token", "delta": content} except Exception as e: yield {"type": "error", "raw": str(e)} return except Exception as e: yield {"type": "error", "raw": str(e)} return parsed_json = extract_json_from_text(full_buffer) if parsed_json is not None: yield {"type": "final", "data": parsed_json, "raw": full_buffer} else: yield {"type": "error", "raw": "JSON Parse Failure", "raw_buffer": full_buffer} async def close(self): await self.client.aclose() self.shutdown_mcp_server() async def async_input(prompt_string: str) -> str: loop = asyncio.get_running_loop() return await loop.run_in_executor(None, input, prompt_string) async def app_loop(): engine = ActiveAgentState() print(f"{CYAN}[Init] Spawning persistent Model Context Protocol (MCP) server...{RESET}") if not await engine.launch_mcp_server(): print(f"{RED}[ERR] Tool engine aborted due to MCP failure.{RESET}") await engine.close() return try: sys.stdout.write(CLEAR) print(f"{CYAN}{BOLD}=== Model Context Protocol TUI Shell ==={RESET}") print(f"{DIM}Transport: Persistent STDIO | RPC: JSON-RPC 2.0{RESET}") print(DIVIDER) router = CPUIntentRouter() while True: user_input = await async_input(f"{YELLOW}{BOLD}User >{RESET} ") if user_input.lower() in ["exit", "quit", "q"]: break if not user_input.strip(): continue # Semantic Intent Routing Check routed_persona = router.route_intent(user_input, threshold=0.72) if routed_persona and engine.current_persona != routed_persona: persona_name = routed_persona.split("/")[-1].capitalize() print(f"\n{CYAN}{BOLD}[Router]{RESET} Input matches intent for {CYAN}{BOLD}{persona_name}{RESET} (Semantic match found).") confirm = await async_input(f" Activate the {persona_name} persona to proceed? (Y/n): ") if not confirm.strip() or confirm.lower().startswith('y'): print(f"{CYAN}[Router] Dynamically switching context to {persona_name}...{RESET}") persona_path = PERSONA_MAP.get(routed_persona) if persona_path and os.path.exists(persona_path): try: with open(persona_path, "r", encoding="utf-8") as pf: new_system = pf.read().strip() engine.system_prompt = new_system engine.message_history[0] = {"role": "system", "content": engine.system_prompt} engine.current_persona = routed_persona print(f"{GREEN}[Router] {persona_name} Activated successfully.{RESET}\n") log_telemetry("ROUTER", f"Activated persona: {persona_name} from intent match.") except Exception as e: print(f"{RED}[ERR] Failed to load persona: {str(e)}{RESET}\n") log_telemetry("ROUTER_ERROR", f"Failed to load persona: {str(e)}") else: print(f"{RED}[ERR] Persona template not found at: {persona_path}{RESET}\n") log_telemetry("ROUTER_ERROR", f"Persona template not found at: {persona_path}") engine.message_history.append({"role": "user", "content": user_input}) # Autonomous agent reasoning loop while True: await engine.budget() print(f"\n{GREEN}{BOLD}Assistant >{RESET} ", end="") sys.stdout.flush() final_payload = None stream_filter = JSONStreamFilter() async for update in engine.infer(): if update["type"] == "token": clean_delta = stream_filter.feed(update["delta"]) if clean_delta: sys.stdout.write(clean_delta) sys.stdout.flush() elif update["type"] == "final": final_payload = update["data"] raw_buffer = update.get("raw", "") log_telemetry(engine.current_persona, f"RAW INFERENCE:\n{raw_buffer}\n\nPARSED JSON:\n{json.dumps(final_payload, indent=2)}") elif update["type"] == "error": print(f"\n{RED}[ERR] {update['raw']}{RESET}") raw_buffer = update.get("raw_buffer", "") log_telemetry(f"{engine.current_persona}_ERROR", f"RAW INFERENCE:\n{raw_buffer}\nHTTP ERROR: {update['raw']}") break print(f"\n{DIVIDER}") if not final_payload: break # Stop execution loop on inference error tool_call = final_payload.get("tool_call") # Cognitive Fail-Safe / Heuristic Backup # If the model failed to output a structured tool_call block but explicitly stated its # intention to activate a persona inside its "thought", we automatically heal and execute the action. if not tool_call or tool_call == "null" or (isinstance(tool_call, dict) and not tool_call.get("name")): thought_text = final_payload.get("thought", "").lower() # Robust root-word check matching diverse conjugations (activating, handoff, switch, etc.) is_switch = any(x in thought_text for x in ["activat", "switch", "hand", "persona", "run"]) if "planner" in thought_text and is_switch and engine.current_persona != "geoscaper/team/planner": tool_call = { "name": "activate_persona", "arguments": {"persona": "geoscaper/team/planner"} } elif "builder" in thought_text and is_switch and engine.current_persona != "geoscaper/team/builder": tool_call = { "name": "activate_persona", "arguments": {"persona": "geoscaper/team/builder"} } elif any(x in thought_text for x in ["reviewer", "audit"]) and is_switch and engine.current_persona != "geoscaper/team/reviewer": tool_call = { "name": "activate_persona", "arguments": {"persona": "geoscaper/team/reviewer"} } else: print(f"{YELLOW}[Fail-Safe] Agent forgot to execute a tool. Forcing retry...{RESET}") log_telemetry(engine.current_persona, f"FAIL-SAFE TRIGGERED: Agent forgot to execute a tool. RAW INFERENCE: {json.dumps(final_payload)}") engine.message_history.append({"role": "assistant", "content": json.dumps(final_payload)}) engine.message_history.append({"role": "user", "content": "[SYSTEM DICTATE: You failed to output a tool_call. You MUST invoke a tool on every turn. If you need to speak to the user or ask a question, use the 'ask_user' tool. Do not output tool_call: null.]"}) continue if tool_call and isinstance(tool_call, dict) and tool_call.get("name"): tool_name = tool_call["name"] tool_args = tool_call.get("arguments", {}) print(f"\n{YELLOW}{BOLD}▶ Executing MCP Call:{RESET} {tool_name}") log_telemetry(engine.current_persona, f"TOOL CALL: {tool_name}\nARGS: {json.dumps(tool_args, indent=2)}") # Dynamic Persona Handoff (Orchestrator level virtual tools) if tool_name == "activate_persona": persona_key = tool_args.get("persona") persona_path = PERSONA_MAP.get(persona_key) if persona_path and os.path.exists(persona_path): try: with open(persona_path, "r", encoding="utf-8") as pf: new_system = pf.read().strip() engine.system_prompt = new_system engine.message_history[0] = {"role": "system", "content": engine.system_prompt} engine.current_persona = persona_key # <-- Track active persona tool_result = json.dumps({"status": "success", "message": f"Activated persona: {persona_key}"}) except Exception as e: tool_result = json.dumps({"status": "error", "reason": f"Failed to load persona file: {str(e)}"}) else: tool_result = json.dumps({"status": "error", "reason": f"Persona '{persona_key}' not found."}) elif tool_name == "deactivate_persona": try: if os.path.exists(SYSTEM_PROMPT_PATH): with open(SYSTEM_PROMPT_PATH, "r", encoding="utf-8") as pf: new_system = pf.read().strip() else: new_system = "You are a helpful assistant. Output must strictly match GBNF JSON grammar." new_system += MASTER_ORCHESTRATOR_INSTRUCTION engine.system_prompt = new_system engine.message_history[0] = {"role": "system", "content": engine.system_prompt} engine.current_persona = "orchestrator" # <-- Reset to orchestrator tool_result = json.dumps({"status": "success", "message": "Deactivated persona. Returned to Master Orchestrator."}) except Exception as e: tool_result = json.dumps({"status": "error", "reason": f"Failed to restore master persona: {str(e)}"}) elif tool_name == "ask_user": message = tool_args.get("message", "") print(f"\n{CYAN}{BOLD}Agent Message:{RESET} {message}") engine.message_history.append({"role": "assistant", "content": json.dumps(final_payload)}) break # Exit autonomous loop and wait for next user command else: # Execute the tool via Model Context Protocol JSON-RPC tool_result = await engine.call_mcp_tool(tool_name, tool_args) print(f"{GREEN}{BOLD}◀ Output:{RESET}\n {tool_result}\n{DIVIDER}") log_telemetry(engine.current_persona, f"TOOL RESULT: {tool_name}\nOUTPUT:\n{tool_result}") engine.message_history.append({"role": "assistant", "content": json.dumps(final_payload)}) # Wrap the raw tool output in a SYSTEM DICTATE to force autonomous continuation tool_content = ( f"[SYSTEM: Tool Execution Result]\n" f"{tool_result}\n\n" f"[SYSTEM DICTATE: Proceed to the next step of your instructions. You MUST output a valid JSON tool_call on this turn. If your instructions require an interview or question, use the 'ask_user' tool immediately. Otherwise, invoke the next autonomous tool.]" ) engine.message_history.append({"role": "user", "content": tool_content}) # Continue loop to process the tool output autonomously continue else: engine.message_history.append({"role": "assistant", "content": json.dumps(final_payload)}) break # Exit autonomous loop and wait for next user command except KeyboardInterrupt: print(f"\n{RED}[!] Interrupted.{RESET}") engine.shutdown_mcp_server() os._exit(0) if __name__ == "__main__": if not os.path.exists("agents"): print(f"{RED}[ERR] Must run from workspace root containing '/agents'.{RESET}") sys.exit(1) asyncio.run(app_loop())