# app.py from __future__ import annotations import os, secrets, string, json as _json, csv from datetime import datetime, timedelta from io import StringIO from typing import Optional from flask import ( Flask, render_template_string, request, redirect, url_for, flash, jsonify, make_response, abort ) from flask_sqlalchemy import SQLAlchemy from flask_login import ( LoginManager, UserMixin, login_user, current_user, login_required, logout_user ) from werkzeug.security import generate_password_hash, check_password_hash # CSRF + Rate Limiting from flask_wtf import CSRFProtect from flask_limiter import Limiter from flask_limiter.util import get_remote_address # ============================================================================= # Config # ============================================================================= def getenv(name: str, default: Optional[str] = None) -> Optional[str]: return os.environ.get(name, default) DATABASE_URL = getenv("DATABASE_URL", "sqlite:///rbac.db") SECRET_KEY = getenv("APP_SECRET_KEY", "dev_change_me") APP_BRAND = getenv("APP_BRAND", "Streetside Canyon") # First-run bootstrap admin (used only if there are no users in DB) BOOTSTRAP_ADMIN_USER = getenv("DEFAULT_ADMIN_USERNAME", "admin") BOOTSTRAP_ADMIN_PASS = getenv("DEFAULT_ADMIN_PASSWORD", "change_me_now") # ============================================================================= # App + DB + Login # ============================================================================= app = Flask(__name__) app.config.update( SECRET_KEY=SECRET_KEY, SQLALCHEMY_DATABASE_URI=DATABASE_URL, SQLALCHEMY_TRACK_MODIFICATIONS=False, SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE="Lax", SESSION_COOKIE_SECURE=True, # assumes HTTPS (you run behind Nginx) REMEMBER_COOKIE_SECURE=True, REMEMBER_COOKIE_HTTPONLY=True, PERMANENT_SESSION_LIFETIME=60 * 60 * 24 * 14, # 14 days in seconds ) # --- Freeze-safe config --- app.config.update( FREEZER_FLAT_URLS=True, FREEZER_REMOVE_EXTRA_FILES=True, FREEZER_IGNORE_MIMETYPE_WARNINGS=True ) db = SQLAlchemy(app) login_manager = LoginManager(app) login_manager.login_view = "login" login_manager.session_protection = "strong" # CSRF + Rate limit csrf = CSRFProtect(app) limiter = Limiter(key_func=get_remote_address, app=app, default_limits=["200/day", "50/hour"]) # ============================================================================= # Security headers # ============================================================================= @app.after_request def add_security_headers(resp): csp = ( "default-src 'self'; " "style-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com; " "script-src 'self' 'unsafe-inline' https://cdn.tailwindcss.com; " "img-src 'self' data:; font-src 'self' data:; " "base-uri 'none'; frame-ancestors 'none'; form-action 'self'" ) resp.headers.setdefault("Content-Security-Policy", csp) resp.headers.setdefault("X-Frame-Options", "DENY") resp.headers.setdefault("X-Content-Type-Options", "nosniff") resp.headers.setdefault("Referrer-Policy", "strict-origin-when-cross-origin") return resp # ============================================================================= # Models # ============================================================================= class User(UserMixin, db.Model): __tablename__ = "users" id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False, index=True) password_hash = db.Column(db.String(255), nullable=False) # 'admin' or 'member' role = db.Column(db.String(16), nullable=False, default="member", index=True) is_active = db.Column(db.Boolean, nullable=False, default=True) # Force password change on first login (or after admin reset) must_change_password = db.Column(db.Boolean, nullable=False, default=True) created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, index=True) def get_id(self): return str(self.id) def set_password(self, raw: str) -> None: self.password_hash = generate_password_hash(raw) def check_password(self, raw: str) -> bool: return check_password_hash(self.password_hash, raw) def is_admin(self) -> bool: return self.role == "admin" class AvailabilityWeekly(db.Model): __tablename__ = "availability_weekly" id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey("users.id"), unique=True, nullable=False, index=True) # JSON shape: {"mon":{"avail":true,"start":"08:00","end":"17:00"}, ...} data_json = db.Column(db.Text, nullable=False, default="{}") updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, index=True) user = db.relationship("User", backref=db.backref("availability_weekly", uselist=False)) class TimeOffRequest(db.Model): __tablename__ = "timeoff_requests" __table_args__ = ( db.Index("ix_timeoff_user_status", "user_id", "status"), db.Index("ix_timeoff_created_at", "created_at"), db.Index("ix_timeoff_date", "date"), ) id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False, index=True) date = db.Column(db.String(10), nullable=False) # "YYYY-MM-DD" note = db.Column(db.String(240)) status = db.Column(db.String(16), nullable=False, default="pending", index=True) # pending|approved|denied|cancelled created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, index=True) decided_at = db.Column(db.DateTime) user = db.relationship("User", backref=db.backref("timeoff_requests", lazy="dynamic")) class Wishlist(db.Model): __tablename__ = "wishlists" id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey("users.id"), unique=True, nullable=False, index=True) wishlist = db.Column(db.Text, nullable=False) updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, index=True) user = db.relationship("User", backref=db.backref("wishlist", uselist=False)) class SecretSantaEntry(db.Model): __tablename__ = "secret_santa_entries" id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey("users.id"), unique=True, nullable=False, index=True) # Explicit questions full_name = db.Column(db.String(120), nullable=False) # “Name (First and Last)” age = db.Column(db.Integer) # Age birthday = db.Column(db.String(10)) # YYYY-MM-DD (stored as text for SQLite simplicity) hobbies = db.Column(db.Text) # List of hobbies gift_card = db.Column(db.String(120)) # Favorite gift card fav_movie = db.Column(db.String(120)) # Favorite type of movie jewelry = db.Column(db.Boolean, nullable=False, default=False) # Yes/No created_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, index=True) updated_at = db.Column(db.DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow, index=True) user = db.relationship("User", backref=db.backref("secret_santa", uselist=False)) class InfoContact(db.Model): __tablename__ = "info_contacts" id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(120), nullable=False, index=True) role = db.Column(db.String(120)) phone = db.Column(db.String(32)) priority = db.Column(db.Integer, default=5, index=True) # 1 = top team = db.Column(db.String(64), default="Streetside") # e.g., "Streetside" is_active = db.Column(db.Boolean, default=True, index=True) created_at = db.Column(db.DateTime, default=datetime.utcnow) class DeptExtension(db.Model): __tablename__ = "dept_extensions" id = db.Column(db.Integer, primary_key=True) ext = db.Column(db.String(16), nullable=False, index=True) dept = db.Column(db.String(120), nullable=False, index=True) is_active = db.Column(db.Boolean, default=True, index=True) created_at = db.Column(db.DateTime, default=datetime.utcnow) class SupportItem(db.Model): """ Generic support card: Guest Services, eCommerce Team, IT, etc. """ __tablename__ = "support_items" id = db.Column(db.Integer, primary_key=True) category = db.Column(db.String(64), nullable=False) # "Guest Services", "IT", "Ecommerce" email = db.Column(db.String(160)) phone = db.Column(db.String(64)) note = db.Column(db.Text) # free-text note / disclaimer issues_json = db.Column(db.Text, default="[]") # JSON list of bullet items audience = db.Column(db.String(16), default="all") # "all" | "admin" is_active = db.Column(db.Boolean, default=True) updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) def issues(self): try: return _json.loads(self.issues_json or "[]") except Exception: return [] class LocalSecret(db.Model): """ Admin-only quick refs (register login, local device passwords, etc.) """ __tablename__ = "local_secrets" id = db.Column(db.Integer, primary_key=True) label = db.Column(db.String(160), nullable=False) # e.g. "Register Login" value = db.Column(db.String(400), nullable=False) # e.g. "#291 / 0000" notes = db.Column(db.Text) is_active = db.Column(db.Boolean, default=True) updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) @login_manager.user_loader def load_user(user_id: str) -> Optional[User]: try: return db.session.get(User, int(user_id)) # type: ignore[arg-type] except Exception: return None # ============================================================================= # Bootstrap (first run) — Flask 3 safe # ============================================================================= def ensure_bootstrap_admin(): db.create_all() if not User.query.first(): admin = User( username=(BOOTSTRAP_ADMIN_USER or "admin").strip(), role="admin", is_active=True, must_change_password=True, ) admin.set_password((BOOTSTRAP_ADMIN_PASS or "change_me_now").strip()) db.session.add(admin) db.session.commit() app.logger.warning( "Bootstrapped admin user '%s' (must change password on first login).", admin.username, ) with app.app_context(): ensure_bootstrap_admin() db.create_all() # make sure new tables exist def seed_store_info(): """ One-time seed using the info Ben provided. Safe to call repeatedly: it inserts only when each table is empty. """ any_inserted = False # ----------------------------- # Contacts (priority: 1 = top) # ----------------------------- if InfoContact.query.count() == 0: contacts = [ {"name": "Ben", "role": None, "phone": "(806) 395-6770", "priority": 1}, {"name": "Terri A", "role": None, "phone": "(702) 776-1791", "priority": 2}, {"name": "Shireen", "role": None, "phone": "(806) 231-4024", "priority": 3}, {"name": "Charles", "role": None, "phone": "(806) 476-8101", "priority": 4}, {"name": "Matt", "role": None, "phone": "(806) 443-2069", "priority": 5}, {"name": "Navayah", "role": None, "phone": "(806) 729-2385", "priority": 6}, {"name": "Jaydaci", "role": None, "phone": "(806) 433-1715", "priority": 7}, {"name": "Diego", "role": None, "phone": "(620) 621-4402", "priority": 8}, {"name": "Kaley", "role": None, "phone": "(806) 292-9060", "priority": 9}, {"name": "Maika", "role": None, "phone": "(806) 557-8154", "priority": 10}, {"name": "Jacob", "role": None, "phone": "(806) 476-9724", "priority": 11}, {"name": "Tammy", "role": None, "phone": "(806) 523-5935", "priority": 12}, {"name": "Kathleen", "role": None, "phone": "(806) 420-9218", "priority": 13}, ] db.session.bulk_save_objects([InfoContact(**c) for c in contacts]) any_inserted = True # -------------------------------- # Department extensions directory # -------------------------------- if DeptExtension.query.count() == 0: exts = [ ("532000", "Service Counter"), ("532005", "Produce"), ("532006", "Market"), ("532106", "Market Office"), ("532007", "Bakery 1"), ("532107", "Bakery 2"), ("532012", "Food Service"), ("532013", "Pharmacy 1"), ("532113", "Pharmacy 2"), ("532213", "Pharmacy 3"), ("532313", "Pharmacy 4"), ("532413", "Pharmacy Office"), ("532014", "Dairy"), ("532018", "Fuel (UE)"), ("532373", "DSD"), ("532200", "Store Office"), ("532500", "Cash Office"), ("532600", "Price Coordinator"), ("532899", "Guest (Lobby)"), ("532950", "Streetside"), ] db.session.bulk_save_objects([DeptExtension(ext=e, dept=d) for e, d in exts]) any_inserted = True # ----------------------------- # Support / escalation cards # ----------------------------- if SupportItem.query.count() == 0: # Guest Services (primary escalation path) guest_services_issues = [ "Billing Issues", "Delivery Creations", "Missing Loyalty Number", "Waiting for the POS Result", "Flybuy Activation", "Order Status Updates", "Refund Status", "App/Web Issues", "Promo Questions", "Delivery Questions", "Adding Rewards", "Constant Export Failing", "Winshop Not Working", "Storefront issues (e.g., wrong prices/descriptions, discontinued items)" ] db.session.add(SupportItem( category="Guest Services", email="guestservices@unitedtexas.com", phone="855-762-7880", note="Guest Services should be your first step. The address streetsidehelp@unitedtexas.com is no longer active.", issues_json=_json.dumps(guest_services_issues), audience="all", is_active=True, )) # Ecommerce Team ecommerce_resp = [ "Equipment needs: Scales, Walkies, Thermo Guns, etc.", "Marketing materials needs", "Slot closures — contact SD and RVP first" ] db.session.add(SupportItem( category="Ecommerce Team", email="team_ecommerce@unitedtexas.com", phone=None, note=None, issues_json=_json.dumps(ecommerce_resp), audience="all", is_active=True, )) # IT Support it_issues = [ "Issues with POS", "Issues with handhelds or label printer", "Needing handhelds", "Troubles logging into the computer or email" ] db.session.add(SupportItem( category="IT Support", email="support@unitedtexas.com", phone="806-791-8181 Option 1", note=None, issues_json=_json.dumps(it_issues), audience="all", is_active=True, )) any_inserted = True # ----------------------------- # Admin-only quick notes # ----------------------------- if LocalSecret.query.count() == 0: secrets = [ { "label": "Register Login", "value": "#291 / 0000", "notes": "Sign off register at night — DO NOT SHUT OFF.", }, { "label": "Computer Password", "value": "532532", "notes": "Closing: turn off heater/fan if on.", }, { "label": "Phone Password", "value": "1111", "notes": None, }, { "label": "Bottle Deposit Returns (Volleman Glass empty)", "value": "13573", "notes": None, }, ] db.session.bulk_save_objects([LocalSecret(**s) for s in secrets]) any_inserted = True if any_inserted: db.session.commit() app.logger.warning("Seeded Store Info (contacts/extensions/support/secrets).") else: app.logger.info("Store Info seed skipped: tables already contain data.") with app.app_context(): ensure_bootstrap_admin() db.create_all() seed_store_info() # ============================================================================= # Role guard + utilities # ============================================================================= from functools import wraps def admin_required(f): @wraps(f) @login_required def wrapper(*args, **kwargs): if not current_user.is_admin(): flash("Admins only.", "error") return redirect(url_for("dashboard")) return f(*args, **kwargs) return wrapper def gen_temp_password(length: int = 14) -> str: alphabet = string.ascii_letters + string.digits return "".join(secrets.choice(alphabet) for _ in range(length)) DAY_NAMES = [("mon","Mon"),("tue","Tue"),("wed","Wed"),("thu","Thu"),("fri","Fri"),("sat","Sat"),("sun","Sun")] def _default_week(): return {k: {"avail": False, "start": "08:00", "end": "17:00"} for k,_ in DAY_NAMES} def _parse_week_from_form(form): out = {} for key,_label in DAY_NAMES: avail = (form.get(f"{key}_avail") == "on") start = (form.get(f"{key}_start") or "08:00").strip()[:5] end = (form.get(f"{key}_end") or "17:00").strip()[:5] out[key] = {"avail": avail, "start": start, "end": end} return out def _validate_iso_date(d: str) -> bool: try: datetime.strptime(d, "%Y-%m-%d") return True except ValueError: return False # ============================================================================= # Base layout + render helper # ============================================================================= TPL_BASE = """ {{ title or app_brand }}
SC
{{ app_brand }}
{% if current_user.is_authenticated %} {% endif %}
{% with msgs = get_flashed_messages(with_categories=true) %} {% if msgs %}
{% for cat,msg in msgs %}
{{ msg }}
{% endfor %}
{% endif %} {% endwith %} {{ content|safe }}
{% if current_user.is_authenticated %} {% endif %}
""" TPL_LOGIN_BODY = """

Sign in

First run? Use the bootstrap admin, then change it.

""" # Dashboard with punchy, clickable tiles TPL_DASHBOARD_BODY = """

Welcome, {{ user.username }}

Role: {{ user.role }}

Department / Store Info

Phones, extensions, support contacts, and notes.

Your Availability

Set your weekly pattern.

Request Off

Ask for specific days off.

Secret Santa

Fill out your gift preferences.

{% if user.role == 'admin' %}

Team Availability

View weekly patterns by user.

Time-off Requests

Approve or deny requests.

Secret Santa (Admin)

Browse entries and export data.

User Management

Create accounts and set roles.

{% endif %}
""" TPL_INFO_PAGE = """

Department / Store Info

{% if current_user.role == 'admin' %} Admin Console {% endif %}
{% if admin_secrets %} {% endif %}
""" TPL_ADMIN_INFO = """

Info Admin Console

Back to Info

Contacts

{% for c in contacts %} {% endfor %}
NameRolePhonePriorityActiveAction
{{ c.name }} {{ c.role or '' }} {{ c.phone or '' }} {{ c.priority }} {{ 'yes' if c.is_active else 'no' }}

Department Extensions

{% for d in exts %} {% endfor %}
ExtDepartmentActiveAction
{{ d.ext }} {{ d.dept }} {{ 'yes' if d.is_active else 'no' }}

Support & Escalation

{% for s in supports %}

{{ s.category }}

{{ s.audience }}
{% if s.email %}

{{ s.email }}

{% endif %} {% if s.phone %}

{{ s.phone }}

{% endif %} {% if s.note %}

{{ s.note }}

{% endif %} {% if s.issues() %}
    {% for it in s.issues() %}
  • {{ it }}
  • {% endfor %}
{% endif %}
{% endfor %}

Admin Quick Notes (Secrets)

{% for s in secrets %}

{{ s.label }}

{{ s.value }}

{% if s.notes %}

{{ s.notes }}

{% endif %}
{% endfor %}
""" TPL_CHANGE_PASSWORD_BODY = """

Change Password

{% if user.must_change_password %}

You must set a new password before continuing.

{% endif %}
""" # Member: Request Off page (simple, professional) TPL_REQUEST_OFF_BODY = """

Request Time Off

Use this page to request specific days off. Your leader will approve or deny the request in the admin view.

Your Requests

{% if my_reqs %} {% for r in my_reqs %} {% endfor %}
Date Status Note Requested Action
{{ r.date }} {{ r.status }} {{ r.note or '' }} {{ r.created_at.strftime('%Y-%m-%d %H:%M') if r.created_at else '' }} {% if r.status == 'pending' %}
{% else %} No action {% endif %}
{% else %}

You don't have any requests yet.

{% endif %}
""" # Admin: Users TPL_ADMIN_USERS_BODY = """

Users

Temp password will be generated and shown as a flash message.

All Users

{% for u in users %} {% endfor %}
ID Username Role Active Must Change PW Actions
{{ u.id }} {{ u.username }}
{{ 'yes' if u.must_change_password else 'no' }}
""" # Member: availability only TPL_AVAIL_BODY = """

Your Weekly Availability

{% for key,label in day_names %} {% endfor %}
Day Available Start End
{{ label }}
Tip: Use the links in the header to request a specific day off or fill out Secret Santa.
""" # Admin: Team Availability grid (includes ✎ edit link) TPL_ADMIN_AVAIL_BODY = """

Team Availability (Weekly)

Refresh
{% for _k,lab in day_names %}{% endfor %} {% for row in rows %} {% for k,_lab in day_names %} {% set d = row.week.get(k) %} {% endfor %} {% endfor %}
User{{ lab }}
{{ row.username }} ✎ Edit
{% if d and d.avail %} {{ d.start }}–{{ d.end }} {% else %} {% endif %}
""" # Admin: Time-off requests moderation TPL_ADMIN_REQS_BODY = """

Time-off Requests

Export CSV
{% for r in rows %} {% endfor %}
UserDateStatusNoteRequestedAction
{{ r.user.username }} {{ r.date }} {{ r.status }} {{ r.note or '' }} {{ r.created_at.strftime('%Y-%m-%d %H:%M') }} {% if r.status == 'pending' %}
{% else %}{% endif %}
""" # Admin: Wishlists table TPL_ADMIN_WISHLISTS_BODY = """

Secret Santa / Wishlists

Export Secret Santa CSV
{% for w in rows %} {% endfor %}
UserLast UpdatedWishlist
{{ w.user.username }} {{ w.updated_at.strftime('%Y-%m-%d %H:%M') if w.updated_at else '' }} {{ w.wishlist }}
""" TPL_SECRET_SANTA_BODY = """

Secret Santa

""" # Admin: Secret Santa cards TPL_ADMIN_SS_BODY = """

Secret Santa

Export CSV
{% for e in rows %}

{{ e.full_name or '—' }}

{{ e.user.username }}

{{ 'Jewelry: Yes' if e.jewelry else 'Jewelry: No' }}

Age

{{ e.age or '—' }}

Birthday

{{ e.birthday or '—' }}

Favorite Gift Card

{{ e.gift_card or '—' }}

Favorite Type of Movie

{{ e.fav_movie or '—' }}

Hobbies

{{ e.hobbies or '—' }}

Updated {{ e.updated_at.strftime('%Y-%m-%d %H:%M') if e.updated_at else '—' }} ID #{{ e.id }}
{% endfor %}
{% if not rows %}

No Secret Santa entries yet.

{% endif %}
""" # Admin: Availability editor (NEW) TPL_ADMIN_AVAIL_EDIT_BODY = """

Edit User Availability

Back to Team Grid
{% if user %}

Weekly Pattern for: {{ user.username }}

{% for key,label in day_names %} {% endfor %}
Day Available Start End
{{ label }}
{% endif %}
""" def render_page(body_tpl: str, **ctx): """Render a body template into the base layout.""" body_html = render_template_string(body_tpl, **ctx) title = ctx.get("title") or APP_BRAND return render_template_string(TPL_BASE, title=title, content=body_html, app_brand=APP_BRAND) # ============================================================================= # Routes: Auth + Dashboard # ============================================================================= @app.route("/") def index(): if current_user.is_authenticated: return redirect(url_for("dashboard")) return redirect(url_for("login")) @limiter.limit("5/minute") @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = (request.form.get("username") or "").strip() password = request.form.get("password") or "" user = User.query.filter_by(username=username).first() if not user or not user.is_active or not user.check_password(password): flash("Invalid credentials.", "error") return redirect(url_for("login")) login_user(user, remember=True) if user.must_change_password: flash("Please set a new password to continue.", "ok") return redirect(url_for("change_password")) return redirect(url_for("dashboard")) return render_page(TPL_LOGIN_BODY, title="Login") @app.post("/logout") @login_required def logout(): logout_user() flash("Signed out.", "ok") return redirect(url_for("login")) @app.route("/dashboard") @login_required def dashboard(): return render_page(TPL_DASHBOARD_BODY, title="Dashboard", user=current_user) @app.route("/change-password", methods=["GET", "POST"]) @login_required def change_password(): if request.method == "POST": curr = request.form.get("current_password") or "" new1 = request.form.get("new_password") or "" new2 = request.form.get("confirm_password") or "" if not current_user.check_password(curr): flash("Current password is incorrect.", "error") return redirect(url_for("change_password")) if len(new1) < 10: flash("Choose a stronger password (min 10 chars).", "error") return redirect(url_for("change_password")) if new1 != new2: flash("Passwords do not match.", "error") return redirect(url_for("change_password")) current_user.set_password(new1) current_user.must_change_password = False db.session.commit() flash("Password updated.", "ok") return redirect(url_for("dashboard")) return render_page(TPL_CHANGE_PASSWORD_BODY, title="Change Password", user=current_user) # ============================================================================= # Secret Santa # ============================================================================= @app.route("/secret-santa", methods=["GET", "POST"]) @login_required def secret_santa(): entry = SecretSantaEntry.query.filter_by(user_id=current_user.id).first() if request.method == "POST": full_name = (request.form.get("full_name") or "").strip() if not full_name: flash("Name is required.", "error") return redirect(url_for("secret_santa")) age_raw = request.form.get("age") birthday = (request.form.get("birthday") or "").strip() hobbies = (request.form.get("hobbies") or "").strip() gift_card = (request.form.get("gift_card") or "").strip() fav_movie = (request.form.get("fav_movie") or "").strip() jewelry = request.form.get("jewelry") == "yes" try: age = int(age_raw) if (age_raw or "").strip() else None except ValueError: age = None if entry: entry.full_name = full_name entry.age = age entry.birthday = birthday or None entry.hobbies = hobbies entry.gift_card = gift_card entry.fav_movie = fav_movie entry.jewelry = jewelry else: entry = SecretSantaEntry( user_id=current_user.id, full_name=full_name, age=age, birthday=birthday or None, hobbies=hobbies, gift_card=gift_card, fav_movie=fav_movie, jewelry=jewelry, ) db.session.add(entry) db.session.commit() flash("Secret Santa saved.", "ok") return redirect(url_for("secret_santa")) form = { "full_name": entry.full_name if entry else current_user.username, "age": entry.age if entry else "", "birthday": entry.birthday if entry else "", "hobbies": entry.hobbies if entry else "", "gift_card": entry.gift_card if entry else "", "fav_movie": entry.fav_movie if entry else "", "jewelry": bool(entry.jewelry) if (entry and entry.jewelry is not None) else False, } return render_page(TPL_SECRET_SANTA_BODY, title="Secret Santa", form=form) @app.get("/admin/secret-santa") @admin_required def admin_secret_santa(): rows = (SecretSantaEntry.query .join(User, SecretSantaEntry.user_id == User.id) .order_by(User.username.asc()) .all()) return render_page(TPL_ADMIN_SS_BODY, title="Secret Santa", rows=rows) # ============================================================================= # Request Off # ============================================================================= @app.get("/request-off") @login_required def request_off(): my_reqs = (TimeOffRequest.query .filter_by(user_id=current_user.id) .order_by(TimeOffRequest.created_at.desc()) .all()) return render_page(TPL_REQUEST_OFF_BODY, title="Request Off", my_reqs=my_reqs) @app.post("/requests/new") @login_required def requests_new(): date = (request.form.get("date") or "").strip() note = (request.form.get("note") or "").strip()[:240] if not date or not _validate_iso_date(date): flash("Please choose a valid date (YYYY-MM-DD).", "error") return redirect(url_for("request_off")) r = TimeOffRequest(user_id=current_user.id, date=date, note=note, status="pending") db.session.add(r); db.session.commit() flash("Request submitted.", "ok") return redirect(url_for("request_off")) @app.post("/requests//cancel") @login_required def requests_cancel(req_id: int): r = db.session.get(TimeOffRequest, req_id) if not r or r.user_id != current_user.id: flash("Not found.", "error"); return redirect(url_for("request_off")) if r.status != "pending": flash("Only pending requests can be cancelled.", "error"); return redirect(url_for("request_off")) r.status = "cancelled"; r.decided_at = datetime.utcnow(); db.session.commit() flash("Request cancelled.", "ok"); return redirect(url_for("request_off")) # ============================================================================= # Availability (member self-service) # ============================================================================= @app.route("/availability", methods=["GET", "POST"]) @login_required def availability(): aw = AvailabilityWeekly.query.filter_by(user_id=current_user.id).first() if request.method == "POST": week = _parse_week_from_form(request.form) if not aw: aw = AvailabilityWeekly(user_id=current_user.id, data_json=_json.dumps(week)) db.session.add(aw) else: aw.data_json = _json.dumps(week) db.session.commit() flash("Availability saved.", "ok") return redirect(url_for("availability")) week = _default_week() if aw and aw.data_json: try: data = _json.loads(aw.data_json) or {} for k,_ in DAY_NAMES: if k in data: week[k] = { "avail": bool(data[k].get("avail")), "start": (data[k].get("start") or "08:00")[:5], "end": (data[k].get("end") or "17:00")[:5], } except Exception as e: app.logger.warning("Failed to parse availability JSON for user %s: %s", current_user.id, e) return render_page(TPL_AVAIL_BODY, title="Your Availability", day_names=DAY_NAMES, week=week) # ============================================================================= # Admin: Availability grid + EDITOR (NEW) # ============================================================================= @app.get("/admin/availability") @admin_required def admin_availability(): rows = [] users = User.query.filter_by(is_active=True).order_by(User.username.asc()).all() for u in users: aw = AvailabilityWeekly.query.filter_by(user_id=u.id).first() wk = _default_week() if aw and aw.data_json: try: data = _json.loads(aw.data_json) or {} for k,_ in DAY_NAMES: if k in data: wk[k] = { "avail": bool(data[k].get("avail")), "start": (data[k].get("start") or "08:00")[:5], "end": (data[k].get("end") or "17:00")[:5], } except Exception as e: app.logger.warning("Failed to parse availability JSON for user %s: %s", u.id, e) rows.append({"user_id": u.id, "username": u.username, "week": wk}) return render_page(TPL_ADMIN_AVAIL_BODY, title="Team Availability", rows=rows, day_names=DAY_NAMES) @app.route("/admin/availability/edit", methods=["GET", "POST"]) @admin_required def admin_availability_edit(): # Save for a specific user if request.method == "POST": try: target_id = int(request.form.get("user_id") or "0") except ValueError: flash("Invalid user.", "error") return redirect(url_for("admin_availability_edit")) target = db.session.get(User, target_id) if not target: flash("User not found.", "error") return redirect(url_for("admin_availability_edit")) week = _parse_week_from_form(request.form) aw = AvailabilityWeekly.query.filter_by(user_id=target.id).first() if not aw: aw = AvailabilityWeekly(user_id=target.id, data_json=_json.dumps(week)) db.session.add(aw) else: aw.data_json = _json.dumps(week) db.session.commit() flash(f"Availability saved for {target.username}.", "ok") return redirect(url_for("admin_availability_edit", user_id=target.id)) # GET: pick a user then load their week users = User.query.filter_by(is_active=True).order_by(User.username.asc()).all() user_id = request.args.get("user_id", type=int) user = db.session.get(User, user_id) if user_id else None week = _default_week() if user: aw = AvailabilityWeekly.query.filter_by(user_id=user.id).first() if aw and aw.data_json: try: data = _json.loads(aw.data_json) or {} for k,_ in DAY_NAMES: if k in data: week[k] = { "avail": bool(data[k].get("avail")), "start": (data[k].get("start") or "08:00")[:5], "end": (data[k].get("end") or "17:00")[:5], } except Exception as e: app.logger.warning("Admin edit: bad JSON for user %s: %s", user.id, e) return render_page(TPL_ADMIN_AVAIL_EDIT_BODY, title="Edit Availability", users=users, user=user, day_names=DAY_NAMES, week=week) # ============================================================================= # Admin: Requests + Wishlists # ============================================================================= @app.get("/admin/requests") @admin_required def admin_requests(): rows = (TimeOffRequest.query .join(User, TimeOffRequest.user_id == User.id) .order_by(TimeOffRequest.status.asc(), TimeOffRequest.created_at.desc()) .all()) return render_page(TPL_ADMIN_REQS_BODY, title="Time-off Requests", rows=rows) @app.post("/admin/requests//") @admin_required def admin_requests_action(req_id: int, action: str): r = db.session.get(TimeOffRequest, req_id) if not r: flash("Request not found.", "error"); return redirect(url_for("admin_requests")) if r.status != "pending": flash("Request is already decided.", "error"); return redirect(url_for("admin_requests")) if action not in ("approve","deny"): flash("Invalid action.", "error"); return redirect(url_for("admin_requests")) r.status = "approved" if action == "approve" else "denied" r.decided_at = datetime.utcnow() db.session.commit() flash(f"Request {action}d.", "ok"); return redirect(url_for("admin_requests")) @app.get("/admin/wishlists") @admin_required def admin_wishlists(): rows = (Wishlist.query .join(User, Wishlist.user_id == User.id) .order_by(User.username.asc()) .all()) return render_page(TPL_ADMIN_WISHLISTS_BODY, title="Wishlists", rows=rows) # ============================================================================= # Exports # ============================================================================= @app.get("/admin/secret-santa/export.csv") @admin_required def admin_secret_santa_export(): rows = (SecretSantaEntry.query.join(User).order_by(User.username.asc()).all()) sio = StringIO() w = csv.writer(sio) w.writerow(["username","full_name","age","birthday","hobbies","gift_card","fav_movie","jewelry","updated_at"]) for e in rows: w.writerow([ e.user.username, e.full_name, e.age or "", e.birthday or "", e.hobbies or "", e.gift_card or "", e.fav_movie or "", "Yes" if e.jewelry else "No", e.updated_at.isoformat() if e.updated_at else "" ]) resp = make_response(sio.getvalue()) resp.headers["Content-Type"] = "text/csv; charset=utf-8" resp.headers["Content-Disposition"] = "attachment; filename=secret_santa.csv" return resp @app.get("/admin/requests/export.csv") @admin_required def admin_requests_export(): rows = TimeOffRequest.query.join(User).order_by(TimeOffRequest.created_at.desc()).all() sio = StringIO() w = csv.writer(sio) w.writerow(["username","date","status","note","created_at","decided_at"]) for r in rows: w.writerow([ r.user.username, r.date, r.status, r.note or "", r.created_at.isoformat(), r.decided_at.isoformat() if r.decided_at else "" ]) resp = make_response(sio.getvalue()) resp.headers["Content-Type"] = "text/csv; charset=utf-8" resp.headers["Content-Disposition"] = "attachment; filename=time_off_requests.csv" return resp # ============================================================================= # Admin: Users CRUD # ============================================================================= @app.get("/admin/users") @admin_required def admin_users(): users = User.query.order_by(User.created_at.desc()).all() return render_page(TPL_ADMIN_USERS_BODY, title="Users", users=users) @app.post("/admin/users/create") @admin_required def admin_users_create(): username = (request.form.get("username") or "").strip() role = (request.form.get("role") or "member").strip().lower() role = "admin" if role == "admin" else "member" if not username: flash("Username is required.", "error") return redirect(url_for("admin_users")) if User.query.filter_by(username=username).first(): flash("Username already exists.", "error") return redirect(url_for("admin_users")) temp_pw = gen_temp_password() u = User(username=username, role=role, is_active=True, must_change_password=True) u.set_password(temp_pw) db.session.add(u) db.session.commit() flash(f"User '{username}' created with temporary password: {temp_pw}", "ok") return redirect(url_for("admin_users")) @app.post("/admin/users//reset") @admin_required def admin_users_reset(user_id: int): u = db.session.get(User, user_id) if not u: flash("User not found.", "error") return redirect(url_for("admin_users")) temp_pw = gen_temp_password() u.set_password(temp_pw) u.must_change_password = True db.session.commit() flash(f"Password reset for '{u.username}'. New temp: {temp_pw}", "ok") return redirect(url_for("admin_users")) @app.post("/admin/users//role") @admin_required def admin_users_role(user_id: int): u = db.session.get(User, user_id) if not u: flash("User not found.", "error") return redirect(url_for("admin_users")) new_role = (request.form.get("role") or "member").lower() u.role = "admin" if new_role == "admin" else "member" db.session.commit() flash(f"Role updated for '{u.username}' → {u.role}", "ok") return redirect(url_for("admin_users")) @app.post("/admin/users//toggle") @admin_required def admin_users_toggle(user_id: int): u = db.session.get(User, user_id) if not u: flash("User not found.", "error") return redirect(url_for("admin_users")) # Prevent locking yourself out of the last admin if u.id == current_user.id and u.role == "admin": admins = User.query.filter_by(role="admin", is_active=True).count() if admins <= 1: flash("You are the last active admin; cannot deactivate.", "error") return redirect(url_for("admin_users")) u.is_active = not u.is_active db.session.commit() flash(f"User '{u.username}' active={u.is_active}", "ok") return redirect(url_for("admin_users")) # ============================================================================= # JSON API + Health # ============================================================================= @app.get("/api/me") @login_required def api_me(): u: User = current_user # type: ignore return jsonify( id=u.id, username=u.username, role=u.role, must_change_password=u.must_change_password, is_active=u.is_active ) # --------------------------- # Info: user view # --------------------------- @app.get("/info") @login_required def info_page(): contacts = (InfoContact.query .filter_by(is_active=True) .order_by(InfoContact.priority.asc(), InfoContact.name.asc()) .all()) exts = (DeptExtension.query .filter_by(is_active=True) .order_by(DeptExtension.ext.asc()) .all()) supports = (SupportItem.query .filter_by(is_active=True) .filter((SupportItem.audience == "all") | (SupportItem.audience == ("admin" if current_user.is_admin() else "zzz"))) .order_by(SupportItem.category.asc()) .all()) admin_secrets = [] if current_user.is_admin(): admin_secrets = (LocalSecret.query .filter_by(is_active=True) .order_by(LocalSecret.updated_at.desc()) .all()) return render_page(TPL_INFO_PAGE, title="Store Info", contacts=contacts, exts=exts, supports=supports, admin_secrets=admin_secrets) # --------------------------- # Info: admin console # --------------------------- @app.get("/admin/info") @admin_required def admin_info_console(): contacts = InfoContact.query.order_by(InfoContact.priority.asc(), InfoContact.name.asc()).all() exts = DeptExtension.query.order_by(DeptExtension.ext.asc()).all() supports = SupportItem.query.order_by(SupportItem.category.asc()).all() secrets = LocalSecret.query.order_by(LocalSecret.updated_at.desc()).all() return render_page(TPL_ADMIN_INFO, title="Info Admin", contacts=contacts, exts=exts, supports=supports, secrets=secrets) # ---- Contacts CRUD @app.post("/admin/info/contacts/create") @admin_required def admin_info_contacts_create(): name = (request.form.get("name") or "").strip() role = (request.form.get("role") or "").strip() phone = (request.form.get("phone") or "").strip() priority = request.form.get("priority", type=int) or 5 if not name: flash("Name is required.", "error"); return redirect(url_for("admin_info_console")) db.session.add(InfoContact(name=name, role=role, phone=phone, priority=max(1, min(priority, 9)))) db.session.commit() flash("Contact added.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/contacts//toggle") @admin_required def admin_info_contacts_toggle(cid:int): c = db.session.get(InfoContact, cid) if not c: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) c.is_active = not c.is_active; db.session.commit() flash("Contact updated.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/contacts//delete") @admin_required def admin_info_contacts_delete(cid:int): c = db.session.get(InfoContact, cid) if not c: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) db.session.delete(c); db.session.commit() flash("Contact deleted.", "ok"); return redirect(url_for("admin_info_console")) # ---- Dept Extensions CRUD @app.post("/admin/info/exts/create") @admin_required def admin_info_exts_create(): ext = (request.form.get("ext") or "").strip() dept = (request.form.get("dept") or "").strip() if not ext or not dept: flash("Extension and department are required.", "error"); return redirect(url_for("admin_info_console")) db.session.add(DeptExtension(ext=ext, dept=dept)) db.session.commit() flash("Extension added.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/exts//toggle") @admin_required def admin_info_exts_toggle(did:int): d = db.session.get(DeptExtension, did) if not d: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) d.is_active = not d.is_active; db.session.commit() flash("Extension updated.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/exts//delete") @admin_required def admin_info_exts_delete(did:int): d = db.session.get(DeptExtension, did) if not d: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) db.session.delete(d); db.session.commit() flash("Extension deleted.", "ok"); return redirect(url_for("admin_info_console")) # ---- Support Items CRUD @app.post("/admin/info/support/create") @admin_required def admin_info_support_create(): cat = (request.form.get("category") or "").strip() email = (request.form.get("email") or "").strip() phone = (request.form.get("phone") or "").strip() note = (request.form.get("note") or "").strip() admin_only = request.form.get("admin_only") == "on" issues_text = (request.form.get("issues") or "").strip() issues = [ln.strip() for ln in issues_text.splitlines() if ln.strip()] if not cat: flash("Category is required.", "error"); return redirect(url_for("admin_info_console")) db.session.add(SupportItem(category=cat, email=email, phone=phone, note=note, issues_json=_json.dumps(issues), audience=("admin" if admin_only else "all"))) db.session.commit() flash("Support item added.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/support//toggle") @admin_required def admin_info_support_toggle(sid:int): s = db.session.get(SupportItem, sid) if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) s.is_active = not s.is_active; db.session.commit() flash("Support item updated.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/support//delete") @admin_required def admin_info_support_delete(sid:int): s = db.session.get(SupportItem, sid) if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) db.session.delete(s); db.session.commit() flash("Support item deleted.", "ok"); return redirect(url_for("admin_info_console")) # ---- Secrets CRUD (admin-only) @app.post("/admin/info/secret/create") @admin_required def admin_info_secret_create(): label = (request.form.get("label") or "").strip() value = (request.form.get("value") or "").strip() notes = (request.form.get("notes") or "").strip() if not label or not value: flash("Label and value are required.", "error"); return redirect(url_for("admin_info_console")) db.session.add(LocalSecret(label=label, value=value, notes=notes)) db.session.commit() flash("Secret added.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/secret//toggle") @admin_required def admin_info_secret_toggle(sid:int): s = db.session.get(LocalSecret, sid) if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) s.is_active = not s.is_active; db.session.commit() flash("Secret updated.", "ok"); return redirect(url_for("admin_info_console")) @app.post("/admin/info/secret//delete") @admin_required def admin_info_secret_delete(sid:int): s = db.session.get(LocalSecret, sid) if not s: flash("Not found.", "error"); return redirect(url_for("admin_info_console")) db.session.delete(s); db.session.commit() flash("Secret deleted.", "ok"); return redirect(url_for("admin_info_console")) @app.get("/healthz") def healthz(): return {"ok": True}, 200 # ============================================================================= # Error handlers # ============================================================================= @app.errorhandler(403) def error_403(e): return render_page("

Forbidden

You don't have access to this resource.

"), 403 @app.errorhandler(404) def error_404(e): return render_page("

Not Found

We couldn't find what you were looking for.

"), 404 @app.errorhandler(500) def error_500(e): return render_page("

Server Error

Something went wrong. Please try again.

"), 500 # ============================================================================= # Main # ============================================================================= if __name__ == "__main__": app.run(debug=True, port=5000)