-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmodels.py
More file actions
117 lines (98 loc) · 4.08 KB
/
models.py
File metadata and controls
117 lines (98 loc) · 4.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from fastapi import Request, HTTPException, status
from sqlmodel import SQLModel, Field
from typing import Optional
from datetime import datetime, timedelta, timezone
from passlib.context import CryptContext
from jose import jwt, JWTError
import os
from dotenv import load_dotenv
load_dotenv()
# from the terminal: openssl rand -hex 32
# this generates an SSL private key for our SECRET_KEY
# the secret key and algorithm are combined and encoded with user information in the create_access_token function
# The create_access_token function is in the get_current_user function
SECRET_KEY = os.getenv('SECRET')
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
bcrypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# ----------------
# Authentication Table User Model
#-----------------
class User(SQLModel, table=True):
__tablename__ = "user_identity"
id: Optional[int] = Field(primary_key=True, index=True)
username: str = Field(unique=True, index=True)
email: Optional[str] = Field(unique=True)
full_name: Optional[str] = Field(default=None)
is_active: bool = Field(default=True)
hashed_password: str
# ----------------
# User Schema Model
# -----------------
class CreateUser(SQLModel):
username: str
email: str
full_name: str
password: str
# --------------------------------------
# Response Model for the /token endpoint
# --------------------------------------
class Token(SQLModel):
access_token: str
token_type: str
# -----------------------
# OAuth form requirement (email and password) transferred to html form requirement (username and password)
# -----------------------
class LoginForm:
def __init__(self, request: Request):
self.request: Request = request
self.username: Optional[str] = None
self.password: Optional[str] = None
async def create_oauth_form(self):
form = await self.request.form()
self.username = form.get("email")
self.password = form.get("password")
# --------------------------
# Password Hashing Functions
# --------------------------
def get_password_hash(password):
return bcrypt_context.hash(password)
def verify_password(plain_password, hashed_password):
return bcrypt_context.verify(plain_password, hashed_password)
# ------------------------------------------
# Authentication and Authorization Functions
# ------------------------------------------
# These functions are both called by the /token endpoint
# 1. authenticate_user
# 2. create_access_token creates an access token - if the user is successfully authenticated
def authenticate_user(username: str, password: str, session):
user = session.query(User).filter(User.username == username).first()
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
def create_access_token(username: str, user_id: int, expires_delta: Optional[timedelta] = None):
payload = {'sub': username, "id": user_id}
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
payload.update({'exp': expire})
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
# --------------------------------------------------------------------
# get_current_user decodes the JWT for resource endpoint authorization
# --------------------------------------------------------------------
def get_current_user(request:Request):
try:
token = request.cookies.get("access_token") # get JWT from browser
if token is None:
return None
payload = jwt.decode(token, SECRET_KEY, ALGORITHM)
username: str = payload.get('sub')
user_id: int = payload.get('id')
if username is None or user_id is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate user.")
return {'username': username, 'id': user_id} # where does this go?
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="JWT failed validation.")