210 lines
6.3 KiB
Python
210 lines
6.3 KiB
Python
# capture_generic.py
|
|
import json
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
DB_PATH = "bsg.sqlite3"
|
|
SESSION_DIR = "pw_session_vscode"
|
|
CAPTURE_DIR = "captures"
|
|
|
|
|
|
def init_db():
|
|
con = sqlite3.connect(DB_PATH)
|
|
cur = con.cursor()
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS snapshots (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
round TEXT NOT NULL,
|
|
page_url TEXT NOT NULL,
|
|
page_title TEXT,
|
|
captured_at TEXT NOT NULL,
|
|
page_html TEXT
|
|
)
|
|
""")
|
|
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS tables (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
snapshot_id INTEGER NOT NULL,
|
|
table_index INTEGER NOT NULL,
|
|
caption TEXT,
|
|
outer_html TEXT NOT NULL,
|
|
json_rows TEXT NOT NULL,
|
|
FOREIGN KEY(snapshot_id) REFERENCES snapshots(id)
|
|
)
|
|
""")
|
|
|
|
con.commit()
|
|
return con
|
|
|
|
|
|
def extract_tables(page):
|
|
"""
|
|
Returns list of dicts:
|
|
[{ index, caption, outer_html, rows }]
|
|
where rows is a simple 2D list of cell texts.
|
|
"""
|
|
script = r"""
|
|
() => {
|
|
const tables = Array.from(document.querySelectorAll("table"));
|
|
return tables.map((t, i) => {
|
|
const cap = t.querySelector("caption");
|
|
const caption = cap ? cap.innerText.trim() : null;
|
|
|
|
const rows = Array.from(t.querySelectorAll("tr")).map(tr => {
|
|
return Array.from(tr.querySelectorAll("th,td")).map(cell => {
|
|
return (cell.innerText || "")
|
|
.replace(/\s+/g, " ")
|
|
.trim();
|
|
});
|
|
});
|
|
|
|
return { index: i, caption, outer_html: t.outerHTML, rows };
|
|
});
|
|
}
|
|
"""
|
|
return page.evaluate(script)
|
|
|
|
|
|
def save_snapshot(con, round_id, url, title, page_html, tables):
|
|
cur = con.cursor()
|
|
captured_at = datetime.now().isoformat(timespec="seconds")
|
|
|
|
cur.execute(
|
|
"INSERT INTO snapshots(round, page_url, page_title, captured_at, page_html) VALUES(?,?,?,?,?)",
|
|
(round_id, url, title, captured_at, page_html)
|
|
)
|
|
snapshot_id = cur.lastrowid
|
|
|
|
for t in tables:
|
|
cur.execute(
|
|
"INSERT INTO tables(snapshot_id, table_index, caption, outer_html, json_rows) VALUES(?,?,?,?,?)",
|
|
(snapshot_id, int(t["index"]), t.get("caption"), t["outer_html"], json.dumps(t["rows"]))
|
|
)
|
|
|
|
con.commit()
|
|
return snapshot_id, captured_at
|
|
|
|
|
|
def write_snapshot_files(round_id, snapshot_id, url, title, captured_at, page_html, tables):
|
|
snap_dir = Path(CAPTURE_DIR) / round_id / f"snapshot_{snapshot_id:04d}"
|
|
tables_dir = snap_dir / "tables"
|
|
tables_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
meta = {
|
|
"snapshot_id": snapshot_id,
|
|
"round": round_id,
|
|
"url": url,
|
|
"title": title,
|
|
"captured_at": captured_at,
|
|
"table_count": len(tables),
|
|
"tables": [{"index": t["index"], "caption": t.get("caption")} for t in tables]
|
|
}
|
|
|
|
(snap_dir / "meta.json").write_text(json.dumps(meta, indent=2), encoding="utf-8")
|
|
(snap_dir / "page.html").write_text(page_html, encoding="utf-8")
|
|
|
|
for t in tables:
|
|
idx = int(t["index"])
|
|
(tables_dir / f"table_{idx:02d}.json").write_text(
|
|
json.dumps({"index": idx, "caption": t.get("caption"), "rows": t["rows"]}, indent=2),
|
|
encoding="utf-8"
|
|
)
|
|
(tables_dir / f"table_{idx:02d}.html").write_text(t["outer_html"], encoding="utf-8")
|
|
|
|
|
|
def pick_capture_page(context):
|
|
"""
|
|
Pick the page/tab that most likely contains the report you want.
|
|
- Ignore about:blank
|
|
- Prefer a non-lobby URL if available
|
|
- Otherwise use the most recently opened page
|
|
"""
|
|
pages = [p for p in context.pages if p.url and p.url != "about:blank"]
|
|
if not pages:
|
|
return context.pages[0]
|
|
|
|
non_lobby = [p for p in pages if "/users/lobby" not in p.url]
|
|
if non_lobby:
|
|
return non_lobby[-1]
|
|
|
|
return pages[-1]
|
|
|
|
|
|
def safe_page_read(page):
|
|
"""
|
|
Read url/title/html with a little safety in case the page is navigating.
|
|
"""
|
|
try:
|
|
url = page.url
|
|
title = page.title()
|
|
html = page.content()
|
|
return url, title, html, None
|
|
except Exception as e:
|
|
return None, None, None, e
|
|
|
|
|
|
def main():
|
|
con = init_db()
|
|
round_id = input("Round label (e.g., Y11 / Round1): ").strip() or "Round1"
|
|
|
|
Path(SESSION_DIR).mkdir(parents=True, exist_ok=True)
|
|
Path(CAPTURE_DIR).mkdir(parents=True, exist_ok=True)
|
|
|
|
with sync_playwright() as p:
|
|
context = p.chromium.launch_persistent_context(
|
|
user_data_dir=SESSION_DIR,
|
|
headless=False
|
|
)
|
|
page = context.new_page()
|
|
|
|
print("\nBrowser opened.")
|
|
print("✅ Log in manually (first time only).")
|
|
print("✅ Navigate to a BSG report page (may open a new tab).")
|
|
print("✅ Press Enter here to CAPTURE the most relevant tab.\n")
|
|
|
|
while True:
|
|
cmd = input("Enter = capture | 'q' = quit: ").strip().lower()
|
|
if cmd == "q":
|
|
break
|
|
|
|
# ✅ Pick the most relevant current tab/page (reports often open new tabs)
|
|
page = pick_capture_page(context)
|
|
|
|
url, title, html, err = safe_page_read(page)
|
|
if err:
|
|
print(f"Could not read current page (maybe navigating). Try again.\nError: {err}")
|
|
continue
|
|
|
|
tables = extract_tables(page)
|
|
if not tables:
|
|
print(f"No <table> tags found on this page: {url}")
|
|
continue
|
|
|
|
snapshot_id, captured_at = save_snapshot(con, round_id, url, title, html, tables)
|
|
write_snapshot_files(round_id, snapshot_id, url, title, captured_at, html, tables)
|
|
|
|
print(f"\n✅ Captured snapshot {snapshot_id:04d}")
|
|
print(f" Captured from: {url}")
|
|
print(f" Title: {title}")
|
|
print(f" Tables found: {len(tables)}")
|
|
print(f" 📁 Files: {CAPTURE_DIR}/{round_id}/snapshot_{snapshot_id:04d}/")
|
|
|
|
for t in tables[:10]:
|
|
cap = t.get("caption") or "(no caption)"
|
|
print(f" - table[{t['index']}]: {cap}")
|
|
if len(tables) > 10:
|
|
print(f" ...and {len(tables) - 10} more tables")
|
|
|
|
context.close()
|
|
|
|
con.close()
|
|
print("Done.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |