|
13 | 13 | from models.admin_user import AdminUser |
14 | 14 | from models.application import Application, ApplicationStatus |
15 | 15 | from models.response import Response |
| 16 | +from models.form import Form as Form1 |
16 | 17 | from pydantic import BaseModel |
17 | 18 | from services.google_sheets import export_applicants_to_sheets |
18 | 19 |
|
@@ -842,3 +843,128 @@ async def export_to_sheets( |
842 | 843 | return ExportResponse(**result) |
843 | 844 | except Exception as e: |
844 | 845 | raise HTTPException(status_code=500, detail=f"Failed to export to Google Sheets: {str(e)}") |
| 846 | + |
| 847 | + |
| 848 | +class ExceptionListResponse(BaseModel): |
| 849 | + form_key: str |
| 850 | + exception_emails: list[str] |
| 851 | + |
| 852 | + |
| 853 | +class AddExceptionRequest(BaseModel): |
| 854 | + email: str |
| 855 | + |
| 856 | + |
| 857 | +class RemoveExceptionRequest(BaseModel): |
| 858 | + email: str |
| 859 | + |
| 860 | + |
| 861 | +@router.get("/exceptions", response_model=ExceptionListResponse) |
| 862 | +async def get_exception_list( |
| 863 | + session_id: str, |
| 864 | + auth_payload: Dict[str, Any] = Security(auth.verify), |
| 865 | + db: Session = Depends(get_db), |
| 866 | +): |
| 867 | + """Get the list of emails with form submission exceptions.""" |
| 868 | + auth0_id = auth_payload.get("sub") |
| 869 | + if not auth0_id: |
| 870 | + raise HTTPException(status_code=401, detail="Auth0 ID not found in token") |
| 871 | + |
| 872 | + user = db.query(User).filter(User.auth0_id == auth0_id).first() |
| 873 | + if not user: |
| 874 | + raise HTTPException(status_code=401, detail="User not found") |
| 875 | + |
| 876 | + admin_user = db.query(AdminUser).filter(AdminUser.user_id == user.id).first() |
| 877 | + if not admin_user: |
| 878 | + raise HTTPException(status_code=403, detail="Not an admin") |
| 879 | + |
| 880 | + if not _validate_session(db, user.id, session_id): |
| 881 | + raise HTTPException(status_code=403, detail="Session invalidated") |
| 882 | + |
| 883 | + form = db.query(Form1).filter(Form1.form_key == CURRENT_FORM_KEY).first() |
| 884 | + if not form: |
| 885 | + raise HTTPException(status_code=404, detail="Form not found") |
| 886 | + |
| 887 | + return ExceptionListResponse( |
| 888 | + form_key=form.form_key, |
| 889 | + exception_emails=form.exception_emails or [] |
| 890 | + ) |
| 891 | + |
| 892 | + |
| 893 | +@router.post("/exceptions/add", response_model=ExceptionListResponse) |
| 894 | +async def add_exception_email( |
| 895 | + request: AddExceptionRequest, |
| 896 | + session_id: str, |
| 897 | + auth_payload: Dict[str, Any] = Security(auth.verify), |
| 898 | + db: Session = Depends(get_db), |
| 899 | +): |
| 900 | + """Add an email to the exception list.""" |
| 901 | + auth0_id = auth_payload.get("sub") |
| 902 | + if not auth0_id: |
| 903 | + raise HTTPException(status_code=401, detail="Auth0 ID not found in token") |
| 904 | + |
| 905 | + user = db.query(User).filter(User.auth0_id == auth0_id).first() |
| 906 | + if not user: |
| 907 | + raise HTTPException(status_code=401, detail="User not found") |
| 908 | + |
| 909 | + admin_user = db.query(AdminUser).filter(AdminUser.user_id == user.id).first() |
| 910 | + if not admin_user: |
| 911 | + raise HTTPException(status_code=403, detail="Not an admin") |
| 912 | + |
| 913 | + if not _validate_session(db, user.id, session_id): |
| 914 | + raise HTTPException(status_code=403, detail="Session invalidated") |
| 915 | + |
| 916 | + form = db.query(Form1).filter(Form1.form_key == CURRENT_FORM_KEY).first() |
| 917 | + if not form: |
| 918 | + raise HTTPException(status_code=404, detail="Form not found") |
| 919 | + |
| 920 | + email = request.email.strip().lower() |
| 921 | + if not email: |
| 922 | + raise HTTPException(status_code=400, detail="Email cannot be empty") |
| 923 | + |
| 924 | + current_emails = form.exception_emails or [] |
| 925 | + if email not in [e.lower() for e in current_emails]: |
| 926 | + form.exception_emails = current_emails + [email] |
| 927 | + db.commit() |
| 928 | + |
| 929 | + return ExceptionListResponse( |
| 930 | + form_key=form.form_key, |
| 931 | + exception_emails=form.exception_emails or [] |
| 932 | + ) |
| 933 | + |
| 934 | + |
| 935 | +@router.post("/exceptions/remove", response_model=ExceptionListResponse) |
| 936 | +async def remove_exception_email( |
| 937 | + request: RemoveExceptionRequest, |
| 938 | + session_id: str, |
| 939 | + auth_payload: Dict[str, Any] = Security(auth.verify), |
| 940 | + db: Session = Depends(get_db), |
| 941 | +): |
| 942 | + """Remove an email from the exception list.""" |
| 943 | + auth0_id = auth_payload.get("sub") |
| 944 | + if not auth0_id: |
| 945 | + raise HTTPException(status_code=401, detail="Auth0 ID not found in token") |
| 946 | + |
| 947 | + user = db.query(User).filter(User.auth0_id == auth0_id).first() |
| 948 | + if not user: |
| 949 | + raise HTTPException(status_code=401, detail="User not found") |
| 950 | + |
| 951 | + admin_user = db.query(AdminUser).filter(AdminUser.user_id == user.id).first() |
| 952 | + if not admin_user: |
| 953 | + raise HTTPException(status_code=403, detail="Not an admin") |
| 954 | + |
| 955 | + if not _validate_session(db, user.id, session_id): |
| 956 | + raise HTTPException(status_code=403, detail="Session invalidated") |
| 957 | + |
| 958 | + form = db.query(Form1).filter(Form1.form_key == CURRENT_FORM_KEY).first() |
| 959 | + if not form: |
| 960 | + raise HTTPException(status_code=404, detail="Form not found") |
| 961 | + |
| 962 | + email = request.email.strip().lower() |
| 963 | + current_emails = form.exception_emails or [] |
| 964 | + form.exception_emails = [e for e in current_emails if e.lower() != email] |
| 965 | + db.commit() |
| 966 | + |
| 967 | + return ExceptionListResponse( |
| 968 | + form_key=form.form_key, |
| 969 | + exception_emails=form.exception_emails or [] |
| 970 | + ) |
0 commit comments