Source code for app.views_auth

"""Authentication views and routes for the Mail List Shield application.

This module defines the authentication-related routes including login,
registration, logout, password reset, email confirmation, two-factor
authentication, and Google OAuth integration.
"""

# Python modules
from datetime import datetime, timezone
import requests
import json
import random
import string
from itsdangerous import URLSafeTimedSerializer

# Flask modules
from flask import (
    Blueprint,
    render_template,
    request,
    url_for,
    redirect,
    flash,
    abort,
    session,
    current_app,
)
from flask_login import (
    login_user,
    logout_user,
    current_user,
)
from werkzeug.exceptions import HTTPException, NotFound
from jinja2 import TemplateNotFound


# App modules
from app import lm, db, bc, csrf
from app.models import Users
from app.forms import (
    LoginForm,
    RegisterForm,
    EmailConfirmationForm,
    ResetPassword,
    SetNewPassword,
    TwoFactorAuthenticationForm,
)
from app.views import limiter
from app.config import appTimezone
from app.emails import send_email_with_code, send_email_to_reset_password
from app.utilities.user_registration_actions import new_user_actions_for_email_confirmed
from app.utilities.helpers import generate_n_digit_code
from app.utilities.error_handlers import error_page
from app.utilities.recaptcha import verify_recaptcha

[docs] auth_bp = Blueprint("auth_bp", __name__)
@lm.user_loader
[docs] def load_user(user_id): """Load a user from the database by ID. This function is used by Flask-Login to reload the user object from the user ID stored in the session. Args: user_id: The ID of the user to load. Returns: Users: The user object from the database corresponding to the user ID. """ return Users.query.get(int(user_id))
@lm.unauthorized_handler
[docs] def unauthorized_callback(): """Handle unauthorized access to protected routes. Redirects unauthenticated users to the login page with the original destination preserved in the next parameter. Returns: Response: Redirect to the login page. """ return redirect("/login?next=" + request.path)
@auth_bp.route("/logout")
[docs] def logout(): """Log out the current user and redirect to the login page. Returns: Response: Redirect to the login page. """ logout_user() return redirect(url_for("auth_bp.login"))
@auth_bp.route("/register", methods=["GET", "POST"]) @limiter.limit("10 per day", methods=["POST"])
[docs] def register(): """The view function for the registration page. Handles new user registration with reCAPTCHA verification. Returns: Response: The registration form or redirect to email confirmation. """ # Don't allow logged in users here if current_user.is_authenticated: flash("You are already registered.", "info") return redirect("/app") # Declare the form form = RegisterForm(request.form) success = False if request.method == "GET": return render_template( "public/auth/register.html", form=form, user=current_user ) # Check if both http method is POST, form is valid, and csrf token is valid if form.validate_on_submit(): # Verify reCAPTCHA recaptcha_response = request.form.get("g-recaptcha-response") recaptcha_verified = verify_recaptcha(recaptcha_response, request.remote_addr) if not recaptcha_verified: flash( 'Please check the box next to the phrase "I\'m not a robot".', "danger" ) return render_template( "public/auth/register.html", form=form, success=False, user=current_user ) # assign form data to variables email = request.form.get("email", "", type=str) password = request.form.get("password", "", type=str) firstName = request.form.get("firstName", "", type=str) lastName = request.form.get("lastName", "", type=str) newsletter = "0" if request.form.get("newsletter", "0") == "0" else "1" # Log current date member_since = datetime.now(timezone.utc).replace(tzinfo=None) last_login = datetime.now(timezone.utc).replace(tzinfo=None) # filter User out of database through user email user_by_email = Users.query.filter_by(email=email).first() if user_by_email: flash("Error: User exists!", "danger") else: pw_hash = bc.generate_password_hash(password).decode("utf8") tier_id = 1 email_confirmation_code = generate_n_digit_code(6) user = Users( email, pw_hash, tier_id, firstName, lastName, newsletter, member_since, last_login, email_confirmation_code, ) user.save() success = True else: flash("Please make sure you fill all the required fields.", "danger") if success: login_user(user, remember=True) send_email_with_code(current_user) return redirect(url_for("auth_bp.email_confirmation_by_code")) else: return render_template( "public/auth/register.html", form=form, success=success, user=current_user )
@auth_bp.route("/login", methods=["GET", "POST"]) @limiter.limit("10 per day", methods=["POST"])
[docs] def login(): """The view function for the login page. Handles user authentication with optional Google OAuth and reCAPTCHA. Returns: Response: The login form or redirect to dashboard/two-factor auth. """ # Don't allow logged in users here if current_user.is_authenticated: flash("You are already logged in.", "info") return redirect("/app") if request.method == "GET": # If user came here with the login with Google button if request.args.get("sso") == "google": # Single Sign On requests with Google google_config = get_google_sso_config() endpoint = google_config["authorization_endpoint"] # Preserve next url if it exists request_uri = current_app.google_client.prepare_request_uri( endpoint, redirect_uri="https://" + request.host + "/login/callback/google", scope=["openid", "email", "profile"], state=request.args.get("next"), ) return redirect(request_uri) # Declare the form form = LoginForm(request.form) # Check if both http method is POST, form is valid, and csrf token is valid if form.validate_on_submit(): # Verify reCAPTCHA recaptcha_response = request.form.get("g-recaptcha-response") recaptcha_verified = verify_recaptcha(recaptcha_response, request.remote_addr) if not recaptcha_verified: flash( 'Please check the box next to the phrase "I\'m not a robot".', "danger" ) return render_template( "public/auth/login.html", form=form, success=False, user=current_user ) # Assign form data to variables email = request.form.get("email", "", type=str) password = request.form.get("password", "", type=str) # Filter User out of database through username user = Users.query.filter_by(email=email).first() if user and bc.check_password_hash(user.password, password): # If the user have two factor auth enabled, redirect for that if user.totp_enabled == 1: session["email"] = user.email redirect_url = "/two-factor" if "next" in request.args: redirect_url += "?next=" + request.args["next"] return redirect(redirect_url) # Otherwise log them in else: login_user(user) current_user.last_login = datetime.now(timezone.utc).astimezone( appTimezone ) current_user.save() if "next" in request.args: return redirect(request.args["next"]) else: return redirect(url_for("private_bp.private_index")) else: flash("Incorrect username or password. Please try again.", "danger") # Preserve next url the user was going before login redirect, if it exists next = request.args["next"] if "next" in request.args else "/app" return render_template( "public/auth/login.html", form=form, user=current_user, next=next )
@auth_bp.route("/two-factor", methods=["GET", "POST"]) @limiter.limit("10 per day", methods=["POST"])
[docs] def two_factor(): """The view function for the two-factor authentication page. Verifies TOTP codes for users with two-factor authentication enabled. Returns: Response: The two-factor form or redirect to dashboard. """ # Don't allow logged in users here if current_user.is_authenticated: flash("You are already logged in.", "info") return redirect("/app") # If the user isn't redirected with a successful login if "email" not in session: return redirect(url_for("auth_bp.login")) user = Users.query.filter_by(email=session["email"]).first() # This shouldn't happen but if it does: if user is None: flash("Something went wrong during two factor authentication.", "danger") return redirect(url_for("auth_bp.login")) # Declare the form form = TwoFactorAuthenticationForm(request.form) # If it is a GET request, just show the page if request.method == "GET": return render_template( "public/auth/two-factor-auth.html", form=form, user=current_user ) # Check if both http method is POST, form is valid, and csrf token is valid if form.validate_on_submit(): entered_code = "" try: for i in range(0, 6): entered_code += request.form["code" + str(i)] except: flash("Please enter a valid code before submitting.", "danger") return render_template( "public/auth/two-factor-auth.html", form=form, user=current_user ) codeMatched = user.totp_match(entered_code) if codeMatched: login_user(user) current_user.last_login = datetime.now(timezone.utc).replace(tzinfo=None) current_user.save() if "next" in request.args: return redirect(request.args["next"]) else: return redirect(url_for("private_bp.private_index")) else: flash("This code was not correct.", "danger") return render_template( "public/auth/two-factor-auth.html", form=form, user=current_user ) else: flash("Please enter a valid code before submitting.", "danger") return render_template( "public/auth/two-factor-auth.html", form=form, user=current_user )
@auth_bp.route("/email-confirmation", methods=["GET", "POST"])
[docs] def email_confirmation_by_code(): """The view function for the email verification page. Handles verification code entry and validation. Returns: Response: The email confirmation form or redirect to dashboard. """ codeMatched = True if not current_user.is_authenticated: flash("Please login before verifying your email address.", "info") return redirect( url_for("auth_bp.login") + "?next=" + url_for("auth_bp.email_confirmation_by_code") ) elif current_user.email_confirmed == 0: # Verification code code = current_user.email_confirmation_code # Declare the form form = EmailConfirmationForm(request.form) # If it is a GET request, just show the page if request.method == "GET": if "resend" in request.args: send_email_with_code(current_user) return render_template( "public/auth/email-confirm.html", form=form, user=current_user ) # Check if both http method is POST, form is valid, and csrf token is valid if form.validate_on_submit(): for i in range(0, 6): codeMatched = codeMatched and request.form["code" + str(i)] == code[i] if codeMatched: # Run the new user actions new_user_actions_for_email_confirmed(current_user) # Change user attribute in the db current_user.email_confirmed = 1 current_user.save() # Flash conformation flash( "Thank you for confirming your email address!", category="success" ) return redirect(url_for("private_bp.private_index")) else: flash("This code was not correct.", "danger") return render_template( "public/auth/email-confirm.html", form=form, user=current_user ) else: flash("Please enter a valid code before submitting.", "danger") return render_template( "public/auth/email-confirm.html", form=form, user=current_user ) else: flash("Your email address is already confirmed.", "info") return redirect(url_for("private_bp.private_index"))
@auth_bp.route("/forgot-password", methods=["GET", "POST"]) @limiter.limit("1 per day", methods=["POST"])
[docs] def password_reset(): """The view function for the password reset page. Initiates the password reset process by sending a reset email. Returns: Response: The password reset form or redirect to confirmation page. """ # Declare the form form = ResetPassword(request.form) if request.method == "GET": return render_template( "public/auth/password-reset.html", form=form, user=current_user ) # Check if both http method is POST, form is valid, and csrf token is valid if form.validate_on_submit(): email = request.form.get("email", "", type=str) user = Users.query.filter_by(email=email).first() if user: send_email_to_reset_password(email) # Whatever happens, redirect to the information page without telling what happened return redirect(url_for("auth_bp.password_reset_requested"))
@auth_bp.route("/password-reset-requested")
[docs] def password_reset_requested(): """The view function for the password reset requested page. Shows a confirmation message after a password reset request. Returns: Response: The password reset requested confirmation page. """ return render_template( "public/auth/password-reset-requested.html", user=current_user )
@auth_bp.route("/set-new-password", methods=["GET", "POST"], defaults={"token": ""}) @auth_bp.route("/set-new-password/<token>", methods=["GET", "POST"])
[docs] def set_new_password(token): """The view function for the set new password page. Validates the reset token and allows the user to set a new password. Args: token (str): The token from the forgot password email, used to verify the user. Returns: Response: The new password form or redirect to login. """ try: ts = URLSafeTimedSerializer(current_app.config["SECRET_KEY"]) email = ts.loads(token, salt="recover-key", max_age=86400) except: abort(404) # Declare the form form = SetNewPassword() # Check if both http method is POST, form is valid, and csrf token is valid if form.validate_on_submit(): user = Users.query.filter_by(email=email).first_or_404() user.password = bc.generate_password_hash(form.password.data).decode("utf8") db.session.add(user) db.session.commit() flash( "Your password has been reset successfully. Please login with your new password.", "info", ) return redirect(url_for("auth_bp.login")) return render_template( "public/auth/new-password.html", form=form, user=current_user )
[docs] def get_google_sso_config(): """Get the Google SSO configuration. Fetches the OpenID Connect discovery document from Google. Returns: dict: The Google SSO configuration including endpoints. """ return requests.get( "https://accounts.google.com/.well-known/openid-configuration" ).json()
@auth_bp.route("/login/callback/google", methods=["GET", "POST"]) @csrf.exempt
[docs] def login_callback_google(): """The view function for the Google login callback. This is the page that Google redirects to after the Google authentication attempt. Handles both existing users logging in and new user registration via Google OAuth. Returns: Response: Redirect to dashboard or login page based on authentication result. """ # Don't allow logged in users here if current_user.is_authenticated and current_user.is_connected_google(): flash("You are already logged in.", "info") return redirect("/app") # Handling the case where the callback has an error arg if request.args.get("error") is not None: print( "Error on callback, redirecting to login.\nError from Google: ", request.args.get("error"), ) flash("Google authentication was not completed. Please try again.", "danger") return redirect("/login") # Get authorization code Google sent back to you code = request.args.get("code") google_config = get_google_sso_config() token_endpoint = google_config["token_endpoint"] # Prepare and send a request to get tokens! token_url, headers, body = current_app.google_client.prepare_token_request( token_endpoint, authorization_response="https://" + request.host + request.full_path, redirect_url="https://" + request.host + request.path, code=code, ) token_response = requests.post( token_url, headers=headers, data=body, auth=( current_app.config["GOOGLE_CLIENT_ID"], current_app.config["GOOGLE_CLIENT_SECRET"], ), ) # Parse the tokens current_app.google_client.parse_request_body_response( json.dumps(token_response.json()) ) userinfo_endpoint = google_config["userinfo_endpoint"] uri, headers, body = current_app.google_client.add_token(userinfo_endpoint) userinfo_response = requests.get(uri, headers=headers, data=body) # You want to make sure their email is verified. # The user authenticated with Google, authorized your # app, and now you've verified their email through Google! user_info_json = userinfo_response.json() if user_info_json["email_verified"]: google_user_email = user_info_json.get("email", "") google_user_picture = user_info_json.get("picture", "") google_user_given_name = user_info_json.get("given_name", "") google_user_family_name = user_info_json.get("family_name", "") # Is this email already registered? # Covers both the people signed up with email and the people logged in with social before found_user = Users.query.filter_by(email=google_user_email).first() if found_user: # Update their missing info - These would be missing for email sign-up if found_user.google_avatar_url == None: found_user.google_avatar_url = google_user_picture if not found_user.email_confirmed == 1: found_user.email_confirmed = 1 flash("Your email address is verified!", category="success") if found_user.firstName == None: found_user.firstName = google_user_given_name if found_user.lastName == None: found_user.lastName = google_user_family_name db.session.commit() # Log current date member_since = datetime.now(timezone.utc).replace(tzinfo=None) # Log them in login_user(found_user, remember=True) current_user.last_login = datetime.now(timezone.utc).replace(tzinfo=None) current_user.save() flash("Logged in with Google successfully!", category="success") # If we didn't have a record for this user else: # Assign them a random password letter_set = string.ascii_lowercase random_password = "".join(random.choice(letter_set) for i in range(16)) # Log current date member_since = datetime.now(timezone.utc).replace(tzinfo=None) last_login = datetime.now(timezone.utc).replace(tzinfo=None) # Register them as new user new_user = Users( email=google_user_email, password=bc.generate_password_hash(random_password).decode("utf8"), tier_id=1, firstName=google_user_given_name, lastName=google_user_family_name, newsletter=0, member_since=member_since, last_login=last_login, email_confirmation_code=generate_n_digit_code(6), ) db.session.add(new_user) db.session.commit() user = Users.query.filter_by(email=google_user_email).first() # Run the new user actions new_user_actions_for_email_confirmed(user) login_user(user, remember=True) # We know Google confirmed their email already if not user.email_confirmed == 1: user.email_confirmed = 1 user.google_avatar_url = google_user_picture user.save() flash( "Your account is created successfully.", category="success", ) # We preserved the next page to go to after login in the oauth state earlier, reading it back here next = request.args.get("state", "/app") return redirect(next) else: flash( "Your Google email not available or not verified by Google. Please set up your Google email first or try using a different email address.", "danger", ) return redirect(url_for("auth_bp.login"))