“FastAPI 和 JWT 令牌的令牌验证问题 – ‘无法验证凭据'”

huangapple go评论143阅读模式
英文:

Token authentication issue with FastAPI and JWT tokens - "Could not validate credentials"

问题

I am building an API using Python 3.10.8 and FastAPI 0.95.1, and I'm experiencing an issue with user authentication, specifically related to JWT tokens. I have followed the guide provided in FastAPI's security documentation.

The problem arises when I make a request to an endpoint that requires user authentication. Instead of receiving a valid JWT token in the get_current_user() function, the token is being passed as the string "undefined". As a result, I encounter a 401 Error with the message "Could not validate credentials." In the Chrome developer tools, the Authorization header also shows "Bearer undefined".

  1. # --- dependencies.py ---
  2. from database import recipes_db
  3. from datetime import datetime, timedelta
  4. from fastapi import Depends, HTTPException, status
  5. from fastapi.security import OAuth2PasswordBearer
  6. from jose import JWTError, jwt
  7. from passlib.context import CryptContext
  8. from utils.models import UserInternal, UserExternal, TokenData
  9. from settings import SECRET_KEY, ALGORITHM
  10. from typing import Annotated
  11. SELECT_USER = """
  12. SELECT
  13. id,
  14. email,
  15. first_name as firstName,
  16. hashed_password as hashedPassword
  17. FROM
  18. users
  19. WHERE
  20. email = :email
  21. """
  22. password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  23. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  24. def verify_hash(plain_text: str, hashed_text: str) -> bool:
  25. return password_context.verify(plain_text, hashed_text)
  26. def create_hash(text: str) -> str:
  27. return password_context.hash(text)
  28. async def get_user(email: str, internal: bool = True) -> UserExternal | UserInternal:
  29. data = await recipes_db.fetch_one(
  30. query=SELECT_USER,
  31. values={'email': email}
  32. )
  33. if data:
  34. if internal:
  35. user = UserInternal(**data)
  36. else:
  37. user = UserExternal(**data)
  38. return user
  39. async def authenticate_user(email: str, password: str):
  40. user = await get_user(email)
  41. if not user:
  42. return False
  43. if not verify_hash(password, user.hashedPassword):
  44. return False
  45. return user
  46. def create_access_token(data: dict, expires_delta: timedelta | None = None):
  47. to_encode = data.copy()
  48. if expires_delta:
  49. expire = datetime.utcnow() + expires_delta
  50. else:
  51. expire = datetime.utcnow() + timedelta(minutes=15)
  52. to_encode.update({"exp": expire})
  53. encoded_jwt = jwt.encode(to_encode, str(SECRET_KEY), algorithm=ALGORITHM)
  54. return encoded_jwt
  55. async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
  56. credentials_exception = HTTPException(
  57. status_code=status.HTTP_401_UNAUTHORIZED,
  58. detail="Could not validate credentials",
  59. headers={"WWW-Authenticate": "Bearer"},
  60. )
  61. try:
  62. payload = jwt.decode(token, str(SECRET_KEY), algorithms=[ALGORITHM])
  63. email: str = payload.get("sub")
  64. if email is None:
  65. raise credentials_exception
  66. token_data = TokenData(email=email)
  67. except JWTError as e:
  68. raise credentials_exception
  69. user = await get_user(email=token_data.email)
  70. if user is None:
  71. raise credentials_exception
  72. return user
  73. async def get_current_active_user(current_user: Annotated[UserInternal, Depends(get_current_user)]):
  74. return current_user
  1. # --- main.py ---
  2. @app.post("/token", response_model=Token)
  3. async def login_for_access_token(
  4. formData: Annotated[OAuth2PasswordRequestForm, Depends()]
  5. ):
  6. user = await dependencies.authenticate_user(
  7. formData.username, formData.password
  8. )
  9. if not user:
  10. raise HTTPException(
  11. status_code=status.HTTP_401_UNAUTHORIZED,
  12. detail="Incorrect email or password",
  13. headers={"WWW-Authenticate": "Bearer"},
  14. )
  15. access_token_expires = timedelta(
  16. minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
  17. )
  18. access_token = dependencies.create_access_token(
  19. data={"sub": user.email}, expires_delta=access_token_expires
  20. )
  21. return {"accessToken": access_token, "tokenType": "bearer"}

When I request an endpoint that depends on a user sign in, i.e.:

  1. @router.get("/")
  2. async def get_user(
  3. currentUser: Annotated[UserInternal, Depends(get_current_active_user)]):
  4. return currentUser

I expected the get_current_user() function to receive a valid JWT token from the Authorization header, decode it, and extract the email address from the payload. The JWT is being correctly created in the login_for_access_token() endpoint but it seems as if this is not being passed in the headers of the request in the automatically generated OpenAPI docs.

What can I do to fix this?

英文:

I am building an API using Python 3.10.8 and FastAPI 0.95.1, and I'm experiencing an issue with user authentication, specifically related to JWT tokens. I have followed the guide provided in FastAPI's security documentation.

The problem arises when I make a request to an endpoint that requires user authentication. Instead of receiving a valid JWT token in the get_current_user() function, the token is being passed as the string "undefined". As a result, I encounter a 401 Error with the message "Could not validate credentials." In the Chrome developer tools, the Authorization header also shows "Bearer undefined".

  1. # --- dependencies.py ---
  2. from database import recipes_db
  3. from datetime import datetime, timedelta
  4. from fastapi import Depends, HTTPException, status
  5. from fastapi.security import OAuth2PasswordBearer
  6. from jose import JWTError, jwt
  7. from passlib.context import CryptContext
  8. from utils.models import UserInternal, UserExternal, TokenData
  9. from settings import SECRET_KEY, ALGORITHM
  10. from typing import Annotated
  11. SELECT_USER = """
  12. SELECT
  13. id,
  14. email,
  15. first_name as firstName,
  16. hashed_password as hashedPassword
  17. FROM
  18. users
  19. WHERE
  20. email = :email
  21. """
  22. password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  23. # This context can be used to hash passwords and verify them later on.
  24. # The deprecated argument is set to "auto", which means that the library
  25. # will automatically deprecate old algorithms and switch to new ones as needed.
  26. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  27. # This is a security scheme used for authenticating users with OAuth2.
  28. # The tokenUrl parameter is set to "token", which is the endpoint where the
  29. # user can obtain an access token by providing their credentials. The oauth2_scheme
  30. # object can be used as a dependency in FastAPI endpoints to enforce authentication.
  31. def verify_hash(plain_text: str, hashed_text: str) -> bool:
  32. """
  33. Verifies that a plain text matches a hashed text.
  34. Args:
  35. plain_text (str): The plain text to verify.
  36. hashed_text (str): The hashed text to compare against.
  37. Returns:
  38. bool: True if the plain text matches the hashed text, False otherwise.
  39. """
  40. return password_context.verify(plain_text, hashed_text)
  41. def create_hash(text: str) -> str:
  42. """
  43. Hashes a string (e.g., password or token)
  44. Args:
  45. text (str): The plain text to hash.
  46. Returns:
  47. str: The hashed text.
  48. """
  49. return password_context.hash(text)
  50. async def get_user(email: str, internal: bool = True) -> UserExternal | UserInternal:
  51. """
  52. Fetches a user from the database by their email.
  53. Args:
  54. email (str): The email of the user.
  55. internal (bool, optional): If True, returns an internal user object.
  56. If False, returns an external user object. Defaults to True.
  57. Returns:
  58. UserInternal or UserExternal: The user object.
  59. """
  60. data = await recipes_db.fetch_one(
  61. query=SELECT_USER,
  62. values={'email': email}
  63. )
  64. if data:
  65. if internal:
  66. user = UserInternal(**data)
  67. else:
  68. user = UserExternal(**data)
  69. return user
  70. async def authenticate_user(email: str, password: str):
  71. """
  72. Authenticates a user.
  73. Args:
  74. email (str): The email of the user.
  75. password (str): The user's password.
  76. Returns:
  77. UserInternal: The authenticated user object if successful, otherwise False.
  78. """
  79. # Retrieve an UserInternal object
  80. user = await get_user(email)
  81. if not user:
  82. return False
  83. if not verify_hash(password, user.hashedPassword):
  84. return False
  85. return user
  86. def create_access_token(data: dict, expires_delta: timedelta | None = None):
  87. """
  88. Generates a JWT access token.
  89. Args:
  90. data (dict): The data to include in the token.
  91. expires_delta (timedelta | None): The expiration time for the token, or None for default.
  92. Returns:
  93. bytes: The encoded access token.
  94. """
  95. # Make a copy of the data so we don't modify the original dictionary
  96. to_encode = data.copy()
  97. # If an expiration time is provided, set the 'exp' claim to that time
  98. if expires_delta:
  99. expire = datetime.utcnow() + expires_delta
  100. else:
  101. expire = datetime.utcnow() + timedelta(minutes=15)
  102. to_encode.update({"exp": expire})
  103. # Encode the token using the JWT library
  104. encoded_jwt = jwt.encode(to_encode, str(SECRET_KEY), algorithm=ALGORITHM)
  105. return encoded_jwt
  106. async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
  107. """
  108. Gets the current user from an authentication token.
  109. Args:
  110. token (Annotated[str, Depends(oauth2_scheme)]): The authentication token.
  111. Raises:
  112. HTTPException: If the credentials cannot be validated.
  113. Returns:
  114. UserInternal: The current user.
  115. """
  116. # Define the exception to raise if credentials cannot be validated
  117. credentials_exception = HTTPException(
  118. status_code=status.HTTP_401_UNAUTHORIZED,
  119. detail="Could not validate credentials",
  120. headers={"WWW-Authenticate": "Bearer"},
  121. )
  122. try:
  123. # Decode the JWT token
  124. payload = jwt.decode(token, str(SECRET_KEY), algorithms=[ALGORITHM])
  125. email: str = payload.get("sub")
  126. # Raise exception if the email is missing from the payload
  127. if email is None:
  128. raise credentials_exception
  129. # Create TokenData object with email
  130. token_data = TokenData(email=email)
  131. except JWTError as e:
  132. # Raise exception if there is a JWT error
  133. raise credentials_exception
  134. # Get user from database using email from token data
  135. user = await get_user(email=token_data.email)
  136. # Raise exception if user not found
  137. if user is None:
  138. raise credentials_exception
  139. return user
  140. async def get_current_active_user(current_user: Annotated[UserInternal, Depends(get_current_user)]):
  141. # if current_user.disabled:
  142. # raise HTTPException(status_code=400, detail="Inactive user")
  143. return current_user
  1. # --- main.py ---
  2. @app.post("/token", response_model=Token)
  3. async def login_for_access_token(
  4. formData: Annotated[OAuth2PasswordRequestForm, Depends()]
  5. ):
  6. """
  7. Logs in and receives an access token.
  8. ### Arguments
  9. - `formData` (`Annotated[OAuth2PasswordRequestForm, Depends()]`): The OAuth2 password request form.
  10. ### Returns
  11. - `Token`: The access token.
  12. """
  13. # Authenticate the user credentials, i.e. check username and password in database
  14. user = await dependencies.authenticate_user(
  15. formData.username, formData.password
  16. )
  17. # If user credentials are invalid, raise an HTTPException with a 401 status code
  18. if not user:
  19. raise HTTPException(
  20. status_code=status.HTTP_401_UNAUTHORIZED,
  21. detail="Incorrect email or password",
  22. headers={"WWW-Authenticate": "Bearer"},
  23. )
  24. # Define access token expiry time
  25. access_token_expires = timedelta(
  26. minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
  27. )
  28. # Create access token using user email and expiry time
  29. access_token = dependencies.create_access_token(
  30. data={"sub": user.email}, expires_delta=access_token_expires
  31. )
  32. return {"accessToken": access_token, "tokenType": "bearer"}

When I request an endpoint that depends on a user sign in, i.e.:

  1. @router.get("/")
  2. async def get_user(
  3. currentUser: Annotated[UserInternal, Depends(get_current_active_user)]):
  4. return currentUser

I expected the get_current_user() function to receive a valid JWT token from the Authorization header, decode it, and extract the email address from the payload. The JWT is being correctly created in the login_for_access_token() endpoint but it seems as if this is not being passed in the headers of the request in the automatically generated OpenAPI docs.

What can I do to fix this?

答案1

得分: 2

return {"access_token": access_token, "token_type": "bearer"}

应该使用蛇形命名法,而不是驼峰命名法。

英文:

You have wrote:

  1. return {"accessToken": access_token, "tokenType": "bearer"}

But according to the documentation, you should have provided:

  1. return {"access_token": access_token, "token_type": "bearer"}

It should be snake case, not camel case.

huangapple
  • 本文由 发表于 2023年7月3日 18:15:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/76603811.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定