497 lines
17 KiB
Python
497 lines
17 KiB
Python
from flask import Flask, request, redirect, url_for, render_template, session, flash, jsonify, abort
|
||
from flask_sqlalchemy import SQLAlchemy
|
||
from sqlalchemy import func
|
||
from dotenv import load_dotenv
|
||
import os
|
||
from werkzeug.utils import secure_filename
|
||
from flask_login import LoginManager, login_required, logout_user, UserMixin, login_user
|
||
from flask_mail import Mail, Message
|
||
import json
|
||
from slugify import slugify
|
||
from datetime import datetime
|
||
import hmac, hashlib, json, time
|
||
from pathlib import Path
|
||
import json, hmac, hashlib, requests, time
|
||
|
||
|
||
load_dotenv(Path(__file__).with_name(".env"))
|
||
|
||
|
||
|
||
|
||
app = Flask(__name__, static_folder="static", static_url_path="/static")
|
||
|
||
HUB_PUSH_SECRET = os.getenv("HUB_PUSH_SECRET") # match the api_secret you set in the Hub
|
||
|
||
|
||
CONTACT = {
|
||
"name": os.environ.get("BH_CONTACT_NAME", "Benjamin Mosley"),
|
||
"title": os.environ.get("BH_CONTACT_TITLE", "E-Commerce Manager, United Supermarkets"),
|
||
"email": os.environ.get("BH_CONTACT_EMAIL", "ben@bennyshouse.net"),
|
||
"phone": os.environ.get("BH_CONTACT_PHONE", "(806) 655 2300"),
|
||
"city": os.environ.get("BH_CONTACT_CITY", "Canyon / Amarillo / Borger / Remote"),
|
||
"cal": os.environ.get("BH_CONTACT_CAL", "https://calendly.com/bennyshouse24/30min"),
|
||
"link": os.environ.get("BH_CONTACT_LINK", "https://www.linkedin.com/in/benjamin-mosley-849643329/"),
|
||
"site": os.environ.get("BH_CONTACT_SITE", "https://bennyshouse.net"),
|
||
"hours": os.environ.get("BH_CONTACT_HOURS", "Mon–Fri, 9a–5p CT"),
|
||
}
|
||
|
||
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///posts.db'
|
||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
||
app.config['UPLOAD_FOLDER'] = 'static/uploads'
|
||
app.config['SECRET_KEY'] = os.getenv('FLASK_SECRET_KEY')
|
||
|
||
db = SQLAlchemy(app)
|
||
login_manager = LoginManager(app)
|
||
login_manager.login_view = 'login'
|
||
|
||
USERNAME = os.getenv('FLASK_LOGIN_USER')
|
||
PASSWORD = os.getenv('FLASK_LOGIN_PASSWORD')
|
||
|
||
if not os.path.exists(app.config['UPLOAD_FOLDER']):
|
||
os.makedirs(app.config['UPLOAD_FOLDER'])
|
||
|
||
class User(UserMixin):
|
||
id = 1
|
||
|
||
@login_manager.user_loader
|
||
def load_user(user_id):
|
||
if user_id == "1":
|
||
return User()
|
||
return None
|
||
|
||
|
||
@app.context_processor
|
||
def inject_now():
|
||
# lets you use {{ now().year }} in any template
|
||
return {"now": datetime.now}
|
||
|
||
class BlogPost(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
title = db.Column(db.String(255), nullable=False)
|
||
slug = db.Column(db.String(255), nullable=False)
|
||
content = db.Column(db.Text, nullable=False)
|
||
images = db.Column(db.Text, nullable=True)
|
||
category = db.Column(db.String(100), nullable=True)
|
||
tags = db.Column(db.Text, nullable=True)
|
||
pinned = db.Column(db.Boolean, default=False)
|
||
|
||
class ProjectPost(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
title = db.Column(db.String(255), nullable=False)
|
||
slug = db.Column(db.String(255), nullable=False)
|
||
content = db.Column(db.Text, nullable=False)
|
||
images = db.Column(db.Text, nullable=True)
|
||
category = db.Column(db.String(100), nullable=True)
|
||
tags = db.Column(db.Text, nullable=True)
|
||
pinned = db.Column(db.Boolean, default=False)
|
||
|
||
with app.app_context():
|
||
db.create_all()
|
||
|
||
@app.template_filter('from_json')
|
||
def from_json(value):
|
||
try:
|
||
return json.loads(value)
|
||
except (TypeError, json.JSONDecodeError):
|
||
return []
|
||
|
||
|
||
def push_to_domain(push_url: str, api_secret: str, payload: dict):
|
||
# Serialize deterministically
|
||
body = json.dumps(payload, separators=(',', ':'), ensure_ascii=False).encode('utf-8')
|
||
|
||
# Compute signature over the exact bytes
|
||
sig = hmac.new(api_secret.encode('utf-8'), body, hashlib.sha256).hexdigest()
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"X-Hub-Signature-256": f"sha256={sig}",
|
||
# Optional if you later check timestamps:
|
||
# "X-Timestamp": str(int(time.time())),
|
||
}
|
||
|
||
# Send the *exact same* bytes you signed
|
||
r = requests.post(push_url, headers=headers, data=body, timeout=15)
|
||
return r
|
||
|
||
|
||
def _verify_hub_signature(raw: bytes) -> None:
|
||
if not HUB_PUSH_SECRET:
|
||
abort(403, description="Missing HUB_PUSH_SECRET")
|
||
|
||
# Accept both header names
|
||
sig_hdr = (request.headers.get("X-Signature")
|
||
or request.headers.get("X-Hub-Signature-256")
|
||
or "").strip()
|
||
|
||
# Accept either "sha256=<hex>" or just "<hex>"
|
||
provided = sig_hdr
|
||
if provided.lower().startswith("sha256="):
|
||
provided = provided.split("=", 1)[1].strip()
|
||
|
||
if not provided or any(c.isspace() for c in provided):
|
||
abort(403, description="Bad signature format")
|
||
|
||
expected = hmac.new(HUB_PUSH_SECRET.encode(), raw, hashlib.sha256).hexdigest()
|
||
if not hmac.compare_digest(provided, expected):
|
||
abort(403, description="Signature mismatch")
|
||
|
||
ts = request.headers.get("X-Timestamp")
|
||
if ts and ts.isdigit() and abs(time.time() - int(ts)) > 300:
|
||
abort(403, description="Stale request")
|
||
|
||
def _to_slug(text: str) -> str:
|
||
s = slugify(text or "")
|
||
return s or f"post-{int(time.time())}"
|
||
def _to_slug(text: str) -> str:
|
||
s = slugify(text or "")
|
||
return s or f"post-{int(time.time())}"
|
||
def _verify_hub_signature(raw: bytes) -> None:
|
||
if not HUB_PUSH_SECRET:
|
||
abort(403, description="Missing HUB_PUSH_SECRET")
|
||
|
||
# Accept both header names
|
||
sig_hdr = (request.headers.get("X-Signature")
|
||
or request.headers.get("X-Hub-Signature-256")
|
||
or "").strip()
|
||
|
||
# Accept either "sha256=<hex>" or just "<hex>"
|
||
provided = sig_hdr
|
||
if provided.lower().startswith("sha256="):
|
||
provided = provided.split("=", 1)[1].strip()
|
||
|
||
# must be 64 hex chars, no spaces
|
||
if not provided or any(c.isspace() for c in provided) or len(provided) != 64:
|
||
abort(403, description="Bad signature format")
|
||
|
||
expected = hmac.new(HUB_PUSH_SECRET.encode(), raw, hashlib.sha256).hexdigest()
|
||
if not hmac.compare_digest(provided, expected):
|
||
abort(403, description="Signature mismatch")
|
||
|
||
ts = request.headers.get("X-Timestamp")
|
||
if ts and ts.isdigit() and abs(time.time() - int(ts)) > 300:
|
||
abort(403, description="Stale request")
|
||
|
||
|
||
def _to_slug(text: str) -> str:
|
||
s = slugify(text or "")
|
||
return s or f"post-{int(time.time())}"
|
||
|
||
@app.post("/api/v1/publish")
|
||
def api_publish():
|
||
raw = request.get_data(cache=False)
|
||
_verify_hub_signature(raw)
|
||
try:
|
||
data = json.loads(raw.decode("utf-8"))
|
||
except Exception:
|
||
abort(400, description="Invalid JSON")
|
||
|
||
kind = (data.get("kind") or "blog").lower()
|
||
title = data.get("title") or "Untitled"
|
||
summary = data.get("summary")
|
||
content_html = data.get("content_html") or ""
|
||
content_text = data.get("content_text") or ""
|
||
tags = ",".join(data.get("tags") or [])
|
||
images = data.get("images") or []
|
||
pinned = bool(data.get("pinned"))
|
||
external_id = data.get("external_id") # optional stable id (slug-like)
|
||
slug = _to_slug(external_id or title)
|
||
|
||
# Normalize content to your existing schema
|
||
body = content_html or content_text.replace("\n", "<br>")
|
||
images_json = json.dumps(images) if images else None
|
||
|
||
if kind == "project":
|
||
obj = ProjectPost(
|
||
title=title,
|
||
slug=slug,
|
||
content=body,
|
||
category=None,
|
||
tags=tags,
|
||
pinned=pinned,
|
||
images=images_json
|
||
)
|
||
else:
|
||
obj = BlogPost(
|
||
title=title,
|
||
slug=slug,
|
||
content=body,
|
||
category=None,
|
||
tags=tags,
|
||
pinned=pinned,
|
||
images=images_json
|
||
)
|
||
|
||
db.session.add(obj)
|
||
db.session.commit()
|
||
|
||
# Return the canonical URL so the hub can log it if you want
|
||
url = url_for("view_project" if kind == "project" else "view_blog", slug=slug, _external=True)
|
||
return jsonify({"ok": True, "url": url, "id": obj.id, "slug": slug})
|
||
|
||
|
||
computed = hmac.new(
|
||
(HUB_PUSH_SECRET or "").encode(), raw, hashlib.sha256
|
||
).hexdigest()
|
||
|
||
return jsonify({
|
||
"saw_header": sig_hdr,
|
||
"provided": provided,
|
||
"len_provided": len(provided),
|
||
"computed": computed,
|
||
"matches": hmac.compare_digest(provided, computed),
|
||
"body_len": len(raw),
|
||
})
|
||
|
||
|
||
@app.route("/login", methods=["GET", "POST"])
|
||
def login():
|
||
if request.method == "POST":
|
||
username = request.form["username"]
|
||
password = request.form["password"]
|
||
|
||
if username == USERNAME and password == PASSWORD:
|
||
user = User()
|
||
login_user(user)
|
||
session.permanent = True
|
||
return redirect(url_for("admin_panel"))
|
||
|
||
flash("Invalid email or password!", "danger")
|
||
return render_template("login.html")
|
||
|
||
@app.route("/logout")
|
||
@login_required
|
||
def logout():
|
||
logout_user()
|
||
return redirect(url_for("index"))
|
||
|
||
|
||
@app.route('/admin')
|
||
@login_required
|
||
def admin_panel():
|
||
blogpost = BlogPost.query.order_by(BlogPost.id.desc()).all()
|
||
projectpost = ProjectPost.query.order_by(ProjectPost.id.desc()).all()
|
||
return render_template('admin.html', blogpost=blogpost, projectpost=projectpost)
|
||
|
||
@app.route('/newblog', methods=['GET', 'POST'])
|
||
@login_required
|
||
def new_blog():
|
||
if request.method == 'POST':
|
||
title = request.form['title']
|
||
content = request.form['content'].replace("\n", "<br>")
|
||
category = request.form['category']
|
||
tags = request.form['tags']
|
||
pinned = 'pinned' in request.form
|
||
images = request.files.getlist('images')
|
||
|
||
slug = slugify(title)
|
||
image_filenames = [secure_filename(image.filename) for image in images if image.filename]
|
||
for image, filename in zip(images, image_filenames):
|
||
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
||
|
||
new_post = BlogPost(
|
||
title=title,
|
||
slug=slug,
|
||
content=content,
|
||
category=category,
|
||
tags=tags,
|
||
pinned=pinned,
|
||
images=json.dumps(image_filenames)
|
||
)
|
||
db.session.add(new_post)
|
||
db.session.commit()
|
||
return redirect(url_for('blog'))
|
||
|
||
return render_template('new_blog.html')
|
||
|
||
@app.route('/newproject', methods=['GET', 'POST'])
|
||
@login_required
|
||
def new_project():
|
||
if request.method == 'POST':
|
||
title = request.form['title']
|
||
content = request.form['content'].replace("\n", "<br>")
|
||
category = request.form['category']
|
||
tags = request.form['tags']
|
||
pinned = 'pinned' in request.form
|
||
images = request.files.getlist('images')
|
||
|
||
slug = slugify(title)
|
||
image_filenames = [secure_filename(image.filename) for image in images if image.filename]
|
||
for image, filename in zip(images, image_filenames):
|
||
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
||
|
||
new_post = ProjectPost(
|
||
title=title,
|
||
slug=slug,
|
||
content=content,
|
||
category=category,
|
||
tags=tags,
|
||
pinned=pinned,
|
||
images=json.dumps(image_filenames)
|
||
)
|
||
db.session.add(new_post)
|
||
db.session.commit()
|
||
return redirect(url_for('projects'))
|
||
|
||
return render_template('new_project.html')
|
||
|
||
@app.route('/edit-blog/<slug>', methods=['GET', 'POST'])
|
||
@login_required
|
||
def edit_blog(slug):
|
||
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
|
||
|
||
if request.method == 'POST':
|
||
blogpost.title = request.form['title']
|
||
blogpost.slug = slugify(blogpost.title)
|
||
blogpost.content = request.form['content'].replace("\n", "<br>")
|
||
blogpost.category = request.form['category']
|
||
blogpost.tags = request.form['tags']
|
||
blogpost.pinned = 'pinned' in request.form
|
||
|
||
images = request.files.getlist('images')
|
||
image_filenames = json.loads(blogpost.images) if blogpost.images else []
|
||
|
||
for image in images:
|
||
if image.filename:
|
||
filename = secure_filename(image.filename)
|
||
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
||
image_filenames.append(filename)
|
||
|
||
blogpost.images = json.dumps(image_filenames)
|
||
db.session.commit()
|
||
return redirect(url_for('view_blog', slug=blogpost.slug))
|
||
|
||
return render_template('edit_blog.html', blogpost=blogpost)
|
||
|
||
@app.route('/edit-project/<slug>', methods=['GET', 'POST'])
|
||
@login_required
|
||
def edit_project(slug):
|
||
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
|
||
|
||
if request.method == 'POST':
|
||
projectpost.title = request.form['title']
|
||
projectpost.slug = slugify(projectpost.title)
|
||
projectpost.content = request.form['content'].replace("\n", "<br>")
|
||
projectpost.category = request.form['category']
|
||
projectpost.tags = request.form['tags']
|
||
projectpost.pinned = 'pinned' in request.form
|
||
|
||
images = request.files.getlist('images')
|
||
image_filenames = json.loads(projectpost.images) if projectpost.images else []
|
||
|
||
for image in images:
|
||
if image.filename:
|
||
filename = secure_filename(image.filename)
|
||
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
||
image_filenames.append(filename)
|
||
|
||
projectpost.images = json.dumps(image_filenames)
|
||
db.session.commit()
|
||
return redirect(url_for('view_project', slug=projectpost.slug))
|
||
|
||
return render_template('edit_project.html', projectpost=projectpost)
|
||
|
||
@app.route('/delete-blog/<slug>', methods=['POST'])
|
||
@login_required
|
||
def delete_blog(slug):
|
||
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
|
||
if blogpost.images:
|
||
for filename in json.loads(blogpost.images):
|
||
image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
||
if os.path.exists(image_path):
|
||
os.remove(image_path)
|
||
db.session.delete(blogpost)
|
||
db.session.commit()
|
||
return redirect(url_for('admin_panel'))
|
||
|
||
@app.route('/delete-project/<slug>', methods=['POST'])
|
||
@login_required
|
||
def delete_project(slug):
|
||
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
|
||
if projectpost.images:
|
||
for filename in json.loads(projectpost.images):
|
||
image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
||
if os.path.exists(image_path):
|
||
os.remove(image_path)
|
||
db.session.delete(projectpost)
|
||
db.session.commit()
|
||
return redirect(url_for('admin_panel'))
|
||
|
||
@app.route('/blogtag/<tag>')
|
||
def view_tag(tag):
|
||
tag_lower = tag.lower() # Normalize case
|
||
blogpost = BlogPost.query.filter(func.lower(BlogPost.tags).contains(tag_lower)).order_by(BlogPost.id.desc()).all()
|
||
return render_template('blog_tag_results.html', blogpost=blogpost, tag=tag)
|
||
|
||
@app.route('/projecttag/<tag>')
|
||
def view_project_tag(tag):
|
||
tag_lower = tag.lower() # Normalize case
|
||
projectpost = ProjectPost.query.filter(func.lower(ProjectPost.tags).contains(tag_lower)).order_by(ProjectPost.id.desc()).all()
|
||
return render_template('project_tag_results.html', projectpost=projectpost, tag=tag)
|
||
|
||
@app.route('/blog')
|
||
def blog():
|
||
blogpost = BlogPost.query.order_by(BlogPost.id.desc()).all()
|
||
return render_template('blog.html', blogpost=blogpost)
|
||
|
||
@app.route('/projects')
|
||
def projects():
|
||
projectpost = ProjectPost.query.order_by(ProjectPost.id.desc()).all()
|
||
return render_template('project.html', projectpost=projectpost)
|
||
|
||
@app.route('/blog/<slug>')
|
||
def view_blog(slug):
|
||
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
|
||
return render_template('view_blog.html', blogpost=blogpost)
|
||
|
||
@app.route('/project/<slug>')
|
||
def view_project(slug):
|
||
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
|
||
return render_template('view_project.html', projectpost=projectpost)
|
||
|
||
@app.route('/contact')
|
||
def contact():
|
||
return render_template('contact.html', CONTACT=CONTACT)
|
||
|
||
@app.route('/resume')
|
||
def resume():
|
||
return render_template('resume.html')
|
||
|
||
@app.route('/about')
|
||
def about():
|
||
return render_template('about.html')
|
||
|
||
@app.route('/')
|
||
def index():
|
||
return render_template('index.html')
|
||
|
||
# ---- patched helpers (appended) ----
|
||
def _verify_hub_signature(raw: bytes) -> None:
|
||
if not HUB_PUSH_SECRET:
|
||
abort(403, description="Missing HUB_PUSH_SECRET")
|
||
|
||
# Accept both header names and both formats
|
||
sig_hdr = (request.headers.get("X-Signature")
|
||
or request.headers.get("X-Hub-Signature-256")
|
||
or "").strip()
|
||
|
||
provided = sig_hdr
|
||
if provided.lower().startswith("sha256="):
|
||
provided = provided.split("=", 1)[1].strip()
|
||
|
||
# must be a 64-char hex, no whitespace
|
||
if not provided or any(c.isspace() for c in provided) or len(provided) != 64:
|
||
abort(403, description="Bad signature format")
|
||
|
||
expected = hmac.new(HUB_PUSH_SECRET.encode(), raw, hashlib.sha256).hexdigest()
|
||
if not hmac.compare_digest(provided, expected):
|
||
abort(403, description="Signature mismatch")
|
||
|
||
ts = request.headers.get("X-Timestamp")
|
||
if ts and ts.isdigit() and abs(time.time() - int(ts)) > 300:
|
||
abort(403, description="Stale request")
|
||
|
||
def _to_slug(text: str) -> str:
|
||
s = slugify(text or "")
|
||
return s or f"post-{int(time.time())}"
|