Files
SatoNano/app/core/security.py
2026-01-06 23:49:23 +08:00

156 lines
3.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
安全相关功能
包括密码哈希、JWT 令牌生成与验证。
使用 Argon2 作为密码哈希算法(目前最安全的选择)。
"""
from datetime import datetime, timedelta, timezone
from typing import Any
import jwt
from argon2 import PasswordHasher
from argon2.exceptions import InvalidHashError, VerifyMismatchError
from app.core.config import settings
# Argon2 密码哈希器,使用推荐的安全参数
_password_hasher = PasswordHasher(
time_cost=3, # 迭代次数
memory_cost=65536, # 内存使用 (64MB)
parallelism=4, # 并行度
)
def hash_password(password: str) -> str:
"""
对密码进行哈希处理
Args:
password: 明文密码
Returns:
Argon2 哈希字符串
"""
return _password_hasher.hash(password)
def verify_password(password: str, hashed_password: str) -> bool:
"""
验证密码是否匹配
Args:
password: 明文密码
hashed_password: 已哈希的密码
Returns:
密码是否正确
"""
try:
_password_hasher.verify(hashed_password, password)
return True
except (VerifyMismatchError, InvalidHashError):
return False
def password_needs_rehash(hashed_password: str) -> bool:
"""
检查密码是否需要重新哈希(参数升级时使用)
Args:
hashed_password: 已哈希的密码
Returns:
是否需要重新哈希
"""
return _password_hasher.check_needs_rehash(hashed_password)
def create_access_token(
subject: str | int,
expires_delta: timedelta | None = None,
extra_claims: dict[str, Any] | None = None,
) -> str:
"""
创建访问令牌
Args:
subject: 令牌主体通常是用户ID
expires_delta: 过期时间增量
extra_claims: 额外的声明数据
Returns:
JWT 访问令牌
"""
now = datetime.now(timezone.utc)
if expires_delta:
expire = now + expires_delta
else:
expire = now + timedelta(minutes=settings.access_token_expire_minutes)
payload = {
"sub": str(subject),
"iat": now,
"exp": expire,
"type": "access",
}
if extra_claims:
payload.update(extra_claims)
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
def create_refresh_token(
subject: str | int,
expires_delta: timedelta | None = None,
) -> str:
"""
创建刷新令牌
Args:
subject: 令牌主体通常是用户ID
expires_delta: 过期时间增量
Returns:
JWT 刷新令牌
"""
now = datetime.now(timezone.utc)
if expires_delta:
expire = now + expires_delta
else:
expire = now + timedelta(days=settings.refresh_token_expire_days)
payload = {
"sub": str(subject),
"iat": now,
"exp": expire,
"type": "refresh",
}
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
def decode_token(token: str) -> dict[str, Any]:
"""
解码并验证 JWT 令牌
Args:
token: JWT 令牌字符串
Returns:
解码后的负载数据
Raises:
jwt.InvalidTokenError: 令牌无效
jwt.ExpiredSignatureError: 令牌已过期
"""
return jwt.decode(
token,
settings.secret_key,
algorithms=[settings.algorithm],
)