Files
SatoNano/app/services/balance.py
2026-01-06 23:49:23 +08:00

935 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
余额服务
处理余额相关的业务逻辑。
设计说明:
- 所有金额操作使用整数单位units避免浮点精度问题
- 扣款操作使用行级锁(悲观锁)确保原子性
- 充值操作使用乐观锁,配合重试机制
- 每笔操作都记录交易流水
预扣款流程(内部方法,用于耗时付费操作):
1. pre_authorize() - 预扣款冻结金额快速释放锁返回交易ID
2. 执行耗时的付费操作使用交易ID追踪
3. confirm() 或 cancel() - 根据操作结果确认或取消
推荐使用上下文管理器 deduction_context() 自动处理确认/取消。
"""
import logging
from contextlib import asynccontextmanager
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, AsyncIterator, Callable, Awaitable, TypeVar
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import (
AppException,
ResourceNotFoundError,
ValidationError,
)
from app.models.balance import (
UserBalance,
BalanceTransaction,
TransactionType,
TransactionStatus,
)
from app.repositories.balance import BalanceRepository, TransactionRepository
logger = logging.getLogger(__name__)
T = TypeVar("T")
class InsufficientBalanceError(AppException):
"""余额不足"""
def __init__(self, required: int, available: int):
super().__init__(
f"余额不足,需要 {required / 1000:.2f},当前可用 {available / 1000:.2f}",
"INSUFFICIENT_BALANCE",
{"required_units": required, "available_units": available},
)
class DuplicateTransactionError(AppException):
"""重复交易"""
def __init__(self, idempotency_key: str):
super().__init__(
"该交易已处理",
"DUPLICATE_TRANSACTION",
{"idempotency_key": idempotency_key},
)
class ConcurrencyError(AppException):
"""并发冲突"""
def __init__(self):
super().__init__(
"操作冲突,请重试",
"CONCURRENCY_ERROR",
)
class TransactionNotFoundError(AppException):
"""交易不存在"""
def __init__(self, transaction_id: str):
super().__init__(
"交易记录不存在",
"TRANSACTION_NOT_FOUND",
{"transaction_id": transaction_id},
)
class TransactionStateError(AppException):
"""交易状态错误"""
def __init__(self, transaction_id: str, current_status: str, expected_status: str = "pending"):
super().__init__(
f"交易状态无效:当前 {current_status},预期 {expected_status}",
"TRANSACTION_STATE_ERROR",
{
"transaction_id": transaction_id,
"current_status": current_status,
"expected_status": expected_status,
},
)
@dataclass
class PreAuthResult:
"""
预授权结果
包含交易ID和相关信息用于后续确认或取消操作。
"""
transaction_id: str
"""交易ID用于后续 confirm/cancel 操作"""
user_id: str
"""用户ID"""
amount_units: int
"""预扣款金额(单位额度)"""
frozen_at: datetime
"""冻结时间"""
@property
def amount_display(self) -> str:
"""显示金额2位小数"""
return f"{self.amount_units / 1000:.2f}"
@dataclass
class DeductionResult:
"""
扣款结果
包含扣款操作的完整信息。
"""
transaction_id: str
"""交易ID"""
status: TransactionStatus
"""交易状态"""
amount_units: int
"""实际扣款金额(单位额度)"""
balance_before: int
"""扣款前余额"""
balance_after: int
"""扣款后余额"""
@property
def success(self) -> bool:
"""是否扣款成功"""
return self.status == TransactionStatus.COMPLETED
@property
def amount_display(self) -> str:
"""显示金额"""
return f"{abs(self.amount_units) / 1000:.2f}"
@property
def balance_before_display(self) -> str:
"""显示扣款前余额"""
return f"{self.balance_before / 1000:.2f}"
@property
def balance_after_display(self) -> str:
"""显示扣款后余额"""
return f"{self.balance_after / 1000:.2f}"
class BalanceService:
"""余额服务"""
# 乐观锁最大重试次数
MAX_RETRIES = 3
def __init__(self, session: AsyncSession):
"""
初始化余额服务
Args:
session: 数据库会话
"""
self.session = session
self.balance_repo = BalanceRepository(session)
self.transaction_repo = TransactionRepository(session)
# ============================================================
# 余额查询
# ============================================================
async def get_balance(self, user_id: str) -> UserBalance:
"""
获取用户余额
如果用户没有余额账户,自动创建一个。
Args:
user_id: 用户 ID
Returns:
余额账户
"""
balance = await self.balance_repo.get_or_create(user_id)
await self.balance_repo.commit()
return balance
async def get_balance_detail(self, user_id: str) -> dict[str, Any]:
"""
获取用户余额详情
Args:
user_id: 用户 ID
Returns:
余额详情字典
"""
balance = await self.get_balance(user_id)
return {
"user_id": balance.user_id,
"balance_units": balance.balance,
"frozen_units": balance.frozen_balance,
"available_units": balance.available_balance,
"total_recharged_units": balance.total_recharged,
"total_consumed_units": balance.total_consumed,
}
async def get_transactions(
self,
user_id: str,
*,
offset: int = 0,
limit: int = 20,
transaction_type: TransactionType | None = None,
) -> tuple[list[BalanceTransaction], int]:
"""
获取用户交易记录
Args:
user_id: 用户 ID
offset: 偏移量
limit: 限制数量
transaction_type: 交易类型过滤
Returns:
(交易记录列表, 总数)
"""
transactions = await self.transaction_repo.get_by_user_id(
user_id,
offset=offset,
limit=limit,
transaction_type=transaction_type,
)
total = await self.transaction_repo.count_by_user_id(
user_id,
transaction_type=transaction_type,
)
return transactions, total
# ============================================================
# 扣款操作(使用行级锁 - 悲观锁)
# ============================================================
async def deduct(
self,
user_id: str,
amount_units: int,
*,
reference_type: str | None = None,
reference_id: str | None = None,
description: str | None = None,
idempotency_key: str | None = None,
) -> BalanceTransaction:
"""
扣款
使用行级锁确保原子性,防止并发扣款导致余额变负。
Args:
user_id: 用户 ID
amount_units: 扣款金额(单位额度,正数)
reference_type: 关联业务类型
reference_id: 关联业务 ID
description: 交易描述
idempotency_key: 幂等键
Returns:
交易记录
Raises:
InsufficientBalanceError: 余额不足
DuplicateTransactionError: 重复交易
"""
if amount_units <= 0:
raise ValidationError("扣款金额必须大于 0")
# 检查幂等性
if idempotency_key:
existing = await self.transaction_repo.get_by_idempotency_key(
idempotency_key
)
if existing:
raise DuplicateTransactionError(idempotency_key)
# 获取余额账户并加锁
balance = await self.balance_repo.get_or_create_for_update(user_id)
# 检查可用余额
if balance.available_balance < amount_units:
raise InsufficientBalanceError(amount_units, balance.available_balance)
# 记录扣款前余额
balance_before = balance.balance
# 执行扣款
balance.balance -= amount_units
balance.total_consumed += amount_units
balance.version += 1
# 创建交易记录
transaction = await self.transaction_repo.create(
user_id=user_id,
balance_account_id=balance.id,
transaction_type=TransactionType.DEDUCTION,
status=TransactionStatus.COMPLETED,
amount=-amount_units, # 负数表示支出
balance_before=balance_before,
balance_after=balance.balance,
reference_type=reference_type,
reference_id=reference_id,
description=description,
idempotency_key=idempotency_key,
)
await self.balance_repo.commit()
logger.info(
f"用户 {user_id} 扣款成功: {amount_units} 单位, "
f"余额 {balance_before} -> {balance.balance}"
)
return transaction
# ============================================================
# 预扣款流程(内部方法,用于耗时付费操作)
# ============================================================
async def pre_authorize(
self,
user_id: str,
amount_units: int,
*,
reference_type: str | None = None,
reference_id: str | None = None,
description: str | None = None,
) -> PreAuthResult:
"""
预授权扣款(内部方法)
冻结指定金额快速释放数据库锁返回交易ID供后续操作使用。
此方法设计用于耗时的付费操作场景。
使用流程:
1. 调用 pre_authorize() 获取 PreAuthResult
2. 执行可能失败的耗时操作
3. 根据操作结果调用 confirm() 或 cancel()
推荐使用 deduction_context() 上下文管理器自动处理。
Args:
user_id: 用户 ID
amount_units: 扣款金额(单位额度,正数)
reference_type: 关联业务类型(如 api_call, service
reference_id: 关联业务 ID
description: 交易描述
Returns:
PreAuthResult: 预授权结果包含交易ID
Raises:
InsufficientBalanceError: 余额不足
ValidationError: 参数无效
"""
if amount_units <= 0:
raise ValidationError("预扣款金额必须大于 0")
# 获取余额账户并加锁(短暂持有)
balance = await self.balance_repo.get_or_create_for_update(user_id)
# 检查可用余额
if balance.available_balance < amount_units:
raise InsufficientBalanceError(amount_units, balance.available_balance)
now = datetime.now(timezone.utc)
# 执行冻结
balance.frozen_balance += amount_units
balance.version += 1
# 创建待处理交易记录
transaction = await self.transaction_repo.create(
user_id=user_id,
balance_account_id=balance.id,
transaction_type=TransactionType.DEDUCTION,
status=TransactionStatus.PENDING,
amount=-amount_units,
balance_before=balance.balance,
balance_after=balance.balance, # 尚未实际扣款
reference_type=reference_type,
reference_id=reference_id,
description=description,
remark=f"预授权冻结: {amount_units} 单位",
)
# 快速提交释放锁
await self.balance_repo.commit()
logger.info(
f"用户 {user_id} 预授权成功: {amount_units} 单位, "
f"交易ID: {transaction.id}"
)
return PreAuthResult(
transaction_id=transaction.id,
user_id=user_id,
amount_units=amount_units,
frozen_at=now,
)
async def confirm(
self,
transaction_id: str,
*,
actual_amount_units: int | None = None,
) -> DeductionResult:
"""
确认预授权扣款(内部方法)
将预冻结的金额实际扣除。支持部分扣款。
Args:
transaction_id: 预授权交易 ID
actual_amount_units: 实际扣款金额(可选,用于部分扣款,默认全额)
Returns:
DeductionResult: 扣款结果
Raises:
TransactionNotFoundError: 交易不存在
TransactionStateError: 交易状态不是 PENDING
ValidationError: 参数无效
"""
transaction = await self.transaction_repo.get_by_id(transaction_id)
if not transaction:
raise TransactionNotFoundError(transaction_id)
if transaction.status != TransactionStatus.PENDING:
raise TransactionStateError(
transaction_id,
transaction.status.value,
)
# 获取余额账户并加锁
balance = await self.balance_repo.get_by_user_id_for_update(transaction.user_id)
if not balance:
raise ResourceNotFoundError("余额账户不存在")
frozen_amount = abs(transaction.amount)
# 确定实际扣款金额
if actual_amount_units is not None:
if actual_amount_units <= 0:
raise ValidationError("实际扣款金额必须大于 0")
if actual_amount_units > frozen_amount:
raise ValidationError(
f"实际扣款金额 ({actual_amount_units}) 不能超过预授权金额 ({frozen_amount})"
)
deduct_amount = actual_amount_units
else:
deduct_amount = frozen_amount
# 检查冻结金额
if balance.frozen_balance < frozen_amount:
raise ValidationError("冻结金额不足,可能已被其他操作修改")
balance_before = balance.balance
# 执行扣款:解冻全部,扣除实际金额
balance.frozen_balance -= frozen_amount
balance.balance -= deduct_amount
balance.total_consumed += deduct_amount
balance.version += 1
# 更新交易记录
transaction.status = TransactionStatus.COMPLETED
transaction.amount = -deduct_amount # 更新为实际扣款金额
transaction.balance_after = balance.balance
await self.balance_repo.commit()
logger.info(
f"用户 {transaction.user_id} 确认扣款: {deduct_amount} 单位, "
f"余额 {balance_before} -> {balance.balance}"
)
return DeductionResult(
transaction_id=transaction.id,
status=TransactionStatus.COMPLETED,
amount_units=deduct_amount,
balance_before=balance_before,
balance_after=balance.balance,
)
async def cancel(
self,
transaction_id: str,
*,
reason: str | None = None,
) -> DeductionResult:
"""
取消预授权扣款(内部方法)
解冻预授权的金额,退回用户可用余额。
Args:
transaction_id: 预授权交易 ID
reason: 取消原因(可选,记录在日志中)
Returns:
DeductionResult: 取消结果
Raises:
TransactionNotFoundError: 交易不存在
TransactionStateError: 交易状态不是 PENDING
"""
transaction = await self.transaction_repo.get_by_id(transaction_id)
if not transaction:
raise TransactionNotFoundError(transaction_id)
if transaction.status != TransactionStatus.PENDING:
raise TransactionStateError(
transaction_id,
transaction.status.value,
)
# 获取余额账户并加锁
balance = await self.balance_repo.get_by_user_id_for_update(transaction.user_id)
if not balance:
raise ResourceNotFoundError("余额账户不存在")
frozen_amount = abs(transaction.amount)
# 解冻
balance.frozen_balance -= frozen_amount
balance.version += 1
# 更新交易记录
transaction.status = TransactionStatus.CANCELLED
if reason:
transaction.remark = f"{transaction.remark or ''}; 取消原因: {reason}"
await self.balance_repo.commit()
logger.info(
f"用户 {transaction.user_id} 取消预授权: {frozen_amount} 单位"
+ (f", 原因: {reason}" if reason else "")
)
return DeductionResult(
transaction_id=transaction.id,
status=TransactionStatus.CANCELLED,
amount_units=0, # 实际未扣款
balance_before=balance.balance,
balance_after=balance.balance,
)
@asynccontextmanager
async def deduction_context(
self,
user_id: str,
amount_units: int,
*,
reference_type: str | None = None,
reference_id: str | None = None,
description: str | None = None,
auto_cancel_on_error: bool = True,
) -> AsyncIterator[PreAuthResult]:
"""
扣款上下文管理器(推荐使用)
提供简便的预扣款流程,自动处理确认和取消。
异常时自动取消预授权,退回冻结金额。
用法示例:
```python
async with balance_service.deduction_context(
user_id,
1000, # 扣款金额(单位额度)
reference_type="api_call",
description="API调用费用",
) as pre_auth:
# pre_auth.transaction_id 可用于追踪
# 执行可能失败的耗时操作
result = await call_external_api()
if not result.success:
raise Exception("API 调用失败")
# 成功退出时自动确认扣款
# 异常退出时自动取消预授权(如果 auto_cancel_on_error=True
```
Args:
user_id: 用户 ID
amount_units: 扣款金额(单位额度)
reference_type: 关联业务类型
reference_id: 关联业务 ID
description: 交易描述
auto_cancel_on_error: 异常时是否自动取消(默认 True
Yields:
PreAuthResult: 预授权结果包含交易ID
Raises:
InsufficientBalanceError: 余额不足
"""
# 第一阶段:预授权
pre_auth = await self.pre_authorize(
user_id,
amount_units,
reference_type=reference_type,
reference_id=reference_id,
description=description,
)
try:
yield pre_auth
# 正常退出:确认扣款
await self.confirm(pre_auth.transaction_id)
except Exception as e:
# 异常退出:取消预授权
if auto_cancel_on_error:
try:
await self.cancel(
pre_auth.transaction_id,
reason=f"操作失败: {str(e)[:200]}",
)
except Exception as cancel_error:
logger.error(
f"取消预授权失败: {pre_auth.transaction_id}, "
f"错误: {cancel_error}"
)
raise
async def execute_with_deduction(
self,
user_id: str,
amount_units: int,
operation: Callable[[PreAuthResult], Awaitable[T]],
*,
reference_type: str | None = None,
reference_id: str | None = None,
description: str | None = None,
) -> tuple[DeductionResult, T]:
"""
执行带扣款的操作(函数式接口)
预扣款后执行指定操作,根据操作结果自动确认或取消。
用法示例:
```python
async def call_api(pre_auth: PreAuthResult):
return await external_api.call(
transaction_id=pre_auth.transaction_id,
amount=pre_auth.amount_display,
)
deduction_result, api_result = await balance_service.execute_with_deduction(
user_id,
1000,
call_api,
reference_type="api_call",
)
```
Args:
user_id: 用户 ID
amount_units: 扣款金额(单位额度)
operation: 要执行的异步操作,接收 PreAuthResult 参数
reference_type: 关联业务类型
reference_id: 关联业务 ID
description: 交易描述
Returns:
(DeductionResult, operation返回值): 扣款结果和操作结果
Raises:
InsufficientBalanceError: 余额不足
Exception: 操作抛出的异常(预授权会自动取消)
"""
pre_auth = await self.pre_authorize(
user_id,
amount_units,
reference_type=reference_type,
reference_id=reference_id,
description=description,
)
try:
# 执行操作
result = await operation(pre_auth)
# 成功:确认扣款
deduction_result = await self.confirm(pre_auth.transaction_id)
return deduction_result, result
except Exception as e:
# 失败:取消预授权
try:
await self.cancel(
pre_auth.transaction_id,
reason=f"操作失败: {str(e)[:200]}",
)
except Exception as cancel_error:
logger.error(
f"取消预授权失败: {pre_auth.transaction_id}, "
f"错误: {cancel_error}"
)
raise
# ============================================================
# 兼容方法(保留旧接口)
# ============================================================
async def deduct_with_freeze(
self,
user_id: str,
amount_units: int,
*,
reference_type: str | None = None,
reference_id: str | None = None,
description: str | None = None,
) -> str:
"""
冻结并预扣款(兼容方法,推荐使用 pre_authorize
Returns:
交易ID
"""
result = await self.pre_authorize(
user_id,
amount_units,
reference_type=reference_type,
reference_id=reference_id,
description=description,
)
return result.transaction_id
async def confirm_frozen_deduction(self, transaction_id: str) -> BalanceTransaction:
"""
确认冻结扣款(兼容方法,推荐使用 confirm
"""
await self.confirm(transaction_id)
transaction = await self.transaction_repo.get_by_id(transaction_id)
return transaction # type: ignore
async def cancel_frozen_deduction(self, transaction_id: str) -> BalanceTransaction:
"""
取消冻结扣款(兼容方法,推荐使用 cancel
"""
await self.cancel(transaction_id)
transaction = await self.transaction_repo.get_by_id(transaction_id)
return transaction # type: ignore
# ============================================================
# 充值操作(使用乐观锁 + 重试)
# ============================================================
async def recharge(
self,
user_id: str,
amount_units: int,
*,
reference_type: str | None = None,
reference_id: str | None = None,
description: str | None = None,
idempotency_key: str | None = None,
) -> BalanceTransaction:
"""
充值
使用乐观锁,配合重试机制处理并发冲突。
Args:
user_id: 用户 ID
amount_units: 充值金额(单位额度,正数)
reference_type: 关联业务类型
reference_id: 关联业务 ID
description: 交易描述
idempotency_key: 幂等键
Returns:
交易记录
Raises:
DuplicateTransactionError: 重复交易
ConcurrencyError: 并发冲突(重试失败)
"""
if amount_units <= 0:
raise ValidationError("充值金额必须大于 0")
# 检查幂等性
if idempotency_key:
existing = await self.transaction_repo.get_by_idempotency_key(
idempotency_key
)
if existing:
raise DuplicateTransactionError(idempotency_key)
# 乐观锁重试
for attempt in range(self.MAX_RETRIES):
balance = await self.balance_repo.get_or_create(user_id)
balance_before = balance.balance
# 尝试更新余额
success = await self.balance_repo.update_balance_optimistic(
balance,
amount_units,
is_recharge=True,
)
if success:
# 创建交易记录
transaction = await self.transaction_repo.create(
user_id=user_id,
balance_account_id=balance.id,
transaction_type=TransactionType.RECHARGE,
status=TransactionStatus.COMPLETED,
amount=amount_units, # 正数表示收入
balance_before=balance_before,
balance_after=balance.balance,
reference_type=reference_type,
reference_id=reference_id,
description=description,
idempotency_key=idempotency_key,
)
await self.balance_repo.commit()
logger.info(
f"用户 {user_id} 充值成功: {amount_units} 单位, "
f"余额 {balance_before} -> {balance.balance}"
)
return transaction
# 冲突,重试
logger.warning(
f"用户 {user_id} 充值冲突,重试 {attempt + 1}/{self.MAX_RETRIES}"
)
await self.balance_repo.rollback()
# 重试失败
raise ConcurrencyError()
# ============================================================
# 管理员操作
# ============================================================
async def admin_adjust(
self,
user_id: str,
amount_units: int,
*,
operator_id: str,
reason: str,
) -> BalanceTransaction:
"""
管理员调整余额
Args:
user_id: 目标用户 ID
amount_units: 调整金额(正数增加,负数减少)
operator_id: 操作人 ID
reason: 调整原因
Returns:
交易记录
Raises:
InsufficientBalanceError: 减少金额时余额不足
"""
if amount_units == 0:
raise ValidationError("调整金额不能为 0")
# 获取余额账户并加锁
balance = await self.balance_repo.get_or_create_for_update(user_id)
# 减少时检查余额
if amount_units < 0 and balance.available_balance < abs(amount_units):
raise InsufficientBalanceError(
abs(amount_units), balance.available_balance
)
balance_before = balance.balance
# 执行调整
balance.balance += amount_units
if amount_units > 0:
balance.total_recharged += amount_units
balance.version += 1
# 创建交易记录
transaction = await self.transaction_repo.create(
user_id=user_id,
balance_account_id=balance.id,
transaction_type=TransactionType.ADJUSTMENT,
status=TransactionStatus.COMPLETED,
amount=amount_units,
balance_before=balance_before,
balance_after=balance.balance,
description=reason,
operator_id=operator_id,
remark=f"管理员调整: {reason}",
)
await self.balance_repo.commit()
logger.info(
f"管理员 {operator_id} 调整用户 {user_id} 余额: {amount_units} 单位, "
f"原因: {reason}"
)
return transaction