英文:
Secure route in FastAPI
问题
from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
import jwt
from passlib.context import CryptContext
from datetime import timedelta
app = FastAPI()
SECRET_KEY = "very-secret1234567890"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class TokenData:
def __init__(self, username: str):
self.username = username
def create_access_token(data: dict):
to_encode = data.copy()
expire = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
# Authenticate user here
# ...
# If authentication is successful, create and return access token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": form_data.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
英文:
<br>
I have a Flask-based backend REST API and I want to migrate to FastAPI. However, I am not sure how to implement secure routes and create access tokens in FastAPI.<br>
In Flask, we have methods from the flask_jwt_extended library such as:
<li>@jwt_required() decorator for secure routes
<li>create_access_token() function for creating JWT tokens.<br><br>
Does FastAPI have a similar feature or capability, or how can I implement this in FastAPI?<br>
Thank you in advance.<br>
Here is an example implementation of secure route and create access token in Flask:<br><br>
import hashlib
import traceback
from datetime import timedelta
from http import HTTPStatus
from flask import Flask, jsonify, request
from flask_jwt_extended import JWTManager, jwt_required, get_jwt_identity, create_access_token
app = Flask(__name__)
jwt = JWTManager(app)
app.config["JWT_SECRET_KEY"] = "very-secret1234567890"
app.config["JWT_ACCESS_TOKEN_EXPIRES"] = timedelta(minutes=15)
app.config["JWT_REFRESH_TOKEN_EXPIRES"] = timedelta(days=30)
host = "localhost"
port = 5000
test_password = "test_password"
db = [
{
"username": "test_user",
"email": "test_email.gmail.com",
"password": hashlib.sha256(test_password.encode()).hexdigest()
}
]
@app.route('/login', methods=['POST'])
def login():
try:
json_data = request.get_json()
email = json_data.get("email")
password = json_data.get("password")
if not email or not password:
response = jsonify(error="'email' and 'password' are required")
return response, HTTPStatus.BAD_REQUEST
# Check if email exists in DB
user_result = [user for user in db if user["email"].lower() == email.lower()]
# Check if the password is correct
encoded_password = hashlib.sha256(password.encode()).hexdigest()
if not user_result or user_result[0]["password"] != encoded_password:
response = jsonify(error="Wrong credentials")
return response, HTTPStatus.BAD_REQUEST
user = user_result[0]
# Generate JWT token and return it
access_token = create_access_token(identity=user["username"])
response = jsonify(username=user["username"], token=access_token)
return response, HTTPStatus.OK
except Exception as e:
print(f"Error: {e}")
print(traceback.format_exc())
response = jsonify(result={"error": "Server error"})
return response, HTTPStatus.INTERNAL_SERVER_ERROR
@app.route('/secured_page', methods=['GET'])
@jwt_required()
def __create_participant():
try:
response = jsonify(message="You are logged in as {}".format(get_jwt_identity()))
return response, HTTPStatus.OK
except Exception as e:
print(f"Error: {e}")
print(traceback.format_exc())
response = jsonify(result={"error": "Server error"})
return response, HTTPStatus.INTERNAL_SERVER_ERROR
if __name__ == '__main__':
app.run(host=host, port=port, debug=True)
答案1
得分: 1
你可以使用像python-jose这样的库来处理JWT功能,但是你必须自己实现jwt_required
和create_access_token
,例如
create_access_token
from jose import JWTError, jwt
from datetime import datetime, timedelta
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
jwt_required(不是一个装饰器,而是创建一个函数,并将其设置为你的路由的依赖项)
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def jwt_required(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user(username=username)
if user is None:
raise credentials_exception
@app.get('/secured_page', dependencies=[jwt_required])
def __create_participant():
...
英文:
You can use libs like python-jose for JWT functionalities, but you have to implement jwt_required
and create_access_token
by yourself, for example
create_access_token
from jose import JWTError, jwt
from datetime import datetime, timedelta
def create_access_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
jwt_required (instead of a decorator, you create a function and set it as a dependency of your route)
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def jwt_required(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user(username=username)
if user is None:
raise credentials_exception
@app.get('/secured_page', dependencies=[jwt_required])
def __create_participant():
...
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论