409 lines
15 KiB
Python
409 lines
15 KiB
Python
from flask import Flask, request, redirect, url_for, render_template, session, flash, jsonify, abort
|
||
from flask_sqlalchemy import SQLAlchemy
|
||
from sqlalchemy import func
|
||
from dotenv import load_dotenv
|
||
import os
|
||
from werkzeug.utils import secure_filename
|
||
from flask_login import LoginManager, login_required, logout_user, UserMixin, login_user, current_user
|
||
from flask_mail import Mail, Message
|
||
from slugify import slugify
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
import hmac, hashlib, json, requests, time
|
||
import markdown as md_lib
|
||
|
||
load_dotenv(Path(__file__).with_name(".env"))
|
||
|
||
app = Flask(__name__, static_folder="static", static_url_path="/static")
|
||
|
||
HUB_PUSH_SECRET = os.getenv("HUB_PUSH_SECRET")
|
||
|
||
CONTACT = {
|
||
"name": os.environ.get("BH_CONTACT_NAME", "Benjamin Mosley"),
|
||
"title": os.environ.get("BH_CONTACT_TITLE", "E-Commerce Manager, United Supermarkets"),
|
||
"email": os.environ.get("BH_CONTACT_EMAIL", "ben@bennyshouse.net"),
|
||
"phone": os.environ.get("BH_CONTACT_PHONE", "(806) 655 2300"),
|
||
"city": os.environ.get("BH_CONTACT_CITY", "Canyon / Amarillo / Borger / Remote"),
|
||
"cal": os.environ.get("BH_CONTACT_CAL", "https://calendly.com/bennyshouse24/30min"),
|
||
"link": os.environ.get("BH_CONTACT_LINK", "https://www.linkedin.com/in/benjamin-mosley-849643329/"),
|
||
"site": os.environ.get("BH_CONTACT_SITE", "https://bennyshouse.net"),
|
||
"hours": os.environ.get("BH_CONTACT_HOURS", "Mon–Fri, 9a–5p CT"),
|
||
}
|
||
|
||
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///posts.db'
|
||
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
|
||
app.config['UPLOAD_FOLDER'] = 'static/uploads'
|
||
app.config['SECRET_KEY'] = os.getenv('FLASK_SECRET_KEY')
|
||
|
||
db = SQLAlchemy(app)
|
||
login_manager = LoginManager(app)
|
||
login_manager.login_view = 'login'
|
||
|
||
USERNAME = os.getenv('FLASK_LOGIN_USER')
|
||
PASSWORD = os.getenv('FLASK_LOGIN_PASSWORD')
|
||
|
||
if not os.path.exists(app.config['UPLOAD_FOLDER']):
|
||
os.makedirs(app.config['UPLOAD_FOLDER'])
|
||
|
||
class User(UserMixin):
|
||
id = 1
|
||
|
||
@login_manager.user_loader
|
||
def load_user(user_id):
|
||
if user_id == "1":
|
||
return User()
|
||
return None
|
||
|
||
@app.context_processor
|
||
def inject_now():
|
||
return {"now": datetime.now}
|
||
|
||
class BlogPost(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
title = db.Column(db.String(255), nullable=False)
|
||
slug = db.Column(db.String(255), nullable=False)
|
||
content = db.Column(db.Text, nullable=False)
|
||
images = db.Column(db.Text, nullable=True)
|
||
category = db.Column(db.String(100), nullable=True)
|
||
tags = db.Column(db.Text, nullable=True)
|
||
pinned = db.Column(db.Boolean, default=False)
|
||
draft = db.Column(db.Boolean, default=False)
|
||
|
||
class ProjectPost(db.Model):
|
||
id = db.Column(db.Integer, primary_key=True)
|
||
title = db.Column(db.String(255), nullable=False)
|
||
slug = db.Column(db.String(255), nullable=False)
|
||
content = db.Column(db.Text, nullable=False)
|
||
images = db.Column(db.Text, nullable=True)
|
||
category = db.Column(db.String(100), nullable=True)
|
||
tags = db.Column(db.Text, nullable=True)
|
||
pinned = db.Column(db.Boolean, default=False)
|
||
draft = db.Column(db.Boolean, default=False)
|
||
|
||
with app.app_context():
|
||
db.create_all()
|
||
with db.engine.connect() as conn:
|
||
for table in ('blog_post', 'project_post'):
|
||
try:
|
||
conn.execute(db.text(f'ALTER TABLE {table} ADD COLUMN draft BOOLEAN NOT NULL DEFAULT 0'))
|
||
conn.commit()
|
||
except Exception:
|
||
pass
|
||
|
||
@app.template_filter('from_json')
|
||
def from_json(value):
|
||
try:
|
||
return json.loads(value)
|
||
except (TypeError, json.JSONDecodeError):
|
||
return []
|
||
|
||
@app.template_filter('markdown')
|
||
def render_markdown(value):
|
||
return md_lib.markdown(value or '', extensions=['fenced_code', 'tables', 'nl2br'])
|
||
|
||
def push_to_domain(push_url: str, api_secret: str, payload: dict):
|
||
body = json.dumps(payload, separators=(',', ':'), ensure_ascii=False).encode('utf-8')
|
||
sig = hmac.new(api_secret.encode('utf-8'), body, hashlib.sha256).hexdigest()
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"X-Hub-Signature-256": f"sha256={sig}",
|
||
}
|
||
return requests.post(push_url, headers=headers, data=body, timeout=15)
|
||
|
||
def _verify_hub_signature(raw: bytes) -> None:
|
||
if not HUB_PUSH_SECRET:
|
||
abort(403, description="Missing HUB_PUSH_SECRET")
|
||
|
||
sig_hdr = (request.headers.get("X-Signature")
|
||
or request.headers.get("X-Hub-Signature-256")
|
||
or "").strip()
|
||
|
||
provided = sig_hdr
|
||
if provided.lower().startswith("sha256="):
|
||
provided = provided.split("=", 1)[1].strip()
|
||
|
||
if not provided or any(c.isspace() for c in provided) or len(provided) != 64:
|
||
abort(403, description="Bad signature format")
|
||
|
||
expected = hmac.new(HUB_PUSH_SECRET.encode(), raw, hashlib.sha256).hexdigest()
|
||
if not hmac.compare_digest(provided, expected):
|
||
abort(403, description="Signature mismatch")
|
||
|
||
ts = request.headers.get("X-Timestamp")
|
||
if ts and ts.isdigit() and abs(time.time() - int(ts)) > 300:
|
||
abort(403, description="Stale request")
|
||
|
||
def _to_slug(text: str) -> str:
|
||
s = slugify(text or "")
|
||
return s or f"post-{int(time.time())}"
|
||
|
||
@app.post("/api/v1/publish")
|
||
def api_publish():
|
||
raw = request.get_data(cache=False)
|
||
_verify_hub_signature(raw)
|
||
try:
|
||
data = json.loads(raw.decode("utf-8"))
|
||
except Exception:
|
||
abort(400, description="Invalid JSON")
|
||
|
||
kind = (data.get("kind") or "blog").lower()
|
||
title = data.get("title") or "Untitled"
|
||
content_html = data.get("content_html") or ""
|
||
content_text = data.get("content_text") or ""
|
||
tags = ",".join(data.get("tags") or [])
|
||
images = data.get("images") or []
|
||
pinned = bool(data.get("pinned"))
|
||
external_id = data.get("external_id")
|
||
slug = _to_slug(external_id or title)
|
||
|
||
body = content_html or content_text.replace("\n", "<br>")
|
||
images_json = json.dumps(images) if images else None
|
||
|
||
if kind == "project":
|
||
obj = ProjectPost(
|
||
title=title, slug=slug, content=body,
|
||
category=None, tags=tags, pinned=pinned, images=images_json
|
||
)
|
||
else:
|
||
obj = BlogPost(
|
||
title=title, slug=slug, content=body,
|
||
category=None, tags=tags, pinned=pinned, images=images_json
|
||
)
|
||
|
||
db.session.add(obj)
|
||
db.session.commit()
|
||
|
||
url = url_for("view_project" if kind == "project" else "view_blog", slug=slug, _external=True)
|
||
return jsonify({"ok": True, "url": url, "id": obj.id, "slug": slug})
|
||
|
||
@app.route("/login", methods=["GET", "POST"])
|
||
def login():
|
||
if request.method == "POST":
|
||
username = request.form["username"]
|
||
password = request.form["password"]
|
||
if username == USERNAME and password == PASSWORD:
|
||
user = User()
|
||
login_user(user)
|
||
session.permanent = True
|
||
return redirect(url_for("admin_panel"))
|
||
flash("Invalid email or password!", "danger")
|
||
return render_template("login.html")
|
||
|
||
@app.route("/logout")
|
||
@login_required
|
||
def logout():
|
||
logout_user()
|
||
return redirect(url_for("index"))
|
||
|
||
@app.route('/admin')
|
||
@login_required
|
||
def admin_panel():
|
||
blogpost = BlogPost.query.order_by(BlogPost.id.desc()).all()
|
||
projectpost = ProjectPost.query.order_by(ProjectPost.id.desc()).all()
|
||
return render_template('admin.html', blogpost=blogpost, projectpost=projectpost)
|
||
|
||
@app.route('/newblog', methods=['GET', 'POST'])
|
||
@login_required
|
||
def new_blog():
|
||
if request.method == 'POST':
|
||
title = request.form['title']
|
||
content = request.form['content']
|
||
category = request.form['category']
|
||
tags = request.form['tags']
|
||
pinned = 'pinned' in request.form
|
||
is_draft = request.form.get('action') == 'draft'
|
||
images = request.files.getlist('images')
|
||
|
||
slug = slugify(title)
|
||
image_filenames = [secure_filename(image.filename) for image in images if image.filename]
|
||
for image, filename in zip(images, image_filenames):
|
||
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
||
|
||
new_post = BlogPost(
|
||
title=title, slug=slug, content=content,
|
||
category=category, tags=tags, pinned=pinned,
|
||
images=json.dumps(image_filenames), draft=is_draft
|
||
)
|
||
db.session.add(new_post)
|
||
db.session.commit()
|
||
return redirect(url_for('view_blog', slug=slug))
|
||
|
||
return render_template('new_blog.html')
|
||
|
||
@app.route('/newproject', methods=['GET', 'POST'])
|
||
@login_required
|
||
def new_project():
|
||
if request.method == 'POST':
|
||
title = request.form['title']
|
||
content = request.form['content']
|
||
category = request.form['category']
|
||
tags = request.form['tags']
|
||
pinned = 'pinned' in request.form
|
||
is_draft = request.form.get('action') == 'draft'
|
||
images = request.files.getlist('images')
|
||
|
||
slug = slugify(title)
|
||
image_filenames = [secure_filename(image.filename) for image in images if image.filename]
|
||
for image, filename in zip(images, image_filenames):
|
||
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
||
|
||
new_post = ProjectPost(
|
||
title=title, slug=slug, content=content,
|
||
category=category, tags=tags, pinned=pinned,
|
||
images=json.dumps(image_filenames), draft=is_draft
|
||
)
|
||
db.session.add(new_post)
|
||
db.session.commit()
|
||
return redirect(url_for('view_project', slug=slug))
|
||
|
||
return render_template('new_project.html')
|
||
|
||
@app.route('/edit-blog/<slug>', methods=['GET', 'POST'])
|
||
@login_required
|
||
def edit_blog(slug):
|
||
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
|
||
if request.method == 'POST':
|
||
blogpost.title = request.form['title']
|
||
blogpost.slug = slugify(blogpost.title)
|
||
blogpost.content = request.form['content']
|
||
blogpost.category = request.form['category']
|
||
blogpost.tags = request.form['tags']
|
||
blogpost.pinned = 'pinned' in request.form
|
||
|
||
images = request.files.getlist('images')
|
||
image_filenames = json.loads(blogpost.images) if blogpost.images else []
|
||
for image in images:
|
||
if image.filename:
|
||
filename = secure_filename(image.filename)
|
||
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
||
image_filenames.append(filename)
|
||
|
||
blogpost.images = json.dumps(image_filenames)
|
||
db.session.commit()
|
||
return redirect(url_for('view_blog', slug=blogpost.slug))
|
||
|
||
return render_template('edit_blog.html', blogpost=blogpost)
|
||
|
||
@app.route('/edit-project/<slug>', methods=['GET', 'POST'])
|
||
@login_required
|
||
def edit_project(slug):
|
||
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
|
||
if request.method == 'POST':
|
||
projectpost.title = request.form['title']
|
||
projectpost.slug = slugify(projectpost.title)
|
||
projectpost.content = request.form['content']
|
||
projectpost.category = request.form['category']
|
||
projectpost.tags = request.form['tags']
|
||
projectpost.pinned = 'pinned' in request.form
|
||
|
||
images = request.files.getlist('images')
|
||
image_filenames = json.loads(projectpost.images) if projectpost.images else []
|
||
for image in images:
|
||
if image.filename:
|
||
filename = secure_filename(image.filename)
|
||
image.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
|
||
image_filenames.append(filename)
|
||
|
||
projectpost.images = json.dumps(image_filenames)
|
||
db.session.commit()
|
||
return redirect(url_for('view_project', slug=projectpost.slug))
|
||
|
||
return render_template('edit_project.html', projectpost=projectpost)
|
||
|
||
@app.route('/delete-blog/<slug>', methods=['POST'])
|
||
@login_required
|
||
def delete_blog(slug):
|
||
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
|
||
if blogpost.images:
|
||
for filename in json.loads(blogpost.images):
|
||
image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
||
if os.path.exists(image_path):
|
||
os.remove(image_path)
|
||
db.session.delete(blogpost)
|
||
db.session.commit()
|
||
return redirect(url_for('admin_panel'))
|
||
|
||
@app.route('/delete-project/<slug>', methods=['POST'])
|
||
@login_required
|
||
def delete_project(slug):
|
||
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
|
||
if projectpost.images:
|
||
for filename in json.loads(projectpost.images):
|
||
image_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
|
||
if os.path.exists(image_path):
|
||
os.remove(image_path)
|
||
db.session.delete(projectpost)
|
||
db.session.commit()
|
||
return redirect(url_for('admin_panel'))
|
||
|
||
@app.route('/blogtag/<tag>')
|
||
def view_tag(tag):
|
||
tag_lower = tag.lower()
|
||
blogpost = BlogPost.query.filter(func.lower(BlogPost.tags).contains(tag_lower)).order_by(BlogPost.id.desc()).all()
|
||
return render_template('blog_tag_results.html', blogpost=blogpost, tag=tag)
|
||
|
||
@app.route('/projecttag/<tag>')
|
||
def view_project_tag(tag):
|
||
tag_lower = tag.lower()
|
||
projectpost = ProjectPost.query.filter(func.lower(ProjectPost.tags).contains(tag_lower)).order_by(ProjectPost.id.desc()).all()
|
||
return render_template('project_tag_results.html', projectpost=projectpost, tag=tag)
|
||
|
||
@app.route('/blog')
|
||
def blog():
|
||
blogpost = BlogPost.query.filter_by(draft=False).order_by(BlogPost.id.desc()).all()
|
||
return render_template('blog.html', blogpost=blogpost)
|
||
|
||
@app.route('/projects')
|
||
def projects():
|
||
projectpost = ProjectPost.query.filter_by(draft=False).order_by(ProjectPost.id.desc()).all()
|
||
return render_template('project.html', projectpost=projectpost)
|
||
|
||
@app.route('/blog/<slug>')
|
||
def view_blog(slug):
|
||
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
|
||
if blogpost.draft and not current_user.is_authenticated:
|
||
abort(404)
|
||
return render_template('view_blog.html', blogpost=blogpost)
|
||
|
||
@app.route('/project/<slug>')
|
||
def view_project(slug):
|
||
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
|
||
if projectpost.draft and not current_user.is_authenticated:
|
||
abort(404)
|
||
return render_template('view_project.html', projectpost=projectpost)
|
||
|
||
@app.route('/publish-blog/<slug>', methods=['POST'])
|
||
@login_required
|
||
def publish_blog(slug):
|
||
blogpost = BlogPost.query.filter_by(slug=slug).first_or_404()
|
||
blogpost.draft = False
|
||
db.session.commit()
|
||
return redirect(url_for('view_blog', slug=slug))
|
||
|
||
@app.route('/publish-project/<slug>', methods=['POST'])
|
||
@login_required
|
||
def publish_project(slug):
|
||
projectpost = ProjectPost.query.filter_by(slug=slug).first_or_404()
|
||
projectpost.draft = False
|
||
db.session.commit()
|
||
return redirect(url_for('view_project', slug=slug))
|
||
|
||
@app.route('/contact')
|
||
def contact():
|
||
return render_template('contact.html', CONTACT=CONTACT)
|
||
|
||
@app.route('/resume')
|
||
def resume():
|
||
return render_template('resume.html')
|
||
|
||
@app.route('/about')
|
||
def about():
|
||
return render_template('about.html')
|
||
|
||
@app.route('/')
|
||
def index():
|
||
return render_template('index.html')
|
||
|
||
if __name__ == '__main__':
|
||
app.run(debug=True)
|