322 lines
10 KiB
Python
322 lines
10 KiB
Python
import os
|
|
from flask import Flask, render_template, request, redirect, url_for, session, flash, g, jsonify
|
|
from dotenv import load_dotenv
|
|
import sqlite3
|
|
from datetime import timedelta
|
|
import requests
|
|
from google.oauth2 import service_account
|
|
from googleapiclient.discovery import build
|
|
import json
|
|
import logging
|
|
load_dotenv()
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.getenv('FLASK_SECRET_KEY', os.urandom(24))
|
|
app.permanent_session_lifetime = timedelta(minutes=30)
|
|
app.config['SESSION_COOKIE_PATH'] = '/'
|
|
|
|
USERNAME = os.getenv('FLASK_LOGIN_USER')
|
|
PASSWORD = os.getenv('FLASK_LOGIN_PASSWORD')
|
|
DATABASE = 'work_orders.db'
|
|
|
|
PUSHOVER_API_TOKEN = os.getenv('PUSHOVER_API_TOKEN')
|
|
PUSHOVER_USER_KEY = os.getenv('PUSHOVER_USER_KEY')
|
|
|
|
GA_PROPERTY_ID = os.getenv('GA_PROPERTY_ID')
|
|
GA_CREDENTIALS = 'ga_key.json'
|
|
|
|
RECAPTCHA_SITE_KEY = os.getenv('RECAPTCHA_SITE_KEY')
|
|
RECAPTCHA_SECRET_KEY = os.getenv('RECAPTCHA_SECRET_KEY')
|
|
|
|
logging.basicConfig(
|
|
filename='ga4_debug.log',
|
|
level=logging.DEBUG,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
|
|
def get_db():
|
|
db = getattr(g, '_database', None)
|
|
if db is None:
|
|
db = g._database = sqlite3.connect(DATABASE)
|
|
return db
|
|
|
|
|
|
@app.teardown_appcontext
|
|
def close_connection(exception):
|
|
db = getattr(g, '_database', None)
|
|
if db is not None:
|
|
db.close()
|
|
|
|
|
|
def init_db():
|
|
with app.app_context():
|
|
db = get_db()
|
|
db.execute('''CREATE TABLE IF NOT EXISTS orders (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
job TEXT NOT NULL,
|
|
address TEXT NOT NULL,
|
|
city TEXT NOT NULL,
|
|
state TEXT NOT NULL,
|
|
zipcode INTEGER NOT NULL,
|
|
phone TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);''')
|
|
db.execute('''CREATE TABLE IF NOT EXISTS completed_orders (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
job TEXT NOT NULL,
|
|
address TEXT NOT NULL,
|
|
city TEXT NOT NULL,
|
|
state TEXT NOT NULL,
|
|
zipcode INTEGER NOT NULL,
|
|
phone TEXT NOT NULL,
|
|
completed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
);''')
|
|
db.commit()
|
|
|
|
|
|
# ── Public pages ──────────────────────────────────────────────
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return render_template('index.html')
|
|
|
|
|
|
@app.route('/about')
|
|
def about():
|
|
return render_template('about.html')
|
|
|
|
|
|
@app.route('/services')
|
|
def services():
|
|
return render_template('services.html')
|
|
|
|
|
|
@app.route('/reviews')
|
|
def reviews():
|
|
return render_template('reviews.html')
|
|
|
|
|
|
@app.route('/contact')
|
|
def contact():
|
|
return render_template('contact.html')
|
|
|
|
|
|
def verify_recaptcha(token):
|
|
if not RECAPTCHA_SECRET_KEY:
|
|
return True # skip check if not configured yet
|
|
try:
|
|
resp = requests.post(
|
|
'https://www.google.com/recaptcha/api/siteverify',
|
|
data={'secret': RECAPTCHA_SECRET_KEY, 'response': token},
|
|
timeout=5,
|
|
)
|
|
result = resp.json()
|
|
return result.get('success') and result.get('score', 0) >= 0.5
|
|
except Exception:
|
|
return True # fail open if Google is unreachable
|
|
|
|
|
|
@app.route('/work-order')
|
|
def work_order():
|
|
return render_template('work_order.html', recaptcha_site_key=RECAPTCHA_SITE_KEY)
|
|
|
|
|
|
# ── Work order submission ─────────────────────────────────────
|
|
|
|
@app.route('/submit', methods=['POST'])
|
|
def submit():
|
|
# Honeypot: real users never fill this field; bots usually do
|
|
if request.form.get('website'):
|
|
session['form_submitted'] = True
|
|
return redirect(url_for('success')) # silent reject — bot thinks it worked
|
|
|
|
# reCAPTCHA v3 score check
|
|
token = request.form.get('g-recaptcha-response', '')
|
|
if not verify_recaptcha(token):
|
|
session['form_submitted'] = True
|
|
return redirect(url_for('failure'))
|
|
|
|
try:
|
|
name = request.form.get('name')
|
|
job = request.form.get('job')
|
|
address = request.form.get('address')
|
|
city = request.form.get('city')
|
|
state = request.form.get('state')
|
|
zipcode = request.form.get('zipcode')
|
|
phone = request.form.get('phone')
|
|
|
|
conn = get_db()
|
|
conn.execute('''INSERT INTO orders (name, job, address, city, state, zipcode, phone)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)''',
|
|
(name, job, address, city, state, zipcode, phone))
|
|
conn.commit()
|
|
|
|
send_push_notification(name, job)
|
|
session['form_submitted'] = True
|
|
return redirect(url_for('success'))
|
|
|
|
except Exception as e:
|
|
session['form_submitted'] = True
|
|
return redirect(url_for('failure'))
|
|
|
|
|
|
@app.route('/success')
|
|
def success():
|
|
if not session.pop('form_submitted', None):
|
|
return redirect(url_for('work_order'))
|
|
return render_template('success.html')
|
|
|
|
|
|
@app.route('/failure')
|
|
def failure():
|
|
if not session.pop('form_submitted', None):
|
|
return redirect(url_for('work_order'))
|
|
return render_template('failure.html')
|
|
|
|
|
|
# ── Auth ──────────────────────────────────────────────────────
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
if username == USERNAME and password == PASSWORD:
|
|
session.permanent = True
|
|
session['logged_in'] = True
|
|
return redirect(url_for('admin'))
|
|
else:
|
|
flash('Invalid username or password.', 'danger')
|
|
return render_template('login.html')
|
|
|
|
|
|
@app.route('/logout', methods=['GET', 'POST'])
|
|
def logout():
|
|
session.pop('logged_in', None)
|
|
flash('You have been logged out.')
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
# ── Admin ─────────────────────────────────────────────────────
|
|
|
|
@app.route('/admin')
|
|
def admin():
|
|
if not session.get('logged_in'):
|
|
return redirect(url_for('login'))
|
|
conn = get_db()
|
|
orders = conn.execute('SELECT * FROM orders').fetchall()
|
|
return render_template('admin.html', work_orders=orders)
|
|
|
|
|
|
@app.route('/delete_order/<int:order_id>', methods=['POST'])
|
|
def delete_order(order_id):
|
|
if not session.get('logged_in'):
|
|
return redirect(url_for('login'))
|
|
try:
|
|
db = get_db()
|
|
db.execute('DELETE FROM orders WHERE id = ?', (order_id,))
|
|
db.commit()
|
|
flash('Work order deleted.', 'success')
|
|
except Exception as e:
|
|
flash(f'Error deleting work order: {str(e)}', 'danger')
|
|
return redirect(url_for('admin'))
|
|
|
|
|
|
@app.route('/mark_complete/<int:order_id>', methods=['POST'])
|
|
def mark_complete(order_id):
|
|
if not session.get('logged_in'):
|
|
return redirect(url_for('login'))
|
|
try:
|
|
db = get_db()
|
|
order = db.execute('SELECT * FROM orders WHERE id = ?', (order_id,)).fetchone()
|
|
if order:
|
|
db.execute('''INSERT INTO completed_orders (name, job, address, city, state, zipcode, phone)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)''',
|
|
(order[1], order[2], order[3], order[4], order[5], order[6], order[7]))
|
|
db.execute('DELETE FROM orders WHERE id = ?', (order_id,))
|
|
db.commit()
|
|
flash('Work order marked as complete.', 'success')
|
|
else:
|
|
flash('Work order not found.', 'danger')
|
|
except Exception as e:
|
|
flash(f'Error: {str(e)}', 'danger')
|
|
return redirect(url_for('admin'))
|
|
|
|
|
|
@app.route('/completed_jobs')
|
|
def completed_jobs():
|
|
if not session.get('logged_in'):
|
|
return redirect(url_for('login'))
|
|
db = get_db()
|
|
jobs = db.execute('SELECT * FROM completed_orders').fetchall()
|
|
return render_template('completed_jobs.html', completed_jobs=jobs)
|
|
|
|
|
|
# ── Analytics ─────────────────────────────────────────────────
|
|
|
|
def get_ga4_data():
|
|
try:
|
|
credentials = service_account.Credentials.from_service_account_file(
|
|
GA_CREDENTIALS, scopes=["https://www.googleapis.com/auth/analytics.readonly"]
|
|
)
|
|
analytics = build("analyticsdata", "v1beta", credentials=credentials)
|
|
|
|
response = analytics.properties().runReport(
|
|
property=f"properties/{int(GA_PROPERTY_ID)}",
|
|
body={
|
|
"dateRanges": [{"startDate": "7daysAgo", "endDate": "yesterday"}],
|
|
"metrics": [{"name": "totalUsers"}, {"name": "screenPageViews"}],
|
|
"dimensions": [{"name": "pagePath"}],
|
|
"limit": 1,
|
|
},
|
|
).execute()
|
|
|
|
logging.debug("Full GA4 API Response: %s", json.dumps(response, indent=2))
|
|
|
|
rows = response.get("rows", [])
|
|
if not rows:
|
|
return {"total_users": "0", "total_pageviews": "0", "top_page": "N/A"}
|
|
|
|
top_page = rows[0]["dimensionValues"][0]["value"]
|
|
metric_values = rows[0]["metricValues"]
|
|
total_users = metric_values[0]["value"] if len(metric_values) > 0 else "0"
|
|
total_pageviews = metric_values[1]["value"] if len(metric_values) > 1 else "0"
|
|
|
|
return {"total_users": total_users, "total_pageviews": total_pageviews, "top_page": top_page}
|
|
|
|
except Exception as e:
|
|
logging.error("Google Analytics API Error: %s", str(e))
|
|
return {"total_users": "0", "total_pageviews": "0", "top_page": "N/A"}
|
|
|
|
|
|
@app.route('/analytics')
|
|
def analytics():
|
|
if not session.get('logged_in'):
|
|
return redirect(url_for('login'))
|
|
return jsonify(get_ga4_data())
|
|
|
|
|
|
# ── Notifications ─────────────────────────────────────────────
|
|
|
|
def send_push_notification(name, job):
|
|
if not PUSHOVER_API_TOKEN or not PUSHOVER_USER_KEY:
|
|
return
|
|
message = f"New Work Order:\nName: {name}\nJob: {job}"
|
|
data = {
|
|
"token": PUSHOVER_API_TOKEN,
|
|
"user": PUSHOVER_USER_KEY,
|
|
"message": message,
|
|
}
|
|
response = requests.post("https://api.pushover.net/1/messages.json", data=data)
|
|
if response.status_code != 200:
|
|
print(f"Push notification failed: {response.status_code} - {response.text}")
|
|
|
|
|
|
init_db()
|
|
|
|
if __name__ == '__main__':
|
|
app.run(debug=True)
|