英文:
Token authentication issue with FastAPI and JWT tokens - "Could not validate credentials"
问题
I am building an API using Python 3.10.8 and FastAPI 0.95.1, and I'm experiencing an issue with user authentication, specifically related to JWT tokens. I have followed the guide provided in FastAPI's security documentation.
The problem arises when I make a request to an endpoint that requires user authentication. Instead of receiving a valid JWT token in the get_current_user()
function, the token is being passed as the string "undefined"
. As a result, I encounter a 401 Error with the message "Could not validate credentials." In the Chrome developer tools, the Authorization header also shows "Bearer undefined"
.
# --- dependencies.py ---
from database import recipes_db
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from utils.models import UserInternal, UserExternal, TokenData
from settings import SECRET_KEY, ALGORITHM
from typing import Annotated
SELECT_USER = """
SELECT
id,
email,
first_name as firstName,
hashed_password as hashedPassword
FROM
users
WHERE
email = :email
"""
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_hash(plain_text: str, hashed_text: str) -> bool:
return password_context.verify(plain_text, hashed_text)
def create_hash(text: str) -> str:
return password_context.hash(text)
async def get_user(email: str, internal: bool = True) -> UserExternal | UserInternal:
data = await recipes_db.fetch_one(
query=SELECT_USER,
values={'email': email}
)
if data:
if internal:
user = UserInternal(**data)
else:
user = UserExternal(**data)
return user
async def authenticate_user(email: str, password: str):
user = await get_user(email)
if not user:
return False
if not verify_hash(password, user.hashedPassword):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, str(SECRET_KEY), algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, str(SECRET_KEY), algorithms=[ALGORITHM])
email: str = payload.get("sub")
if email is None:
raise credentials_exception
token_data = TokenData(email=email)
except JWTError as e:
raise credentials_exception
user = await get_user(email=token_data.email)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: Annotated[UserInternal, Depends(get_current_user)]):
return current_user
# --- main.py ---
@app.post("/token", response_model=Token)
async def login_for_access_token(
formData: Annotated[OAuth2PasswordRequestForm, Depends()]
):
user = await dependencies.authenticate_user(
formData.username, formData.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
access_token = dependencies.create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
return {"accessToken": access_token, "tokenType": "bearer"}
When I request an endpoint that depends on a user sign in, i.e.:
@router.get("/")
async def get_user(
currentUser: Annotated[UserInternal, Depends(get_current_active_user)]):
return currentUser
I expected the get_current_user()
function to receive a valid JWT token from the Authorization header, decode it, and extract the email address from the payload. The JWT is being correctly created in the login_for_access_token()
endpoint but it seems as if this is not being passed in the headers of the request in the automatically generated OpenAPI docs.
What can I do to fix this?
英文:
I am building an API using Python 3.10.8 and FastAPI 0.95.1, and I'm experiencing an issue with user authentication, specifically related to JWT tokens. I have followed the guide provided in FastAPI's security documentation.
The problem arises when I make a request to an endpoint that requires user authentication. Instead of receiving a valid JWT token in the get_current_user()
function, the token is being passed as the string "undefined"
. As a result, I encounter a 401 Error with the message "Could not validate credentials." In the Chrome developer tools, the Authorization header also shows "Bearer undefined"
.
# --- dependencies.py ---
from database import recipes_db
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from utils.models import UserInternal, UserExternal, TokenData
from settings import SECRET_KEY, ALGORITHM
from typing import Annotated
SELECT_USER = """
SELECT
id,
email,
first_name as firstName,
hashed_password as hashedPassword
FROM
users
WHERE
email = :email
"""
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# This context can be used to hash passwords and verify them later on.
# The deprecated argument is set to "auto", which means that the library
# will automatically deprecate old algorithms and switch to new ones as needed.
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# This is a security scheme used for authenticating users with OAuth2.
# The tokenUrl parameter is set to "token", which is the endpoint where the
# user can obtain an access token by providing their credentials. The oauth2_scheme
# object can be used as a dependency in FastAPI endpoints to enforce authentication.
def verify_hash(plain_text: str, hashed_text: str) -> bool:
"""
Verifies that a plain text matches a hashed text.
Args:
plain_text (str): The plain text to verify.
hashed_text (str): The hashed text to compare against.
Returns:
bool: True if the plain text matches the hashed text, False otherwise.
"""
return password_context.verify(plain_text, hashed_text)
def create_hash(text: str) -> str:
"""
Hashes a string (e.g., password or token)
Args:
text (str): The plain text to hash.
Returns:
str: The hashed text.
"""
return password_context.hash(text)
async def get_user(email: str, internal: bool = True) -> UserExternal | UserInternal:
"""
Fetches a user from the database by their email.
Args:
email (str): The email of the user.
internal (bool, optional): If True, returns an internal user object.
If False, returns an external user object. Defaults to True.
Returns:
UserInternal or UserExternal: The user object.
"""
data = await recipes_db.fetch_one(
query=SELECT_USER,
values={'email': email}
)
if data:
if internal:
user = UserInternal(**data)
else:
user = UserExternal(**data)
return user
async def authenticate_user(email: str, password: str):
"""
Authenticates a user.
Args:
email (str): The email of the user.
password (str): The user's password.
Returns:
UserInternal: The authenticated user object if successful, otherwise False.
"""
# Retrieve an UserInternal object
user = await get_user(email)
if not user:
return False
if not verify_hash(password, user.hashedPassword):
return False
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None):
"""
Generates a JWT access token.
Args:
data (dict): The data to include in the token.
expires_delta (timedelta | None): The expiration time for the token, or None for default.
Returns:
bytes: The encoded access token.
"""
# Make a copy of the data so we don't modify the original dictionary
to_encode = data.copy()
# If an expiration time is provided, set the 'exp' claim to that time
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
# Encode the token using the JWT library
encoded_jwt = jwt.encode(to_encode, str(SECRET_KEY), algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
"""
Gets the current user from an authentication token.
Args:
token (Annotated[str, Depends(oauth2_scheme)]): The authentication token.
Raises:
HTTPException: If the credentials cannot be validated.
Returns:
UserInternal: The current user.
"""
# Define the exception to raise if credentials cannot be validated
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Decode the JWT token
payload = jwt.decode(token, str(SECRET_KEY), algorithms=[ALGORITHM])
email: str = payload.get("sub")
# Raise exception if the email is missing from the payload
if email is None:
raise credentials_exception
# Create TokenData object with email
token_data = TokenData(email=email)
except JWTError as e:
# Raise exception if there is a JWT error
raise credentials_exception
# Get user from database using email from token data
user = await get_user(email=token_data.email)
# Raise exception if user not found
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: Annotated[UserInternal, Depends(get_current_user)]):
# if current_user.disabled:
# raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# --- main.py ---
@app.post("/token", response_model=Token)
async def login_for_access_token(
formData: Annotated[OAuth2PasswordRequestForm, Depends()]
):
"""
Logs in and receives an access token.
### Arguments
- `formData` (`Annotated[OAuth2PasswordRequestForm, Depends()]`): The OAuth2 password request form.
### Returns
- `Token`: The access token.
"""
# Authenticate the user credentials, i.e. check username and password in database
user = await dependencies.authenticate_user(
formData.username, formData.password
)
# If user credentials are invalid, raise an HTTPException with a 401 status code
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Define access token expiry time
access_token_expires = timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
# Create access token using user email and expiry time
access_token = dependencies.create_access_token(
data={"sub": user.email}, expires_delta=access_token_expires
)
return {"accessToken": access_token, "tokenType": "bearer"}
When I request an endpoint that depends on a user sign in, i.e.:
@router.get("/")
async def get_user(
currentUser: Annotated[UserInternal, Depends(get_current_active_user)]):
return currentUser
I expected the get_current_user()
function to receive a valid JWT token from the Authorization header, decode it, and extract the email address from the payload. The JWT is being correctly created in the login_for_access_token()
endpoint but it seems as if this is not being passed in the headers of the request in the automatically generated OpenAPI docs.
What can I do to fix this?
答案1
得分: 2
return {"access_token": access_token, "token_type": "bearer"}
应该使用蛇形命名法,而不是驼峰命名法。
英文:
You have wrote:
return {"accessToken": access_token, "tokenType": "bearer"}
But according to the documentation, you should have provided:
return {"access_token": access_token, "token_type": "bearer"}
It should be snake case, not camel case.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论