FastAPI中的安全路由

huangapple go评论84阅读模式
英文:

Secure route in FastAPI

问题

  1. from fastapi import FastAPI, HTTPException, Depends
  2. from fastapi.security import OAuth2PasswordBearer
  3. import jwt
  4. from passlib.context import CryptContext
  5. from datetime import timedelta
  6. app = FastAPI()
  7. SECRET_KEY = "very-secret1234567890"
  8. ALGORITHM = "HS256"
  9. ACCESS_TOKEN_EXPIRE_MINUTES = 15
  10. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  11. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  12. class TokenData:
  13. def __init__(self, username: str):
  14. self.username = username
  15. def create_access_token(data: dict):
  16. to_encode = data.copy()
  17. expire = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
  18. to_encode.update({"exp": expire})
  19. encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  20. return encoded_jwt
  21. @app.post("/token")
  22. async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
  23. # Authenticate user here
  24. # ...
  25. # If authentication is successful, create and return access token
  26. access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
  27. access_token = create_access_token(
  28. data={"sub": form_data.username}, expires_delta=access_token_expires
  29. )
  30. return {"access_token": access_token, "token_type": "bearer"}
英文:

<br>
I have a Flask-based backend REST API and I want to migrate to FastAPI. However, I am not sure how to implement secure routes and create access tokens in FastAPI.<br>
In Flask, we have methods from the flask_jwt_extended library such as:
<li>@jwt_required() decorator for secure routes
<li>create_access_token() function for creating JWT tokens.<br><br>
Does FastAPI have a similar feature or capability, or how can I implement this in FastAPI?<br>
Thank you in advance.<br>
Here is an example implementation of secure route and create access token in Flask:<br><br>

  1. import hashlib
  2. import traceback
  3. from datetime import timedelta
  4. from http import HTTPStatus
  5. from flask import Flask, jsonify, request
  6. from flask_jwt_extended import JWTManager, jwt_required, get_jwt_identity, create_access_token
  7. app = Flask(__name__)
  8. jwt = JWTManager(app)
  9. app.config[&quot;JWT_SECRET_KEY&quot;] = &quot;very-secret1234567890&quot;
  10. app.config[&quot;JWT_ACCESS_TOKEN_EXPIRES&quot;] = timedelta(minutes=15)
  11. app.config[&quot;JWT_REFRESH_TOKEN_EXPIRES&quot;] = timedelta(days=30)
  12. host = &quot;localhost&quot;
  13. port = 5000
  14. test_password = &quot;test_password&quot;
  15. db = [
  16. {
  17. &quot;username&quot;: &quot;test_user&quot;,
  18. &quot;email&quot;: &quot;test_email.gmail.com&quot;,
  19. &quot;password&quot;: hashlib.sha256(test_password.encode()).hexdigest()
  20. }
  21. ]
  22. @app.route(&#39;/login&#39;, methods=[&#39;POST&#39;])
  23. def login():
  24. try:
  25. json_data = request.get_json()
  26. email = json_data.get(&quot;email&quot;)
  27. password = json_data.get(&quot;password&quot;)
  28. if not email or not password:
  29. response = jsonify(error=&quot;&#39;email&#39; and &#39;password&#39; are required&quot;)
  30. return response, HTTPStatus.BAD_REQUEST
  31. # Check if email exists in DB
  32. user_result = [user for user in db if user[&quot;email&quot;].lower() == email.lower()]
  33. # Check if the password is correct
  34. encoded_password = hashlib.sha256(password.encode()).hexdigest()
  35. if not user_result or user_result[0][&quot;password&quot;] != encoded_password:
  36. response = jsonify(error=&quot;Wrong credentials&quot;)
  37. return response, HTTPStatus.BAD_REQUEST
  38. user = user_result[0]
  39. # Generate JWT token and return it
  40. access_token = create_access_token(identity=user[&quot;username&quot;])
  41. response = jsonify(username=user[&quot;username&quot;], token=access_token)
  42. return response, HTTPStatus.OK
  43. except Exception as e:
  44. print(f&quot;Error: {e}&quot;)
  45. print(traceback.format_exc())
  46. response = jsonify(result={&quot;error&quot;: &quot;Server error&quot;})
  47. return response, HTTPStatus.INTERNAL_SERVER_ERROR
  48. @app.route(&#39;/secured_page&#39;, methods=[&#39;GET&#39;])
  49. @jwt_required()
  50. def __create_participant():
  51. try:
  52. response = jsonify(message=&quot;You are logged in as {}&quot;.format(get_jwt_identity()))
  53. return response, HTTPStatus.OK
  54. except Exception as e:
  55. print(f&quot;Error: {e}&quot;)
  56. print(traceback.format_exc())
  57. response = jsonify(result={&quot;error&quot;: &quot;Server error&quot;})
  58. return response, HTTPStatus.INTERNAL_SERVER_ERROR
  59. if __name__ == &#39;__main__&#39;:
  60. app.run(host=host, port=port, debug=True)

答案1

得分: 1

你可以使用像python-jose这样的库来处理JWT功能,但是你必须自己实现jwt_requiredcreate_access_token,例如

create_access_token

  1. from jose import JWTError, jwt
  2. from datetime import datetime, timedelta
  3. def create_access_token(data: dict):
  4. to_encode = data.copy()
  5. expire = datetime.utcnow() + timedelta(minutes=15)
  6. to_encode.update({"exp": expire})
  7. encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  8. return encoded_jwt

jwt_required(不是一个装饰器,而是创建一个函数,并将其设置为你的路由的依赖项)

  1. from fastapi import Depends, HTTPException, status
  2. from fastapi.security import OAuth2PasswordBearer
  3. from jose import JWTError, jwt
  4. oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
  5. async def jwt_required(token: str = Depends(oauth2_scheme)):
  6. credentials_exception = HTTPException(
  7. status_code=status.HTTP_401_UNAUTHORIZED,
  8. detail="Could not validate credentials",
  9. headers={"WWW-Authenticate": "Bearer"},
  10. )
  11. try:
  12. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  13. username: str = payload.get("sub")
  14. if username is None:
  15. raise credentials_exception
  16. except JWTError:
  17. raise credentials_exception
  18. user = get_user(username=username)
  19. if user is None:
  20. raise credentials_exception
  21. @app.get('/secured_page', dependencies=[jwt_required])
  22. def __create_participant():
  23. ...
英文:

You can use libs like python-jose for JWT functionalities, but you have to implement jwt_required and create_access_token by yourself, for example

create_access_token

  1. from jose import JWTError, jwt
  2. from datetime import datetime, timedelta
  3. def create_access_token(data: dict):
  4. to_encode = data.copy()
  5. expire = datetime.utcnow() + timedelta(minutes=15)
  6. to_encode.update({&quot;exp&quot;: expire})
  7. encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  8. return encoded_jwt

jwt_required (instead of a decorator, you create a function and set it as a dependency of your route)

  1. from fastapi import Depends, HTTPException, status
  2. from fastapi.security import OAuth2PasswordBearer
  3. from jose import JWTError, jwt
  4. oauth2_scheme = OAuth2PasswordBearer(tokenUrl=&quot;token&quot;)
  5. async def jwt_required(token: str = Depends(oauth2_scheme)):
  6. credentials_exception = HTTPException(
  7. status_code=status.HTTP_401_UNAUTHORIZED,
  8. detail=&quot;Could not validate credentials&quot;,
  9. headers={&quot;WWW-Authenticate&quot;: &quot;Bearer&quot;},
  10. )
  11. try:
  12. payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
  13. username: str = payload.get(&quot;sub&quot;)
  14. if username is None:
  15. raise credentials_exception
  16. except JWTError:
  17. raise credentials_exception
  18. user = get_user(username=username)
  19. if user is None:
  20. raise credentials_exception
  21. @app.get(&#39;/secured_page&#39;, dependencies=[jwt_required])
  22. def __create_participant():
  23. ...

huangapple
  • 本文由 发表于 2023年4月20日 00:35:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/76056906.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定