提供基本前后端骨架
This commit is contained in:
155
app/core/security.py
Normal file
155
app/core/security.py
Normal file
@@ -0,0 +1,155 @@
|
||||
"""
|
||||
安全相关功能
|
||||
|
||||
包括密码哈希、JWT 令牌生成与验证。
|
||||
使用 Argon2 作为密码哈希算法(目前最安全的选择)。
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from typing import Any
|
||||
|
||||
import jwt
|
||||
from argon2 import PasswordHasher
|
||||
from argon2.exceptions import InvalidHashError, VerifyMismatchError
|
||||
|
||||
from app.core.config import settings
|
||||
|
||||
# Argon2 密码哈希器,使用推荐的安全参数
|
||||
_password_hasher = PasswordHasher(
|
||||
time_cost=3, # 迭代次数
|
||||
memory_cost=65536, # 内存使用 (64MB)
|
||||
parallelism=4, # 并行度
|
||||
)
|
||||
|
||||
|
||||
def hash_password(password: str) -> str:
|
||||
"""
|
||||
对密码进行哈希处理
|
||||
|
||||
Args:
|
||||
password: 明文密码
|
||||
|
||||
Returns:
|
||||
Argon2 哈希字符串
|
||||
"""
|
||||
return _password_hasher.hash(password)
|
||||
|
||||
|
||||
def verify_password(password: str, hashed_password: str) -> bool:
|
||||
"""
|
||||
验证密码是否匹配
|
||||
|
||||
Args:
|
||||
password: 明文密码
|
||||
hashed_password: 已哈希的密码
|
||||
|
||||
Returns:
|
||||
密码是否正确
|
||||
"""
|
||||
try:
|
||||
_password_hasher.verify(hashed_password, password)
|
||||
return True
|
||||
except (VerifyMismatchError, InvalidHashError):
|
||||
return False
|
||||
|
||||
|
||||
def password_needs_rehash(hashed_password: str) -> bool:
|
||||
"""
|
||||
检查密码是否需要重新哈希(参数升级时使用)
|
||||
|
||||
Args:
|
||||
hashed_password: 已哈希的密码
|
||||
|
||||
Returns:
|
||||
是否需要重新哈希
|
||||
"""
|
||||
return _password_hasher.check_needs_rehash(hashed_password)
|
||||
|
||||
|
||||
def create_access_token(
|
||||
subject: str | int,
|
||||
expires_delta: timedelta | None = None,
|
||||
extra_claims: dict[str, Any] | None = None,
|
||||
) -> str:
|
||||
"""
|
||||
创建访问令牌
|
||||
|
||||
Args:
|
||||
subject: 令牌主体(通常是用户ID)
|
||||
expires_delta: 过期时间增量
|
||||
extra_claims: 额外的声明数据
|
||||
|
||||
Returns:
|
||||
JWT 访问令牌
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
if expires_delta:
|
||||
expire = now + expires_delta
|
||||
else:
|
||||
expire = now + timedelta(minutes=settings.access_token_expire_minutes)
|
||||
|
||||
payload = {
|
||||
"sub": str(subject),
|
||||
"iat": now,
|
||||
"exp": expire,
|
||||
"type": "access",
|
||||
}
|
||||
|
||||
if extra_claims:
|
||||
payload.update(extra_claims)
|
||||
|
||||
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
|
||||
|
||||
|
||||
def create_refresh_token(
|
||||
subject: str | int,
|
||||
expires_delta: timedelta | None = None,
|
||||
) -> str:
|
||||
"""
|
||||
创建刷新令牌
|
||||
|
||||
Args:
|
||||
subject: 令牌主体(通常是用户ID)
|
||||
expires_delta: 过期时间增量
|
||||
|
||||
Returns:
|
||||
JWT 刷新令牌
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
if expires_delta:
|
||||
expire = now + expires_delta
|
||||
else:
|
||||
expire = now + timedelta(days=settings.refresh_token_expire_days)
|
||||
|
||||
payload = {
|
||||
"sub": str(subject),
|
||||
"iat": now,
|
||||
"exp": expire,
|
||||
"type": "refresh",
|
||||
}
|
||||
|
||||
return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
|
||||
|
||||
|
||||
def decode_token(token: str) -> dict[str, Any]:
|
||||
"""
|
||||
解码并验证 JWT 令牌
|
||||
|
||||
Args:
|
||||
token: JWT 令牌字符串
|
||||
|
||||
Returns:
|
||||
解码后的负载数据
|
||||
|
||||
Raises:
|
||||
jwt.InvalidTokenError: 令牌无效
|
||||
jwt.ExpiredSignatureError: 令牌已过期
|
||||
"""
|
||||
return jwt.decode(
|
||||
token,
|
||||
settings.secret_key,
|
||||
algorithms=[settings.algorithm],
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user