From 46cc904643c03050d7311a03035a029d97d32a69 Mon Sep 17 00:00:00 2001 From: Nizar Izzuddin Yatim Fadlan Date: Fri, 12 Dec 2025 10:01:36 +0700 Subject: [PATCH 1/3] Refactor check-in tests to use management and volunteer user types; add authorization checks for check-in operations --- routes/tests/test_checkin.py | 131 +++++++++++++++++++++++++++-------- routes/ticket.py | 23 ++++++ 2 files changed, 126 insertions(+), 28 deletions(-) diff --git a/routes/tests/test_checkin.py b/routes/tests/test_checkin.py index 5686008..57c37f0 100644 --- a/routes/tests/test_checkin.py +++ b/routes/tests/test_checkin.py @@ -14,7 +14,7 @@ from models.Payment import Payment, PaymentStatus from models.Ticket import Ticket from models.Token import Token -from models.User import User +from models.User import MANAGEMENT_PARTICIPANT, VOLUNTEER_PARTICIPANT, User from schemas.checkin import CheckinDayEnum from settings import ACCESS_TOKEN_EXPIRE_MINUTES, ALGORITHM, SECRET_KEY, TZ @@ -46,8 +46,25 @@ def setUp(self) -> None: self.db.add(self.test_user) self.db.commit() - # Create staff user for check-in - self.staff_user = User( + # Create test token manually for management user + expire = datetime.now(tz=timezone(TZ)) + timedelta( + minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES) + ) + payload = { + "id": str(self.test_user.id), + "username": self.test_user.username, + "exp": expire, + } + token_str = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) + test_token_model = Token( + user_id=self.test_user.id, token=token_str, expired_at=expire + ) + self.db.add(test_token_model) + self.db.commit() + self.test_user_token = token_str + + # Create managemnt user for check-in + self.management_user = User( username="staffuser", email="staff@example.com", phone="+628123456790", @@ -55,26 +72,58 @@ def setUp(self) -> None: last_name="user", password=generate_hash_password("password"), is_active=True, + participant_type=MANAGEMENT_PARTICIPANT, ) - self.db.add(self.staff_user) + self.db.add(self.management_user) self.db.commit() - # Create test token manually for staff user + # Create test token manually for management user expire = datetime.now(tz=timezone(TZ)) + timedelta( minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES) ) payload = { - "id": str(self.staff_user.id), - "username": self.staff_user.username, + "id": str(self.management_user.id), + "username": self.management_user.username, "exp": expire, } token_str = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) test_token_model = Token( - user_id=self.staff_user.id, token=token_str, expired_at=expire + user_id=self.management_user.id, token=token_str, expired_at=expire ) self.db.add(test_token_model) self.db.commit() - self.staff_token = token_str + self.management_token = token_str + + # Create volunteer user for check-in + self.volunteer_user = User( + username="staffuser-volunteer", + email="staff-volunteer@example.com", + phone="+628123456790", + first_name="volunteer", + last_name="user", + password=generate_hash_password("password"), + is_active=True, + participant_type=VOLUNTEER_PARTICIPANT, + ) + self.db.add(self.volunteer_user) + self.db.commit() + + # Create test token manually for management user + expire = datetime.now(tz=timezone(TZ)) + timedelta( + minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES) + ) + payload = { + "id": str(self.volunteer_user.id), + "username": self.volunteer_user.username, + "exp": expire, + } + token_str = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) + test_token_model = Token( + user_id=self.volunteer_user.id, token=token_str, expired_at=expire + ) + self.db.add(test_token_model) + self.db.commit() + self.volunteer_token = token_str # Create test ticket self.test_ticket = Ticket( @@ -227,7 +276,7 @@ def test_checkin_user_day1_success(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day1.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response.status_code, 200) data = response.json() @@ -244,7 +293,7 @@ def test_checkin_user_day2_success(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day2.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response.status_code, 200) data = response.json() @@ -271,7 +320,7 @@ def test_checkin_user_payment_not_found(self): "payment_id": str(non_existent_id), "day": CheckinDayEnum.day1.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response.status_code, 404) data = response.json() @@ -301,7 +350,7 @@ def test_checkin_user_unpaid_payment(self): "payment_id": str(unpaid_payment.id), "day": CheckinDayEnum.day1.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response.status_code, 402) data = response.json() @@ -315,7 +364,7 @@ def test_checkin_updates_attendance_timestamp(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day1.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response.status_code, 200) @@ -323,7 +372,9 @@ def test_checkin_updates_attendance_timestamp(self): self.db.refresh(self.test_user) self.assertTrue(self.test_user.attendance_day_1) self.assertIsNotNone(self.test_user.attendance_day_1_at) - self.assertEqual(self.test_user.attendance_day_1_updated_by, self.staff_user.id) + self.assertEqual( + self.test_user.attendance_day_1_updated_by, self.management_user.id + ) def test_checkin_multiple_days(self): """Test check-in for both days""" @@ -334,7 +385,7 @@ def test_checkin_multiple_days(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day1.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response1.status_code, 200) @@ -345,7 +396,7 @@ def test_checkin_multiple_days(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day2.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response2.status_code, 200) @@ -364,7 +415,7 @@ def test_reset_checkin_user_day1_success(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day1.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) # Then reset @@ -374,7 +425,7 @@ def test_reset_checkin_user_day1_success(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day1.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response.status_code, 200) data = response.json() @@ -391,7 +442,7 @@ def test_reset_checkin_user_day2_success(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day2.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) # Then reset @@ -401,7 +452,7 @@ def test_reset_checkin_user_day2_success(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day2.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response.status_code, 200) data = response.json() @@ -427,7 +478,7 @@ def test_reset_checkin_user_payment_not_found(self): "payment_id": str(non_existent_id), "day": CheckinDayEnum.day1.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response.status_code, 404) data = response.json() @@ -442,7 +493,7 @@ def test_reset_checkin_without_prior_checkin(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day1.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response.status_code, 200) data = response.json() @@ -457,7 +508,7 @@ def test_reset_checkin_preserves_other_day(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day1.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.client.patch( "/ticket/checkin", @@ -465,7 +516,7 @@ def test_reset_checkin_preserves_other_day(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day2.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) # Reset day 1 @@ -475,7 +526,7 @@ def test_reset_checkin_preserves_other_day(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day1.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response.status_code, 200) data = response.json() @@ -490,10 +541,34 @@ def test_checkin_updates_staff_user_tracking(self): "payment_id": str(self.test_payment.id), "day": CheckinDayEnum.day1.value, }, - headers={"Authorization": f"Bearer {self.staff_token}"}, + headers={"Authorization": f"Bearer {self.management_token}"}, ) self.assertEqual(response.status_code, 200) # Verify staff user is recorded self.db.refresh(self.test_user) - self.assertEqual(self.test_user.attendance_day_1_updated_by, self.staff_user.id) + self.assertEqual( + self.test_user.attendance_day_1_updated_by, self.management_user.id + ) + + def test_checkin_updates_forbidden_user(self): + response = self.client.patch( + "/ticket/checkin", + json={ + "payment_id": str(self.test_payment.id), + "day": CheckinDayEnum.day1.value, + }, + headers={"Authorization": f"Bearer {self.test_user_token}"}, + ) + self.assertEqual(response.status_code, 403) + + def test_reset_checkin_forbidden_user(self): + response = self.client.patch( + "/ticket/checkin/reset", + json={ + "payment_id": str(self.test_payment.id), + "day": CheckinDayEnum.day1.value, + }, + headers={"Authorization": f"Bearer {self.test_user_token}"}, + ) + self.assertEqual(response.status_code, 403) diff --git a/routes/ticket.py b/routes/ticket.py index 3f10c1a..450c2e9 100644 --- a/routes/ticket.py +++ b/routes/ticket.py @@ -6,6 +6,7 @@ from core.log import logger from core.responses import ( + Forbidden, InternalServerError, NotFound, Ok, @@ -16,6 +17,7 @@ from core.security import get_user_from_token, oauth2_scheme from models import get_db_sync from models.Payment import PaymentStatus +from models.User import MANAGEMENT_PARTICIPANT, VOLUNTEER_PARTICIPANT from repository import payment as paymentRepo from repository.checkin import ( get_user_and_payment_by_payment_id, @@ -44,6 +46,7 @@ TicketResponse, UserInfo, ) +from schemas.user_profile import ParticipantType router = APIRouter(prefix="/ticket", tags=["Ticket"]) @@ -212,6 +215,16 @@ async def checkin_user( logger.error("Unauthorized check-in attempt") return common_response(Unauthorized(message="Unauthorized")) + if checkin_staff_user.participant_type not in [ + MANAGEMENT_PARTICIPANT, + VOLUNTEER_PARTICIPANT, + ParticipantType.ORGANIZER, + ]: + logger.error( + f"User {checkin_staff_user.id} with participant type {checkin_staff_user.participant_type} is not authorized to perform check-ins" + ) + return common_response(Forbidden()) + user_and_payment = get_user_and_payment_by_payment_id(db, payload.payment_id) if user_and_payment is None: return common_response( @@ -279,6 +292,16 @@ async def checkin_user_reset( logger.error("Unauthorized check-in reset attempt") return common_response(Unauthorized(message="Unauthorized")) + if checkin_staff_user.participant_type not in [ + MANAGEMENT_PARTICIPANT, + VOLUNTEER_PARTICIPANT, + ParticipantType.ORGANIZER, + ]: + logger.error( + f"User {checkin_staff_user.id} with participant type {checkin_staff_user.participant_type} is not authorized to perform check-ins" + ) + return common_response(Forbidden()) + user = get_user_data_by_payment_id(db, payload.payment_id) if user is None: return common_response( From 9d7a94a2ce68dc3e5001a00682e283f5215a51e1 Mon Sep 17 00:00:00 2001 From: Nizar Izzuddin Yatim Fadlan Date: Fri, 12 Dec 2025 10:01:47 +0700 Subject: [PATCH 2/3] Add attendance fields to UserProfilePrivate model --- schemas/user_profile.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/schemas/user_profile.py b/schemas/user_profile.py index 618f01f..7fd42b0 100644 --- a/schemas/user_profile.py +++ b/schemas/user_profile.py @@ -1,5 +1,5 @@ import re -from datetime import date +from datetime import date, datetime from enum import Enum from typing import Optional, List, Any @@ -236,6 +236,12 @@ class UserProfilePrivate(UserProfilePublic): twitter_username: str | None instagram_username: str | None + # Attendance + attendance_day_1: Optional[bool] = None + attendance_day_1_at: Optional[datetime] = None + attendance_day_2: Optional[bool] = None + attendance_day_2_at: Optional[datetime] = None + @model_validator(mode="before") @classmethod def extract_relationships(cls, data: Any) -> Any: From 18698d5897c8fe6876dc07708160983d945951c6 Mon Sep 17 00:00:00 2001 From: Nizar Izzuddin Yatim Fadlan Date: Fri, 12 Dec 2025 10:04:38 +0700 Subject: [PATCH 3/3] Add tests for check-in and reset check-in functionality for volunteer users --- routes/tests/test_checkin.py | 75 ++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/routes/tests/test_checkin.py b/routes/tests/test_checkin.py index 57c37f0..a181cb8 100644 --- a/routes/tests/test_checkin.py +++ b/routes/tests/test_checkin.py @@ -125,6 +125,37 @@ def setUp(self) -> None: self.db.commit() self.volunteer_token = token_str + # Create organizer user for check-in + self.organizer_user = User( + username="staffuser-organizer", + email="staff-organizer@example.com", + phone="+628123456790", + first_name="organizer", + last_name="user", + password=generate_hash_password("password"), + is_active=True, + participant_type=ParticipantType.ORGANIZER, + ) + self.db.add(self.organizer_user) + self.db.commit() + + # Create test token manually for management user + expire = datetime.now(tz=timezone(TZ)) + timedelta( + minutes=float(ACCESS_TOKEN_EXPIRE_MINUTES) + ) + payload = { + "id": str(self.organizer_user.id), + "username": self.organizer_user.username, + "exp": expire, + } + token_str = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) + test_token_model = Token( + user_id=self.organizer_user.id, token=token_str, expired_at=expire + ) + self.db.add(test_token_model) + self.db.commit() + self.organizer_token = token_str + # Create test ticket self.test_ticket = Ticket( id=uuid.uuid4(), @@ -551,6 +582,50 @@ def test_checkin_updates_staff_user_tracking(self): self.test_user.attendance_day_1_updated_by, self.management_user.id ) + def test_checkin_updates_volunteer_user(self): + response = self.client.patch( + "/ticket/checkin", + json={ + "payment_id": str(self.test_payment.id), + "day": CheckinDayEnum.day1.value, + }, + headers={"Authorization": f"Bearer {self.volunteer_token}"}, + ) + self.assertEqual(response.status_code, 200) + + def test_reset_checkin_volunteer_user(self): + response = self.client.patch( + "/ticket/checkin/reset", + json={ + "payment_id": str(self.test_payment.id), + "day": CheckinDayEnum.day1.value, + }, + headers={"Authorization": f"Bearer {self.volunteer_token}"}, + ) + self.assertEqual(response.status_code, 200) + + def test_checkin_updates_organizer_user(self): + response = self.client.patch( + "/ticket/checkin", + json={ + "payment_id": str(self.test_payment.id), + "day": CheckinDayEnum.day1.value, + }, + headers={"Authorization": f"Bearer {self.organizer_token}"}, + ) + self.assertEqual(response.status_code, 200) + + def test_reset_checkin_organizer_user(self): + response = self.client.patch( + "/ticket/checkin/reset", + json={ + "payment_id": str(self.test_payment.id), + "day": CheckinDayEnum.day1.value, + }, + headers={"Authorization": f"Bearer {self.organizer_token}"}, + ) + self.assertEqual(response.status_code, 200) + def test_checkin_updates_forbidden_user(self): response = self.client.patch( "/ticket/checkin",