398 lines
14 KiB
Python
398 lines
14 KiB
Python
# app.py
|
|
from __future__ import annotations
|
|
|
|
import os, smtplib, sqlite3, json, secrets, time
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any, Tuple
|
|
|
|
from dotenv import load_dotenv
|
|
from flask import (
|
|
Flask, render_template, request, redirect, url_for, flash, session
|
|
)
|
|
from werkzeug.security import check_password_hash
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Load .env FIRST
|
|
# -----------------------------------------------------------------------------
|
|
load_dotenv() # must come before getenv() usage
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Config
|
|
# -----------------------------------------------------------------------------
|
|
def getenv(name: str, default: str | None=None) -> str | None:
|
|
return os.environ.get(name, default)
|
|
|
|
APP_SECRET_KEY = getenv("APP_SECRET_KEY", "dev_change_me")
|
|
|
|
# Single admin (from .env)
|
|
ADMIN_USERNAME = getenv("ADMIN_USERNAME", "")
|
|
ADMIN_PASSWORD_HASH = getenv("ADMIN_PASSWORD_HASH", "")
|
|
|
|
ADMIN_EMAIL = getenv("ADMIN_EMAIL", "admin@example.com")
|
|
SMTP_HOST = getenv("SMTP_HOST", "")
|
|
SMTP_PORT = int(getenv("SMTP_PORT", "587") or "587")
|
|
SMTP_USER = getenv("SMTP_USER", "")
|
|
SMTP_PASS = getenv("SMTP_PASS", "")
|
|
SMTP_FROM = getenv("SMTP_FROM", SMTP_USER or "no-reply@example.com")
|
|
BASE_URL = getenv("BASE_URL", "http://localhost:5000")
|
|
|
|
DB_PATH = getenv("DB_PATH", "quotes.db")
|
|
|
|
# Session hardening (use HTTPS in prod)
|
|
SESSION_COOKIE_SECURE = getenv("SESSION_COOKIE_SECURE", "false").lower() == "true"
|
|
SESSION_COOKIE_HTTPONLY = getenv("SESSION_COOKIE_HTTPONLY", "true").lower() == "true"
|
|
SESSION_COOKIE_SAMESITE = getenv("SESSION_COOKIE_SAMESITE", "Lax")
|
|
|
|
app = Flask(__name__, static_folder="static", static_url_path="/static")
|
|
app.secret_key = APP_SECRET_KEY
|
|
app.config.update(
|
|
SESSION_COOKIE_SECURE=SESSION_COOKIE_SECURE,
|
|
SESSION_COOKIE_HTTPONLY=SESSION_COOKIE_HTTPONLY,
|
|
SESSION_COOKIE_SAMESITE=SESSION_COOKIE_SAMESITE,
|
|
PERMANENT_SESSION_LIFETIME=timedelta(days=30),
|
|
)
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# DB
|
|
# -----------------------------------------------------------------------------
|
|
def init_db() -> None:
|
|
con = sqlite3.connect(DB_PATH)
|
|
cur = con.cursor()
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS quote_requests (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
created_at TEXT NOT NULL,
|
|
name TEXT NOT NULL,
|
|
email TEXT NOT NULL,
|
|
phone TEXT,
|
|
company TEXT,
|
|
project_type TEXT,
|
|
complexity TEXT,
|
|
urgency TEXT,
|
|
features TEXT,
|
|
budget_range TEXT,
|
|
description TEXT,
|
|
attachments TEXT,
|
|
est_hours REAL,
|
|
est_cost REAL,
|
|
hourly_rate REAL,
|
|
json_payload TEXT
|
|
)
|
|
""")
|
|
con.commit()
|
|
con.close()
|
|
|
|
def migrate_db() -> None:
|
|
"""Add QoL columns if missing."""
|
|
con = sqlite3.connect(DB_PATH)
|
|
cur = con.cursor()
|
|
cur.execute("PRAGMA table_info(quote_requests)")
|
|
cols = {row[1] for row in cur.fetchall()}
|
|
|
|
if "status" not in cols:
|
|
cur.execute("ALTER TABLE quote_requests ADD COLUMN status TEXT DEFAULT 'open'")
|
|
if "completed_at" not in cols:
|
|
cur.execute("ALTER TABLE quote_requests ADD COLUMN completed_at TEXT")
|
|
if "deleted_at" not in cols:
|
|
cur.execute("ALTER TABLE quote_requests ADD COLUMN deleted_at TEXT")
|
|
|
|
con.commit()
|
|
con.close()
|
|
|
|
init_db()
|
|
migrate_db()
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Auth helpers (sessions + CSRF + simple throttle)
|
|
# -----------------------------------------------------------------------------
|
|
_failed: dict[str, tuple[int, float]] = {} # key -> (count, last_ts)
|
|
|
|
def throttle_key() -> str:
|
|
return request.headers.get("X-Forwarded-For", request.remote_addr or "unknown")
|
|
|
|
def throttled(key: str, limit=5, window=900) -> bool:
|
|
now = time.time()
|
|
count, last = _failed.get(key, (0, 0))
|
|
if now - last > window:
|
|
count = 0
|
|
return count >= limit
|
|
|
|
def bump_fail(key: str) -> None:
|
|
count, _ = _failed.get(key, (0, 0))
|
|
_failed[key] = (count + 1, time.time())
|
|
|
|
def csrf_token() -> str:
|
|
tok = session.get("_csrf")
|
|
if not tok:
|
|
tok = secrets.token_urlsafe(32)
|
|
session["_csrf"] = tok
|
|
return tok
|
|
|
|
def check_csrf() -> bool:
|
|
return request.form.get("_csrf") == session.get("_csrf")
|
|
|
|
def admin_required(view):
|
|
from functools import wraps
|
|
@wraps(view)
|
|
def _wrap(*args, **kwargs):
|
|
if session.get("is_admin"):
|
|
return view(*args, **kwargs)
|
|
return redirect(url_for("admin_login_form", next=request.path))
|
|
return _wrap
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Estimator logic (plain-language form)
|
|
# -----------------------------------------------------------------------------
|
|
def estimate_hours_and_cost(payload: Dict[str, Any]) -> Tuple[float, float, float]:
|
|
hourly_rate = float(getenv("HOURLY_RATE", "95"))
|
|
|
|
need = payload.get("need", "not-sure")
|
|
base = {
|
|
"simple-site": 10,
|
|
"pro-site": 18,
|
|
"online-form": 8,
|
|
"sell-online": 24,
|
|
"fix-or-improve": 6,
|
|
"it-help": 6,
|
|
"custom-app": 28,
|
|
"not-sure": 8,
|
|
}.get(need, 8)
|
|
|
|
size = payload.get("scope_size", "small")
|
|
size_mult = {"small": 1.0, "medium": 1.4, "large": 2.0}.get(size, 1.0)
|
|
|
|
timeline = payload.get("timeline", "flexible")
|
|
time_mult = {"flexible": 1.0, "soon": 1.1, "rush": 1.25, "critical": 1.45}.get(timeline, 1.0)
|
|
|
|
hours = base * size_mult * time_mult
|
|
|
|
extras = payload.get("extras", [])
|
|
if isinstance(extras, str):
|
|
extras = [extras]
|
|
extra_add = {"content": 3, "branding": 4, "training": 2, "care": 2}
|
|
for e in extras:
|
|
hours += extra_add.get(e, 0)
|
|
|
|
hours = max(3, round(hours, 1))
|
|
cost = round(hours * hourly_rate, 2)
|
|
return hours, cost, hourly_rate
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Email
|
|
# -----------------------------------------------------------------------------
|
|
def send_email(subject: str, html_body: str, to_address: str) -> bool:
|
|
if not SMTP_HOST or not to_address:
|
|
app.logger.info("SMTP not configured; would send to %s (%s)", to_address, subject)
|
|
return False
|
|
try:
|
|
msg = MIMEMultipart("alternative")
|
|
msg["Subject"] = subject
|
|
msg["From"] = SMTP_FROM
|
|
msg["To"] = to_address
|
|
msg.attach(MIMEText(html_body, "html"))
|
|
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
|
|
server.starttls()
|
|
if SMTP_USER and SMTP_PASS:
|
|
server.login(SMTP_USER, SMTP_PASS)
|
|
server.sendmail(SMTP_FROM, [to_address], msg.as_string())
|
|
return True
|
|
except Exception as e:
|
|
app.logger.error("Email send failed: %s", e)
|
|
return False
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Routes: public
|
|
# -----------------------------------------------------------------------------
|
|
@app.get("/")
|
|
def index():
|
|
return render_template("index.html")
|
|
|
|
@app.post("/submit")
|
|
def submit():
|
|
payload = {
|
|
"name": request.form.get("name","").strip(),
|
|
"email": request.form.get("email","").strip(),
|
|
"phone": request.form.get("phone","").strip(),
|
|
"company": request.form.get("company","").strip(),
|
|
|
|
"need": request.form.get("need","not-sure"),
|
|
"scope_size": request.form.get("scope_size","small"),
|
|
"timeline": request.form.get("timeline","flexible"),
|
|
"extras": request.form.getlist("extras"),
|
|
"budget_feel": request.form.get("budget_feel","unsure"),
|
|
|
|
"description": request.form.get("description","").strip(),
|
|
}
|
|
|
|
if not payload["name"] or not payload["email"]:
|
|
flash("Name and Email are required.", "error")
|
|
return redirect(url_for("index"))
|
|
|
|
est_hours, est_cost, hourly_rate = estimate_hours_and_cost(payload)
|
|
|
|
con = sqlite3.connect(DB_PATH)
|
|
cur = con.cursor()
|
|
cur.execute("""
|
|
INSERT INTO quote_requests
|
|
(created_at, name, email, phone, company,
|
|
project_type, complexity, urgency, features, budget_range,
|
|
description, attachments, est_hours, est_cost, hourly_rate, json_payload)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
datetime.utcnow().isoformat(),
|
|
payload["name"], payload["email"], payload["phone"], payload["company"],
|
|
payload["need"], payload["scope_size"], payload["timeline"],
|
|
",".join(payload["extras"]), payload["budget_feel"],
|
|
payload["description"], "",
|
|
est_hours, est_cost, hourly_rate, json.dumps(payload)
|
|
))
|
|
con.commit()
|
|
con.close()
|
|
|
|
admin_html = render_template("new_request_email.html",
|
|
payload=payload,
|
|
est_hours=est_hours,
|
|
est_cost=est_cost,
|
|
hourly_rate=hourly_rate,
|
|
base_url=BASE_URL)
|
|
send_email("New Quote Request Received", admin_html, ADMIN_EMAIL)
|
|
return redirect(url_for("thanks"))
|
|
|
|
@app.get("/thanks")
|
|
def thanks():
|
|
return render_template("thanks.html")
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Routes: auth
|
|
# -----------------------------------------------------------------------------
|
|
@app.get("/admin/login")
|
|
def admin_login_form():
|
|
return render_template("login.html", csrf=csrf_token(), next=request.args.get("next","/admin"))
|
|
|
|
@app.post("/admin/login")
|
|
def admin_login():
|
|
if not check_csrf():
|
|
return "Bad CSRF", 400
|
|
key = throttle_key()
|
|
if throttled(key):
|
|
return "Too many attempts. Try again later.", 429
|
|
|
|
username = request.form.get("username","").strip()
|
|
password = request.form.get("password","")
|
|
|
|
ok = (username == ADMIN_USERNAME and ADMIN_PASSWORD_HASH and check_password_hash(ADMIN_PASSWORD_HASH, password))
|
|
if not ok:
|
|
bump_fail(key)
|
|
flash("Invalid username or password.", "error")
|
|
return redirect(url_for("admin_login_form"))
|
|
|
|
session["is_admin"] = True
|
|
session.permanent = True if request.form.get("remember") == "on" else False
|
|
flash("Signed in.", "ok")
|
|
dest = request.form.get("next") or url_for("admin")
|
|
return redirect(dest)
|
|
|
|
@app.post("/admin/logout")
|
|
@admin_required
|
|
def admin_logout():
|
|
session.clear()
|
|
flash("Signed out.", "ok")
|
|
return redirect(url_for("admin_login_form"))
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Routes: admin views & actions
|
|
# -----------------------------------------------------------------------------
|
|
@app.get("/admin")
|
|
@admin_required
|
|
def admin():
|
|
# Filters: active (default), open, completed, deleted, all
|
|
show = request.args.get("show", "active")
|
|
where = []
|
|
if show == "open":
|
|
where.append("(status IS NULL OR status='open') AND deleted_at IS NULL")
|
|
elif show == "completed":
|
|
where.append("status='completed' AND deleted_at IS NULL")
|
|
elif show == "deleted":
|
|
where.append("deleted_at IS NOT NULL")
|
|
elif show == "all":
|
|
pass
|
|
else: # active
|
|
where.append("deleted_at IS NULL")
|
|
|
|
query = "SELECT * FROM quote_requests"
|
|
if where:
|
|
query += " WHERE " + " AND ".join(where)
|
|
query += " ORDER BY id DESC LIMIT 500"
|
|
|
|
con = sqlite3.connect(DB_PATH)
|
|
con.row_factory = sqlite3.Row
|
|
cur = con.cursor()
|
|
cur.execute(query)
|
|
rows = cur.fetchall()
|
|
con.close()
|
|
|
|
return render_template("admin.html", rows=rows, csrf=csrf_token(), show=show)
|
|
|
|
@app.post("/admin/request/<int:rid>/complete")
|
|
@admin_required
|
|
def mark_complete(rid: int):
|
|
if not check_csrf():
|
|
return "Bad CSRF", 400
|
|
con = sqlite3.connect(DB_PATH)
|
|
cur = con.cursor()
|
|
cur.execute("""
|
|
UPDATE quote_requests
|
|
SET status='completed', completed_at=?
|
|
WHERE id=? AND deleted_at IS NULL
|
|
""", (datetime.utcnow().isoformat(), rid))
|
|
con.commit()
|
|
con.close()
|
|
flash(f"Request #{rid} marked as completed.", "ok")
|
|
return redirect(url_for("admin", show=request.args.get("show","active")))
|
|
|
|
@app.post("/admin/request/<int:rid>/delete")
|
|
@admin_required
|
|
def delete_request(rid: int):
|
|
if not check_csrf():
|
|
return "Bad CSRF", 400
|
|
hard = request.args.get("hard") == "1"
|
|
con = sqlite3.connect(DB_PATH)
|
|
cur = con.cursor()
|
|
if hard:
|
|
cur.execute("DELETE FROM quote_requests WHERE id=?", (rid,))
|
|
flash(f"Request #{rid} permanently deleted.", "ok")
|
|
else:
|
|
cur.execute("""
|
|
UPDATE quote_requests SET deleted_at=?
|
|
WHERE id=? AND deleted_at IS NULL
|
|
""", (datetime.utcnow().isoformat(), rid))
|
|
flash(f"Request #{rid} moved to Deleted.", "ok")
|
|
con.commit()
|
|
con.close()
|
|
return redirect(url_for("admin", show=request.args.get("show","active")))
|
|
|
|
# Protected preview (for your client email template)
|
|
@app.get("/preview-client-email")
|
|
@admin_required
|
|
def preview_client_email():
|
|
sample = {
|
|
"client_name": "Ben",
|
|
"project_title": "Website + Basic Admin",
|
|
"proposal_summary": "Design + dev of a 5-page brochure site with contact form and basic admin panel.",
|
|
"est_hours": 24, "hourly_rate": 95, "est_cost": 2280,
|
|
"valid_until": "November 15, 2025",
|
|
"next_steps_url": "https://example.com/pay",
|
|
"contact_email": ADMIN_EMAIL,
|
|
"company_name": "Benny's House — NetDeploy",
|
|
}
|
|
return render_template("quote_email.html", **sample)
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Dev server
|
|
# -----------------------------------------------------------------------------
|
|
if __name__ == "__main__":
|
|
app.run(debug=True)
|
|
|