QoL update
This commit is contained in:
222
app.py
222
app.py
@@ -2,27 +2,21 @@ from flask import Flask, request, redirect, url_for, render_template, session, f
|
||||
from flask_sqlalchemy import SQLAlchemy
|
||||
from sqlalchemy import func
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
import os
|
||||
from werkzeug.utils import secure_filename
|
||||
from flask_login import LoginManager, login_required, logout_user, UserMixin, login_user
|
||||
from flask_login import LoginManager, login_required, logout_user, UserMixin, login_user, current_user
|
||||
from flask_mail import Mail, Message
|
||||
import json
|
||||
from slugify import slugify
|
||||
from slugify import slugify
|
||||
from datetime import datetime
|
||||
import hmac, hashlib, json, time
|
||||
from pathlib import Path
|
||||
import json, hmac, hashlib, requests, time
|
||||
|
||||
import hmac, hashlib, json, requests, time
|
||||
import markdown as md_lib
|
||||
|
||||
load_dotenv(Path(__file__).with_name(".env"))
|
||||
|
||||
|
||||
|
||||
|
||||
app = Flask(__name__, static_folder="static", static_url_path="/static")
|
||||
|
||||
HUB_PUSH_SECRET = os.getenv("HUB_PUSH_SECRET") # match the api_secret you set in the Hub
|
||||
|
||||
HUB_PUSH_SECRET = os.getenv("HUB_PUSH_SECRET")
|
||||
|
||||
CONTACT = {
|
||||
"name": os.environ.get("BH_CONTACT_NAME", "Benjamin Mosley"),
|
||||
@@ -31,7 +25,7 @@ CONTACT = {
|
||||
"phone": os.environ.get("BH_CONTACT_PHONE", "(806) 655 2300"),
|
||||
"city": os.environ.get("BH_CONTACT_CITY", "Canyon / Amarillo / Borger / Remote"),
|
||||
"cal": os.environ.get("BH_CONTACT_CAL", "https://calendly.com/bennyshouse24/30min"),
|
||||
"link": os.environ.get("BH_CONTACT_LINK", "https://www.linkedin.com/in/benjamin-mosley-849643329/"),
|
||||
"link": os.environ.get("BH_CONTACT_LINK", "https://www.linkedin.com/in/benjamin-mosley-849643329/"),
|
||||
"site": os.environ.get("BH_CONTACT_SITE", "https://bennyshouse.net"),
|
||||
"hours": os.environ.get("BH_CONTACT_HOURS", "Mon–Fri, 9a–5p CT"),
|
||||
}
|
||||
@@ -60,10 +54,8 @@ def load_user(user_id):
|
||||
return User()
|
||||
return None
|
||||
|
||||
|
||||
@app.context_processor
|
||||
def inject_now():
|
||||
# lets you use {{ now().year }} in any template
|
||||
return {"now": datetime.now}
|
||||
|
||||
class BlogPost(db.Model):
|
||||
@@ -71,23 +63,32 @@ class BlogPost(db.Model):
|
||||
title = db.Column(db.String(255), nullable=False)
|
||||
slug = db.Column(db.String(255), nullable=False)
|
||||
content = db.Column(db.Text, nullable=False)
|
||||
images = db.Column(db.Text, nullable=True)
|
||||
images = db.Column(db.Text, nullable=True)
|
||||
category = db.Column(db.String(100), nullable=True)
|
||||
tags = db.Column(db.Text, nullable=True)
|
||||
pinned = db.Column(db.Boolean, default=False)
|
||||
draft = db.Column(db.Boolean, default=False)
|
||||
|
||||
class ProjectPost(db.Model):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
title = db.Column(db.String(255), nullable=False)
|
||||
slug = db.Column(db.String(255), nullable=False)
|
||||
content = db.Column(db.Text, nullable=False)
|
||||
images = db.Column(db.Text, nullable=True)
|
||||
images = db.Column(db.Text, nullable=True)
|
||||
category = db.Column(db.String(100), nullable=True)
|
||||
tags = db.Column(db.Text, nullable=True)
|
||||
pinned = db.Column(db.Boolean, default=False)
|
||||
draft = db.Column(db.Boolean, default=False)
|
||||
|
||||
with app.app_context():
|
||||
db.create_all()
|
||||
with db.engine.connect() as conn:
|
||||
for table in ('blog_post', 'project_post'):
|
||||
try:
|
||||
conn.execute(db.text(f'ALTER TABLE {table} ADD COLUMN draft BOOLEAN NOT NULL DEFAULT 0'))
|
||||
conn.commit()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@app.template_filter('from_json')
|
||||
def from_json(value):
|
||||
@@ -96,71 +97,31 @@ def from_json(value):
|
||||
except (TypeError, json.JSONDecodeError):
|
||||
return []
|
||||
|
||||
@app.template_filter('markdown')
|
||||
def render_markdown(value):
|
||||
return md_lib.markdown(value or '', extensions=['fenced_code', 'tables', 'nl2br'])
|
||||
|
||||
def push_to_domain(push_url: str, api_secret: str, payload: dict):
|
||||
# Serialize deterministically
|
||||
body = json.dumps(payload, separators=(',', ':'), ensure_ascii=False).encode('utf-8')
|
||||
|
||||
# Compute signature over the exact bytes
|
||||
sig = hmac.new(api_secret.encode('utf-8'), body, hashlib.sha256).hexdigest()
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-Hub-Signature-256": f"sha256={sig}",
|
||||
# Optional if you later check timestamps:
|
||||
# "X-Timestamp": str(int(time.time())),
|
||||
}
|
||||
|
||||
# Send the *exact same* bytes you signed
|
||||
r = requests.post(push_url, headers=headers, data=body, timeout=15)
|
||||
return r
|
||||
|
||||
return requests.post(push_url, headers=headers, data=body, timeout=15)
|
||||
|
||||
def _verify_hub_signature(raw: bytes) -> None:
|
||||
if not HUB_PUSH_SECRET:
|
||||
abort(403, description="Missing HUB_PUSH_SECRET")
|
||||
|
||||
# Accept both header names
|
||||
sig_hdr = (request.headers.get("X-Signature")
|
||||
or request.headers.get("X-Hub-Signature-256")
|
||||
or "").strip()
|
||||
|
||||
# Accept either "sha256=<hex>" or just "<hex>"
|
||||
provided = sig_hdr
|
||||
if provided.lower().startswith("sha256="):
|
||||
provided = provided.split("=", 1)[1].strip()
|
||||
|
||||
if not provided or any(c.isspace() for c in provided):
|
||||
abort(403, description="Bad signature format")
|
||||
|
||||
expected = hmac.new(HUB_PUSH_SECRET.encode(), raw, hashlib.sha256).hexdigest()
|
||||
if not hmac.compare_digest(provided, expected):
|
||||
abort(403, description="Signature mismatch")
|
||||
|
||||
ts = request.headers.get("X-Timestamp")
|
||||
if ts and ts.isdigit() and abs(time.time() - int(ts)) > 300:
|
||||
abort(403, description="Stale request")
|
||||
|
||||
def _to_slug(text: str) -> str:
|
||||
s = slugify(text or "")
|
||||
return s or f"post-{int(time.time())}"
|
||||
def _to_slug(text: str) -> str:
|
||||
s = slugify(text or "")
|
||||
return s or f"post-{int(time.time())}"
|
||||
def _verify_hub_signature(raw: bytes) -> None:
|
||||
if not HUB_PUSH_SECRET:
|
||||
abort(403, description="Missing HUB_PUSH_SECRET")
|
||||
|
||||
# Accept both header names
|
||||
sig_hdr = (request.headers.get("X-Signature")
|
||||
or request.headers.get("X-Hub-Signature-256")
|
||||
or "").strip()
|
||||
|
||||
# Accept either "sha256=<hex>" or just "<hex>"
|
||||
provided = sig_hdr
|
||||
if provided.lower().startswith("sha256="):
|
||||
provided = provided.split("=", 1)[1].strip()
|
||||
|
||||
# must be 64 hex chars, no spaces
|
||||
if not provided or any(c.isspace() for c in provided) or len(provided) != 64:
|
||||
abort(403, description="Bad signature format")
|
||||
|
||||
@@ -172,7 +133,6 @@ def _verify_hub_signature(raw: bytes) -> None:
|
||||
if ts and ts.isdigit() and abs(time.time() - int(ts)) > 300:
|
||||
abort(403, description="Stale request")
|
||||
|
||||
|
||||
def _to_slug(text: str) -> str:
|
||||
s = slugify(text or "")
|
||||
return s or f"post-{int(time.time())}"
|
||||
@@ -188,74 +148,44 @@ def api_publish():
|
||||
|
||||
kind = (data.get("kind") or "blog").lower()
|
||||
title = data.get("title") or "Untitled"
|
||||
summary = data.get("summary")
|
||||
content_html = data.get("content_html") or ""
|
||||
content_text = data.get("content_text") or ""
|
||||
tags = ",".join(data.get("tags") or [])
|
||||
images = data.get("images") or []
|
||||
pinned = bool(data.get("pinned"))
|
||||
external_id = data.get("external_id") # optional stable id (slug-like)
|
||||
external_id = data.get("external_id")
|
||||
slug = _to_slug(external_id or title)
|
||||
|
||||
# Normalize content to your existing schema
|
||||
body = content_html or content_text.replace("\n", "<br>")
|
||||
images_json = json.dumps(images) if images else None
|
||||
|
||||
if kind == "project":
|
||||
obj = ProjectPost(
|
||||
title=title,
|
||||
slug=slug,
|
||||
content=body,
|
||||
category=None,
|
||||
tags=tags,
|
||||
pinned=pinned,
|
||||
images=images_json
|
||||
title=title, slug=slug, content=body,
|
||||
category=None, tags=tags, pinned=pinned, images=images_json
|
||||
)
|
||||
else:
|
||||
obj = BlogPost(
|
||||
title=title,
|
||||
slug=slug,
|
||||
content=body,
|
||||
category=None,
|
||||
tags=tags,
|
||||
pinned=pinned,
|
||||
images=images_json
|
||||
title=title, slug=slug, content=body,
|
||||
category=None, tags=tags, pinned=pinned, images=images_json
|
||||
)
|
||||
|
||||
db.session.add(obj)
|
||||
db.session.commit()
|
||||
|
||||
# Return the canonical URL so the hub can log it if you want
|
||||
url = url_for("view_project" if kind == "project" else "view_blog", slug=slug, _external=True)
|
||||
return jsonify({"ok": True, "url": url, "id": obj.id, "slug": slug})
|
||||
|
||||
|
||||
computed = hmac.new(
|
||||
(HUB_PUSH_SECRET or "").encode(), raw, hashlib.sha256
|
||||
).hexdigest()
|
||||
|
||||
return jsonify({
|
||||
"saw_header": sig_hdr,
|
||||
"provided": provided,
|
||||
"len_provided": len(provided),
|
||||
"computed": computed,
|
||||
"matches": hmac.compare_digest(provided, computed),
|
||||
"body_len": len(raw),
|
||||
})
|
||||
|
||||
|
||||
@app.route("/login", methods=["GET", "POST"])
|
||||
def login():
|
||||
if request.method == "POST":
|
||||
username = request.form["username"]
|
||||
password = request.form["password"]
|
||||
|
||||
if username == USERNAME and password == PASSWORD:
|
||||
user = User()
|
||||
login_user(user)
|
||||
session.permanent = True
|
||||
return redirect(url_for("admin_panel"))
|
||||
|
||||
flash("Invalid email or password!", "danger")
|
||||
return render_template("login.html")
|
||||
|
||||
@@ -265,7 +195,6 @@ def logout():
|
||||
logout_user()
|
||||
return redirect(url_for("index"))
|
||||
|
||||
|
||||
@app.route('/admin')
|
||||
@login_required
|
||||
def admin_panel():
|
||||
@@ -278,10 +207,11 @@ def admin_panel():
|
||||
def new_blog():
|
||||
if request.method == 'POST':
|
||||
title = request.form['title']
|
||||
content = request.form['content'].replace("\n", "<br>")
|
||||
content = request.form['content']
|
||||
category = request.form['category']
|
||||
tags = request.form['tags']
|
||||
pinned = 'pinned' in request.form
|
||||
is_draft = request.form.get('action') == 'draft'
|
||||
images = request.files.getlist('images')
|
||||
|
||||
slug = slugify(title)
|
||||
@@ -290,17 +220,13 @@ def new_blog():
|
||||
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
||||
|
||||
new_post = BlogPost(
|
||||
title=title,
|
||||
slug=slug,
|
||||
content=content,
|
||||
category=category,
|
||||
tags=tags,
|
||||
pinned=pinned,
|
||||
images=json.dumps(image_filenames)
|
||||
title=title, slug=slug, content=content,
|
||||
category=category, tags=tags, pinned=pinned,
|
||||
images=json.dumps(image_filenames), draft=is_draft
|
||||
)
|
||||
db.session.add(new_post)
|
||||
db.session.commit()
|
||||
return redirect(url_for('blog'))
|
||||
return redirect(url_for('view_blog', slug=slug))
|
||||
|
||||
return render_template('new_blog.html')
|
||||
|
||||
@@ -309,10 +235,11 @@ def new_blog():
|
||||
def new_project():
|
||||
if request.method == 'POST':
|
||||
title = request.form['title']
|
||||
content = request.form['content'].replace("\n", "<br>")
|
||||
content = request.form['content']
|
||||
category = request.form['category']
|
||||
tags = request.form['tags']
|
||||
pinned = 'pinned' in request.form
|
||||
is_draft = request.form.get('action') == 'draft'
|
||||
images = request.files.getlist('images')
|
||||
|
||||
slug = slugify(title)
|
||||
@@ -321,17 +248,13 @@ def new_project():
|
||||
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
||||
|
||||
new_post = ProjectPost(
|
||||
title=title,
|
||||
slug=slug,
|
||||
content=content,
|
||||
category=category,
|
||||
tags=tags,
|
||||
pinned=pinned,
|
||||
images=json.dumps(image_filenames)
|
||||
title=title, slug=slug, content=content,
|
||||
category=category, tags=tags, pinned=pinned,
|
||||
images=json.dumps(image_filenames), draft=is_draft
|
||||
)
|
||||
db.session.add(new_post)
|
||||
db.session.commit()
|
||||
return redirect(url_for('projects'))
|
||||
return redirect(url_for('view_project', slug=slug))
|
||||
|
||||
return render_template('new_project.html')
|
||||
|
||||
@@ -339,18 +262,16 @@ def new_project():
|
||||
@login_required
|
||||
def edit_blog(slug):
|
||||
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
|
||||
|
||||
if request.method == 'POST':
|
||||
blogpost.title = request.form['title']
|
||||
blogpost.slug = slugify(blogpost.title)
|
||||
blogpost.content = request.form['content'].replace("\n", "<br>")
|
||||
blogpost.content = request.form['content']
|
||||
blogpost.category = request.form['category']
|
||||
blogpost.tags = request.form['tags']
|
||||
blogpost.pinned = 'pinned' in request.form
|
||||
|
||||
images = request.files.getlist('images')
|
||||
image_filenames = json.loads(blogpost.images) if blogpost.images else []
|
||||
|
||||
for image in images:
|
||||
if image.filename:
|
||||
filename = secure_filename(image.filename)
|
||||
@@ -367,18 +288,16 @@ def edit_blog(slug):
|
||||
@login_required
|
||||
def edit_project(slug):
|
||||
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
|
||||
|
||||
if request.method == 'POST':
|
||||
projectpost.title = request.form['title']
|
||||
projectpost.slug = slugify(projectpost.title)
|
||||
projectpost.content = request.form['content'].replace("\n", "<br>")
|
||||
projectpost.content = request.form['content']
|
||||
projectpost.category = request.form['category']
|
||||
projectpost.tags = request.form['tags']
|
||||
projectpost.pinned = 'pinned' in request.form
|
||||
|
||||
images = request.files.getlist('images')
|
||||
image_filenames = json.loads(projectpost.images) if projectpost.images else []
|
||||
|
||||
for image in images:
|
||||
if image.filename:
|
||||
filename = secure_filename(image.filename)
|
||||
@@ -419,36 +338,56 @@ def delete_project(slug):
|
||||
|
||||
@app.route('/blogtag/<tag>')
|
||||
def view_tag(tag):
|
||||
tag_lower = tag.lower() # Normalize case
|
||||
tag_lower = tag.lower()
|
||||
blogpost = BlogPost.query.filter(func.lower(BlogPost.tags).contains(tag_lower)).order_by(BlogPost.id.desc()).all()
|
||||
return render_template('blog_tag_results.html', blogpost=blogpost, tag=tag)
|
||||
|
||||
@app.route('/projecttag/<tag>')
|
||||
def view_project_tag(tag):
|
||||
tag_lower = tag.lower() # Normalize case
|
||||
tag_lower = tag.lower()
|
||||
projectpost = ProjectPost.query.filter(func.lower(ProjectPost.tags).contains(tag_lower)).order_by(ProjectPost.id.desc()).all()
|
||||
return render_template('project_tag_results.html', projectpost=projectpost, tag=tag)
|
||||
|
||||
@app.route('/blog')
|
||||
def blog():
|
||||
blogpost = BlogPost.query.order_by(BlogPost.id.desc()).all()
|
||||
blogpost = BlogPost.query.filter_by(draft=False).order_by(BlogPost.id.desc()).all()
|
||||
return render_template('blog.html', blogpost=blogpost)
|
||||
|
||||
@app.route('/projects')
|
||||
def projects():
|
||||
projectpost = ProjectPost.query.order_by(ProjectPost.id.desc()).all()
|
||||
projectpost = ProjectPost.query.filter_by(draft=False).order_by(ProjectPost.id.desc()).all()
|
||||
return render_template('project.html', projectpost=projectpost)
|
||||
|
||||
@app.route('/blog/<slug>')
|
||||
def view_blog(slug):
|
||||
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
|
||||
if blogpost.draft and not current_user.is_authenticated:
|
||||
abort(404)
|
||||
return render_template('view_blog.html', blogpost=blogpost)
|
||||
|
||||
@app.route('/project/<slug>')
|
||||
def view_project(slug):
|
||||
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
|
||||
if projectpost.draft and not current_user.is_authenticated:
|
||||
abort(404)
|
||||
return render_template('view_project.html', projectpost=projectpost)
|
||||
|
||||
@app.route('/publish-blog/<slug>', methods=['POST'])
|
||||
@login_required
|
||||
def publish_blog(slug):
|
||||
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
|
||||
blogpost.draft = False
|
||||
db.session.commit()
|
||||
return redirect(url_for('view_blog', slug=slug))
|
||||
|
||||
@app.route('/publish-project/<slug>', methods=['POST'])
|
||||
@login_required
|
||||
def publish_project(slug):
|
||||
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
|
||||
projectpost.draft = False
|
||||
db.session.commit()
|
||||
return redirect(url_for('view_project', slug=slug))
|
||||
|
||||
@app.route('/contact')
|
||||
def contact():
|
||||
return render_template('contact.html', CONTACT=CONTACT)
|
||||
@@ -465,32 +404,5 @@ def about():
|
||||
def index():
|
||||
return render_template('index.html')
|
||||
|
||||
# ---- patched helpers (appended) ----
|
||||
def _verify_hub_signature(raw: bytes) -> None:
|
||||
if not HUB_PUSH_SECRET:
|
||||
abort(403, description="Missing HUB_PUSH_SECRET")
|
||||
|
||||
# Accept both header names and both formats
|
||||
sig_hdr = (request.headers.get("X-Signature")
|
||||
or request.headers.get("X-Hub-Signature-256")
|
||||
or "").strip()
|
||||
|
||||
provided = sig_hdr
|
||||
if provided.lower().startswith("sha256="):
|
||||
provided = provided.split("=", 1)[1].strip()
|
||||
|
||||
# must be a 64-char hex, no whitespace
|
||||
if not provided or any(c.isspace() for c in provided) or len(provided) != 64:
|
||||
abort(403, description="Bad signature format")
|
||||
|
||||
expected = hmac.new(HUB_PUSH_SECRET.encode(), raw, hashlib.sha256).hexdigest()
|
||||
if not hmac.compare_digest(provided, expected):
|
||||
abort(403, description="Signature mismatch")
|
||||
|
||||
ts = request.headers.get("X-Timestamp")
|
||||
if ts and ts.isdigit() and abs(time.time() - int(ts)) > 300:
|
||||
abort(403, description="Stale request")
|
||||
|
||||
def _to_slug(text: str) -> str:
|
||||
s = slugify(text or "")
|
||||
return s or f"post-{int(time.time())}"
|
||||
if __name__ == '__main__':
|
||||
app.run(debug=True)
|
||||
|
||||
Reference in New Issue
Block a user