# app.py
from __future__ import annotations
import os, secrets, string, json as _json, csv
from datetime import datetime, timedelta
from io import StringIO
from typing import Optional
from flask import (
Flask, render_template_string, request, redirect, url_for, flash, jsonify,
make_response, abort
)
from flask_sqlalchemy import SQLAlchemy
from flask_login import (
LoginManager, UserMixin, login_user, current_user, login_required, logout_user
)
from werkzeug.security import generate_password_hash, check_password_hash
# CSRF + Rate Limiting
from flask_wtf import CSRFProtect
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
# =============================================================================
# Config
# =============================================================================
def getenv(name: str, default: Optional[str] = None) -> Optional[str]:
return os.environ.get(name, default)
DATABASE_URL = getenv("DATABASE_URL", "sqlite:///rbac.db")
SECRET_KEY = getenv("APP_SECRET_KEY", "dev_change_me")
APP_BRAND = getenv("APP_BRAND", "Streetside Canyon")
# First-run bootstrap admin (used only if there are no users in DB)
BOOTSTRAP_ADMIN_USER = getenv("DEFAULT_ADMIN_USERNAME", "admin")
BOOTSTRAP_ADMIN_PASS = getenv("DEFAULT_ADMIN_PASSWORD", "change_me_now")
# =============================================================================
# App + DB + Login
# =============================================================================
app = Flask(__name__)
app.config.update(
SECRET_KEY=SECRET_KEY,
SQLALCHEMY_DATABASE_URI=DATABASE_URL,
SQLALCHEMY_TRACK_MODIFICATIONS=False,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE="Lax",
SESSION_COOKIE_SECURE=True, # assumes HTTPS (you run behind Nginx)
REMEMBER_COOKIE_SECURE=True,
REMEMBER_COOKIE_HTTPONLY=True,
PERMANENT_SESSION_LIFETIME=60 * 60 * 24 * 14, # 14 days in seconds
)
# --- Freeze-safe config ---
app.config.update(
FREEZER_FLAT_URLS=True,
FREEZER_REMOVE_EXTRA_FILES=True,
FREEZER_IGNORE_MIMETYPE_WARNINGS=True
)
db = SQLAlchemy(app)
login_manager = LoginManager(app)
login_manager.login_view = "login"
login_manager.session_protection = "strong"
# CSRF + Rate limit
csrf = CSRFProtect(app)
limiter = Limiter(key_func=get_remote_address, app=app, default_limits=["200/day", "50/hour"])
# =============================================================================
# Security headers
# =============================================================================
@app.after_request
def add_security_headers(resp):
csp = (
"default-src 'self'; "
"style-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com; "
"script-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com; "
"img-src 'self' data:; font-src 'self' data:; "
"base-uri 'none'; frame-ancestors 'none'; form-action 'self'"
)
resp.headers.setdefault("Content-Security-Policy", csp)
resp.headers.setdefault("X-Frame-Options", "DENY")
resp.headers.setdefault("X-Content-Type-Options", "nosniff")
resp.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin")
return resp
# =============================================================================
# Models
# =============================================================================
class User(UserMixin, db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(255), nullable=False)
# 'admin' or 'member'
role = db.Column(db.String(16), nullable=False, default="member", index=True)
is_active = db.Column(db.Boolean, nullable=False, default=True)
# Force password change on first login (or after admin reset)
must_change_password = db.Column(db.Boolean, nullable=False, default=True)
created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, index=True)
def get_id(self):
return str(self.id)
def set_password(self, raw: str) -> None:
self.password_hash = generate_password_hash(raw)
def check_password(self, raw: str) -> bool:
return check_password_hash(self.password_hash, raw)
def is_admin(self) -> bool:
return self.role == "admin"
class AvailabilityWeekly(db.Model):
__tablename__ = "availability_weekly"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), unique=True, nullable=False, index=True)
# JSON shape: {"mon":{"avail":true,"start":"08:00","end":"17:00"}, ...}
data_json = db.Column(db.Text, nullable=False, default="{}")
updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, index=True)
user = db.relationship("User", backref=db.backref("availability_weekly", uselist=False))
class TimeOffRequest(db.Model):
__tablename__ = "timeoff_requests"
__table_args__ = (
db.Index("ix_timeoff_user_status", "user_id", "status"),
db.Index("ix_timeoff_created_at", "created_at"),
db.Index("ix_timeoff_date", "date"),
)
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True)
date = db.Column(db.String(10), nullable=False) # "YYYY-MM-DD"
note = db.Column(db.String(240))
status = db.Column(db.String(16), nullable=False, default="pending", index=True) # pending|approved|denied|cancelled
created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, index=True)
decided_at = db.Column(db.DateTime)
user = db.relationship("User", backref=db.backref("timeoff_requests", lazy="dynamic"))
class Wishlist(db.Model):
__tablename__ = "wishlists"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), unique=True, nullable=False, index=True)
wishlist = db.Column(db.Text, nullable=False)
updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, index=True)
user = db.relationship("User", backref=db.backref("wishlist", uselist=False))
class SecretSantaEntry(db.Model):
__tablename__ = "secret_santa_entries"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), unique=True, nullable=False, index=True)
# Explicit questions
full_name = db.Column(db.String(120), nullable=False) # “Name (First and Last)”
age = db.Column(db.Integer) # Age
birthday = db.Column(db.String(10)) # YYYY-MM-DD (stored as text for SQLite simplicity)
hobbies = db.Column(db.Text) # List of hobbies
gift_card = db.Column(db.String(120)) # Favorite gift card
fav_movie = db.Column(db.String(120)) # Favorite type of movie
jewelry = db.Column(db.Boolean, nullable=False, default=False) # Yes/No
created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, index=True)
updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, index=True)
user = db.relationship("User", backref=db.backref("secret_santa", uselist=False))
class InfoContact(db.Model):
__tablename__ = "info_contacts"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(120), nullable=False, index=True)
role = db.Column(db.String(120))
phone = db.Column(db.String(32))
priority = db.Column(db.Integer, default=5, index=True) # 1 = top
team = db.Column(db.String(64), default="Streetside") # e.g., "Streetside"
is_active = db.Column(db.Boolean, default=True, index=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
class DeptExtension(db.Model):
__tablename__ = "dept_extensions"
id = db.Column(db.Integer, primary_key=True)
ext = db.Column(db.String(16), nullable=False, index=True)
dept = db.Column(db.String(120), nullable=False, index=True)
is_active = db.Column(db.Boolean, default=True, index=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
class SupportItem(db.Model):
"""
Generic support card: Guest Services, eCommerce Team, IT, etc.
"""
__tablename__ = "support_items"
id = db.Column(db.Integer, primary_key=True)
category = db.Column(db.String(64), nullable=False) # "Guest Services", "IT", "Ecommerce"
email = db.Column(db.String(160))
phone = db.Column(db.String(64))
note = db.Column(db.Text) # free-text note / disclaimer
issues_json = db.Column(db.Text, default="[]") # JSON list of bullet items
audience = db.Column(db.String(16), default="all") # "all" | "admin"
is_active = db.Column(db.Boolean, default=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def issues(self):
try:
return _json.loads(self.issues_json or "[]")
except Exception:
return []
class LocalSecret(db.Model):
"""
Admin-only quick refs (register login, local device passwords, etc.)
"""
__tablename__ = "local_secrets"
id = db.Column(db.Integer, primary_key=True)
label = db.Column(db.String(160), nullable=False) # e.g. "Register Login"
value = db.Column(db.String(400), nullable=False) # e.g. "#291 / 0000"
notes = db.Column(db.Text)
is_active = db.Column(db.Boolean, default=True)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
@login_manager.user_loader
def load_user(user_id: str) -> Optional[User]:
try:
return db.session.get(User, int(user_id)) # type: ignore[arg-type]
except Exception:
return None
# =============================================================================
# Bootstrap (first run) — Flask 3 safe
# =============================================================================
def ensure_bootstrap_admin():
db.create_all()
if not User.query.first():
admin = User(
username=(BOOTSTRAP_ADMIN_USER or "admin").strip(),
role="admin",
is_active=True,
must_change_password=True,
)
admin.set_password((BOOTSTRAP_ADMIN_PASS or "change_me_now").strip())
db.session.add(admin)
db.session.commit()
app.logger.warning(
"Bootstrapped admin user '%s' (must change password on first login).",
admin.username,
)
with app.app_context():
ensure_bootstrap_admin()
db.create_all() # make sure new tables exist
def seed_store_info():
"""
One-time seed using the info Ben provided.
Safe to call repeatedly: it inserts only when each table is empty.
"""
any_inserted = False
# -----------------------------
# Contacts (priority: 1 = top)
# -----------------------------
if InfoContact.query.count() == 0:
contacts = [
{"name": "Ben", "role": None, "phone": "(806) 395-6770", "priority": 1},
{"name": "Terri A", "role": None, "phone": "(702) 776-1791", "priority": 2},
{"name": "Shireen", "role": None, "phone": "(806) 231-4024", "priority": 3},
{"name": "Charles", "role": None, "phone": "(806) 476-8101", "priority": 4},
{"name": "Matt", "role": None, "phone": "(806) 443-2069", "priority": 5},
{"name": "Navayah", "role": None, "phone": "(806) 729-2385", "priority": 6},
{"name": "Jaydaci", "role": None, "phone": "(806) 433-1715", "priority": 7},
{"name": "Diego", "role": None, "phone": "(620) 621-4402", "priority": 8},
{"name": "Kaley", "role": None, "phone": "(806) 292-9060", "priority": 9},
{"name": "Maika", "role": None, "phone": "(806) 557-8154", "priority": 10},
{"name": "Jacob", "role": None, "phone": "(806) 476-9724", "priority": 11},
{"name": "Tammy", "role": None, "phone": "(806) 523-5935", "priority": 12},
{"name": "Kathleen", "role": None, "phone": "(806) 420-9218", "priority": 13},
]
db.session.bulk_save_objects([InfoContact(**c) for c in contacts])
any_inserted = True
# --------------------------------
# Department extensions directory
# --------------------------------
if DeptExtension.query.count() == 0:
exts = [
("532000", "Service Counter"),
("532005", "Produce"),
("532006", "Market"),
("532106", "Market Office"),
("532007", "Bakery 1"),
("532107", "Bakery 2"),
("532012", "Food Service"),
("532013", "Pharmacy 1"),
("532113", "Pharmacy 2"),
("532213", "Pharmacy 3"),
("532313", "Pharmacy 4"),
("532413", "Pharmacy Office"),
("532014", "Dairy"),
("532018", "Fuel (UE)"),
("532373", "DSD"),
("532200", "Store Office"),
("532500", "Cash Office"),
("532600", "Price Coordinator"),
("532899", "Guest (Lobby)"),
("532950", "Streetside"),
]
db.session.bulk_save_objects([DeptExtension(ext=e, dept=d) for e, d in exts])
any_inserted = True
# -----------------------------
# Support / escalation cards
# -----------------------------
if SupportItem.query.count() == 0:
# Guest Services (primary escalation path)
guest_services_issues = [
"Billing Issues",
"Delivery Creations",
"Missing Loyalty Number",
"Waiting for the POS Result",
"Flybuy Activation",
"Order Status Updates",
"Refund Status",
"App/Web Issues",
"Promo Questions",
"Delivery Questions",
"Adding Rewards",
"Constant Export Failing",
"Winshop Not Working",
"Storefront issues (e.g., wrong prices/descriptions, discontinued items)"
]
db.session.add(SupportItem(
category="Guest Services",
email="guestservices@unitedtexas.com",
phone="855-762-7880",
note="Guest Services should be your first step. The address streetsidehelp@unitedtexas.com is no longer active.",
issues_json=_json.dumps(guest_services_issues),
audience="all",
is_active=True,
))
# Ecommerce Team
ecommerce_resp = [
"Equipment needs: Scales, Walkies, Thermo Guns, etc.",
"Marketing materials needs",
"Slot closures — contact SD and RVP first"
]
db.session.add(SupportItem(
category="Ecommerce Team",
email="team_ecommerce@unitedtexas.com",
phone=None,
note=None,
issues_json=_json.dumps(ecommerce_resp),
audience="all",
is_active=True,
))
# IT Support
it_issues = [
"Issues with POS",
"Issues with handhelds or label printer",
"Needing handhelds",
"Troubles logging into the computer or email"
]
db.session.add(SupportItem(
category="IT Support",
email="support@unitedtexas.com",
phone="806-791-8181 Option 1",
note=None,
issues_json=_json.dumps(it_issues),
audience="all",
is_active=True,
))
any_inserted = True
# -----------------------------
# Admin-only quick notes
# -----------------------------
if LocalSecret.query.count() == 0:
secrets = [
{
"label": "Register Login",
"value": "#291 / 0000",
"notes": "Sign off register at night — DO NOT SHUT OFF.",
},
{
"label": "Computer Password",
"value": "532532",
"notes": "Closing: turn off heater/fan if on.",
},
{
"label": "Phone Password",
"value": "1111",
"notes": None,
},
{
"label": "Bottle Deposit Returns (Volleman Glass empty)",
"value": "13573",
"notes": None,
},
]
db.session.bulk_save_objects([LocalSecret(**s) for s in secrets])
any_inserted = True
if any_inserted:
db.session.commit()
app.logger.warning("Seeded Store Info (contacts/extensions/support/secrets).")
else:
app.logger.info("Store Info seed skipped: tables already contain data.")
with app.app_context():
ensure_bootstrap_admin()
db.create_all()
seed_store_info()
# =============================================================================
# Role guard + utilities
# =============================================================================
from functools import wraps
def admin_required(f):
@wraps(f)
@login_required
def wrapper(*args, **kwargs):
if not current_user.is_admin():
flash("Admins only.", "error")
return redirect(url_for("dashboard"))
return f(*args, **kwargs)
return wrapper
def gen_temp_password(length: int = 14) -> str:
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for _ in range(length))
DAY_NAMES = [("mon","Mon"),("tue","Tue"),("wed","Wed"),("thu","Thu"),("fri","Fri"),("sat","Sat"),("sun","Sun")]
def _default_week():
return {k: {"avail": False, "start": "08:00", "end": "17:00"} for k,_ in DAY_NAMES}
def _parse_week_from_form(form):
out = {}
for key,_label in DAY_NAMES:
avail = (form.get(f"{key}_avail") == "on")
start = (form.get(f"{key}_start") or "08:00").strip()[:5]
end = (form.get(f"{key}_end") or "17:00").strip()[:5]
out[key] = {"avail": avail, "start": start, "end": end}
return out
def _validate_iso_date(d: str) -> bool:
try:
datetime.strptime(d, "%Y-%m-%d")
return True
except ValueError:
return False
# =============================================================================
# Base layout + render helper
# =============================================================================
TPL_BASE = """
{{ title or app_brand }}
{% with msgs = get_flashed_messages(with_categories=true) %}
{% if msgs %}
{% for cat,msg in msgs %}
{{ msg }}
{% endfor %}
{% endif %}
{% endwith %}
{{ content|safe }}
{% if current_user.is_authenticated %}
{% endif %}
"""
TPL_LOGIN_BODY = """
"""
# Dashboard with punchy, clickable tiles
TPL_DASHBOARD_BODY = """
Welcome, {{ user.username }}
Role: {{ user.role }}
"""
TPL_INFO_PAGE = """
Department / Store Info
{% if current_user.role == 'admin' %}
Admin Console
{% endif %}
Department Extensions
| Extension | Department |
{% for d in exts %}
| {{ d.ext }} | {{ d.dept }} |
{% endfor %}
Support & Escalation
{% for s in supports %}
{{ s.category }}
{% if s.email %}Email: {{ s.email }}
{% endif %}
{% if s.phone %}Phone: {{ s.phone }}
{% endif %}
{% if s.note %}{{ s.note }}
{% endif %}
{% if s.issues() %}
{% for it in s.issues() %}- {{ it }}
{% endfor %}
{% endif %}
{% endfor %}
{% if admin_secrets %}
Admin Quick Notes
Visible to admins only
{% for s in admin_secrets %}
{{ s.label }}
{{ s.value }}
{% if s.notes %}{{ s.notes }}
{% endif %}
{% endfor %}
{% endif %}
"""
TPL_ADMIN_INFO = """
Contacts
| Name | Role | Phone | Priority | Active | Action |
{% for c in contacts %}
| {{ c.name }} |
{{ c.role or '' }} |
{{ c.phone or '' }} |
{{ c.priority }} |
{{ 'yes' if c.is_active else 'no' }} |
|
{% endfor %}
Department Extensions
| Ext | Department | Active | Action |
{% for d in exts %}
| {{ d.ext }} |
{{ d.dept }} |
{{ 'yes' if d.is_active else 'no' }} |
|
{% endfor %}
Support & Escalation
{% for s in supports %}
{{ s.category }}
{{ s.audience }}
{% if s.email %}{{ s.email }}
{% endif %}
{% if s.phone %}{{ s.phone }}
{% endif %}
{% if s.note %}{{ s.note }}
{% endif %}
{% if s.issues() %}
{% for it in s.issues() %}- {{ it }}
{% endfor %}
{% endif %}
{% endfor %}
Admin Quick Notes (Secrets)
{% for s in secrets %}
{{ s.label }}
{{ s.value }}
{% if s.notes %}{{ s.notes }}
{% endif %}
{% endfor %}
"""
TPL_CHANGE_PASSWORD_BODY = """
Change Password
"""
# Member: Request Off page (simple, professional)
TPL_REQUEST_OFF_BODY = """
Your Requests
{% if my_reqs %}
| Date |
Status |
Note |
Requested |
Action |
{% for r in my_reqs %}
| {{ r.date }} |
{{ r.status }}
|
{{ r.note or '' }} |
{{ r.created_at.strftime('%Y-%m-%d %H:%M') if r.created_at else '' }}
|
{% if r.status == 'pending' %}
{% else %}
No action
{% endif %}
|
{% endfor %}
{% else %}
You don't have any requests yet.
{% endif %}
"""
# Admin: Users
TPL_ADMIN_USERS_BODY = """
"""
# Member: availability only
TPL_AVAIL_BODY = """
Your Weekly Availability
"""
# Admin: Team Availability grid (includes ✎ edit link)
TPL_ADMIN_AVAIL_BODY = """
| User |
{% for _k,lab in day_names %}{{ lab }} | {% endfor %}
{% for row in rows %}
|
|
{% for k,_lab in day_names %}
{% set d = row.week.get(k) %}
{% if d and d.avail %}
{{ d.start }}–{{ d.end }}
{% else %}
—
{% endif %}
|
{% endfor %}
{% endfor %}
"""
# Admin: Time-off requests moderation
TPL_ADMIN_REQS_BODY = """
| User | Date | Status | Note | Requested | Action |
{% for r in rows %}
| {{ r.user.username }} |
{{ r.date }} |
{{ r.status }} |
{{ r.note or '' }} |
{{ r.created_at.strftime('%Y-%m-%d %H:%M') }} |
{% if r.status == 'pending' %}
{% else %}—{% endif %}
|
{% endfor %}
"""
# Admin: Wishlists table
TPL_ADMIN_WISHLISTS_BODY = """
| User | Last Updated | Wishlist |
{% for w in rows %}
| {{ w.user.username }} |
{{ w.updated_at.strftime('%Y-%m-%d %H:%M') if w.updated_at else '' }} |
{{ w.wishlist }} |
{% endfor %}
"""
TPL_SECRET_SANTA_BODY = """
"""
# Admin: Secret Santa cards
TPL_ADMIN_SS_BODY = """
{% for e in rows %}
{{ e.full_name or '—' }}
{{ e.user.username }}
{{ 'Jewelry: Yes' if e.jewelry else 'Jewelry: No' }}
Birthday
{{ e.birthday or '—' }}
Favorite Gift Card
{{ e.gift_card or '—' }}
Favorite Type of Movie
{{ e.fav_movie or '—' }}
Hobbies
{{ e.hobbies or '—' }}
{% endfor %}
{% if not rows %}
No Secret Santa entries yet.
{% endif %}
"""
# Admin: Availability editor (NEW)
TPL_ADMIN_AVAIL_EDIT_BODY = """
{% if user %}
Weekly Pattern for: {{ user.username }}
{% endif %}
"""
def render_page(body_tpl: str, **ctx):
"""Render a body template into the base layout."""
body_html = render_template_string(body_tpl, **ctx)
title = ctx.get("title") or APP_BRAND
return render_template_string(TPL_BASE, title=title, content=body_html, app_brand=APP_BRAND)
# =============================================================================
# Routes: Auth + Dashboard
# =============================================================================
@app.route("/")
def index():
if current_user.is_authenticated:
return redirect(url_for("dashboard"))
return redirect(url_for("login"))
@limiter.limit("5/minute")
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
user = User.query.filter_by(username=username).first()
if not user or not user.is_active or not user.check_password(password):
flash("Invalid credentials.", "error")
return redirect(url_for("login"))
login_user(user, remember=True)
if user.must_change_password:
flash("Please set a new password to continue.", "ok")
return redirect(url_for("change_password"))
return redirect(url_for("dashboard"))
return render_page(TPL_LOGIN_BODY, title="Login")
@app.post("/logout")
@login_required
def logout():
logout_user()
flash("Signed out.", "ok")
return redirect(url_for("login"))
@app.route("/dashboard")
@login_required
def dashboard():
return render_page(TPL_DASHBOARD_BODY, title="Dashboard", user=current_user)
@app.route("/change-password", methods=["GET", "POST"])
@login_required
def change_password():
if request.method == "POST":
curr = request.form.get("current_password") or ""
new1 = request.form.get("new_password") or ""
new2 = request.form.get("confirm_password") or ""
if not current_user.check_password(curr):
flash("Current password is incorrect.", "error")
return redirect(url_for("change_password"))
if len(new1) < 10:
flash("Choose a stronger password (min 10 chars).", "error")
return redirect(url_for("change_password"))
if new1 != new2:
flash("Passwords do not match.", "error")
return redirect(url_for("change_password"))
current_user.set_password(new1)
current_user.must_change_password = False
db.session.commit()
flash("Password updated.", "ok")
return redirect(url_for("dashboard"))
return render_page(TPL_CHANGE_PASSWORD_BODY, title="Change Password", user=current_user)
# =============================================================================
# Secret Santa
# =============================================================================
@app.route("/secret-santa", methods=["GET", "POST"])
@login_required
def secret_santa():
entry = SecretSantaEntry.query.filter_by(user_id=current_user.id).first()
if request.method == "POST":
full_name = (request.form.get("full_name") or "").strip()
if not full_name:
flash("Name is required.", "error")
return redirect(url_for("secret_santa"))
age_raw = request.form.get("age")
birthday = (request.form.get("birthday") or "").strip()
hobbies = (request.form.get("hobbies") or "").strip()
gift_card = (request.form.get("gift_card") or "").strip()
fav_movie = (request.form.get("fav_movie") or "").strip()
jewelry = request.form.get("jewelry") == "yes"
try:
age = int(age_raw) if (age_raw or "").strip() else None
except ValueError:
age = None
if entry:
entry.full_name = full_name
entry.age = age
entry.birthday = birthday or None
entry.hobbies = hobbies
entry.gift_card = gift_card
entry.fav_movie = fav_movie
entry.jewelry = jewelry
else:
entry = SecretSantaEntry(
user_id=current_user.id,
full_name=full_name, age=age, birthday=birthday or None,
hobbies=hobbies, gift_card=gift_card, fav_movie=fav_movie,
jewelry=jewelry,
)
db.session.add(entry)
db.session.commit()
flash("Secret Santa saved.", "ok")
return redirect(url_for("secret_santa"))
form = {
"full_name": entry.full_name if entry else current_user.username,
"age": entry.age if entry else "",
"birthday": entry.birthday if entry else "",
"hobbies": entry.hobbies if entry else "",
"gift_card": entry.gift_card if entry else "",
"fav_movie": entry.fav_movie if entry else "",
"jewelry": bool(entry.jewelry) if (entry and entry.jewelry is not None) else False,
}
return render_page(TPL_SECRET_SANTA_BODY, title="Secret Santa", form=form)
@app.get("/admin/secret-santa")
@admin_required
def admin_secret_santa():
rows = (SecretSantaEntry.query
.join(User, SecretSantaEntry.user_id == User.id)
.order_by(User.username.asc())
.all())
return render_page(TPL_ADMIN_SS_BODY, title="Secret Santa", rows=rows)
# =============================================================================
# Request Off
# =============================================================================
@app.get("/request-off")
@login_required
def request_off():
my_reqs = (TimeOffRequest.query
.filter_by(user_id=current_user.id)
.order_by(TimeOffRequest.created_at.desc())
.all())
return render_page(TPL_REQUEST_OFF_BODY, title="Request Off", my_reqs=my_reqs)
@app.post("/requests/new")
@login_required
def requests_new():
date = (request.form.get("date") or "").strip()
note = (request.form.get("note") or "").strip()[:240]
if not date or not _validate_iso_date(date):
flash("Please choose a valid date (YYYY-MM-DD).", "error")
return redirect(url_for("request_off"))
r = TimeOffRequest(user_id=current_user.id, date=date, note=note, status="pending")
db.session.add(r); db.session.commit()
flash("Request submitted.", "ok")
return redirect(url_for("request_off"))
@app.post("/requests//cancel")
@login_required
def requests_cancel(req_id: int):
r = db.session.get(TimeOffRequest, req_id)
if not r or r.user_id != current_user.id:
flash("Not found.", "error"); return redirect(url_for("request_off"))
if r.status != "pending":
flash("Only pending requests can be cancelled.", "error"); return redirect(url_for("request_off"))
r.status = "cancelled"; r.decided_at = datetime.utcnow(); db.session.commit()
flash("Request cancelled.", "ok"); return redirect(url_for("request_off"))
# =============================================================================
# Availability (member self-service)
# =============================================================================
@app.route("/availability", methods=["GET", "POST"])
@login_required
def availability():
aw = AvailabilityWeekly.query.filter_by(user_id=current_user.id).first()
if request.method == "POST":
week = _parse_week_from_form(request.form)
if not aw:
aw = AvailabilityWeekly(user_id=current_user.id, data_json=_json.dumps(week))
db.session.add(aw)
else:
aw.data_json = _json.dumps(week)
db.session.commit()
flash("Availability saved.", "ok")
return redirect(url_for("availability"))
week = _default_week()
if aw and aw.data_json:
try:
data = _json.loads(aw.data_json) or {}
for k,_ in DAY_NAMES:
if k in data:
week[k] = {
"avail": bool(data[k].get("avail")),
"start": (data[k].get("start") or "08:00")[:5],
"end": (data[k].get("end") or "17:00")[:5],
}
except Exception as e:
app.logger.warning("Failed to parse availability JSON for user %s: %s", current_user.id, e)
return render_page(TPL_AVAIL_BODY, title="Your Availability",
day_names=DAY_NAMES, week=week)
# =============================================================================
# Admin: Availability grid + EDITOR (NEW)
# =============================================================================
@app.get("/admin/availability")
@admin_required
def admin_availability():
rows = []
users = User.query.filter_by(is_active=True).order_by(User.username.asc()).all()
for u in users:
aw = AvailabilityWeekly.query.filter_by(user_id=u.id).first()
wk = _default_week()
if aw and aw.data_json:
try:
data = _json.loads(aw.data_json) or {}
for k,_ in DAY_NAMES:
if k in data:
wk[k] = {
"avail": bool(data[k].get("avail")),
"start": (data[k].get("start") or "08:00")[:5],
"end": (data[k].get("end") or "17:00")[:5],
}
except Exception as e:
app.logger.warning("Failed to parse availability JSON for user %s: %s", u.id, e)
rows.append({"user_id": u.id, "username": u.username, "week": wk})
return render_page(TPL_ADMIN_AVAIL_BODY, title="Team Availability", rows=rows, day_names=DAY_NAMES)
@app.route("/admin/availability/edit", methods=["GET", "POST"])
@admin_required
def admin_availability_edit():
# Save for a specific user
if request.method == "POST":
try:
target_id = int(request.form.get("user_id") or "0")
except ValueError:
flash("Invalid user.", "error")
return redirect(url_for("admin_availability_edit"))
target = db.session.get(User, target_id)
if not target:
flash("User not found.", "error")
return redirect(url_for("admin_availability_edit"))
week = _parse_week_from_form(request.form)
aw = AvailabilityWeekly.query.filter_by(user_id=target.id).first()
if not aw:
aw = AvailabilityWeekly(user_id=target.id, data_json=_json.dumps(week))
db.session.add(aw)
else:
aw.data_json = _json.dumps(week)
db.session.commit()
flash(f"Availability saved for {target.username}.", "ok")
return redirect(url_for("admin_availability_edit", user_id=target.id))
# GET: pick a user then load their week
users = User.query.filter_by(is_active=True).order_by(User.username.asc()).all()
user_id = request.args.get("user_id", type=int)
user = db.session.get(User, user_id) if user_id else None
week = _default_week()
if user:
aw = AvailabilityWeekly.query.filter_by(user_id=user.id).first()
if aw and aw.data_json:
try:
data = _json.loads(aw.data_json) or {}
for k,_ in DAY_NAMES:
if k in data:
week[k] = {
"avail": bool(data[k].get("avail")),
"start": (data[k].get("start") or "08:00")[:5],
"end": (data[k].get("end") or "17:00")[:5],
}
except Exception as e:
app.logger.warning("Admin edit: bad JSON for user %s: %s", user.id, e)
return render_page(TPL_ADMIN_AVAIL_EDIT_BODY,
title="Edit Availability",
users=users, user=user, day_names=DAY_NAMES, week=week)
# =============================================================================
# Admin: Requests + Wishlists
# =============================================================================
@app.get("/admin/requests")
@admin_required
def admin_requests():
rows = (TimeOffRequest.query
.join(User, TimeOffRequest.user_id == User.id)
.order_by(TimeOffRequest.status.asc(), TimeOffRequest.created_at.desc())
.all())
return render_page(TPL_ADMIN_REQS_BODY, title="Time-off Requests", rows=rows)
@app.post("/admin/requests//")
@admin_required
def admin_requests_action(req_id: int, action: str):
r = db.session.get(TimeOffRequest, req_id)
if not r:
flash("Request not found.", "error"); return redirect(url_for("admin_requests"))
if r.status != "pending":
flash("Request is already decided.", "error"); return redirect(url_for("admin_requests"))
if action not in ("approve","deny"):
flash("Invalid action.", "error"); return redirect(url_for("admin_requests"))
r.status = "approved" if action == "approve" else "denied"
r.decided_at = datetime.utcnow()
db.session.commit()
flash(f"Request {action}d.", "ok"); return redirect(url_for("admin_requests"))
@app.get("/admin/wishlists")
@admin_required
def admin_wishlists():
rows = (Wishlist.query
.join(User, Wishlist.user_id == User.id)
.order_by(User.username.asc())
.all())
return render_page(TPL_ADMIN_WISHLISTS_BODY, title="Wishlists", rows=rows)
# =============================================================================
# Exports
# =============================================================================
@app.get("/admin/secret-santa/export.csv")
@admin_required
def admin_secret_santa_export():
rows = (SecretSantaEntry.query.join(User).order_by(User.username.asc()).all())
sio = StringIO()
w = csv.writer(sio)
w.writerow(["username","full_name","age","birthday","hobbies","gift_card","fav_movie","jewelry","updated_at"])
for e in rows:
w.writerow([
e.user.username, e.full_name, e.age or "", e.birthday or "", e.hobbies or "",
e.gift_card or "", e.fav_movie or "", "Yes" if e.jewelry else "No",
e.updated_at.isoformat() if e.updated_at else ""
])
resp = make_response(sio.getvalue())
resp.headers["Content-Type"] = "text/csv; charset=utf-8"
resp.headers["Content-Disposition"] = "attachment; filename=secret_santa.csv"
return resp
@app.get("/admin/requests/export.csv")
@admin_required
def admin_requests_export():
rows = TimeOffRequest.query.join(User).order_by(TimeOffRequest.created_at.desc()).all()
sio = StringIO()
w = csv.writer(sio)
w.writerow(["username","date","status","note","created_at","decided_at"])
for r in rows:
w.writerow([
r.user.username, r.date, r.status, r.note or "",
r.created_at.isoformat(),
r.decided_at.isoformat() if r.decided_at else ""
])
resp = make_response(sio.getvalue())
resp.headers["Content-Type"] = "text/csv; charset=utf-8"
resp.headers["Content-Disposition"] = "attachment; filename=time_off_requests.csv"
return resp
# =============================================================================
# Admin: Users CRUD
# =============================================================================
@app.get("/admin/users")
@admin_required
def admin_users():
users = User.query.order_by(User.created_at.desc()).all()
return render_page(TPL_ADMIN_USERS_BODY, title="Users", users=users)
@app.post("/admin/users/create")
@admin_required
def admin_users_create():
username = (request.form.get("username") or "").strip()
role = (request.form.get("role") or "member").strip().lower()
role = "admin" if role == "admin" else "member"
if not username:
flash("Username is required.", "error")
return redirect(url_for("admin_users"))
if User.query.filter_by(username=username).first():
flash("Username already exists.", "error")
return redirect(url_for("admin_users"))
temp_pw = gen_temp_password()
u = User(username=username, role=role, is_active=True, must_change_password=True)
u.set_password(temp_pw)
db.session.add(u)
db.session.commit()
flash(f"User '{username}' created with temporary password: {temp_pw}", "ok")
return redirect(url_for("admin_users"))
@app.post("/admin/users//reset")
@admin_required
def admin_users_reset(user_id: int):
u = db.session.get(User, user_id)
if not u:
flash("User not found.", "error")
return redirect(url_for("admin_users"))
temp_pw = gen_temp_password()
u.set_password(temp_pw)
u.must_change_password = True
db.session.commit()
flash(f"Password reset for '{u.username}'. New temp: {temp_pw}", "ok")
return redirect(url_for("admin_users"))
@app.post("/admin/users//role")
@admin_required
def admin_users_role(user_id: int):
u = db.session.get(User, user_id)
if not u:
flash("User not found.", "error")
return redirect(url_for("admin_users"))
new_role = (request.form.get("role") or "member").lower()
u.role = "admin" if new_role == "admin" else "member"
db.session.commit()
flash(f"Role updated for '{u.username}' → {u.role}", "ok")
return redirect(url_for("admin_users"))
@app.post("/admin/users//toggle")
@admin_required
def admin_users_toggle(user_id: int):
u = db.session.get(User, user_id)
if not u:
flash("User not found.", "error")
return redirect(url_for("admin_users"))
# Prevent locking yourself out of the last admin
if u.id == current_user.id and u.role == "admin":
admins = User.query.filter_by(role="admin", is_active=True).count()
if admins <= 1:
flash("You are the last active admin; cannot deactivate.", "error")
return redirect(url_for("admin_users"))
u.is_active = not u.is_active
db.session.commit()
flash(f"User '{u.username}' active={u.is_active}", "ok")
return redirect(url_for("admin_users"))
# =============================================================================
# JSON API + Health
# =============================================================================
@app.get("/api/me")
@login_required
def api_me():
u: User = current_user # type: ignore
return jsonify(
id=u.id, username=u.username, role=u.role,
must_change_password=u.must_change_password, is_active=u.is_active
)
# ---------------------------
# Info: user view
# ---------------------------
@app.get("/info")
@login_required
def info_page():
contacts = (InfoContact.query
.filter_by(is_active=True)
.order_by(InfoContact.priority.asc(), InfoContact.name.asc())
.all())
exts = (DeptExtension.query
.filter_by(is_active=True)
.order_by(DeptExtension.ext.asc())
.all())
supports = (SupportItem.query
.filter_by(is_active=True)
.filter((SupportItem.audience == "all") | (SupportItem.audience == ("admin" if current_user.is_admin() else "zzz")))
.order_by(SupportItem.category.asc())
.all())
admin_secrets = []
if current_user.is_admin():
admin_secrets = (LocalSecret.query
.filter_by(is_active=True)
.order_by(LocalSecret.updated_at.desc())
.all())
return render_page(TPL_INFO_PAGE, title="Store Info",
contacts=contacts, exts=exts, supports=supports, admin_secrets=admin_secrets)
# ---------------------------
# Info: admin console
# ---------------------------
@app.get("/admin/info")
@admin_required
def admin_info_console():
contacts = InfoContact.query.order_by(InfoContact.priority.asc(), InfoContact.name.asc()).all()
exts = DeptExtension.query.order_by(DeptExtension.ext.asc()).all()
supports = SupportItem.query.order_by(SupportItem.category.asc()).all()
secrets = LocalSecret.query.order_by(LocalSecret.updated_at.desc()).all()
return render_page(TPL_ADMIN_INFO, title="Info Admin",
contacts=contacts, exts=exts, supports=supports, secrets=secrets)
# ---- Contacts CRUD
@app.post("/admin/info/contacts/create")
@admin_required
def admin_info_contacts_create():
name = (request.form.get("name") or "").strip()
role = (request.form.get("role") or "").strip()
phone = (request.form.get("phone") or "").strip()
priority = request.form.get("priority", type=int) or 5
if not name:
flash("Name is required.", "error"); return redirect(url_for("admin_info_console"))
db.session.add(InfoContact(name=name, role=role, phone=phone, priority=max(1, min(priority, 9))))
db.session.commit()
flash("Contact added.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/contacts//toggle")
@admin_required
def admin_info_contacts_toggle(cid:int):
c = db.session.get(InfoContact, cid)
if not c: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
c.is_active = not c.is_active; db.session.commit()
flash("Contact updated.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/contacts//delete")
@admin_required
def admin_info_contacts_delete(cid:int):
c = db.session.get(InfoContact, cid)
if not c: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
db.session.delete(c); db.session.commit()
flash("Contact deleted.", "ok"); return redirect(url_for("admin_info_console"))
# ---- Dept Extensions CRUD
@app.post("/admin/info/exts/create")
@admin_required
def admin_info_exts_create():
ext = (request.form.get("ext") or "").strip()
dept = (request.form.get("dept") or "").strip()
if not ext or not dept:
flash("Extension and department are required.", "error"); return redirect(url_for("admin_info_console"))
db.session.add(DeptExtension(ext=ext, dept=dept))
db.session.commit()
flash("Extension added.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/exts//toggle")
@admin_required
def admin_info_exts_toggle(did:int):
d = db.session.get(DeptExtension, did)
if not d: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
d.is_active = not d.is_active; db.session.commit()
flash("Extension updated.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/exts//delete")
@admin_required
def admin_info_exts_delete(did:int):
d = db.session.get(DeptExtension, did)
if not d: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
db.session.delete(d); db.session.commit()
flash("Extension deleted.", "ok"); return redirect(url_for("admin_info_console"))
# ---- Support Items CRUD
@app.post("/admin/info/support/create")
@admin_required
def admin_info_support_create():
cat = (request.form.get("category") or "").strip()
email = (request.form.get("email") or "").strip()
phone = (request.form.get("phone") or "").strip()
note = (request.form.get("note") or "").strip()
admin_only = request.form.get("admin_only") == "on"
issues_text = (request.form.get("issues") or "").strip()
issues = [ln.strip() for ln in issues_text.splitlines() if ln.strip()]
if not cat:
flash("Category is required.", "error"); return redirect(url_for("admin_info_console"))
db.session.add(SupportItem(category=cat, email=email, phone=phone, note=note,
issues_json=_json.dumps(issues), audience=("admin" if admin_only else "all")))
db.session.commit()
flash("Support item added.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/support//toggle")
@admin_required
def admin_info_support_toggle(sid:int):
s = db.session.get(SupportItem, sid)
if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
s.is_active = not s.is_active; db.session.commit()
flash("Support item updated.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/support//delete")
@admin_required
def admin_info_support_delete(sid:int):
s = db.session.get(SupportItem, sid)
if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
db.session.delete(s); db.session.commit()
flash("Support item deleted.", "ok"); return redirect(url_for("admin_info_console"))
# ---- Secrets CRUD (admin-only)
@app.post("/admin/info/secret/create")
@admin_required
def admin_info_secret_create():
label = (request.form.get("label") or "").strip()
value = (request.form.get("value") or "").strip()
notes = (request.form.get("notes") or "").strip()
if not label or not value:
flash("Label and value are required.", "error"); return redirect(url_for("admin_info_console"))
db.session.add(LocalSecret(label=label, value=value, notes=notes))
db.session.commit()
flash("Secret added.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/secret//toggle")
@admin_required
def admin_info_secret_toggle(sid:int):
s = db.session.get(LocalSecret, sid)
if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
s.is_active = not s.is_active; db.session.commit()
flash("Secret updated.", "ok"); return redirect(url_for("admin_info_console"))
@app.post("/admin/info/secret//delete")
@admin_required
def admin_info_secret_delete(sid:int):
s = db.session.get(LocalSecret, sid)
if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console"))
db.session.delete(s); db.session.commit()
flash("Secret deleted.", "ok"); return redirect(url_for("admin_info_console"))
@app.get("/healthz")
def healthz():
return {"ok": True}, 200
# =============================================================================
# Error handlers
# =============================================================================
@app.errorhandler(403)
def error_403(e):
return render_page("Forbidden
You don't have access to this resource.
"), 403
@app.errorhandler(404)
def error_404(e):
return render_page("Not Found
We couldn't find what you were looking for.
"), 404
@app.errorhandler(500)
def error_500(e):
return render_page("Server Error
Something went wrong. Please try again.
"), 500
# =============================================================================
# Main
# =============================================================================
if __name__ == "__main__":
app.run(debug=True, port=5000)