FastAPI教程 使用(哈希)密碼和 JWT Bearer 令牌的 OAuth2

2021-11-03 10:18 更新

既然我們已經(jīng)有了所有的安全流程,就讓我們來(lái)使用 JWT 令牌和安全哈希密碼讓應(yīng)用程序真正地安全吧。

你可以在應(yīng)用程序中真正地使用這些代碼,在數(shù)據(jù)庫(kù)中保存密碼哈希值,等等。

我們將從上一章結(jié)束的位置開始,然后對(duì)示例進(jìn)行擴(kuò)充。

關(guān)于 JWT

JWT 表示 「JSON Web Tokens」。

它是一個(gè)將 JSON 對(duì)象編碼為密集且沒有空格的長(zhǎng)字符串的標(biāo)準(zhǔn)。字符串看起來(lái)像這樣:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

它沒有被加密,因此任何人都可以從字符串內(nèi)容中還原數(shù)據(jù)。

但它經(jīng)過(guò)了簽名。因此,當(dāng)你收到一個(gè)由你發(fā)出的令牌時(shí),可以校驗(yàn)令牌是否真的由你發(fā)出。

通過(guò)這種方式,你可以創(chuàng)建一個(gè)有效期為 1 周的令牌。然后當(dāng)用戶第二天使用令牌重新訪問(wèn)時(shí),你知道該用戶仍然處于登入狀態(tài)。

一周后令牌將會(huì)過(guò)期,用戶將不會(huì)通過(guò)認(rèn)證,必須再次登錄才能獲得一個(gè)新令牌。而且如果用戶(或第三方)試圖修改令牌以篡改過(guò)期時(shí)間,你將因?yàn)楹灻黄ヅ涠軌虬l(fā)覺。

如果你想上手體驗(yàn) JWT 令牌并了解其工作方式,可訪問(wèn) https://jwt.io。

安裝 python-jose

我們需要安裝 python-jose 以在 Python 中生成和校驗(yàn) JWT 令牌:

pip install python-jose[cryptography]

████████████████████████████████████████ 100%


Python-jose 需要一個(gè)額外的加密后端。

這里我們使用的是推薦的后端:pyca/cryptography。

Tip

本教程曾經(jīng)使用過(guò) PyJWT。

但是后來(lái)更新為使用 Python-jose,因?yàn)樗峁┝?PyJWT 的所有功能,以及之后與其他工具進(jìn)行集成時(shí)你可能需要的一些其他功能。

哈希密碼

「哈?!沟囊馑际牵簩⒛承﹥?nèi)容(在本例中為密碼)轉(zhuǎn)換為看起來(lái)像亂碼的字節(jié)序列(只是一個(gè)字符串)。

每次你傳入完全相同的內(nèi)容(完全相同的密碼)時(shí),你都會(huì)得到完全相同的亂碼。

但是你不能從亂碼轉(zhuǎn)換回密碼。

為什么使用哈希密碼

如果你的數(shù)據(jù)庫(kù)被盜,小偷將無(wú)法獲得用戶的明文密碼,只能拿到哈希值。

因此,小偷將無(wú)法嘗試在另一個(gè)系統(tǒng)中使用這些相同的密碼(由于許多用戶在任何地方都使用相同的密碼,因此這很危險(xiǎn))。

安裝 passlib

PassLib 是一個(gè)用于處理哈希密碼的很棒的 Python 包。

它支持許多安全哈希算法以及配合算法使用的實(shí)用程序。

推薦的算法是 「Bcrypt」。

因此,安裝附帶 Bcrypt 的 PassLib:

pip install passlib[bcrypt]


████████████████████████████████████████ 100%

Tip

使用 passlib,你甚至可以將其配置為能夠讀取 Django,F(xiàn)lask 的安全擴(kuò)展或許多其他工具創(chuàng)建的密碼。

因此,你將能夠,舉個(gè)例子,將數(shù)據(jù)庫(kù)中來(lái)自 Django 應(yīng)用的數(shù)據(jù)共享給一個(gè) FastAPI 應(yīng)用?;蛘呤褂猛粩?shù)據(jù)庫(kù)但逐漸將應(yīng)用從 Django 遷移到 FastAPI。

而你的用戶將能夠同時(shí)從 Django 應(yīng)用或 FastAPI 應(yīng)用登錄。

哈希并校驗(yàn)密碼

從 passlib 導(dǎo)入我們需要的工具。

創(chuàng)建一個(gè) PassLib 「上下文」。這將用于哈希和校驗(yàn)密碼。

Tip

PassLib 上下文還具有使用不同哈希算法的功能,包括僅允許用于校驗(yàn)的已棄用的舊算法等。

例如,你可以使用它來(lái)讀取和校驗(yàn)由另一個(gè)系統(tǒng)(例如Django)生成的密碼,但是使用其他算法例如 Bcrypt 生成新的密碼哈希值。

并同時(shí)兼容所有的這些功能。

創(chuàng)建一個(gè)工具函數(shù)以哈希來(lái)自用戶的密碼。

然后創(chuàng)建另一個(gè)工具函數(shù),用于校驗(yàn)接收的密碼是否與存儲(chǔ)的哈希值匹配。

再創(chuàng)建另一個(gè)工具函數(shù)用于認(rèn)證并返回用戶。

from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]

Note

如果你查看新的(偽)數(shù)據(jù)庫(kù) fake_users_db,你將看到哈希后的密碼現(xiàn)在的樣子:"$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW"。

處理 JWT 令牌

導(dǎo)入已安裝的模塊。

創(chuàng)建一個(gè)隨機(jī)密鑰,該密鑰將用于對(duì) JWT 令牌進(jìn)行簽名。

要生成一個(gè)安全的隨機(jī)密鑰,可使用以下命令:

openssl rand -hex 32


09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7

然后將輸出復(fù)制到變量 「SECRET_KEY」 中(不要使用示例中的這個(gè))。

創(chuàng)建用于設(shè)定 JWT 令牌簽名算法的變量 「ALGORITHM」,并將其設(shè)置為 "HS256"。

創(chuàng)建一個(gè)設(shè)置令牌過(guò)期時(shí)間的變量。

定義一個(gè)將在令牌端點(diǎn)中用于響應(yīng)的 Pydantic 模型。

創(chuàng)建一個(gè)生成新的訪問(wèn)令牌的工具函數(shù)。

from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]

更新依賴項(xiàng)

更新 get_current_user 以接收與之前相同的令牌,但這次使用的是 JWT 令牌。

解碼接收到的令牌,對(duì)其進(jìn)行校驗(yàn),然后返回當(dāng)前用戶。

如果令牌無(wú)效,立即返回一個(gè) HTTP 錯(cuò)誤。

from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]

更新 /token 路徑操作

使用令牌的過(guò)期時(shí)間創(chuàng)建一個(gè) timedelta 對(duì)象。

創(chuàng)建一個(gè)真實(shí)的 JWT 訪問(wèn)令牌并返回它。

from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Optional[str] = None


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]

關(guān)于 JWT 「主題」 sub 的技術(shù)細(xì)節(jié)

JWT 的規(guī)范中提到有一個(gè) sub 鍵,值為該令牌的主題。

使用它并不是必須的,但這是你放置用戶標(biāo)識(shí)的地方,所以我們?cè)谑纠惺褂昧怂?/p>

除了識(shí)別用戶并允許他們直接在你的 API 上執(zhí)行操作之外,JWT 還可以用于其他事情。

例如,你可以識(shí)別一個(gè) 「汽車」 或 「博客文章」。

然后你可以添加關(guān)于該實(shí)體的權(quán)限,比如「駕駛」(汽車)或「編輯」(博客)。

然后,你可以將 JWT 令牌交給用戶(或機(jī)器人),他們可以使用它來(lái)執(zhí)行這些操作(駕駛汽車,或編輯博客文章),甚至不需要有一個(gè)賬戶,只需使用你的 API 為其生成的 JWT 令牌。

使用這樣的思路,JWT 可以用于更復(fù)雜的場(chǎng)景。

在這些情況下,幾個(gè)實(shí)體可能有相同的 ID,比如說(shuō) foo(一個(gè)用戶 foo,一輛車 foo,一篇博客文章 foo)。

因此,為了避免 ID 沖突,當(dāng)為用戶創(chuàng)建 JWT 令牌時(shí),你可以在 sub 鍵的值前加上前綴,例如 username:。所以,在這個(gè)例子中,sub 的值可以是:username:johndoe。

要記住的重點(diǎn)是,sub 鍵在整個(gè)應(yīng)用程序中應(yīng)該有一個(gè)唯一的標(biāo)識(shí)符,而且應(yīng)該是一個(gè)字符串。

檢查效果

運(yùn)行服務(wù)器并訪問(wèn)文檔: http://127.0.0.1:8000/docs。

你會(huì)看到如下用戶界面:

像以前一樣對(duì)應(yīng)用程序進(jìn)行認(rèn)證。

使用如下憑證:

用戶名: johndoe 密碼: secret

Check

請(qǐng)注意,代碼中沒有任何地方記錄了明文密碼 「secret」,我們只保存了其哈希值。

訪問(wèn) /users/me/ 端點(diǎn),你將獲得如下響應(yīng):

{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "disabled": false
}

如果你打開開發(fā)者工具,將看到數(shù)據(jù)是如何發(fā)送的并且其中僅包含了令牌,只有在第一個(gè)請(qǐng)求中發(fā)送了密碼以校驗(yàn)用戶身份并獲取該訪問(wèn)令牌,但之后都不會(huì)再發(fā)送密碼:

Note

注意請(qǐng)求中的 Authorization 首部,其值以 Bearer 開頭。

使用 scopes 的進(jìn)階用法

OAuth2 具有「作用域」的概念。

你可以使用它們向 JWT 令牌添加一組特定的權(quán)限。

然后,你可以將此令牌直接提供給用戶或第三方,使其在一些限制下與你的 API 進(jìn)行交互。

你可以在之后的進(jìn)階用戶指南中了解如何使用它們以及如何將它們集成到 FastAPI 中。

總結(jié)

通過(guò)目前你所看到的,你可以使用像 OAuth2 和 JWT 這樣的標(biāo)準(zhǔn)來(lái)構(gòu)建一個(gè)安全的 FastAPI 應(yīng)用程序。

在幾乎所有的框架中,處理安全性問(wèn)題都很容易成為一個(gè)相當(dāng)復(fù)雜的話題。

許多高度簡(jiǎn)化了安全流程的軟件包不得不在數(shù)據(jù)模型、數(shù)據(jù)庫(kù)和可用功能上做出很多妥協(xié)。而這些過(guò)于簡(jiǎn)化流程的軟件包中,有些其實(shí)隱含了安全漏洞。

FastAPI 不對(duì)任何數(shù)據(jù)庫(kù)、數(shù)據(jù)模型或工具做任何妥協(xié)。

它給了你所有的靈活性來(lái)選擇最適合你項(xiàng)目的前者。

你可以直接使用許多維護(hù)良好且使用廣泛的包,如 passlib 和 python-jose,因?yàn)?nbsp;FastAPI 不需要任何復(fù)雜的機(jī)制來(lái)集成外部包。

但它為你提供了一些工具,在不影響靈活性、健壯性和安全性的前提下,盡可能地簡(jiǎn)化這個(gè)過(guò)程。

而且你可以用相對(duì)簡(jiǎn)單的方式使用和實(shí)現(xiàn)安全、標(biāo)準(zhǔn)的協(xié)議,比如 OAuth2。

你可以在進(jìn)階用戶指南中了解更多關(guān)于如何使用 OAuth2 「作用域」的信息,以實(shí)現(xiàn)更精細(xì)的權(quán)限系統(tǒng),并同樣遵循這些標(biāo)準(zhǔn)。帶有作用域的 OAuth2 是很多大的認(rèn)證提供商使用的機(jī)制,比如 Facebook、Google、GitHub、微軟、Twitter 等,授權(quán)第三方應(yīng)用代表用戶與他們的 API 進(jìn)行交互。


以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)