import os from flask import Flask, render_template, request, redirect, url_for, session, flash, g, jsonify from dotenv import load_dotenv import sqlite3 from datetime import timedelta import requests from google.oauth2 import service_account from googleapiclient.discovery import build import json import logging load_dotenv() app = Flask(__name__) app.secret_key = os.getenv('FLASK_SECRET_KEY', os.urandom(24)) app.permanent_session_lifetime = timedelta(minutes=30) app.config['SESSION_COOKIE_PATH'] = '/' USERNAME = os.getenv('FLASK_LOGIN_USER') PASSWORD = os.getenv('FLASK_LOGIN_PASSWORD') DATABASE = 'work_orders.db' PUSHOVER_API_TOKEN = os.getenv('PUSHOVER_API_TOKEN') PUSHOVER_USER_KEY = os.getenv('PUSHOVER_USER_KEY') GA_PROPERTY_ID = os.getenv('GA_PROPERTY_ID') GA_CREDENTIALS = 'ga_key.json' RECAPTCHA_SITE_KEY = os.getenv('RECAPTCHA_SITE_KEY') RECAPTCHA_SECRET_KEY = os.getenv('RECAPTCHA_SECRET_KEY') logging.basicConfig( filename='ga4_debug.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s' ) def get_db(): db = getattr(g, '_database', None) if db is None: db = g._database = sqlite3.connect(DATABASE) return db @app.teardown_appcontext def close_connection(exception): db = getattr(g, '_database', None) if db is not None: db.close() def init_db(): with app.app_context(): db = get_db() db.execute('''CREATE TABLE IF NOT EXISTS orders ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, job TEXT NOT NULL, address TEXT NOT NULL, city TEXT NOT NULL, state TEXT NOT NULL, zipcode INTEGER NOT NULL, phone TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );''') db.execute('''CREATE TABLE IF NOT EXISTS completed_orders ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, job TEXT NOT NULL, address TEXT NOT NULL, city TEXT NOT NULL, state TEXT NOT NULL, zipcode INTEGER NOT NULL, phone TEXT NOT NULL, completed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP );''') db.commit() # ── Public pages ────────────────────────────────────────────── @app.route('/') def index(): return render_template('index.html') @app.route('/about') def about(): return render_template('about.html') @app.route('/services') def services(): return render_template('services.html') @app.route('/reviews') def reviews(): return render_template('reviews.html') @app.route('/contact') def contact(): return render_template('contact.html') def verify_recaptcha(token): if not RECAPTCHA_SECRET_KEY: return True # skip check if not configured yet try: resp = requests.post( 'https://www.google.com/recaptcha/api/siteverify', data={'secret': RECAPTCHA_SECRET_KEY, 'response': token}, timeout=5, ) result = resp.json() return result.get('success') and result.get('score', 0) >= 0.5 except Exception: return True # fail open if Google is unreachable @app.route('/work-order') def work_order(): return render_template('work_order.html', recaptcha_site_key=RECAPTCHA_SITE_KEY) # ── Work order submission ───────────────────────────────────── @app.route('/submit', methods=['POST']) def submit(): # Honeypot: real users never fill this field; bots usually do if request.form.get('website'): session['form_submitted'] = True return redirect(url_for('success')) # silent reject — bot thinks it worked # reCAPTCHA v3 score check token = request.form.get('g-recaptcha-response', '') if not verify_recaptcha(token): session['form_submitted'] = True return redirect(url_for('failure')) try: name = request.form.get('name') job = request.form.get('job') address = request.form.get('address') city = request.form.get('city') state = request.form.get('state') zipcode = request.form.get('zipcode') phone = request.form.get('phone') conn = get_db() conn.execute('''INSERT INTO orders (name, job, address, city, state, zipcode, phone) VALUES (?, ?, ?, ?, ?, ?, ?)''', (name, job, address, city, state, zipcode, phone)) conn.commit() send_push_notification(name, job) session['form_submitted'] = True return redirect(url_for('success')) except Exception as e: session['form_submitted'] = True return redirect(url_for('failure')) @app.route('/success') def success(): if not session.pop('form_submitted', None): return redirect(url_for('work_order')) return render_template('success.html') @app.route('/failure') def failure(): if not session.pop('form_submitted', None): return redirect(url_for('work_order')) return render_template('failure.html') # ── Auth ────────────────────────────────────────────────────── @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] if username == USERNAME and password == PASSWORD: session.permanent = True session['logged_in'] = True return redirect(url_for('admin')) else: flash('Invalid username or password.', 'danger') return render_template('login.html') @app.route('/logout', methods=['GET', 'POST']) def logout(): session.pop('logged_in', None) flash('You have been logged out.') return redirect(url_for('index')) # ── Admin ───────────────────────────────────────────────────── @app.route('/admin') def admin(): if not session.get('logged_in'): return redirect(url_for('login')) conn = get_db() orders = conn.execute('SELECT * FROM orders').fetchall() return render_template('admin.html', work_orders=orders) @app.route('/delete_order/', methods=['POST']) def delete_order(order_id): if not session.get('logged_in'): return redirect(url_for('login')) try: db = get_db() db.execute('DELETE FROM orders WHERE id = ?', (order_id,)) db.commit() flash('Work order deleted.', 'success') except Exception as e: flash(f'Error deleting work order: {str(e)}', 'danger') return redirect(url_for('admin')) @app.route('/mark_complete/', methods=['POST']) def mark_complete(order_id): if not session.get('logged_in'): return redirect(url_for('login')) try: db = get_db() order = db.execute('SELECT * FROM orders WHERE id = ?', (order_id,)).fetchone() if order: db.execute('''INSERT INTO completed_orders (name, job, address, city, state, zipcode, phone) VALUES (?, ?, ?, ?, ?, ?, ?)''', (order[1], order[2], order[3], order[4], order[5], order[6], order[7])) db.execute('DELETE FROM orders WHERE id = ?', (order_id,)) db.commit() flash('Work order marked as complete.', 'success') else: flash('Work order not found.', 'danger') except Exception as e: flash(f'Error: {str(e)}', 'danger') return redirect(url_for('admin')) @app.route('/completed_jobs') def completed_jobs(): if not session.get('logged_in'): return redirect(url_for('login')) db = get_db() jobs = db.execute('SELECT * FROM completed_orders').fetchall() return render_template('completed_jobs.html', completed_jobs=jobs) # ── Analytics ───────────────────────────────────────────────── def get_ga4_data(): try: credentials = service_account.Credentials.from_service_account_file( GA_CREDENTIALS, scopes=["https://www.googleapis.com/auth/analytics.readonly"] ) analytics = build("analyticsdata", "v1beta", credentials=credentials) response = analytics.properties().runReport( property=f"properties/{int(GA_PROPERTY_ID)}", body={ "dateRanges": [{"startDate": "7daysAgo", "endDate": "yesterday"}], "metrics": [{"name": "totalUsers"}, {"name": "screenPageViews"}], "dimensions": [{"name": "pagePath"}], "limit": 1, }, ).execute() logging.debug("Full GA4 API Response: %s", json.dumps(response, indent=2)) rows = response.get("rows", []) if not rows: return {"total_users": "0", "total_pageviews": "0", "top_page": "N/A"} top_page = rows[0]["dimensionValues"][0]["value"] metric_values = rows[0]["metricValues"] total_users = metric_values[0]["value"] if len(metric_values) > 0 else "0" total_pageviews = metric_values[1]["value"] if len(metric_values) > 1 else "0" return {"total_users": total_users, "total_pageviews": total_pageviews, "top_page": top_page} except Exception as e: logging.error("Google Analytics API Error: %s", str(e)) return {"total_users": "0", "total_pageviews": "0", "top_page": "N/A"} @app.route('/analytics') def analytics(): if not session.get('logged_in'): return redirect(url_for('login')) return jsonify(get_ga4_data()) # ── Notifications ───────────────────────────────────────────── def send_push_notification(name, job): if not PUSHOVER_API_TOKEN or not PUSHOVER_USER_KEY: return message = f"New Work Order:\nName: {name}\nJob: {job}" data = { "token": PUSHOVER_API_TOKEN, "user": PUSHOVER_USER_KEY, "message": message, } response = requests.post("https://api.pushover.net/1/messages.json", data=data) if response.status_code != 200: print(f"Push notification failed: {response.status_code} - {response.text}") init_db() if __name__ == '__main__': app.run(debug=True)