156 lines
3.5 KiB
Python
156 lines
3.5 KiB
Python
"""
|
||
安全相关功能
|
||
|
||
包括密码哈希、JWT 令牌生成与验证。
|
||
使用 Argon2 作为密码哈希算法(目前最安全的选择)。
|
||
"""
|
||
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Any
|
||
|
||
import jwt
|
||
from argon2 import PasswordHasher
|
||
from argon2.exceptions import InvalidHashError, VerifyMismatchError
|
||
|
||
from app.core.config import settings
|
||
|
||
# Argon2 密码哈希器,使用推荐的安全参数
|
||
_password_hasher = PasswordHasher(
|
||
time_cost=3, # 迭代次数
|
||
memory_cost=65536, # 内存使用 (64MB)
|
||
parallelism=4, # 并行度
|
||
)
|
||
|
||
|
||
def hash_password(password: str) -> str:
|
||
"""
|
||
对密码进行哈希处理
|
||
|
||
Args:
|
||
password: 明文密码
|
||
|
||
Returns:
|
||
Argon2 哈希字符串
|
||
"""
|
||
return _password_hasher.hash(password)
|
||
|
||
|
||
def verify_password(password: str, hashed_password: str) -> bool:
|
||
"""
|
||
验证密码是否匹配
|
||
|
||
Args:
|
||
password: 明文密码
|
||
hashed_password: 已哈希的密码
|
||
|
||
Returns:
|
||
密码是否正确
|
||
"""
|
||
try:
|
||
_password_hasher.verify(hashed_password, password)
|
||
return True
|
||
except (VerifyMismatchError, InvalidHashError):
|
||
return False
|
||
|
||
|
||
def password_needs_rehash(hashed_password: str) -> bool:
|
||
"""
|
||
检查密码是否需要重新哈希(参数升级时使用)
|
||
|
||
Args:
|
||
hashed_password: 已哈希的密码
|
||
|
||
Returns:
|
||
是否需要重新哈希
|
||
"""
|
||
return _password_hasher.check_needs_rehash(hashed_password)
|
||
|
||
|
||
def create_access_token(
|
||
subject: str | int,
|
||
expires_delta: timedelta | None = None,
|
||
extra_claims: dict[str, Any] | None = None,
|
||
) -> str:
|
||
"""
|
||
创建访问令牌
|
||
|
||
Args:
|
||
subject: 令牌主体(通常是用户ID)
|
||
expires_delta: 过期时间增量
|
||
extra_claims: 额外的声明数据
|
||
|
||
Returns:
|
||
JWT 访问令牌
|
||
"""
|
||
now = datetime.now(timezone.utc)
|
||
|
||
if expires_delta:
|
||
expire = now + expires_delta
|
||
else:
|
||
expire = now + timedelta(minutes=settings.access_token_expire_minutes)
|
||
|
||
payload = {
|
||
"sub": str(subject),
|
||
"iat": now,
|
||
"exp": expire,
|
||
"type": "access",
|
||
}
|
||
|
||
if extra_claims:
|
||
payload.update(extra_claims)
|
||
|
||
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
|
||
|
||
|
||
def create_refresh_token(
|
||
subject: str | int,
|
||
expires_delta: timedelta | None = None,
|
||
) -> str:
|
||
"""
|
||
创建刷新令牌
|
||
|
||
Args:
|
||
subject: 令牌主体(通常是用户ID)
|
||
expires_delta: 过期时间增量
|
||
|
||
Returns:
|
||
JWT 刷新令牌
|
||
"""
|
||
now = datetime.now(timezone.utc)
|
||
|
||
if expires_delta:
|
||
expire = now + expires_delta
|
||
else:
|
||
expire = now + timedelta(days=settings.refresh_token_expire_days)
|
||
|
||
payload = {
|
||
"sub": str(subject),
|
||
"iat": now,
|
||
"exp": expire,
|
||
"type": "refresh",
|
||
}
|
||
|
||
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
|
||
|
||
|
||
def decode_token(token: str) -> dict[str, Any]:
|
||
"""
|
||
解码并验证 JWT 令牌
|
||
|
||
Args:
|
||
token: JWT 令牌字符串
|
||
|
||
Returns:
|
||
解码后的负载数据
|
||
|
||
Raises:
|
||
jwt.InvalidTokenError: 令牌无效
|
||
jwt.ExpiredSignatureError: 令牌已过期
|
||
"""
|
||
return jwt.decode(
|
||
token,
|
||
settings.secret_key,
|
||
algorithms=[settings.algorithm],
|
||
)
|
||
|