Files
netdeploy.net/app.py

398 lines
14 KiB
Python

# app.py
from __future__ import annotations
import os, smtplib, sqlite3, json, secrets, time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime, timedelta
from typing import Dict, Any, Tuple
from dotenv import load_dotenv
from flask import (
Flask, render_template, request, redirect, url_for, flash, session
)
from werkzeug.security import check_password_hash
# -----------------------------------------------------------------------------
# Load .env FIRST
# -----------------------------------------------------------------------------
load_dotenv() # must come before getenv() usage
# -----------------------------------------------------------------------------
# Config
# -----------------------------------------------------------------------------
def getenv(name: str, default: str | None=None) -> str | None:
return os.environ.get(name, default)
APP_SECRET_KEY = getenv("APP_SECRET_KEY", "dev_change_me")
# Single admin (from .env)
ADMIN_USERNAME = getenv("ADMIN_USERNAME", "")
ADMIN_PASSWORD_HASH = getenv("ADMIN_PASSWORD_HASH", "")
ADMIN_EMAIL = getenv("ADMIN_EMAIL", "admin@example.com")
SMTP_HOST = getenv("SMTP_HOST", "")
SMTP_PORT = int(getenv("SMTP_PORT", "587") or "587")
SMTP_USER = getenv("SMTP_USER", "")
SMTP_PASS = getenv("SMTP_PASS", "")
SMTP_FROM = getenv("SMTP_FROM", SMTP_USER or "no-reply@example.com")
BASE_URL = getenv("BASE_URL", "http://localhost:5000")
DB_PATH = getenv("DB_PATH", "quotes.db")
# Session hardening (use HTTPS in prod)
SESSION_COOKIE_SECURE = getenv("SESSION_COOKIE_SECURE", "false").lower() == "true"
SESSION_COOKIE_HTTPONLY = getenv("SESSION_COOKIE_HTTPONLY", "true").lower() == "true"
SESSION_COOKIE_SAMESITE = getenv("SESSION_COOKIE_SAMESITE", "Lax")
app = Flask(__name__, static_folder="static", static_url_path="/static")
app.secret_key = APP_SECRET_KEY
app.config.update(
SESSION_COOKIE_SECURE=SESSION_COOKIE_SECURE,
SESSION_COOKIE_HTTPONLY=SESSION_COOKIE_HTTPONLY,
SESSION_COOKIE_SAMESITE=SESSION_COOKIE_SAMESITE,
PERMANENT_SESSION_LIFETIME=timedelta(days=30),
)
# -----------------------------------------------------------------------------
# DB
# -----------------------------------------------------------------------------
def init_db() -> None:
con = sqlite3.connect(DB_PATH)
cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS quote_requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL,
name TEXT NOT NULL,
email TEXT NOT NULL,
phone TEXT,
company TEXT,
project_type TEXT,
complexity TEXT,
urgency TEXT,
features TEXT,
budget_range TEXT,
description TEXT,
attachments TEXT,
est_hours REAL,
est_cost REAL,
hourly_rate REAL,
json_payload TEXT
)
""")
con.commit()
con.close()
def migrate_db() -> None:
"""Add QoL columns if missing."""
con = sqlite3.connect(DB_PATH)
cur = con.cursor()
cur.execute("PRAGMA table_info(quote_requests)")
cols = {row[1] for row in cur.fetchall()}
if "status" not in cols:
cur.execute("ALTER TABLE quote_requests ADD COLUMN status TEXT DEFAULT 'open'")
if "completed_at" not in cols:
cur.execute("ALTER TABLE quote_requests ADD COLUMN completed_at TEXT")
if "deleted_at" not in cols:
cur.execute("ALTER TABLE quote_requests ADD COLUMN deleted_at TEXT")
con.commit()
con.close()
init_db()
migrate_db()
# -----------------------------------------------------------------------------
# Auth helpers (sessions + CSRF + simple throttle)
# -----------------------------------------------------------------------------
_failed: dict[str, tuple[int, float]] = {} # key -> (count, last_ts)
def throttle_key() -> str:
return request.headers.get("X-Forwarded-For", request.remote_addr or "unknown")
def throttled(key: str, limit=5, window=900) -> bool:
now = time.time()
count, last = _failed.get(key, (0, 0))
if now - last > window:
count = 0
return count >= limit
def bump_fail(key: str) -> None:
count, _ = _failed.get(key, (0, 0))
_failed[key] = (count + 1, time.time())
def csrf_token() -> str:
tok = session.get("_csrf")
if not tok:
tok = secrets.token_urlsafe(32)
session["_csrf"] = tok
return tok
def check_csrf() -> bool:
return request.form.get("_csrf") == session.get("_csrf")
def admin_required(view):
from functools import wraps
@wraps(view)
def _wrap(*args, **kwargs):
if session.get("is_admin"):
return view(*args, **kwargs)
return redirect(url_for("admin_login_form", next=request.path))
return _wrap
# -----------------------------------------------------------------------------
# Estimator logic (plain-language form)
# -----------------------------------------------------------------------------
def estimate_hours_and_cost(payload: Dict[str, Any]) -> Tuple[float, float, float]:
hourly_rate = float(getenv("HOURLY_RATE", "95"))
need = payload.get("need", "not-sure")
base = {
"simple-site": 10,
"pro-site": 18,
"online-form": 8,
"sell-online": 24,
"fix-or-improve": 6,
"it-help": 6,
"custom-app": 28,
"not-sure": 8,
}.get(need, 8)
size = payload.get("scope_size", "small")
size_mult = {"small": 1.0, "medium": 1.4, "large": 2.0}.get(size, 1.0)
timeline = payload.get("timeline", "flexible")
time_mult = {"flexible": 1.0, "soon": 1.1, "rush": 1.25, "critical": 1.45}.get(timeline, 1.0)
hours = base * size_mult * time_mult
extras = payload.get("extras", [])
if isinstance(extras, str):
extras = [extras]
extra_add = {"content": 3, "branding": 4, "training": 2, "care": 2}
for e in extras:
hours += extra_add.get(e, 0)
hours = max(3, round(hours, 1))
cost = round(hours * hourly_rate, 2)
return hours, cost, hourly_rate
# -----------------------------------------------------------------------------
# Email
# -----------------------------------------------------------------------------
def send_email(subject: str, html_body: str, to_address: str) -> bool:
if not SMTP_HOST or not to_address:
app.logger.info("SMTP not configured; would send to %s (%s)", to_address, subject)
return False
try:
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = SMTP_FROM
msg["To"] = to_address
msg.attach(MIMEText(html_body, "html"))
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
server.starttls()
if SMTP_USER and SMTP_PASS:
server.login(SMTP_USER, SMTP_PASS)
server.sendmail(SMTP_FROM, [to_address], msg.as_string())
return True
except Exception as e:
app.logger.error("Email send failed: %s", e)
return False
# -----------------------------------------------------------------------------
# Routes: public
# -----------------------------------------------------------------------------
@app.get("/")
def index():
return render_template("index.html")
@app.post("/submit")
def submit():
payload = {
"name": request.form.get("name","").strip(),
"email": request.form.get("email","").strip(),
"phone": request.form.get("phone","").strip(),
"company": request.form.get("company","").strip(),
"need": request.form.get("need","not-sure"),
"scope_size": request.form.get("scope_size","small"),
"timeline": request.form.get("timeline","flexible"),
"extras": request.form.getlist("extras"),
"budget_feel": request.form.get("budget_feel","unsure"),
"description": request.form.get("description","").strip(),
}
if not payload["name"] or not payload["email"]:
flash("Name and Email are required.", "error")
return redirect(url_for("index"))
est_hours, est_cost, hourly_rate = estimate_hours_and_cost(payload)
con = sqlite3.connect(DB_PATH)
cur = con.cursor()
cur.execute("""
INSERT INTO quote_requests
(created_at, name, email, phone, company,
project_type, complexity, urgency, features, budget_range,
description, attachments, est_hours, est_cost, hourly_rate, json_payload)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
datetime.utcnow().isoformat(),
payload["name"], payload["email"], payload["phone"], payload["company"],
payload["need"], payload["scope_size"], payload["timeline"],
",".join(payload["extras"]), payload["budget_feel"],
payload["description"], "",
est_hours, est_cost, hourly_rate, json.dumps(payload)
))
con.commit()
con.close()
admin_html = render_template("new_request_email.html",
payload=payload,
est_hours=est_hours,
est_cost=est_cost,
hourly_rate=hourly_rate,
base_url=BASE_URL)
send_email("New Quote Request Received", admin_html, ADMIN_EMAIL)
return redirect(url_for("thanks"))
@app.get("/thanks")
def thanks():
return render_template("thanks.html")
# -----------------------------------------------------------------------------
# Routes: auth
# -----------------------------------------------------------------------------
@app.get("/admin/login")
def admin_login_form():
return render_template("login.html", csrf=csrf_token(), next=request.args.get("next","/admin"))
@app.post("/admin/login")
def admin_login():
if not check_csrf():
return "Bad CSRF", 400
key = throttle_key()
if throttled(key):
return "Too many attempts. Try again later.", 429
username = request.form.get("username","").strip()
password = request.form.get("password","")
ok = (username == ADMIN_USERNAME and ADMIN_PASSWORD_HASH and check_password_hash(ADMIN_PASSWORD_HASH, password))
if not ok:
bump_fail(key)
flash("Invalid username or password.", "error")
return redirect(url_for("admin_login_form"))
session["is_admin"] = True
session.permanent = True if request.form.get("remember") == "on" else False
flash("Signed in.", "ok")
dest = request.form.get("next") or url_for("admin")
return redirect(dest)
@app.post("/admin/logout")
@admin_required
def admin_logout():
session.clear()
flash("Signed out.", "ok")
return redirect(url_for("admin_login_form"))
# -----------------------------------------------------------------------------
# Routes: admin views & actions
# -----------------------------------------------------------------------------
@app.get("/admin")
@admin_required
def admin():
# Filters: active (default), open, completed, deleted, all
show = request.args.get("show", "active")
where = []
if show == "open":
where.append("(status IS NULL OR status='open') AND deleted_at IS NULL")
elif show == "completed":
where.append("status='completed' AND deleted_at IS NULL")
elif show == "deleted":
where.append("deleted_at IS NOT NULL")
elif show == "all":
pass
else: # active
where.append("deleted_at IS NULL")
query = "SELECT * FROM quote_requests"
if where:
query += " WHERE " + " AND ".join(where)
query += " ORDER BY id DESC LIMIT 500"
con = sqlite3.connect(DB_PATH)
con.row_factory = sqlite3.Row
cur = con.cursor()
cur.execute(query)
rows = cur.fetchall()
con.close()
return render_template("admin.html", rows=rows, csrf=csrf_token(), show=show)
@app.post("/admin/request/<int:rid>/complete")
@admin_required
def mark_complete(rid: int):
if not check_csrf():
return "Bad CSRF", 400
con = sqlite3.connect(DB_PATH)
cur = con.cursor()
cur.execute("""
UPDATE quote_requests
SET status='completed', completed_at=?
WHERE id=? AND deleted_at IS NULL
""", (datetime.utcnow().isoformat(), rid))
con.commit()
con.close()
flash(f"Request #{rid} marked as completed.", "ok")
return redirect(url_for("admin", show=request.args.get("show","active")))
@app.post("/admin/request/<int:rid>/delete")
@admin_required
def delete_request(rid: int):
if not check_csrf():
return "Bad CSRF", 400
hard = request.args.get("hard") == "1"
con = sqlite3.connect(DB_PATH)
cur = con.cursor()
if hard:
cur.execute("DELETE FROM quote_requests WHERE id=?", (rid,))
flash(f"Request #{rid} permanently deleted.", "ok")
else:
cur.execute("""
UPDATE quote_requests SET deleted_at=?
WHERE id=? AND deleted_at IS NULL
""", (datetime.utcnow().isoformat(), rid))
flash(f"Request #{rid} moved to Deleted.", "ok")
con.commit()
con.close()
return redirect(url_for("admin", show=request.args.get("show","active")))
# Protected preview (for your client email template)
@app.get("/preview-client-email")
@admin_required
def preview_client_email():
sample = {
"client_name": "Ben",
"project_title": "Website + Basic Admin",
"proposal_summary": "Design + dev of a 5-page brochure site with contact form and basic admin panel.",
"est_hours": 24, "hourly_rate": 95, "est_cost": 2280,
"valid_until": "November 15, 2025",
"next_steps_url": "https://example.com/pay",
"contact_email": ADMIN_EMAIL,
"company_name": "Benny's House — NetDeploy",
}
return render_template("quote_email.html", **sample)
# -----------------------------------------------------------------------------
# Dev server
# -----------------------------------------------------------------------------
if __name__ == "__main__":
app.run(debug=True)