""" 余额服务 处理余额相关的业务逻辑。 设计说明: - 所有金额操作使用整数单位(units),避免浮点精度问题 - 扣款操作使用行级锁(悲观锁)确保原子性 - 充值操作使用乐观锁,配合重试机制 - 每笔操作都记录交易流水 预扣款流程(内部方法,用于耗时付费操作): 1. pre_authorize() - 预扣款,冻结金额,快速释放锁,返回交易ID 2. 执行耗时的付费操作(使用交易ID追踪) 3. confirm() 或 cancel() - 根据操作结果确认或取消 推荐使用上下文管理器 deduction_context() 自动处理确认/取消。 """ import logging from contextlib import asynccontextmanager from dataclasses import dataclass from datetime import datetime, timezone from typing import Any, AsyncIterator, Callable, Awaitable, TypeVar from sqlalchemy.ext.asyncio import AsyncSession from app.core.exceptions import ( AppException, ResourceNotFoundError, ValidationError, ) from app.models.balance import ( UserBalance, BalanceTransaction, TransactionType, TransactionStatus, ) from app.repositories.balance import BalanceRepository, TransactionRepository logger = logging.getLogger(__name__) T = TypeVar("T") class InsufficientBalanceError(AppException): """余额不足""" def __init__(self, required: int, available: int): super().__init__( f"余额不足,需要 {required / 1000:.2f},当前可用 {available / 1000:.2f}", "INSUFFICIENT_BALANCE", {"required_units": required, "available_units": available}, ) class DuplicateTransactionError(AppException): """重复交易""" def __init__(self, idempotency_key: str): super().__init__( "该交易已处理", "DUPLICATE_TRANSACTION", {"idempotency_key": idempotency_key}, ) class ConcurrencyError(AppException): """并发冲突""" def __init__(self): super().__init__( "操作冲突,请重试", "CONCURRENCY_ERROR", ) class TransactionNotFoundError(AppException): """交易不存在""" def __init__(self, transaction_id: str): super().__init__( "交易记录不存在", "TRANSACTION_NOT_FOUND", {"transaction_id": transaction_id}, ) class TransactionStateError(AppException): """交易状态错误""" def __init__(self, transaction_id: str, current_status: str, expected_status: str = "pending"): super().__init__( f"交易状态无效:当前 {current_status},预期 {expected_status}", "TRANSACTION_STATE_ERROR", { "transaction_id": transaction_id, "current_status": current_status, "expected_status": expected_status, }, ) @dataclass class PreAuthResult: """ 预授权结果 包含交易ID和相关信息,用于后续确认或取消操作。 """ transaction_id: str """交易ID,用于后续 confirm/cancel 操作""" user_id: str """用户ID""" amount_units: int """预扣款金额(单位额度)""" frozen_at: datetime """冻结时间""" @property def amount_display(self) -> str: """显示金额(2位小数)""" return f"{self.amount_units / 1000:.2f}" @dataclass class DeductionResult: """ 扣款结果 包含扣款操作的完整信息。 """ transaction_id: str """交易ID""" status: TransactionStatus """交易状态""" amount_units: int """实际扣款金额(单位额度)""" balance_before: int """扣款前余额""" balance_after: int """扣款后余额""" @property def success(self) -> bool: """是否扣款成功""" return self.status == TransactionStatus.COMPLETED @property def amount_display(self) -> str: """显示金额""" return f"{abs(self.amount_units) / 1000:.2f}" @property def balance_before_display(self) -> str: """显示扣款前余额""" return f"{self.balance_before / 1000:.2f}" @property def balance_after_display(self) -> str: """显示扣款后余额""" return f"{self.balance_after / 1000:.2f}" class BalanceService: """余额服务""" # 乐观锁最大重试次数 MAX_RETRIES = 3 def __init__(self, session: AsyncSession): """ 初始化余额服务 Args: session: 数据库会话 """ self.session = session self.balance_repo = BalanceRepository(session) self.transaction_repo = TransactionRepository(session) # ============================================================ # 余额查询 # ============================================================ async def get_balance(self, user_id: str) -> UserBalance: """ 获取用户余额 如果用户没有余额账户,自动创建一个。 Args: user_id: 用户 ID Returns: 余额账户 """ balance = await self.balance_repo.get_or_create(user_id) await self.balance_repo.commit() return balance async def get_balance_detail(self, user_id: str) -> dict[str, Any]: """ 获取用户余额详情 Args: user_id: 用户 ID Returns: 余额详情字典 """ balance = await self.get_balance(user_id) return { "user_id": balance.user_id, "balance_units": balance.balance, "frozen_units": balance.frozen_balance, "available_units": balance.available_balance, "total_recharged_units": balance.total_recharged, "total_consumed_units": balance.total_consumed, } async def get_transactions( self, user_id: str, *, offset: int = 0, limit: int = 20, transaction_type: TransactionType | None = None, ) -> tuple[list[BalanceTransaction], int]: """ 获取用户交易记录 Args: user_id: 用户 ID offset: 偏移量 limit: 限制数量 transaction_type: 交易类型过滤 Returns: (交易记录列表, 总数) """ transactions = await self.transaction_repo.get_by_user_id( user_id, offset=offset, limit=limit, transaction_type=transaction_type, ) total = await self.transaction_repo.count_by_user_id( user_id, transaction_type=transaction_type, ) return transactions, total # ============================================================ # 扣款操作(使用行级锁 - 悲观锁) # ============================================================ async def deduct( self, user_id: str, amount_units: int, *, reference_type: str | None = None, reference_id: str | None = None, description: str | None = None, idempotency_key: str | None = None, ) -> BalanceTransaction: """ 扣款 使用行级锁确保原子性,防止并发扣款导致余额变负。 Args: user_id: 用户 ID amount_units: 扣款金额(单位额度,正数) reference_type: 关联业务类型 reference_id: 关联业务 ID description: 交易描述 idempotency_key: 幂等键 Returns: 交易记录 Raises: InsufficientBalanceError: 余额不足 DuplicateTransactionError: 重复交易 """ if amount_units <= 0: raise ValidationError("扣款金额必须大于 0") # 检查幂等性 if idempotency_key: existing = await self.transaction_repo.get_by_idempotency_key( idempotency_key ) if existing: raise DuplicateTransactionError(idempotency_key) # 获取余额账户并加锁 balance = await self.balance_repo.get_or_create_for_update(user_id) # 检查可用余额 if balance.available_balance < amount_units: raise InsufficientBalanceError(amount_units, balance.available_balance) # 记录扣款前余额 balance_before = balance.balance # 执行扣款 balance.balance -= amount_units balance.total_consumed += amount_units balance.version += 1 # 创建交易记录 transaction = await self.transaction_repo.create( user_id=user_id, balance_account_id=balance.id, transaction_type=TransactionType.DEDUCTION, status=TransactionStatus.COMPLETED, amount=-amount_units, # 负数表示支出 balance_before=balance_before, balance_after=balance.balance, reference_type=reference_type, reference_id=reference_id, description=description, idempotency_key=idempotency_key, ) await self.balance_repo.commit() logger.info( f"用户 {user_id} 扣款成功: {amount_units} 单位, " f"余额 {balance_before} -> {balance.balance}" ) return transaction # ============================================================ # 预扣款流程(内部方法,用于耗时付费操作) # ============================================================ async def pre_authorize( self, user_id: str, amount_units: int, *, reference_type: str | None = None, reference_id: str | None = None, description: str | None = None, ) -> PreAuthResult: """ 预授权扣款(内部方法) 冻结指定金额,快速释放数据库锁,返回交易ID供后续操作使用。 此方法设计用于耗时的付费操作场景。 使用流程: 1. 调用 pre_authorize() 获取 PreAuthResult 2. 执行可能失败的耗时操作 3. 根据操作结果调用 confirm() 或 cancel() 推荐使用 deduction_context() 上下文管理器自动处理。 Args: user_id: 用户 ID amount_units: 扣款金额(单位额度,正数) reference_type: 关联业务类型(如 api_call, service) reference_id: 关联业务 ID description: 交易描述 Returns: PreAuthResult: 预授权结果,包含交易ID Raises: InsufficientBalanceError: 余额不足 ValidationError: 参数无效 """ if amount_units <= 0: raise ValidationError("预扣款金额必须大于 0") # 获取余额账户并加锁(短暂持有) balance = await self.balance_repo.get_or_create_for_update(user_id) # 检查可用余额 if balance.available_balance < amount_units: raise InsufficientBalanceError(amount_units, balance.available_balance) now = datetime.now(timezone.utc) # 执行冻结 balance.frozen_balance += amount_units balance.version += 1 # 创建待处理交易记录 transaction = await self.transaction_repo.create( user_id=user_id, balance_account_id=balance.id, transaction_type=TransactionType.DEDUCTION, status=TransactionStatus.PENDING, amount=-amount_units, balance_before=balance.balance, balance_after=balance.balance, # 尚未实际扣款 reference_type=reference_type, reference_id=reference_id, description=description, remark=f"预授权冻结: {amount_units} 单位", ) # 快速提交释放锁 await self.balance_repo.commit() logger.info( f"用户 {user_id} 预授权成功: {amount_units} 单位, " f"交易ID: {transaction.id}" ) return PreAuthResult( transaction_id=transaction.id, user_id=user_id, amount_units=amount_units, frozen_at=now, ) async def confirm( self, transaction_id: str, *, actual_amount_units: int | None = None, ) -> DeductionResult: """ 确认预授权扣款(内部方法) 将预冻结的金额实际扣除。支持部分扣款。 Args: transaction_id: 预授权交易 ID actual_amount_units: 实际扣款金额(可选,用于部分扣款,默认全额) Returns: DeductionResult: 扣款结果 Raises: TransactionNotFoundError: 交易不存在 TransactionStateError: 交易状态不是 PENDING ValidationError: 参数无效 """ transaction = await self.transaction_repo.get_by_id(transaction_id) if not transaction: raise TransactionNotFoundError(transaction_id) if transaction.status != TransactionStatus.PENDING: raise TransactionStateError( transaction_id, transaction.status.value, ) # 获取余额账户并加锁 balance = await self.balance_repo.get_by_user_id_for_update(transaction.user_id) if not balance: raise ResourceNotFoundError("余额账户不存在") frozen_amount = abs(transaction.amount) # 确定实际扣款金额 if actual_amount_units is not None: if actual_amount_units <= 0: raise ValidationError("实际扣款金额必须大于 0") if actual_amount_units > frozen_amount: raise ValidationError( f"实际扣款金额 ({actual_amount_units}) 不能超过预授权金额 ({frozen_amount})" ) deduct_amount = actual_amount_units else: deduct_amount = frozen_amount # 检查冻结金额 if balance.frozen_balance < frozen_amount: raise ValidationError("冻结金额不足,可能已被其他操作修改") balance_before = balance.balance # 执行扣款:解冻全部,扣除实际金额 balance.frozen_balance -= frozen_amount balance.balance -= deduct_amount balance.total_consumed += deduct_amount balance.version += 1 # 更新交易记录 transaction.status = TransactionStatus.COMPLETED transaction.amount = -deduct_amount # 更新为实际扣款金额 transaction.balance_after = balance.balance await self.balance_repo.commit() logger.info( f"用户 {transaction.user_id} 确认扣款: {deduct_amount} 单位, " f"余额 {balance_before} -> {balance.balance}" ) return DeductionResult( transaction_id=transaction.id, status=TransactionStatus.COMPLETED, amount_units=deduct_amount, balance_before=balance_before, balance_after=balance.balance, ) async def cancel( self, transaction_id: str, *, reason: str | None = None, ) -> DeductionResult: """ 取消预授权扣款(内部方法) 解冻预授权的金额,退回用户可用余额。 Args: transaction_id: 预授权交易 ID reason: 取消原因(可选,记录在日志中) Returns: DeductionResult: 取消结果 Raises: TransactionNotFoundError: 交易不存在 TransactionStateError: 交易状态不是 PENDING """ transaction = await self.transaction_repo.get_by_id(transaction_id) if not transaction: raise TransactionNotFoundError(transaction_id) if transaction.status != TransactionStatus.PENDING: raise TransactionStateError( transaction_id, transaction.status.value, ) # 获取余额账户并加锁 balance = await self.balance_repo.get_by_user_id_for_update(transaction.user_id) if not balance: raise ResourceNotFoundError("余额账户不存在") frozen_amount = abs(transaction.amount) # 解冻 balance.frozen_balance -= frozen_amount balance.version += 1 # 更新交易记录 transaction.status = TransactionStatus.CANCELLED if reason: transaction.remark = f"{transaction.remark or ''}; 取消原因: {reason}" await self.balance_repo.commit() logger.info( f"用户 {transaction.user_id} 取消预授权: {frozen_amount} 单位" + (f", 原因: {reason}" if reason else "") ) return DeductionResult( transaction_id=transaction.id, status=TransactionStatus.CANCELLED, amount_units=0, # 实际未扣款 balance_before=balance.balance, balance_after=balance.balance, ) @asynccontextmanager async def deduction_context( self, user_id: str, amount_units: int, *, reference_type: str | None = None, reference_id: str | None = None, description: str | None = None, auto_cancel_on_error: bool = True, ) -> AsyncIterator[PreAuthResult]: """ 扣款上下文管理器(推荐使用) 提供简便的预扣款流程,自动处理确认和取消。 异常时自动取消预授权,退回冻结金额。 用法示例: ```python async with balance_service.deduction_context( user_id, 1000, # 扣款金额(单位额度) reference_type="api_call", description="API调用费用", ) as pre_auth: # pre_auth.transaction_id 可用于追踪 # 执行可能失败的耗时操作 result = await call_external_api() if not result.success: raise Exception("API 调用失败") # 成功退出时自动确认扣款 # 异常退出时自动取消预授权(如果 auto_cancel_on_error=True) ``` Args: user_id: 用户 ID amount_units: 扣款金额(单位额度) reference_type: 关联业务类型 reference_id: 关联业务 ID description: 交易描述 auto_cancel_on_error: 异常时是否自动取消(默认 True) Yields: PreAuthResult: 预授权结果,包含交易ID Raises: InsufficientBalanceError: 余额不足 """ # 第一阶段:预授权 pre_auth = await self.pre_authorize( user_id, amount_units, reference_type=reference_type, reference_id=reference_id, description=description, ) try: yield pre_auth # 正常退出:确认扣款 await self.confirm(pre_auth.transaction_id) except Exception as e: # 异常退出:取消预授权 if auto_cancel_on_error: try: await self.cancel( pre_auth.transaction_id, reason=f"操作失败: {str(e)[:200]}", ) except Exception as cancel_error: logger.error( f"取消预授权失败: {pre_auth.transaction_id}, " f"错误: {cancel_error}" ) raise async def execute_with_deduction( self, user_id: str, amount_units: int, operation: Callable[[PreAuthResult], Awaitable[T]], *, reference_type: str | None = None, reference_id: str | None = None, description: str | None = None, ) -> tuple[DeductionResult, T]: """ 执行带扣款的操作(函数式接口) 预扣款后执行指定操作,根据操作结果自动确认或取消。 用法示例: ```python async def call_api(pre_auth: PreAuthResult): return await external_api.call( transaction_id=pre_auth.transaction_id, amount=pre_auth.amount_display, ) deduction_result, api_result = await balance_service.execute_with_deduction( user_id, 1000, call_api, reference_type="api_call", ) ``` Args: user_id: 用户 ID amount_units: 扣款金额(单位额度) operation: 要执行的异步操作,接收 PreAuthResult 参数 reference_type: 关联业务类型 reference_id: 关联业务 ID description: 交易描述 Returns: (DeductionResult, operation返回值): 扣款结果和操作结果 Raises: InsufficientBalanceError: 余额不足 Exception: 操作抛出的异常(预授权会自动取消) """ pre_auth = await self.pre_authorize( user_id, amount_units, reference_type=reference_type, reference_id=reference_id, description=description, ) try: # 执行操作 result = await operation(pre_auth) # 成功:确认扣款 deduction_result = await self.confirm(pre_auth.transaction_id) return deduction_result, result except Exception as e: # 失败:取消预授权 try: await self.cancel( pre_auth.transaction_id, reason=f"操作失败: {str(e)[:200]}", ) except Exception as cancel_error: logger.error( f"取消预授权失败: {pre_auth.transaction_id}, " f"错误: {cancel_error}" ) raise # ============================================================ # 兼容方法(保留旧接口) # ============================================================ async def deduct_with_freeze( self, user_id: str, amount_units: int, *, reference_type: str | None = None, reference_id: str | None = None, description: str | None = None, ) -> str: """ 冻结并预扣款(兼容方法,推荐使用 pre_authorize) Returns: 交易ID """ result = await self.pre_authorize( user_id, amount_units, reference_type=reference_type, reference_id=reference_id, description=description, ) return result.transaction_id async def confirm_frozen_deduction(self, transaction_id: str) -> BalanceTransaction: """ 确认冻结扣款(兼容方法,推荐使用 confirm) """ await self.confirm(transaction_id) transaction = await self.transaction_repo.get_by_id(transaction_id) return transaction # type: ignore async def cancel_frozen_deduction(self, transaction_id: str) -> BalanceTransaction: """ 取消冻结扣款(兼容方法,推荐使用 cancel) """ await self.cancel(transaction_id) transaction = await self.transaction_repo.get_by_id(transaction_id) return transaction # type: ignore # ============================================================ # 充值操作(使用乐观锁 + 重试) # ============================================================ async def recharge( self, user_id: str, amount_units: int, *, reference_type: str | None = None, reference_id: str | None = None, description: str | None = None, idempotency_key: str | None = None, ) -> BalanceTransaction: """ 充值 使用乐观锁,配合重试机制处理并发冲突。 Args: user_id: 用户 ID amount_units: 充值金额(单位额度,正数) reference_type: 关联业务类型 reference_id: 关联业务 ID description: 交易描述 idempotency_key: 幂等键 Returns: 交易记录 Raises: DuplicateTransactionError: 重复交易 ConcurrencyError: 并发冲突(重试失败) """ if amount_units <= 0: raise ValidationError("充值金额必须大于 0") # 检查幂等性 if idempotency_key: existing = await self.transaction_repo.get_by_idempotency_key( idempotency_key ) if existing: raise DuplicateTransactionError(idempotency_key) # 乐观锁重试 for attempt in range(self.MAX_RETRIES): balance = await self.balance_repo.get_or_create(user_id) balance_before = balance.balance # 尝试更新余额 success = await self.balance_repo.update_balance_optimistic( balance, amount_units, is_recharge=True, ) if success: # 创建交易记录 transaction = await self.transaction_repo.create( user_id=user_id, balance_account_id=balance.id, transaction_type=TransactionType.RECHARGE, status=TransactionStatus.COMPLETED, amount=amount_units, # 正数表示收入 balance_before=balance_before, balance_after=balance.balance, reference_type=reference_type, reference_id=reference_id, description=description, idempotency_key=idempotency_key, ) await self.balance_repo.commit() logger.info( f"用户 {user_id} 充值成功: {amount_units} 单位, " f"余额 {balance_before} -> {balance.balance}" ) return transaction # 冲突,重试 logger.warning( f"用户 {user_id} 充值冲突,重试 {attempt + 1}/{self.MAX_RETRIES}" ) await self.balance_repo.rollback() # 重试失败 raise ConcurrencyError() # ============================================================ # 管理员操作 # ============================================================ async def admin_adjust( self, user_id: str, amount_units: int, *, operator_id: str, reason: str, ) -> BalanceTransaction: """ 管理员调整余额 Args: user_id: 目标用户 ID amount_units: 调整金额(正数增加,负数减少) operator_id: 操作人 ID reason: 调整原因 Returns: 交易记录 Raises: InsufficientBalanceError: 减少金额时余额不足 """ if amount_units == 0: raise ValidationError("调整金额不能为 0") # 获取余额账户并加锁 balance = await self.balance_repo.get_or_create_for_update(user_id) # 减少时检查余额 if amount_units < 0 and balance.available_balance < abs(amount_units): raise InsufficientBalanceError( abs(amount_units), balance.available_balance ) balance_before = balance.balance # 执行调整 balance.balance += amount_units if amount_units > 0: balance.total_recharged += amount_units balance.version += 1 # 创建交易记录 transaction = await self.transaction_repo.create( user_id=user_id, balance_account_id=balance.id, transaction_type=TransactionType.ADJUSTMENT, status=TransactionStatus.COMPLETED, amount=amount_units, balance_before=balance_before, balance_after=balance.balance, description=reason, operator_id=operator_id, remark=f"管理员调整: {reason}", ) await self.balance_repo.commit() logger.info( f"管理员 {operator_id} 调整用户 {user_id} 余额: {amount_units} 单位, " f"原因: {reason}" ) return transaction