Initial Commit
This commit is contained in:
320
app.py
Normal file
320
app.py
Normal file
@@ -0,0 +1,320 @@
|
||||
import os
|
||||
from flask import Flask, render_template, request, redirect, url_for, session, flash, g, jsonify
|
||||
from dotenv import load_dotenv
|
||||
import sqlite3
|
||||
from datetime import timedelta
|
||||
import requests
|
||||
from google.oauth2 import service_account
|
||||
from googleapiclient.discovery import build
|
||||
import json
|
||||
import logging
|
||||
load_dotenv()
|
||||
|
||||
app = Flask(__name__)
|
||||
app.secret_key = os.getenv('FLASK_SECRET_KEY', os.urandom(24))
|
||||
app.permanent_session_lifetime = timedelta(minutes=30)
|
||||
app.config['SESSION_COOKIE_PATH'] = '/'
|
||||
|
||||
USERNAME = os.getenv('FLASK_LOGIN_USER')
|
||||
PASSWORD = os.getenv('FLASK_LOGIN_PASSWORD')
|
||||
DATABASE = 'work_orders.db'
|
||||
|
||||
PUSHOVER_API_TOKEN = os.getenv('PUSHOVER_API_TOKEN')
|
||||
PUSHOVER_USER_KEY = os.getenv('PUSHOVER_USER_KEY')
|
||||
|
||||
GA_PROPERTY_ID = os.getenv('GA_PROPERTY_ID')
|
||||
GA_CREDENTIALS = 'ga_key.json'
|
||||
|
||||
RECAPTCHA_SITE_KEY = os.getenv('RECAPTCHA_SITE_KEY')
|
||||
RECAPTCHA_SECRET_KEY = os.getenv('RECAPTCHA_SECRET_KEY')
|
||||
|
||||
logging.basicConfig(
|
||||
filename='ga4_debug.log',
|
||||
level=logging.DEBUG,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
|
||||
def get_db():
|
||||
db = getattr(g, '_database', None)
|
||||
if db is None:
|
||||
db = g._database = sqlite3.connect(DATABASE)
|
||||
return db
|
||||
|
||||
|
||||
@app.teardown_appcontext
|
||||
def close_connection(exception):
|
||||
db = getattr(g, '_database', None)
|
||||
if db is not None:
|
||||
db.close()
|
||||
|
||||
|
||||
def init_db():
|
||||
with app.app_context():
|
||||
db = get_db()
|
||||
db.execute('''CREATE TABLE IF NOT EXISTS orders (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
job TEXT NOT NULL,
|
||||
address TEXT NOT NULL,
|
||||
city TEXT NOT NULL,
|
||||
state TEXT NOT NULL,
|
||||
zipcode INTEGER NOT NULL,
|
||||
phone TEXT NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);''')
|
||||
db.execute('''CREATE TABLE IF NOT EXISTS completed_orders (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
name TEXT NOT NULL,
|
||||
job TEXT NOT NULL,
|
||||
address TEXT NOT NULL,
|
||||
city TEXT NOT NULL,
|
||||
state TEXT NOT NULL,
|
||||
zipcode INTEGER NOT NULL,
|
||||
phone TEXT NOT NULL,
|
||||
completed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);''')
|
||||
db.commit()
|
||||
|
||||
|
||||
# ── Public pages ──────────────────────────────────────────────
|
||||
|
||||
@app.route('/')
|
||||
def index():
|
||||
return render_template('index.html')
|
||||
|
||||
|
||||
@app.route('/about')
|
||||
def about():
|
||||
return render_template('about.html')
|
||||
|
||||
|
||||
@app.route('/services')
|
||||
def services():
|
||||
return render_template('services.html')
|
||||
|
||||
|
||||
@app.route('/reviews')
|
||||
def reviews():
|
||||
return render_template('reviews.html')
|
||||
|
||||
|
||||
@app.route('/contact')
|
||||
def contact():
|
||||
return render_template('contact.html')
|
||||
|
||||
|
||||
def verify_recaptcha(token):
|
||||
if not RECAPTCHA_SECRET_KEY:
|
||||
return True # skip check if not configured yet
|
||||
try:
|
||||
resp = requests.post(
|
||||
'https://www.google.com/recaptcha/api/siteverify',
|
||||
data={'secret': RECAPTCHA_SECRET_KEY, 'response': token},
|
||||
timeout=5,
|
||||
)
|
||||
result = resp.json()
|
||||
return result.get('success') and result.get('score', 0) >= 0.5
|
||||
except Exception:
|
||||
return True # fail open if Google is unreachable
|
||||
|
||||
|
||||
@app.route('/work-order')
|
||||
def work_order():
|
||||
return render_template('work_order.html', recaptcha_site_key=RECAPTCHA_SITE_KEY)
|
||||
|
||||
|
||||
# ── Work order submission ─────────────────────────────────────
|
||||
|
||||
@app.route('/submit', methods=['POST'])
|
||||
def submit():
|
||||
# Honeypot: real users never fill this field; bots usually do
|
||||
if request.form.get('website'):
|
||||
session['form_submitted'] = True
|
||||
return redirect(url_for('success')) # silent reject — bot thinks it worked
|
||||
|
||||
# reCAPTCHA v3 score check
|
||||
token = request.form.get('g-recaptcha-response', '')
|
||||
if not verify_recaptcha(token):
|
||||
session['form_submitted'] = True
|
||||
return redirect(url_for('failure'))
|
||||
|
||||
try:
|
||||
name = request.form.get('name')
|
||||
job = request.form.get('job')
|
||||
address = request.form.get('address')
|
||||
city = request.form.get('city')
|
||||
state = request.form.get('state')
|
||||
zipcode = request.form.get('zipcode')
|
||||
phone = request.form.get('phone')
|
||||
|
||||
conn = get_db()
|
||||
conn.execute('''INSERT INTO orders (name, job, address, city, state, zipcode, phone)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)''',
|
||||
(name, job, address, city, state, zipcode, phone))
|
||||
conn.commit()
|
||||
|
||||
send_push_notification(name, job)
|
||||
session['form_submitted'] = True
|
||||
return redirect(url_for('success'))
|
||||
|
||||
except Exception as e:
|
||||
session['form_submitted'] = True
|
||||
return redirect(url_for('failure'))
|
||||
|
||||
|
||||
@app.route('/success')
|
||||
def success():
|
||||
if not session.pop('form_submitted', None):
|
||||
return redirect(url_for('work_order'))
|
||||
return render_template('success.html')
|
||||
|
||||
|
||||
@app.route('/failure')
|
||||
def failure():
|
||||
if not session.pop('form_submitted', None):
|
||||
return redirect(url_for('work_order'))
|
||||
return render_template('failure.html')
|
||||
|
||||
|
||||
# ── Auth ──────────────────────────────────────────────────────
|
||||
|
||||
@app.route('/login', methods=['GET', 'POST'])
|
||||
def login():
|
||||
if request.method == 'POST':
|
||||
username = request.form['username']
|
||||
password = request.form['password']
|
||||
if username == USERNAME and password == PASSWORD:
|
||||
session.permanent = True
|
||||
session['logged_in'] = True
|
||||
return redirect(url_for('admin'))
|
||||
else:
|
||||
flash('Invalid username or password.', 'danger')
|
||||
return render_template('login.html')
|
||||
|
||||
|
||||
@app.route('/logout', methods=['GET', 'POST'])
|
||||
def logout():
|
||||
session.pop('logged_in', None)
|
||||
flash('You have been logged out.')
|
||||
return redirect(url_for('index'))
|
||||
|
||||
|
||||
# ── Admin ─────────────────────────────────────────────────────
|
||||
|
||||
@app.route('/admin')
|
||||
def admin():
|
||||
if not session.get('logged_in'):
|
||||
return redirect(url_for('login'))
|
||||
conn = get_db()
|
||||
orders = conn.execute('SELECT * FROM orders').fetchall()
|
||||
return render_template('admin.html', work_orders=orders)
|
||||
|
||||
|
||||
@app.route('/delete_order/<int:order_id>', methods=['POST'])
|
||||
def delete_order(order_id):
|
||||
if not session.get('logged_in'):
|
||||
return redirect(url_for('login'))
|
||||
try:
|
||||
db = get_db()
|
||||
db.execute('DELETE FROM orders WHERE id = ?', (order_id,))
|
||||
db.commit()
|
||||
flash('Work order deleted.', 'success')
|
||||
except Exception as e:
|
||||
flash(f'Error deleting work order: {str(e)}', 'danger')
|
||||
return redirect(url_for('admin'))
|
||||
|
||||
|
||||
@app.route('/mark_complete/<int:order_id>', methods=['POST'])
|
||||
def mark_complete(order_id):
|
||||
if not session.get('logged_in'):
|
||||
return redirect(url_for('login'))
|
||||
try:
|
||||
db = get_db()
|
||||
order = db.execute('SELECT * FROM orders WHERE id = ?', (order_id,)).fetchone()
|
||||
if order:
|
||||
db.execute('''INSERT INTO completed_orders (name, job, address, city, state, zipcode, phone)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)''',
|
||||
(order[1], order[2], order[3], order[4], order[5], order[6], order[7]))
|
||||
db.execute('DELETE FROM orders WHERE id = ?', (order_id,))
|
||||
db.commit()
|
||||
flash('Work order marked as complete.', 'success')
|
||||
else:
|
||||
flash('Work order not found.', 'danger')
|
||||
except Exception as e:
|
||||
flash(f'Error: {str(e)}', 'danger')
|
||||
return redirect(url_for('admin'))
|
||||
|
||||
|
||||
@app.route('/completed_jobs')
|
||||
def completed_jobs():
|
||||
if not session.get('logged_in'):
|
||||
return redirect(url_for('login'))
|
||||
db = get_db()
|
||||
jobs = db.execute('SELECT * FROM completed_orders').fetchall()
|
||||
return render_template('completed_jobs.html', completed_jobs=jobs)
|
||||
|
||||
|
||||
# ── Analytics ─────────────────────────────────────────────────
|
||||
|
||||
def get_ga4_data():
|
||||
try:
|
||||
credentials = service_account.Credentials.from_service_account_file(
|
||||
GA_CREDENTIALS, scopes=["https://www.googleapis.com/auth/analytics.readonly"]
|
||||
)
|
||||
analytics = build("analyticsdata", "v1beta", credentials=credentials)
|
||||
|
||||
response = analytics.properties().runReport(
|
||||
property=f"properties/{int(GA_PROPERTY_ID)}",
|
||||
body={
|
||||
"dateRanges": [{"startDate": "7daysAgo", "endDate": "yesterday"}],
|
||||
"metrics": [{"name": "totalUsers"}, {"name": "screenPageViews"}],
|
||||
"dimensions": [{"name": "pagePath"}],
|
||||
"limit": 1,
|
||||
},
|
||||
).execute()
|
||||
|
||||
logging.debug("Full GA4 API Response: %s", json.dumps(response, indent=2))
|
||||
|
||||
rows = response.get("rows", [])
|
||||
if not rows:
|
||||
return {"total_users": "0", "total_pageviews": "0", "top_page": "N/A"}
|
||||
|
||||
top_page = rows[0]["dimensionValues"][0]["value"]
|
||||
metric_values = rows[0]["metricValues"]
|
||||
total_users = metric_values[0]["value"] if len(metric_values) > 0 else "0"
|
||||
total_pageviews = metric_values[1]["value"] if len(metric_values) > 1 else "0"
|
||||
|
||||
return {"total_users": total_users, "total_pageviews": total_pageviews, "top_page": top_page}
|
||||
|
||||
except Exception as e:
|
||||
logging.error("Google Analytics API Error: %s", str(e))
|
||||
return {"total_users": "0", "total_pageviews": "0", "top_page": "N/A"}
|
||||
|
||||
|
||||
@app.route('/analytics')
|
||||
def analytics():
|
||||
if not session.get('logged_in'):
|
||||
return redirect(url_for('login'))
|
||||
return jsonify(get_ga4_data())
|
||||
|
||||
|
||||
# ── Notifications ─────────────────────────────────────────────
|
||||
|
||||
def send_push_notification(name, job):
|
||||
if not PUSHOVER_API_TOKEN or not PUSHOVER_USER_KEY:
|
||||
return
|
||||
message = f"New Work Order:\nName: {name}\nJob: {job}"
|
||||
data = {
|
||||
"token": PUSHOVER_API_TOKEN,
|
||||
"user": PUSHOVER_USER_KEY,
|
||||
"message": message,
|
||||
}
|
||||
response = requests.post("https://api.pushover.net/1/messages.json", data=data)
|
||||
if response.status_code != 200:
|
||||
print(f"Push notification failed: {response.status_code} - {response.text}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
init_db()
|
||||
app.run(debug=True)
|
||||
Reference in New Issue
Block a user