Yeah baby

This commit is contained in:
Ben Mosley
2026-02-25 12:25:28 -06:00
commit e6acecd5b5
3 changed files with 464 additions and 0 deletions

210
capture_generic.py Normal file
View File

@@ -0,0 +1,210 @@
# capture_generic.py
import json
import sqlite3
from datetime import datetime
from pathlib import Path
from playwright.sync_api import sync_playwright
DB_PATH = "bsg.sqlite3"
SESSION_DIR = "pw_session_vscode"
CAPTURE_DIR = "captures"
def init_db():
con = sqlite3.connect(DB_PATH)
cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
round TEXT NOT NULL,
page_url TEXT NOT NULL,
page_title TEXT,
captured_at TEXT NOT NULL,
page_html TEXT
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS tables (
id INTEGER PRIMARY KEY AUTOINCREMENT,
snapshot_id INTEGER NOT NULL,
table_index INTEGER NOT NULL,
caption TEXT,
outer_html TEXT NOT NULL,
json_rows TEXT NOT NULL,
FOREIGN KEY(snapshot_id) REFERENCES snapshots(id)
)
""")
con.commit()
return con
def extract_tables(page):
"""
Returns list of dicts:
[{ index, caption, outer_html, rows }]
where rows is a simple 2D list of cell texts.
"""
script = r"""
() => {
const tables = Array.from(document.querySelectorAll("table"));
return tables.map((t, i) => {
const cap = t.querySelector("caption");
const caption = cap ? cap.innerText.trim() : null;
const rows = Array.from(t.querySelectorAll("tr")).map(tr => {
return Array.from(tr.querySelectorAll("th,td")).map(cell => {
return (cell.innerText || "")
.replace(/\s+/g, " ")
.trim();
});
});
return { index: i, caption, outer_html: t.outerHTML, rows };
});
}
"""
return page.evaluate(script)
def save_snapshot(con, round_id, url, title, page_html, tables):
cur = con.cursor()
captured_at = datetime.now().isoformat(timespec="seconds")
cur.execute(
"INSERT INTO snapshots(round, page_url, page_title, captured_at, page_html) VALUES(?,?,?,?,?)",
(round_id, url, title, captured_at, page_html)
)
snapshot_id = cur.lastrowid
for t in tables:
cur.execute(
"INSERT INTO tables(snapshot_id, table_index, caption, outer_html, json_rows) VALUES(?,?,?,?,?)",
(snapshot_id, int(t["index"]), t.get("caption"), t["outer_html"], json.dumps(t["rows"]))
)
con.commit()
return snapshot_id, captured_at
def write_snapshot_files(round_id, snapshot_id, url, title, captured_at, page_html, tables):
snap_dir = Path(CAPTURE_DIR) / round_id / f"snapshot_{snapshot_id:04d}"
tables_dir = snap_dir / "tables"
tables_dir.mkdir(parents=True, exist_ok=True)
meta = {
"snapshot_id": snapshot_id,
"round": round_id,
"url": url,
"title": title,
"captured_at": captured_at,
"table_count": len(tables),
"tables": [{"index": t["index"], "caption": t.get("caption")} for t in tables]
}
(snap_dir / "meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8")
(snap_dir / "page.html").write_text(page_html, encoding="utf-8")
for t in tables:
idx = int(t["index"])
(tables_dir / f"table_{idx:02d}.json").write_text(
json.dumps({"index": idx, "caption": t.get("caption"), "rows": t["rows"]}, indent=2),
encoding="utf-8"
)
(tables_dir / f"table_{idx:02d}.html").write_text(t["outer_html"], encoding="utf-8")
def pick_capture_page(context):
"""
Pick the page/tab that most likely contains the report you want.
- Ignore about:blank
- Prefer a non-lobby URL if available
- Otherwise use the most recently opened page
"""
pages = [p for p in context.pages if p.url and p.url != "about:blank"]
if not pages:
return context.pages[0]
non_lobby = [p for p in pages if "/users/lobby" not in p.url]
if non_lobby:
return non_lobby[-1]
return pages[-1]
def safe_page_read(page):
"""
Read url/title/html with a little safety in case the page is navigating.
"""
try:
url = page.url
title = page.title()
html = page.content()
return url, title, html, None
except Exception as e:
return None, None, None, e
def main():
con = init_db()
round_id = input("Round label (e.g., Y11 / Round1): ").strip() or "Round1"
Path(SESSION_DIR).mkdir(parents=True, exist_ok=True)
Path(CAPTURE_DIR).mkdir(parents=True, exist_ok=True)
with sync_playwright() as p:
context = p.chromium.launch_persistent_context(
user_data_dir=SESSION_DIR,
headless=False
)
page = context.new_page()
print("\nBrowser opened.")
print("✅ Log in manually (first time only).")
print("✅ Navigate to a BSG report page (may open a new tab).")
print("✅ Press Enter here to CAPTURE the most relevant tab.\n")
while True:
cmd = input("Enter = capture | 'q' = quit: ").strip().lower()
if cmd == "q":
break
# ✅ Pick the most relevant current tab/page (reports often open new tabs)
page = pick_capture_page(context)
url, title, html, err = safe_page_read(page)
if err:
print(f"Could not read current page (maybe navigating). Try again.\nError: {err}")
continue
tables = extract_tables(page)
if not tables:
print(f"No <table> tags found on this page: {url}")
continue
snapshot_id, captured_at = save_snapshot(con, round_id, url, title, html, tables)
write_snapshot_files(round_id, snapshot_id, url, title, captured_at, html, tables)
print(f"\n✅ Captured snapshot {snapshot_id:04d}")
print(f" Captured from: {url}")
print(f" Title: {title}")
print(f" Tables found: {len(tables)}")
print(f" 📁 Files: {CAPTURE_DIR}/{round_id}/snapshot_{snapshot_id:04d}/")
for t in tables[:10]:
cap = t.get("caption") or "(no caption)"
print(f" - table[{t['index']}]: {cap}")
if len(tables) > 10:
print(f" ...and {len(tables) - 10} more tables")
context.close()
con.close()
print("Done.")
if __name__ == "__main__":
main()

127
compare_snapshots.py Normal file
View File

@@ -0,0 +1,127 @@
# compare_snapshots.py
import json
from pathlib import Path
#########################################################
# Utility
#########################################################
def load_json(path):
return json.loads(Path(path).read_text())
def clean_money(val):
return float(val.replace("$","").replace(",",""))
def clean_percent(val):
return float(val.replace("%",""))
#########################################################
# Extract Scorecard
#########################################################
def extract_scorecard(snapshot_dir):
table0 = Path(snapshot_dir)/"tables"/"table_00.json"
table1 = Path(snapshot_dir)/"tables"/"table_01.json"
t0 = load_json(table0)["rows"]
t1 = load_json(table1)["rows"]
metrics = {}
for row in t0:
name = row[0]
projected = row[2]
if "Earnings Per Share" in name:
metrics["EPS"] = clean_money(projected)
if "Return On Equity" in name:
metrics["ROE"] = clean_percent(projected)
if "Credit Rating" in name:
metrics["Credit"] = projected
if "Image Rating" in name:
metrics["Image"] = int(projected)
for row in t1:
name = row[0]
projected = row[1]
if "Net Profit" in name:
metrics["Profit"] = int(projected.replace(",",""))
if "Ending Cash" in name:
metrics["Cash"] = int(projected.replace(",",""))
return metrics
#########################################################
# Rank Decisions
#########################################################
def score(m):
s = 0
s += m["EPS"] * 100
s += m["ROE"] * 5
s += m["Image"] * 2
s += m["Cash"] / 1000
return s
#########################################################
# Main
#########################################################
base = Path("captures/Y11 / Round1")
snapshots = sorted(base.glob("snapshot_*"))
results = []
for snap in snapshots:
try:
m = extract_scorecard(snap)
s = score(m)
results.append((s,snap.name,m))
except Exception as e:
print("Skipping",snap.name,e)
results.sort(reverse=True)
print("\n=== Snapshot Rankings ===\n")
for s,name,m in results:
print(name)
print(" Score:",round(s,2))
print(" EPS:",m["EPS"])
print(" ROE:",m["ROE"])
print(" Image:",m["Image"])
print(" Cash:",m["Cash"])
print(" Credit:",m["Credit"])
print()

127
inspect_snapshot.py Normal file
View File

@@ -0,0 +1,127 @@
# inspect_snapshot.py
import argparse
import csv
import json
from pathlib import Path
from typing import List, Any
def load_json(p: Path) -> dict:
return json.loads(p.read_text(encoding="utf-8"))
def find_table_files(snapshot_dir: Path) -> List[Path]:
tables_dir = snapshot_dir / "tables"
if not tables_dir.exists():
raise SystemExit(f"Missing tables/ folder in: {snapshot_dir}")
return sorted(tables_dir.glob("table_*.json"))
def list_tables(snapshot_dir: Path) -> None:
meta_path = snapshot_dir / "meta.json"
if not meta_path.exists():
raise SystemExit(f"Missing meta.json in: {snapshot_dir}")
meta = load_json(meta_path)
print(f"\nSnapshot: {snapshot_dir}")
print(f"Title: {meta.get('title')}")
print(f"URL: {meta.get('url')}")
print(f"Captured: {meta.get('captured_at')}")
print(f"Tables: {meta.get('table_count')}\n")
tables_dir = snapshot_dir / "tables"
for t in meta.get("tables", []):
idx = int(t["index"])
cap = t.get("caption") or "(no caption)"
tjson = tables_dir / f"table_{idx:02d}.json"
if not tjson.exists():
print(f"[{idx:02d}] {cap} (missing file)")
continue
data = load_json(tjson)
rows = data.get("rows", [])
r = len(rows)
c = max((len(row) for row in rows), default=0)
# quick preview of row 0 (often header)
preview = ""
if rows:
preview = " | ".join(rows[0][:6])
if len(preview) > 100:
preview = preview[:100] + ""
print(f"[{idx:02d}] {cap} ({r}x{c})")
if preview:
print(f" {preview}")
def print_table(snapshot_dir: Path, idx: int, limit_rows: int = 60, limit_cols: int = 14) -> List[List[str]]:
tjson = snapshot_dir / "tables" / f"table_{idx:02d}.json"
if not tjson.exists():
raise SystemExit(f"Table not found: {tjson}")
data = load_json(tjson)
caption = data.get("caption") or "(no caption)"
rows: List[List[str]] = data.get("rows", [])
print(f"\nTable [{idx:02d}] — {caption}\n")
if not rows:
print("(empty)")
return rows
cols = min(max((len(r) for r in rows), default=0), limit_cols)
# compute column widths
widths = [0] * cols
for r in rows[:limit_rows]:
for j in range(cols):
cell = r[j] if j < len(r) else ""
widths[j] = min(34, max(widths[j], len(cell)))
def fmt_row(r: List[str]) -> str:
out = []
for j in range(cols):
cell = r[j] if j < len(r) else ""
cell = cell.replace("\n", " ")
if len(cell) > 33:
cell = cell[:32] + ""
out.append(cell.ljust(widths[j]))
return " | ".join(out)
for i, r in enumerate(rows[:limit_rows]):
print(fmt_row(r))
if i == 0:
print("-" * min(160, sum(widths) + 3 * (cols - 1)))
if len(rows) > limit_rows:
print(f"\n…({len(rows) - limit_rows} more rows)")
return rows
def export_csv(rows: List[List[Any]], out_path: Path) -> None:
out_path.parent.mkdir(parents=True, exist_ok=True)
with out_path.open("w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
for r in rows:
w.writerow(r)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("snapshot_dir", help="Path to a snapshot folder (has meta.json + tables/)")
ap.add_argument("--table", type=int, help="Print a specific table index (e.g. 5)")
ap.add_argument("--csv", help="Export printed table to CSV at this path")
ap.add_argument("--rows", type=int, default=60, help="Row print limit (default 60)")
ap.add_argument("--cols", type=int, default=14, help="Col print limit (default 14)")
args = ap.parse_args()
snapshot_dir = Path(args.snapshot_dir).expanduser().resolve()
if args.table is None:
list_tables(snapshot_dir)
return
rows = print_table(snapshot_dir, args.table, limit_rows=args.rows, limit_cols=args.cols)
if args.csv:
export_csv(rows, Path(args.csv).expanduser().resolve())
print(f"\n✅ CSV written: {args.csv}")
if __name__ == "__main__":
main()