Files

337 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app.py
from __future__ import annotations
import os, json
from datetime import datetime
from typing import Dict, Any
from urllib.parse import urlencode, urlsplit, urlunsplit, parse_qsl
from flask import Flask, render_template, request, redirect, url_for, jsonify, flash, Response
from sqlalchemy import create_engine, text, Column, Integer, String, DateTime, JSON
from sqlalchemy.orm import declarative_base, sessionmaker, scoped_session
from sqlalchemy.exc import SQLAlchemyError
from functools import wraps
import hmac
from werkzeug.security import check_password_hash
# -----------------------------------------------------------------------------
# App
# -----------------------------------------------------------------------------
app = Flask(__name__, static_folder="static", static_url_path="/static")
BRAND = "BrookHaven Technologies"
TAGLINE = "Fast to prototype. Safe to scale."
# --- Personal contact (override via env) ---
CONTACT = {
"name": os.environ.get("BH_CONTACT_NAME", "Benjamin Mosley"),
"title": os.environ.get("BH_CONTACT_TITLE", "Founder, BrookHaven Technologies"),
"email": os.environ.get("BH_CONTACT_EMAIL", "ben@bennyshouse.net"),
"phone": os.environ.get("BH_CONTACT_PHONE", "(806) 655 2300)"),
"city": os.environ.get("BH_CONTACT_CITY", "Canyon / Amarillo / Borger / Remote"),
"cal": os.environ.get("BH_CONTACT_CAL", "https://calendly.com/bennyshouse24/30min"),
"link": os.environ.get("BH_CONTACT_LINK", "https://www.linkedin.com/in/benjamin-mosley-849643329/"),
"site": os.environ.get("BH_CONTACT_SITE", "https://bennyshouse.net"),
"hours": os.environ.get("BH_CONTACT_HOURS", "MonFri, 9a5p CT"),
}
app.config.update(
SECRET_KEY=os.environ.get("APP_SECRET_KEY", "dev"),
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE="Lax",
SESSION_COOKIE_SECURE=bool(int(os.environ.get("COOKIE_SECURE", "1"))), # set 1 in prod with HTTPS
)
# Admin credentials (env-driven)
ADMIN_USER = os.environ.get("BH_ADMIN_USER", "admin")
ADMIN_PW_HASH = os.environ.get("BH_ADMIN_PASSWORD_HASH", "32768:8:1$pgll8a2zdtxky50G$8ef13bb775569f480da14618433b7b80a93f5cb3ef99b67878ddfb058d39e858f05d81b25c88365737d81400ee287a156c76de7b51aed33ea667030f7a83e10d") # pbkdf2 hash
ADMIN_BEARER = os.environ.get("BH_ADMIN_BEARER", "") # optional static token
# -----------------------------------------------------------------------------
# DB (MariaDB)
# -----------------------------------------------------------------------------
DB_URL = os.environ.get("DB_URL", "mysql+pymysql://tapdown:Swaows.1234@127.0.0.1/tapdown")
engine = create_engine(
DB_URL,
pool_size=10,
max_overflow=20,
pool_recycle=1800,
pool_pre_ping=True,
isolation_level="READ COMMITTED",
future=True,
)
SessionLocal = scoped_session(sessionmaker(bind=engine, expire_on_commit=False, future=True))
Base = declarative_base()
class Inquiry(Base):
__tablename__ = "bh_inquiries"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(160), nullable=False)
email = Column(String(200), nullable=False)
message = Column(String(4000), nullable=False)
nda = Column(String(8), nullable=False, default="no") # yes|no
meta = Column(JSON, nullable=False, default={}) # e.g. user agent
created_at = Column(DateTime, nullable=False, default=datetime.utcnow)
_tables_ready = False
@app.before_request
def ensure_tables():
global _tables_ready
if _tables_ready:
return
try:
with engine.begin() as conn:
Base.metadata.create_all(conn)
_tables_ready = True
except SQLAlchemyError:
app.logger.exception("DB init failed; continuing without DB")
@app.teardown_appcontext
def remove_session(_=None):
SessionLocal.remove()
# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------
def with_utm(url: str, extra: Dict[str, str] | None = None) -> str:
scheme, netloc, path, query, frag = urlsplit(url)
q = dict(parse_qsl(query))
q.update(extra or {})
return urlunsplit((scheme, netloc, path, urlencode(q), frag))
app.config.update(
SECRET_KEY=os.environ.get("APP_SECRET_KEY", "dev"),
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE="Lax",
SESSION_COOKIE_SECURE=bool(int(os.environ.get("COOKIE_SECURE", "0"))), # set 1 in prod with HTTPS
)
def _is_admin_request():
# 1) Bearer token (e.g., for automation or CSV curl)
authz = request.headers.get("Authorization", "")
if ADMIN_BEARER and authz.startswith("Bearer "):
token = authz[7:].strip()
if hmac.compare_digest(token, ADMIN_BEARER):
return True
# 2) HTTP Basic for humans
auth = request.authorization
if auth and ADMIN_PW_HASH and auth.username == ADMIN_USER:
try:
if check_password_hash(ADMIN_PW_HASH, auth.password):
return True
except Exception:
pass
return False
# -----------------------------------------------------------------------------
# Routes (pages)
# -----------------------------------------------------------------------------
@app.get("/")
def home():
return render_template("index.html", brand=BRAND, tagline=TAGLINE)
def require_admin(view):
@wraps(view)
def _wrapped(*args, **kwargs):
if _is_admin_request():
return view(*args, **kwargs)
return Response(
"Authentication required",
401,
{"WWW-Authenticate": 'Basic realm="BrookHaven Admin"'},
)
return _wrapped
@app.get("/about")
def about():
return render_template("about.html", brand=BRAND)
@app.get("/services")
def services():
return render_template("services.html", brand=BRAND)
@app.get("/work")
def work():
# Example case studies (could be a JSON file later)
cases = [
{
"title": "Tapdown Showdown — Cyber Sale Activation",
"desc": "Survey-powered mini-game; branded UI, MariaDB analytics, CPU mode for flaky networks.",
"bullets": ["90%+ survey completion", "UTM funnels to promo pages", "Works offline/kiosk"],
"image": "/static/brookhaven-case.jpg",
},
{
"title": "Kiosk Checkout Prototype",
"desc": "Self-serve event checkout with QR receipts and local-first sync.",
"bullets": ["Local cache", "Queue-busting UX", "Auto export"],
"image": "/static/brookhaven-kiosk.jpg",
},
]
return render_template("work.html", brand=BRAND, cases=cases)
@app.context_processor
def inject_contact():
return {"CONTACT": CONTACT}
@app.get("/contact.vcf")
def contact_vcf():
# Generate a simple vCard 3.0
n = CONTACT["name"]
parts = n.split(" ", 1)
last = parts[-1] if len(parts) > 1 else parts[0]
first = parts[0]
phone = CONTACT["phone"].replace(" ", "")
email = CONTACT["email"]
org = BRAND
title = CONTACT["title"]
url = CONTACT["site"]
city = CONTACT["city"]
vcard = f"""BEGIN:VCARD
VERSION:3.0
N:{last};{first};;;
FN:{n}
ORG:{org}
TITLE:{title}
TEL;TYPE=CELL,VOICE:{phone}
EMAIL;TYPE=INTERNET:{email}
URL:{url}
ADR;TYPE=WORK:;;{city};;;;
END:VCARD
"""
return (vcard, 200, {
"Content-Type": "text/vcard; charset=utf-8",
"Content-Disposition": 'attachment; filename="brookhaven-contact.vcf"',
})
@app.get("/contact")
def contact():
return render_template("contact.html", brand=BRAND)
@app.post("/contact")
def contact_post():
name = (request.form.get("name") or "").strip()
email = (request.form.get("email") or "").strip()
message = (request.form.get("message") or "").strip()
nda = "yes" if request.form.get("nda") in ("on", "yes", "true") else "no"
if not name or not email or not message:
flash("Please fill name, email, and a short description.", "error")
return redirect(url_for("contact"))
# Persist (best-effort)
meta = {"ua": request.headers.get("User-Agent", ""), "ip": request.remote_addr}
db = SessionLocal()
try:
db.add(Inquiry(name=name, email=email, message=message, nda=nda, meta=meta))
db.commit()
except SQLAlchemyError:
db.rollback()
app.logger.exception("Failed to write inquiry")
# keep going anyway to the thank-you page
return redirect(url_for("thanks"))
@app.get("/thanks")
def thanks():
return render_template("thanks.html", brand=BRAND)
# -----------------------------------------------------------------------------
# Admin (read-only)
# -----------------------------------------------------------------------------
@app.get("/admin/inquiries")
@require_admin
def admin_inquiries():
page = max(1, int(request.args.get("page", 1)))
per_page = 25
offset = (page - 1) * per_page
db = SessionLocal()
try:
rows = db.execute(
text("""SELECT id,name,email,message,nda,meta,created_at
FROM bh_inquiries ORDER BY created_at DESC
LIMIT :limit OFFSET :offset"""),
{"limit": per_page, "offset": offset}
).mappings().all()
total = db.execute(text("SELECT COUNT(*) FROM bh_inquiries")).scalar_one()
except Exception:
app.logger.exception("Query failed")
rows, total = [], 0
finally:
db.close()
# Convert meta JSON (string from PyMySQL) to dict
def parse_json(val):
if isinstance(val, dict): return val
if isinstance(val, str) and val:
try: return json.loads(val)
except Exception: return {}
return {}
processed = []
for r in rows:
processed.append({
"id": r["id"], "name": r["name"], "email": r["email"],
"message": r["message"], "nda": r["nda"],
"meta": parse_json(r["meta"]),
"created_at": r["created_at"],
})
pages = (total // per_page) + (1 if total % per_page else 0)
return render_template("admin_inquiries.html",
rows=processed, page=page, pages=pages, total=total, brand=BRAND)
@app.get("/admin/inquiries.csv")
@require_admin
def admin_inquiries_csv():
db = SessionLocal()
try:
rows = db.execute(
text("""SELECT id,name,email,message,nda,meta,created_at
FROM bh_inquiries ORDER BY created_at DESC""")
).mappings().all()
finally:
db.close()
import csv, io
buf = io.StringIO()
w = csv.writer(buf)
w.writerow(["id","name","email","nda","message","ua","ip","created_at"])
for r in rows:
meta = r["meta"]
if isinstance(meta, str):
try:
meta = json.loads(meta)
except Exception:
meta = {}
w.writerow([
r["id"], r["name"], r["email"], r["nda"],
(r["message"] or "").replace("\n"," ").strip(),
(meta or {}).get("ua",""), (meta or {}).get("ip",""),
r["created_at"],
])
out = buf.getvalue()
return out, 200, {
"Content-Type": "text/csv; charset=utf-8",
"Content-Disposition": "attachment; filename=brookhaven_inquiries.csv"
}
# -----------------------------------------------------------------------------
# Utilities
# -----------------------------------------------------------------------------
@app.get("/healthz")
def healthz():
return jsonify({"ok": True, "brand": BRAND}), 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5050, debug=False)