Files
Portfolio/app.py
2026-04-23 01:29:11 +00:00

497 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Flask, request, redirect, url_for, render_template, session, flash, jsonify, abort
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func
from dotenv import load_dotenv
import os
from werkzeug.utils import secure_filename
from flask_login import LoginManager, login_required, logout_user, UserMixin, login_user
from flask_mail import Mail, Message
import json
from slugify import slugify
from datetime import datetime
import hmac, hashlib, json, time
from pathlib import Path
import json, hmac, hashlib, requests, time
load_dotenv(Path(__file__).with_name(".env"))
app = Flask(__name__, static_folder="static", static_url_path="/static")
HUB_PUSH_SECRET = os.getenv("HUB_PUSH_SECRET") # match the api_secret you set in the Hub
CONTACT = {
"name": os.environ.get("BH_CONTACT_NAME", "Benjamin Mosley"),
"title": os.environ.get("BH_CONTACT_TITLE", "E-Commerce Manager, United Supermarkets"),
"email": os.environ.get("BH_CONTACT_EMAIL", "ben@bennyshouse.net"),
"phone": os.environ.get("BH_CONTACT_PHONE", "(806) 655 2300"),
"city": os.environ.get("BH_CONTACT_CITY", "Canyon / Amarillo / Borger / Remote"),
"cal": os.environ.get("BH_CONTACT_CAL", "https://calendly.com/bennyshouse24/30min"),
"link": os.environ.get("BH_CONTACT_LINK", "https://www.linkedin.com/in/benjamin-mosley-849643329/"),
"site": os.environ.get("BH_CONTACT_SITE", "https://bennyshouse.net"),
"hours": os.environ.get("BH_CONTACT_HOURS", "MonFri, 9a5p CT"),
}
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///posts.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['UPLOAD_FOLDER'] = 'static/uploads'
app.config['SECRET_KEY'] = os.getenv('FLASK_SECRET_KEY')
db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
USERNAME = os.getenv('FLASK_LOGIN_USER')
PASSWORD = os.getenv('FLASK_LOGIN_PASSWORD')
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
class User(UserMixin):
id = 1
@login_manager.user_loader
def load_user(user_id):
if user_id == "1":
return User()
return None
@app.context_processor
def inject_now():
# lets you use {{ now().year }} in any template
return {"now": datetime.now}
class BlogPost(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(255), nullable=False)
slug = db.Column(db.String(255), nullable=False)
content = db.Column(db.Text, nullable=False)
images = db.Column(db.Text, nullable=True)
category = db.Column(db.String(100), nullable=True)
tags = db.Column(db.Text, nullable=True)
pinned = db.Column(db.Boolean, default=False)
class ProjectPost(db.Model):
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(255), nullable=False)
slug = db.Column(db.String(255), nullable=False)
content = db.Column(db.Text, nullable=False)
images = db.Column(db.Text, nullable=True)
category = db.Column(db.String(100), nullable=True)
tags = db.Column(db.Text, nullable=True)
pinned = db.Column(db.Boolean, default=False)
with app.app_context():
db.create_all()
@app.template_filter('from_json')
def from_json(value):
try:
return json.loads(value)
except (TypeError, json.JSONDecodeError):
return []
def push_to_domain(push_url: str, api_secret: str, payload: dict):
# Serialize deterministically
body = json.dumps(payload, separators=(',', ':'), ensure_ascii=False).encode('utf-8')
# Compute signature over the exact bytes
sig = hmac.new(api_secret.encode('utf-8'), body, hashlib.sha256).hexdigest()
headers = {
"Content-Type": "application/json",
"X-Hub-Signature-256": f"sha256={sig}",
# Optional if you later check timestamps:
# "X-Timestamp": str(int(time.time())),
}
# Send the *exact same* bytes you signed
r = requests.post(push_url, headers=headers, data=body, timeout=15)
return r
def _verify_hub_signature(raw: bytes) -> None:
if not HUB_PUSH_SECRET:
abort(403, description="Missing HUB_PUSH_SECRET")
# Accept both header names
sig_hdr = (request.headers.get("X-Signature")
or request.headers.get("X-Hub-Signature-256")
or "").strip()
# Accept either "sha256=<hex>" or just "<hex>"
provided = sig_hdr
if provided.lower().startswith("sha256="):
provided = provided.split("=", 1)[1].strip()
if not provided or any(c.isspace() for c in provided):
abort(403, description="Bad signature format")
expected = hmac.new(HUB_PUSH_SECRET.encode(), raw, hashlib.sha256).hexdigest()
if not hmac.compare_digest(provided, expected):
abort(403, description="Signature mismatch")
ts = request.headers.get("X-Timestamp")
if ts and ts.isdigit() and abs(time.time() - int(ts)) > 300:
abort(403, description="Stale request")
def _to_slug(text: str) -> str:
s = slugify(text or "")
return s or f"post-{int(time.time())}"
def _to_slug(text: str) -> str:
s = slugify(text or "")
return s or f"post-{int(time.time())}"
def _verify_hub_signature(raw: bytes) -> None:
if not HUB_PUSH_SECRET:
abort(403, description="Missing HUB_PUSH_SECRET")
# Accept both header names
sig_hdr = (request.headers.get("X-Signature")
or request.headers.get("X-Hub-Signature-256")
or "").strip()
# Accept either "sha256=<hex>" or just "<hex>"
provided = sig_hdr
if provided.lower().startswith("sha256="):
provided = provided.split("=", 1)[1].strip()
# must be 64 hex chars, no spaces
if not provided or any(c.isspace() for c in provided) or len(provided) != 64:
abort(403, description="Bad signature format")
expected = hmac.new(HUB_PUSH_SECRET.encode(), raw, hashlib.sha256).hexdigest()
if not hmac.compare_digest(provided, expected):
abort(403, description="Signature mismatch")
ts = request.headers.get("X-Timestamp")
if ts and ts.isdigit() and abs(time.time() - int(ts)) > 300:
abort(403, description="Stale request")
def _to_slug(text: str) -> str:
s = slugify(text or "")
return s or f"post-{int(time.time())}"
@app.post("/api/v1/publish")
def api_publish():
raw = request.get_data(cache=False)
_verify_hub_signature(raw)
try:
data = json.loads(raw.decode("utf-8"))
except Exception:
abort(400, description="Invalid JSON")
kind = (data.get("kind") or "blog").lower()
title = data.get("title") or "Untitled"
summary = data.get("summary")
content_html = data.get("content_html") or ""
content_text = data.get("content_text") or ""
tags = ",".join(data.get("tags") or [])
images = data.get("images") or []
pinned = bool(data.get("pinned"))
external_id = data.get("external_id") # optional stable id (slug-like)
slug = _to_slug(external_id or title)
# Normalize content to your existing schema
body = content_html or content_text.replace("\n", "<br>")
images_json = json.dumps(images) if images else None
if kind == "project":
obj = ProjectPost(
title=title,
slug=slug,
content=body,
category=None,
tags=tags,
pinned=pinned,
images=images_json
)
else:
obj = BlogPost(
title=title,
slug=slug,
content=body,
category=None,
tags=tags,
pinned=pinned,
images=images_json
)
db.session.add(obj)
db.session.commit()
# Return the canonical URL so the hub can log it if you want
url = url_for("view_project" if kind == "project" else "view_blog", slug=slug, _external=True)
return jsonify({"ok": True, "url": url, "id": obj.id, "slug": slug})
computed = hmac.new(
(HUB_PUSH_SECRET or "").encode(), raw, hashlib.sha256
).hexdigest()
return jsonify({
"saw_header": sig_hdr,
"provided": provided,
"len_provided": len(provided),
"computed": computed,
"matches": hmac.compare_digest(provided, computed),
"body_len": len(raw),
})
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
if username == USERNAME and password == PASSWORD:
user = User()
login_user(user)
session.permanent = True
return redirect(url_for("admin_panel"))
flash("Invalid email or password!", "danger")
return render_template("login.html")
@app.route("/logout")
@login_required
def logout():
logout_user()
return redirect(url_for("index"))
@app.route('/admin')
@login_required
def admin_panel():
blogpost = BlogPost.query.order_by(BlogPost.id.desc()).all()
projectpost = ProjectPost.query.order_by(ProjectPost.id.desc()).all()
return render_template('admin.html', blogpost=blogpost, projectpost=projectpost)
@app.route('/newblog', methods=['GET', 'POST'])
@login_required
def new_blog():
if request.method == 'POST':
title = request.form['title']
content = request.form['content'].replace("\n", "<br>")
category = request.form['category']
tags = request.form['tags']
pinned = 'pinned' in request.form
images = request.files.getlist('images')
slug = slugify(title)
image_filenames = [secure_filename(image.filename) for image in images if image.filename]
for image, filename in zip(images, image_filenames):
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
new_post = BlogPost(
title=title,
slug=slug,
content=content,
category=category,
tags=tags,
pinned=pinned,
images=json.dumps(image_filenames)
)
db.session.add(new_post)
db.session.commit()
return redirect(url_for('blog'))
return render_template('new_blog.html')
@app.route('/newproject', methods=['GET', 'POST'])
@login_required
def new_project():
if request.method == 'POST':
title = request.form['title']
content = request.form['content'].replace("\n", "<br>")
category = request.form['category']
tags = request.form['tags']
pinned = 'pinned' in request.form
images = request.files.getlist('images')
slug = slugify(title)
image_filenames = [secure_filename(image.filename) for image in images if image.filename]
for image, filename in zip(images, image_filenames):
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
new_post = ProjectPost(
title=title,
slug=slug,
content=content,
category=category,
tags=tags,
pinned=pinned,
images=json.dumps(image_filenames)
)
db.session.add(new_post)
db.session.commit()
return redirect(url_for('projects'))
return render_template('new_project.html')
@app.route('/edit-blog/<slug>', methods=['GET', 'POST'])
@login_required
def edit_blog(slug):
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
if request.method == 'POST':
blogpost.title = request.form['title']
blogpost.slug = slugify(blogpost.title)
blogpost.content = request.form['content'].replace("\n", "<br>")
blogpost.category = request.form['category']
blogpost.tags = request.form['tags']
blogpost.pinned = 'pinned' in request.form
images = request.files.getlist('images')
image_filenames = json.loads(blogpost.images) if blogpost.images else []
for image in images:
if image.filename:
filename = secure_filename(image.filename)
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
image_filenames.append(filename)
blogpost.images = json.dumps(image_filenames)
db.session.commit()
return redirect(url_for('view_blog', slug=blogpost.slug))
return render_template('edit_blog.html', blogpost=blogpost)
@app.route('/edit-project/<slug>', methods=['GET', 'POST'])
@login_required
def edit_project(slug):
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
if request.method == 'POST':
projectpost.title = request.form['title']
projectpost.slug = slugify(projectpost.title)
projectpost.content = request.form['content'].replace("\n", "<br>")
projectpost.category = request.form['category']
projectpost.tags = request.form['tags']
projectpost.pinned = 'pinned' in request.form
images = request.files.getlist('images')
image_filenames = json.loads(projectpost.images) if projectpost.images else []
for image in images:
if image.filename:
filename = secure_filename(image.filename)
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
image_filenames.append(filename)
projectpost.images = json.dumps(image_filenames)
db.session.commit()
return redirect(url_for('view_project', slug=projectpost.slug))
return render_template('edit_project.html', projectpost=projectpost)
@app.route('/delete-blog/<slug>', methods=['POST'])
@login_required
def delete_blog(slug):
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
if blogpost.images:
for filename in json.loads(blogpost.images):
image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(image_path):
os.remove(image_path)
db.session.delete(blogpost)
db.session.commit()
return redirect(url_for('admin_panel'))
@app.route('/delete-project/<slug>', methods=['POST'])
@login_required
def delete_project(slug):
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
if projectpost.images:
for filename in json.loads(projectpost.images):
image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(image_path):
os.remove(image_path)
db.session.delete(projectpost)
db.session.commit()
return redirect(url_for('admin_panel'))
@app.route('/blogtag/<tag>')
def view_tag(tag):
tag_lower = tag.lower() # Normalize case
blogpost = BlogPost.query.filter(func.lower(BlogPost.tags).contains(tag_lower)).order_by(BlogPost.id.desc()).all()
return render_template('blog_tag_results.html', blogpost=blogpost, tag=tag)
@app.route('/projecttag/<tag>')
def view_project_tag(tag):
tag_lower = tag.lower() # Normalize case
projectpost = ProjectPost.query.filter(func.lower(ProjectPost.tags).contains(tag_lower)).order_by(ProjectPost.id.desc()).all()
return render_template('project_tag_results.html', projectpost=projectpost, tag=tag)
@app.route('/blog')
def blog():
blogpost = BlogPost.query.order_by(BlogPost.id.desc()).all()
return render_template('blog.html', blogpost=blogpost)
@app.route('/projects')
def projects():
projectpost = ProjectPost.query.order_by(ProjectPost.id.desc()).all()
return render_template('project.html', projectpost=projectpost)
@app.route('/blog/<slug>')
def view_blog(slug):
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
return render_template('view_blog.html', blogpost=blogpost)
@app.route('/project/<slug>')
def view_project(slug):
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
return render_template('view_project.html', projectpost=projectpost)
@app.route('/contact')
def contact():
return render_template('contact.html', CONTACT=CONTACT)
@app.route('/resume')
def resume():
return render_template('resume.html')
@app.route('/about')
def about():
return render_template('about.html')
@app.route('/')
def index():
return render_template('index.html')
# ---- patched helpers (appended) ----
def _verify_hub_signature(raw: bytes) -> None:
if not HUB_PUSH_SECRET:
abort(403, description="Missing HUB_PUSH_SECRET")
# Accept both header names and both formats
sig_hdr = (request.headers.get("X-Signature")
or request.headers.get("X-Hub-Signature-256")
or "").strip()
provided = sig_hdr
if provided.lower().startswith("sha256="):
provided = provided.split("=", 1)[1].strip()
# must be a 64-char hex, no whitespace
if not provided or any(c.isspace() for c in provided) or len(provided) != 64:
abort(403, description="Bad signature format")
expected = hmac.new(HUB_PUSH_SECRET.encode(), raw, hashlib.sha256).hexdigest()
if not hmac.compare_digest(provided, expected):
abort(403, description="Signature mismatch")
ts = request.headers.get("X-Timestamp")
if ts and ts.isdigit() and abs(time.time() - int(ts)) > 300:
abort(403, description="Stale request")
def _to_slug(text: str) -> str:
s = slugify(text or "")
return s or f"post-{int(time.time())}"