FastAPI中的安全路由

huangapple go评论65阅读模式
英文:

Secure route in FastAPI

问题

from fastapi import FastAPI, HTTPException, Depends
from fastapi.security import OAuth2PasswordBearer
import jwt
from passlib.context import CryptContext
from datetime import timedelta

app = FastAPI()

SECRET_KEY = "very-secret1234567890"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class TokenData:
    def __init__(self, username: str):
        self.username = username

def create_access_token(data: dict):
    to_encode = data.copy()
    expire = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    # Authenticate user here
    # ...
    # If authentication is successful, create and return access token
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": form_data.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}
英文:

<br>
I have a Flask-based backend REST API and I want to migrate to FastAPI. However, I am not sure how to implement secure routes and create access tokens in FastAPI.<br>
In Flask, we have methods from the flask_jwt_extended library such as:
<li>@jwt_required() decorator for secure routes
<li>create_access_token() function for creating JWT tokens.<br><br>
Does FastAPI have a similar feature or capability, or how can I implement this in FastAPI?<br>
Thank you in advance.<br>
Here is an example implementation of secure route and create access token in Flask:<br><br>

import hashlib
import traceback
from datetime import timedelta
from http import HTTPStatus
from flask import Flask, jsonify, request
from flask_jwt_extended import JWTManager, jwt_required, get_jwt_identity, create_access_token

app = Flask(__name__)
jwt = JWTManager(app)
app.config[&quot;JWT_SECRET_KEY&quot;] = &quot;very-secret1234567890&quot;
app.config[&quot;JWT_ACCESS_TOKEN_EXPIRES&quot;] = timedelta(minutes=15)
app.config[&quot;JWT_REFRESH_TOKEN_EXPIRES&quot;] = timedelta(days=30)
host = &quot;localhost&quot;
port = 5000
test_password = &quot;test_password&quot;
db = [
    {
        &quot;username&quot;: &quot;test_user&quot;,
        &quot;email&quot;: &quot;test_email.gmail.com&quot;,
        &quot;password&quot;: hashlib.sha256(test_password.encode()).hexdigest()
    }
]


@app.route(&#39;/login&#39;, methods=[&#39;POST&#39;])
def login():
    try:
        json_data = request.get_json()
        email = json_data.get(&quot;email&quot;)
        password = json_data.get(&quot;password&quot;)
        if not email or not password:
            response = jsonify(error=&quot;&#39;email&#39; and &#39;password&#39; are required&quot;)
            return response, HTTPStatus.BAD_REQUEST
        # Check if email exists in DB
        user_result = [user for user in db if user[&quot;email&quot;].lower() == email.lower()]
        # Check if the password is correct
        encoded_password = hashlib.sha256(password.encode()).hexdigest()
        if not user_result or user_result[0][&quot;password&quot;] != encoded_password:
            response = jsonify(error=&quot;Wrong credentials&quot;)
            return response, HTTPStatus.BAD_REQUEST
        user = user_result[0]
        # Generate JWT token and return it
        access_token = create_access_token(identity=user[&quot;username&quot;])
        response = jsonify(username=user[&quot;username&quot;], token=access_token)
        return response, HTTPStatus.OK
    except Exception as e:
        print(f&quot;Error: {e}&quot;)
        print(traceback.format_exc())
        response = jsonify(result={&quot;error&quot;: &quot;Server error&quot;})
        return response, HTTPStatus.INTERNAL_SERVER_ERROR


@app.route(&#39;/secured_page&#39;, methods=[&#39;GET&#39;])
@jwt_required()
def __create_participant():
    try:
        response = jsonify(message=&quot;You are logged in as {}&quot;.format(get_jwt_identity()))
        return response, HTTPStatus.OK
    except Exception as e:
        print(f&quot;Error: {e}&quot;)
        print(traceback.format_exc())
        response = jsonify(result={&quot;error&quot;: &quot;Server error&quot;})
        return response, HTTPStatus.INTERNAL_SERVER_ERROR


if __name__ == &#39;__main__&#39;:
    app.run(host=host, port=port, debug=True)

答案1

得分: 1

你可以使用像python-jose这样的库来处理JWT功能,但是你必须自己实现jwt_requiredcreate_access_token,例如

create_access_token

from jose import JWTError, jwt
from datetime import datetime, timedelta

def create_access_token(data: dict):
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

jwt_required(不是一个装饰器,而是创建一个函数,并将其设置为你的路由的依赖项)

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def jwt_required(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = get_user(username=username)
    if user is None:
        raise credentials_exception

@app.get('/secured_page', dependencies=[jwt_required])
def __create_participant():
...
英文:

You can use libs like python-jose for JWT functionalities, but you have to implement jwt_required and create_access_token by yourself, for example

create_access_token

from jose import JWTError, jwt
from datetime import datetime, timedelta

def create_access_token(data: dict):
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({&quot;exp&quot;: expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

jwt_required (instead of a decorator, you create a function and set it as a dependency of your route)

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt

oauth2_scheme = OAuth2PasswordBearer(tokenUrl=&quot;token&quot;)

async def jwt_required(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail=&quot;Could not validate credentials&quot;,
        headers={&quot;WWW-Authenticate&quot;: &quot;Bearer&quot;},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get(&quot;sub&quot;)
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = get_user(username=username)
    if user is None:
        raise credentials_exception

@app.get(&#39;/secured_page&#39;, dependencies=[jwt_required])
def __create_participant():
...

huangapple
  • 本文由 发表于 2023年4月20日 00:35:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/76056906.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定