This commit is contained in:
Me
2026-05-26 00:31:45 -07:00
parent 87bfcbd5bd
commit f59e8f3891
14 changed files with 1722 additions and 0 deletions

View File

View File

@@ -0,0 +1,79 @@
import os
import re
import hashlib
from html.parser import HTMLParser
from .security import get_safe_path, PROJECTS_DIR
from .state import load_ledger, update_failure_count
class TagBalanceParser(HTMLParser):
def __init__(self):
super().__init__()
self.stack = []
self.errors = []
# Exclude 'p' and 'font' to avoid false positives on standard HTML5 tag-omissions
self.tracked_tags = {'table', 'tr', 'td', 'div', 'nav', 'header', 'footer', 'main', 'section'}
def handle_starttag(self, tag, attrs):
if tag in self.tracked_tags: self.stack.append((tag, self.getpos()))
def handle_endtag(self, tag):
if tag in self.tracked_tags:
if not self.stack:
self.errors.append(f"Orphaned </{tag}> near line {self.getpos()[0]}")
else:
last_open, pos = self.stack.pop()
if last_open != tag:
self.errors.append(f"Mismatched tag: <{last_open}> (L{pos[0]}) closed by </{tag}> (L{self.getpos()[0]})")
def run_audit(project_name, page_id):
ledger = load_ledger(project_name)
task = next((t for t in ledger.get("task_queue", []) if t["page_id"] == page_id), None)
if not task: return {"status": "error", "reason": "Task not found."}
dest_file = get_safe_path(PROJECTS_DIR, project_name, "dist", task["filename"])
if not os.path.exists(dest_file):
return {"status": "error", "reason": "Compiled file not found on disk."}
# 1. Hash Freshness Check
with open(dest_file, "r", encoding="utf-8") as f:
disk_content = f.read()
disk_hash = hashlib.sha256(disk_content.encode('utf-8')).hexdigest()
ledger_hash = ledger.get("hashes", {}).get(page_id, "")
if disk_hash != ledger_hash:
return {"status": "error", "reason": "Security Fault: Disk hash does not match Ledger hash. Stale or tampered file."}
errors = []
# 2. Tag Balance Audit
parser = TagBalanceParser()
try:
parser.feed(disk_content)
errors.extend(parser.errors)
if parser.stack:
for unclosed, pos in parser.stack:
errors.append(f"Unclosed <{unclosed}> near line {pos[0]}")
except Exception as e:
errors.append(f"Parser failure: {str(e)}")
# 3. Link Matrix Audit
allowed_files = {t["filename"] for t in ledger.get("task_queue", [])}
links = re.findall(r'href=["\']([^"\']+)["\']', disk_content, re.IGNORECASE)
for link in links:
if not link.startswith(("http", "https", "mailto:", "#")) and link not in allowed_files:
errors.append(f"Dead Link Found: '{link}' is not in the project task queue.")
# 4. Three-Strike Circuit Breaker
if errors:
strikes = update_failure_count(project_name, page_id, increment=True)
if strikes >= 3:
return {
"status": "circuit_breaker",
"reason": f"Page '{page_id}' failed structural audit 3 consecutive times. Escalating to human.",
"errors": errors
}
return {"status": "error", "strikes": strikes, "errors": errors}
# Reset strikes on success
update_failure_count(project_name, page_id, increment=False)
return {"status": "success", "message": "Audit passed. File is structurally sound."}

View File

@@ -0,0 +1,61 @@
import os
import hashlib
from .security import get_safe_path, PROJECTS_DIR
from .state import load_staging, load_ledger, save_ledger
import sys
def compile_page(project_name, page_id):
staging = load_staging(project_name)
ledger = load_ledger(project_name)
task = next((t for t in ledger.get("task_queue", []) if t["page_id"] == page_id), None)
if not task:
return {"status": "error", "reason": f"Task '{page_id}' not found in task_queue."}
# 1. Source File Reader
src_file = get_safe_path(PROJECTS_DIR, project_name, "src", task["filename"])
if not os.path.exists(src_file):
err_msg = f"Source file '{task['filename']}' not found in src directory."
print(f"[Compiler] Error: {err_msg}", file=sys.stderr)
return {"status": "error", "reason": err_msg}
with open(src_file, "r", encoding="utf-8") as bf:
content_html = bf.read()
# 2. Assemble Document
styles = staging.get("style_tokens", {})
full_document = f"""<!DOCTYPE html>
<html>
<head>
<title>{task.get('title', 'Project Component')}</title>
<style>
body {{
background-color: {styles.get('background_color', '#FFFFFF')};
color: {styles.get('text_color', '#000000')};
font-family: {styles.get('font_family', 'sans-serif')};
}}
</style>
</head>
<body>
{content_html}
</body>
</html>"""
# 3. Write & Hash
dest_dir = get_safe_path(PROJECTS_DIR, project_name, "dist")
os.makedirs(dest_dir, exist_ok=True) # Create nested build directories inside the module workspace safely
dest_file = get_safe_path(PROJECTS_DIR, project_name, "dist", task["filename"])
os.makedirs(os.path.dirname(dest_file), exist_ok=True)
with open(dest_file, "w", encoding="utf-8") as f:
f.write(full_document)
file_hash = hashlib.sha256(full_document.encode('utf-8')).hexdigest()
# 4. Update Ledger
hashes = ledger.get("hashes", {})
hashes[page_id] = file_hash
ledger["hashes"] = hashes
save_ledger(project_name, ledger)
return {"status": "success", "message": f"Compiled and hashed '{page_id}'", "hash": file_hash}

View File

@@ -0,0 +1,18 @@
import os
# Resolve the geoscaper module root directory
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) # agents/modules/geoscaper/lib
GEOSCAPER_DIR = os.path.abspath(os.path.join(SCRIPT_DIR, "..")) # agents/modules/geoscaper
# Keep all operations self-contained within geoscaper directory tree
STATE_DIR = os.path.join(GEOSCAPER_DIR, "state")
PROJECTS_DIR = os.path.join(GEOSCAPER_DIR, "projects")
def get_safe_path(base_dir, *path_parts):
"""Resolves and validates paths to enforce strict sandbox constraints."""
real_base = os.path.realpath(base_dir)
real_target = os.path.realpath(os.path.join(real_base, *path_parts))
if not real_target.startswith(real_base + os.path.sep) and real_target != real_base:
raise PermissionError(f"Security Fault: Path '{real_target}' escaped '{real_base}'")
return real_target

View File

@@ -0,0 +1,47 @@
import json
import os
from .security import get_safe_path, STATE_DIR
def _load_json(filename):
try:
path = get_safe_path(STATE_DIR, filename)
if os.path.exists(path):
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
pass
return {}
def _save_json(filename, data):
os.makedirs(os.path.realpath(STATE_DIR), exist_ok=True)
path = get_safe_path(STATE_DIR, filename)
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
# --- Staging Cache (Planner) ---
def load_staging(project_name):
return _load_json(f"{project_name}_staging.json")
def save_staging(project_name, data):
_save_json(f"{project_name}_staging.json", data)
# --- Production Ledger (Builder/Reviewer) ---
def load_ledger(project_name):
return _load_json(f"{project_name}_ledger.json")
def save_ledger(project_name, data):
_save_json(f"{project_name}_ledger.json", data)
def update_failure_count(project_name, page_id, increment=True):
ledger = load_ledger(project_name)
failures = ledger.get("failure_counts", {})
current = failures.get(page_id, 0)
if increment:
failures[page_id] = current + 1
else:
failures[page_id] = 0
ledger["failure_counts"] = failures
save_ledger(project_name, ledger)
return failures[page_id]