# app.py from __future__ import annotations import os, smtplib, sqlite3, json, secrets, time from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from datetime import datetime, timedelta from typing import Dict, Any, Tuple from dotenv import load_dotenv from flask import ( Flask, render_template, request, redirect, url_for, flash, session ) from werkzeug.security import check_password_hash # ----------------------------------------------------------------------------- # Load .env FIRST # ----------------------------------------------------------------------------- load_dotenv() # must come before getenv() usage # ----------------------------------------------------------------------------- # Config # ----------------------------------------------------------------------------- def getenv(name: str, default: str | None=None) -> str | None: return os.environ.get(name, default) APP_SECRET_KEY = getenv("APP_SECRET_KEY", "dev_change_me") # Single admin (from .env) ADMIN_USERNAME = getenv("ADMIN_USERNAME", "") ADMIN_PASSWORD_HASH = getenv("ADMIN_PASSWORD_HASH", "") ADMIN_EMAIL = getenv("ADMIN_EMAIL", "admin@example.com") SMTP_HOST = getenv("SMTP_HOST", "") SMTP_PORT = int(getenv("SMTP_PORT", "587") or "587") SMTP_USER = getenv("SMTP_USER", "") SMTP_PASS = getenv("SMTP_PASS", "") SMTP_FROM = getenv("SMTP_FROM", SMTP_USER or "no-reply@example.com") BASE_URL = getenv("BASE_URL", "http://localhost:5000") DB_PATH = getenv("DB_PATH", "quotes.db") # Session hardening (use HTTPS in prod) SESSION_COOKIE_SECURE = getenv("SESSION_COOKIE_SECURE", "false").lower() == "true" SESSION_COOKIE_HTTPONLY = getenv("SESSION_COOKIE_HTTPONLY", "true").lower() == "true" SESSION_COOKIE_SAMESITE = getenv("SESSION_COOKIE_SAMESITE", "Lax") app = Flask(__name__, static_folder="static", static_url_path="/static") app.secret_key = APP_SECRET_KEY app.config.update( SESSION_COOKIE_SECURE=SESSION_COOKIE_SECURE, SESSION_COOKIE_HTTPONLY=SESSION_COOKIE_HTTPONLY, SESSION_COOKIE_SAMESITE=SESSION_COOKIE_SAMESITE, PERMANENT_SESSION_LIFETIME=timedelta(days=30), ) # ----------------------------------------------------------------------------- # DB # ----------------------------------------------------------------------------- def init_db() -> None: con = sqlite3.connect(DB_PATH) cur = con.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS quote_requests ( id INTEGER PRIMARY KEY AUTOINCREMENT, created_at TEXT NOT NULL, name TEXT NOT NULL, email TEXT NOT NULL, phone TEXT, company TEXT, project_type TEXT, complexity TEXT, urgency TEXT, features TEXT, budget_range TEXT, description TEXT, attachments TEXT, est_hours REAL, est_cost REAL, hourly_rate REAL, json_payload TEXT ) """) con.commit() con.close() def migrate_db() -> None: """Add QoL columns if missing.""" con = sqlite3.connect(DB_PATH) cur = con.cursor() cur.execute("PRAGMA table_info(quote_requests)") cols = {row[1] for row in cur.fetchall()} if "status" not in cols: cur.execute("ALTER TABLE quote_requests ADD COLUMN status TEXT DEFAULT 'open'") if "completed_at" not in cols: cur.execute("ALTER TABLE quote_requests ADD COLUMN completed_at TEXT") if "deleted_at" not in cols: cur.execute("ALTER TABLE quote_requests ADD COLUMN deleted_at TEXT") con.commit() con.close() init_db() migrate_db() # ----------------------------------------------------------------------------- # Auth helpers (sessions + CSRF + simple throttle) # ----------------------------------------------------------------------------- _failed: dict[str, tuple[int, float]] = {} # key -> (count, last_ts) def throttle_key() -> str: return request.headers.get("X-Forwarded-For", request.remote_addr or "unknown") def throttled(key: str, limit=5, window=900) -> bool: now = time.time() count, last = _failed.get(key, (0, 0)) if now - last > window: count = 0 return count >= limit def bump_fail(key: str) -> None: count, _ = _failed.get(key, (0, 0)) _failed[key] = (count + 1, time.time()) def csrf_token() -> str: tok = session.get("_csrf") if not tok: tok = secrets.token_urlsafe(32) session["_csrf"] = tok return tok def check_csrf() -> bool: return request.form.get("_csrf") == session.get("_csrf") def admin_required(view): from functools import wraps @wraps(view) def _wrap(*args, **kwargs): if session.get("is_admin"): return view(*args, **kwargs) return redirect(url_for("admin_login_form", next=request.path)) return _wrap # ----------------------------------------------------------------------------- # Estimator logic (plain-language form) # ----------------------------------------------------------------------------- def estimate_hours_and_cost(payload: Dict[str, Any]) -> Tuple[float, float, float]: hourly_rate = float(getenv("HOURLY_RATE", "95")) need = payload.get("need", "not-sure") base = { "simple-site": 10, "pro-site": 18, "online-form": 8, "sell-online": 24, "fix-or-improve": 6, "it-help": 6, "custom-app": 28, "not-sure": 8, }.get(need, 8) size = payload.get("scope_size", "small") size_mult = {"small": 1.0, "medium": 1.4, "large": 2.0}.get(size, 1.0) timeline = payload.get("timeline", "flexible") time_mult = {"flexible": 1.0, "soon": 1.1, "rush": 1.25, "critical": 1.45}.get(timeline, 1.0) hours = base * size_mult * time_mult extras = payload.get("extras", []) if isinstance(extras, str): extras = [extras] extra_add = {"content": 3, "branding": 4, "training": 2, "care": 2} for e in extras: hours += extra_add.get(e, 0) hours = max(3, round(hours, 1)) cost = round(hours * hourly_rate, 2) return hours, cost, hourly_rate # ----------------------------------------------------------------------------- # Email # ----------------------------------------------------------------------------- def send_email(subject: str, html_body: str, to_address: str) -> bool: if not SMTP_HOST or not to_address: app.logger.info("SMTP not configured; would send to %s (%s)", to_address, subject) return False try: msg = MIMEMultipart("alternative") msg["Subject"] = subject msg["From"] = SMTP_FROM msg["To"] = to_address msg.attach(MIMEText(html_body, "html")) with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server: server.starttls() if SMTP_USER and SMTP_PASS: server.login(SMTP_USER, SMTP_PASS) server.sendmail(SMTP_FROM, [to_address], msg.as_string()) return True except Exception as e: app.logger.error("Email send failed: %s", e) return False # ----------------------------------------------------------------------------- # Routes: public # ----------------------------------------------------------------------------- @app.get("/") def index(): return render_template("index.html") @app.post("/submit") def submit(): payload = { "name": request.form.get("name","").strip(), "email": request.form.get("email","").strip(), "phone": request.form.get("phone","").strip(), "company": request.form.get("company","").strip(), "need": request.form.get("need","not-sure"), "scope_size": request.form.get("scope_size","small"), "timeline": request.form.get("timeline","flexible"), "extras": request.form.getlist("extras"), "budget_feel": request.form.get("budget_feel","unsure"), "description": request.form.get("description","").strip(), } if not payload["name"] or not payload["email"]: flash("Name and Email are required.", "error") return redirect(url_for("index")) est_hours, est_cost, hourly_rate = estimate_hours_and_cost(payload) con = sqlite3.connect(DB_PATH) cur = con.cursor() cur.execute(""" INSERT INTO quote_requests (created_at, name, email, phone, company, project_type, complexity, urgency, features, budget_range, description, attachments, est_hours, est_cost, hourly_rate, json_payload) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( datetime.utcnow().isoformat(), payload["name"], payload["email"], payload["phone"], payload["company"], payload["need"], payload["scope_size"], payload["timeline"], ",".join(payload["extras"]), payload["budget_feel"], payload["description"], "", est_hours, est_cost, hourly_rate, json.dumps(payload) )) con.commit() con.close() admin_html = render_template("new_request_email.html", payload=payload, est_hours=est_hours, est_cost=est_cost, hourly_rate=hourly_rate, base_url=BASE_URL) send_email("New Quote Request Received", admin_html, ADMIN_EMAIL) return redirect(url_for("thanks")) @app.get("/thanks") def thanks(): return render_template("thanks.html") # ----------------------------------------------------------------------------- # Routes: auth # ----------------------------------------------------------------------------- @app.get("/admin/login") def admin_login_form(): return render_template("login.html", csrf=csrf_token(), next=request.args.get("next","/admin")) @app.post("/admin/login") def admin_login(): if not check_csrf(): return "Bad CSRF", 400 key = throttle_key() if throttled(key): return "Too many attempts. Try again later.", 429 username = request.form.get("username","").strip() password = request.form.get("password","") ok = (username == ADMIN_USERNAME and ADMIN_PASSWORD_HASH and check_password_hash(ADMIN_PASSWORD_HASH, password)) if not ok: bump_fail(key) flash("Invalid username or password.", "error") return redirect(url_for("admin_login_form")) session["is_admin"] = True session.permanent = True if request.form.get("remember") == "on" else False flash("Signed in.", "ok") dest = request.form.get("next") or url_for("admin") return redirect(dest) @app.post("/admin/logout") @admin_required def admin_logout(): session.clear() flash("Signed out.", "ok") return redirect(url_for("admin_login_form")) # ----------------------------------------------------------------------------- # Routes: admin views & actions # ----------------------------------------------------------------------------- @app.get("/admin") @admin_required def admin(): # Filters: active (default), open, completed, deleted, all show = request.args.get("show", "active") where = [] if show == "open": where.append("(status IS NULL OR status='open') AND deleted_at IS NULL") elif show == "completed": where.append("status='completed' AND deleted_at IS NULL") elif show == "deleted": where.append("deleted_at IS NOT NULL") elif show == "all": pass else: # active where.append("deleted_at IS NULL") query = "SELECT * FROM quote_requests" if where: query += " WHERE " + " AND ".join(where) query += " ORDER BY id DESC LIMIT 500" con = sqlite3.connect(DB_PATH) con.row_factory = sqlite3.Row cur = con.cursor() cur.execute(query) rows = cur.fetchall() con.close() return render_template("admin.html", rows=rows, csrf=csrf_token(), show=show) @app.post("/admin/request//complete") @admin_required def mark_complete(rid: int): if not check_csrf(): return "Bad CSRF", 400 con = sqlite3.connect(DB_PATH) cur = con.cursor() cur.execute(""" UPDATE quote_requests SET status='completed', completed_at=? WHERE id=? AND deleted_at IS NULL """, (datetime.utcnow().isoformat(), rid)) con.commit() con.close() flash(f"Request #{rid} marked as completed.", "ok") return redirect(url_for("admin", show=request.args.get("show","active"))) @app.post("/admin/request//delete") @admin_required def delete_request(rid: int): if not check_csrf(): return "Bad CSRF", 400 hard = request.args.get("hard") == "1" con = sqlite3.connect(DB_PATH) cur = con.cursor() if hard: cur.execute("DELETE FROM quote_requests WHERE id=?", (rid,)) flash(f"Request #{rid} permanently deleted.", "ok") else: cur.execute(""" UPDATE quote_requests SET deleted_at=? WHERE id=? AND deleted_at IS NULL """, (datetime.utcnow().isoformat(), rid)) flash(f"Request #{rid} moved to Deleted.", "ok") con.commit() con.close() return redirect(url_for("admin", show=request.args.get("show","active"))) # Protected preview (for your client email template) @app.get("/preview-client-email") @admin_required def preview_client_email(): sample = { "client_name": "Ben", "project_title": "Website + Basic Admin", "proposal_summary": "Design + dev of a 5-page brochure site with contact form and basic admin panel.", "est_hours": 24, "hourly_rate": 95, "est_cost": 2280, "valid_until": "November 15, 2025", "next_steps_url": "https://example.com/pay", "contact_email": ADMIN_EMAIL, "company_name": "Benny's House — NetDeploy", } return render_template("quote_email.html", **sample) # ----------------------------------------------------------------------------- # Dev server # ----------------------------------------------------------------------------- if __name__ == "__main__": app.run(debug=True)