From e6acecd5b5c67f0e22ef7b2da322e280ab1c82a8 Mon Sep 17 00:00:00 2001 From: Ben Mosley Date: Wed, 25 Feb 2026 12:25:28 -0600 Subject: [PATCH] Yeah baby --- capture_generic.py | 210 +++++++++++++++++++++++++++++++++++++++++++ compare_snapshots.py | 127 ++++++++++++++++++++++++++ inspect_snapshot.py | 127 ++++++++++++++++++++++++++ 3 files changed, 464 insertions(+) create mode 100644 capture_generic.py create mode 100644 compare_snapshots.py create mode 100644 inspect_snapshot.py diff --git a/capture_generic.py b/capture_generic.py new file mode 100644 index 0000000..50f8485 --- /dev/null +++ b/capture_generic.py @@ -0,0 +1,210 @@ +# capture_generic.py +import json +import sqlite3 +from datetime import datetime +from pathlib import Path + +from playwright.sync_api import sync_playwright + +DB_PATH = "bsg.sqlite3" +SESSION_DIR = "pw_session_vscode" +CAPTURE_DIR = "captures" + + +def init_db(): + con = sqlite3.connect(DB_PATH) + cur = con.cursor() + + cur.execute(""" + CREATE TABLE IF NOT EXISTS snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + round TEXT NOT NULL, + page_url TEXT NOT NULL, + page_title TEXT, + captured_at TEXT NOT NULL, + page_html TEXT + ) + """) + + cur.execute(""" + CREATE TABLE IF NOT EXISTS tables ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + snapshot_id INTEGER NOT NULL, + table_index INTEGER NOT NULL, + caption TEXT, + outer_html TEXT NOT NULL, + json_rows TEXT NOT NULL, + FOREIGN KEY(snapshot_id) REFERENCES snapshots(id) + ) + """) + + con.commit() + return con + + +def extract_tables(page): + """ + Returns list of dicts: + [{ index, caption, outer_html, rows }] + where rows is a simple 2D list of cell texts. + """ + script = r""" + () => { + const tables = Array.from(document.querySelectorAll("table")); + return tables.map((t, i) => { + const cap = t.querySelector("caption"); + const caption = cap ? cap.innerText.trim() : null; + + const rows = Array.from(t.querySelectorAll("tr")).map(tr => { + return Array.from(tr.querySelectorAll("th,td")).map(cell => { + return (cell.innerText || "") + .replace(/\s+/g, " ") + .trim(); + }); + }); + + return { index: i, caption, outer_html: t.outerHTML, rows }; + }); + } + """ + return page.evaluate(script) + + +def save_snapshot(con, round_id, url, title, page_html, tables): + cur = con.cursor() + captured_at = datetime.now().isoformat(timespec="seconds") + + cur.execute( + "INSERT INTO snapshots(round, page_url, page_title, captured_at, page_html) VALUES(?,?,?,?,?)", + (round_id, url, title, captured_at, page_html) + ) + snapshot_id = cur.lastrowid + + for t in tables: + cur.execute( + "INSERT INTO tables(snapshot_id, table_index, caption, outer_html, json_rows) VALUES(?,?,?,?,?)", + (snapshot_id, int(t["index"]), t.get("caption"), t["outer_html"], json.dumps(t["rows"])) + ) + + con.commit() + return snapshot_id, captured_at + + +def write_snapshot_files(round_id, snapshot_id, url, title, captured_at, page_html, tables): + snap_dir = Path(CAPTURE_DIR) / round_id / f"snapshot_{snapshot_id:04d}" + tables_dir = snap_dir / "tables" + tables_dir.mkdir(parents=True, exist_ok=True) + + meta = { + "snapshot_id": snapshot_id, + "round": round_id, + "url": url, + "title": title, + "captured_at": captured_at, + "table_count": len(tables), + "tables": [{"index": t["index"], "caption": t.get("caption")} for t in tables] + } + + (snap_dir / "meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8") + (snap_dir / "page.html").write_text(page_html, encoding="utf-8") + + for t in tables: + idx = int(t["index"]) + (tables_dir / f"table_{idx:02d}.json").write_text( + json.dumps({"index": idx, "caption": t.get("caption"), "rows": t["rows"]}, indent=2), + encoding="utf-8" + ) + (tables_dir / f"table_{idx:02d}.html").write_text(t["outer_html"], encoding="utf-8") + + +def pick_capture_page(context): + """ + Pick the page/tab that most likely contains the report you want. + - Ignore about:blank + - Prefer a non-lobby URL if available + - Otherwise use the most recently opened page + """ + pages = [p for p in context.pages if p.url and p.url != "about:blank"] + if not pages: + return context.pages[0] + + non_lobby = [p for p in pages if "/users/lobby" not in p.url] + if non_lobby: + return non_lobby[-1] + + return pages[-1] + + +def safe_page_read(page): + """ + Read url/title/html with a little safety in case the page is navigating. + """ + try: + url = page.url + title = page.title() + html = page.content() + return url, title, html, None + except Exception as e: + return None, None, None, e + + +def main(): + con = init_db() + round_id = input("Round label (e.g., Y11 / Round1): ").strip() or "Round1" + + Path(SESSION_DIR).mkdir(parents=True, exist_ok=True) + Path(CAPTURE_DIR).mkdir(parents=True, exist_ok=True) + + with sync_playwright() as p: + context = p.chromium.launch_persistent_context( + user_data_dir=SESSION_DIR, + headless=False + ) + page = context.new_page() + + print("\nBrowser opened.") + print("✅ Log in manually (first time only).") + print("✅ Navigate to a BSG report page (may open a new tab).") + print("✅ Press Enter here to CAPTURE the most relevant tab.\n") + + while True: + cmd = input("Enter = capture | 'q' = quit: ").strip().lower() + if cmd == "q": + break + + # ✅ Pick the most relevant current tab/page (reports often open new tabs) + page = pick_capture_page(context) + + url, title, html, err = safe_page_read(page) + if err: + print(f"Could not read current page (maybe navigating). Try again.\nError: {err}") + continue + + tables = extract_tables(page) + if not tables: + print(f"No tags found on this page: {url}") + continue + + snapshot_id, captured_at = save_snapshot(con, round_id, url, title, html, tables) + write_snapshot_files(round_id, snapshot_id, url, title, captured_at, html, tables) + + print(f"\n✅ Captured snapshot {snapshot_id:04d}") + print(f" Captured from: {url}") + print(f" Title: {title}") + print(f" Tables found: {len(tables)}") + print(f" 📁 Files: {CAPTURE_DIR}/{round_id}/snapshot_{snapshot_id:04d}/") + + for t in tables[:10]: + cap = t.get("caption") or "(no caption)" + print(f" - table[{t['index']}]: {cap}") + if len(tables) > 10: + print(f" ...and {len(tables) - 10} more tables") + + context.close() + + con.close() + print("Done.") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/compare_snapshots.py b/compare_snapshots.py new file mode 100644 index 0000000..86dafb1 --- /dev/null +++ b/compare_snapshots.py @@ -0,0 +1,127 @@ +# compare_snapshots.py + +import json +from pathlib import Path + +######################################################### +# Utility +######################################################### + +def load_json(path): + return json.loads(Path(path).read_text()) + +def clean_money(val): + return float(val.replace("$","").replace(",","")) + +def clean_percent(val): + return float(val.replace("%","")) + +######################################################### +# Extract Scorecard +######################################################### + +def extract_scorecard(snapshot_dir): + + table0 = Path(snapshot_dir)/"tables"/"table_00.json" + table1 = Path(snapshot_dir)/"tables"/"table_01.json" + + t0 = load_json(table0)["rows"] + t1 = load_json(table1)["rows"] + + metrics = {} + + for row in t0: + + name = row[0] + + projected = row[2] + + if "Earnings Per Share" in name: + metrics["EPS"] = clean_money(projected) + + if "Return On Equity" in name: + metrics["ROE"] = clean_percent(projected) + + if "Credit Rating" in name: + metrics["Credit"] = projected + + if "Image Rating" in name: + metrics["Image"] = int(projected) + + + for row in t1: + + name = row[0] + + projected = row[1] + + if "Net Profit" in name: + metrics["Profit"] = int(projected.replace(",","")) + + if "Ending Cash" in name: + metrics["Cash"] = int(projected.replace(",","")) + + return metrics + + +######################################################### +# Rank Decisions +######################################################### + +def score(m): + + s = 0 + + s += m["EPS"] * 100 + s += m["ROE"] * 5 + s += m["Image"] * 2 + s += m["Cash"] / 1000 + + return s + + +######################################################### +# Main +######################################################### + +base = Path("captures/Y11 / Round1") + +snapshots = sorted(base.glob("snapshot_*")) + +results = [] + +for snap in snapshots: + + try: + + m = extract_scorecard(snap) + + s = score(m) + + results.append((s,snap.name,m)) + + except Exception as e: + + print("Skipping",snap.name,e) + + + +results.sort(reverse=True) + + +print("\n=== Snapshot Rankings ===\n") + + +for s,name,m in results: + + print(name) + + print(" Score:",round(s,2)) + + print(" EPS:",m["EPS"]) + print(" ROE:",m["ROE"]) + print(" Image:",m["Image"]) + print(" Cash:",m["Cash"]) + print(" Credit:",m["Credit"]) + + print() \ No newline at end of file diff --git a/inspect_snapshot.py b/inspect_snapshot.py new file mode 100644 index 0000000..b2b8dfe --- /dev/null +++ b/inspect_snapshot.py @@ -0,0 +1,127 @@ +# inspect_snapshot.py +import argparse +import csv +import json +from pathlib import Path +from typing import List, Any + +def load_json(p: Path) -> dict: + return json.loads(p.read_text(encoding="utf-8")) + +def find_table_files(snapshot_dir: Path) -> List[Path]: + tables_dir = snapshot_dir / "tables" + if not tables_dir.exists(): + raise SystemExit(f"Missing tables/ folder in: {snapshot_dir}") + return sorted(tables_dir.glob("table_*.json")) + +def list_tables(snapshot_dir: Path) -> None: + meta_path = snapshot_dir / "meta.json" + if not meta_path.exists(): + raise SystemExit(f"Missing meta.json in: {snapshot_dir}") + + meta = load_json(meta_path) + print(f"\nSnapshot: {snapshot_dir}") + print(f"Title: {meta.get('title')}") + print(f"URL: {meta.get('url')}") + print(f"Captured: {meta.get('captured_at')}") + print(f"Tables: {meta.get('table_count')}\n") + + tables_dir = snapshot_dir / "tables" + for t in meta.get("tables", []): + idx = int(t["index"]) + cap = t.get("caption") or "(no caption)" + tjson = tables_dir / f"table_{idx:02d}.json" + + if not tjson.exists(): + print(f"[{idx:02d}] {cap} (missing file)") + continue + + data = load_json(tjson) + rows = data.get("rows", []) + r = len(rows) + c = max((len(row) for row in rows), default=0) + + # quick preview of row 0 (often header) + preview = "" + if rows: + preview = " | ".join(rows[0][:6]) + if len(preview) > 100: + preview = preview[:100] + "…" + + print(f"[{idx:02d}] {cap} ({r}x{c})") + if preview: + print(f" {preview}") + +def print_table(snapshot_dir: Path, idx: int, limit_rows: int = 60, limit_cols: int = 14) -> List[List[str]]: + tjson = snapshot_dir / "tables" / f"table_{idx:02d}.json" + if not tjson.exists(): + raise SystemExit(f"Table not found: {tjson}") + + data = load_json(tjson) + caption = data.get("caption") or "(no caption)" + rows: List[List[str]] = data.get("rows", []) + + print(f"\nTable [{idx:02d}] — {caption}\n") + + if not rows: + print("(empty)") + return rows + + cols = min(max((len(r) for r in rows), default=0), limit_cols) + + # compute column widths + widths = [0] * cols + for r in rows[:limit_rows]: + for j in range(cols): + cell = r[j] if j < len(r) else "" + widths[j] = min(34, max(widths[j], len(cell))) + + def fmt_row(r: List[str]) -> str: + out = [] + for j in range(cols): + cell = r[j] if j < len(r) else "" + cell = cell.replace("\n", " ") + if len(cell) > 33: + cell = cell[:32] + "…" + out.append(cell.ljust(widths[j])) + return " | ".join(out) + + for i, r in enumerate(rows[:limit_rows]): + print(fmt_row(r)) + if i == 0: + print("-" * min(160, sum(widths) + 3 * (cols - 1))) + + if len(rows) > limit_rows: + print(f"\n…({len(rows) - limit_rows} more rows)") + + return rows + +def export_csv(rows: List[List[Any]], out_path: Path) -> None: + out_path.parent.mkdir(parents=True, exist_ok=True) + with out_path.open("w", newline="", encoding="utf-8") as f: + w = csv.writer(f) + for r in rows: + w.writerow(r) + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("snapshot_dir", help="Path to a snapshot folder (has meta.json + tables/)") + ap.add_argument("--table", type=int, help="Print a specific table index (e.g. 5)") + ap.add_argument("--csv", help="Export printed table to CSV at this path") + ap.add_argument("--rows", type=int, default=60, help="Row print limit (default 60)") + ap.add_argument("--cols", type=int, default=14, help="Col print limit (default 14)") + args = ap.parse_args() + + snapshot_dir = Path(args.snapshot_dir).expanduser().resolve() + + if args.table is None: + list_tables(snapshot_dir) + return + + rows = print_table(snapshot_dir, args.table, limit_rows=args.rows, limit_cols=args.cols) + if args.csv: + export_csv(rows, Path(args.csv).expanduser().resolve()) + print(f"\n✅ CSV written: {args.csv}") + +if __name__ == "__main__": + main() \ No newline at end of file