Files
SatoNano/app/services/redeem_code.py
2026-01-06 23:49:23 +08:00

571 lines
16 KiB
Python

"""
兑换码服务
处理兑换码相关的业务逻辑。
设计说明:
- 兑换操作使用行级锁确保原子性
- 支持批量生成和导入导出
- 记录完整的使用日志
"""
import logging
from datetime import datetime, timezone
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.exceptions import (
AppException,
ResourceNotFoundError,
ValidationError,
)
from app.models.redeem_code import (
RedeemCode,
RedeemCodeBatch,
RedeemCodeUsageLog,
RedeemCodeStatus,
generate_redeem_code,
)
from app.models.balance import TransactionType
from app.repositories.redeem_code import (
RedeemCodeRepository,
RedeemCodeBatchRepository,
RedeemCodeUsageLogRepository,
)
from app.services.balance import BalanceService
logger = logging.getLogger(__name__)
class RedeemCodeNotFoundError(AppException):
"""兑换码不存在"""
def __init__(self, code: str):
super().__init__(
"兑换码不存在",
"REDEEM_CODE_NOT_FOUND",
{"code": code},
)
class RedeemCodeInvalidError(AppException):
"""兑换码无效"""
def __init__(self, code: str, reason: str):
super().__init__(
f"兑换码无效: {reason}",
"REDEEM_CODE_INVALID",
{"code": code, "reason": reason},
)
class RedeemCodeExpiredError(AppException):
"""兑换码已过期"""
def __init__(self, code: str):
super().__init__(
"兑换码已过期",
"REDEEM_CODE_EXPIRED",
{"code": code},
)
class RedeemCodeUsedError(AppException):
"""兑换码已使用"""
def __init__(self, code: str):
super().__init__(
"兑换码已使用",
"REDEEM_CODE_USED",
{"code": code},
)
class RedeemCodeDisabledError(AppException):
"""兑换码已禁用"""
def __init__(self, code: str):
super().__init__(
"兑换码已禁用",
"REDEEM_CODE_DISABLED",
{"code": code},
)
class RedeemCodeService:
"""兑换码服务"""
def __init__(self, session: AsyncSession):
"""
初始化兑换码服务
Args:
session: 数据库会话
"""
self.session = session
self.code_repo = RedeemCodeRepository(session)
self.batch_repo = RedeemCodeBatchRepository(session)
self.log_repo = RedeemCodeUsageLogRepository(session)
self.balance_service = BalanceService(session)
# ============================================================
# 用户兑换
# ============================================================
async def redeem(
self,
user_id: str,
code: str,
*,
ip_address: str | None = None,
user_agent: str | None = None,
) -> dict[str, Any]:
"""
用户兑换余额
使用行级锁确保原子性,防止并发兑换。
Args:
user_id: 用户 ID
code: 兑换码
ip_address: 客户端 IP
user_agent: User Agent
Returns:
兑换结果
Raises:
RedeemCodeNotFoundError: 兑换码不存在
RedeemCodeInvalidError: 兑换码无效
"""
# 标准化兑换码
normalized_code = code.strip().upper().replace(" ", "")
# 获取兑换码并加锁
redeem_code = await self.code_repo.get_by_code_for_update(normalized_code)
if not redeem_code:
raise RedeemCodeNotFoundError(normalized_code)
# 验证兑换码状态
self._validate_redeem_code(redeem_code)
# 获取用户当前余额
balance = await self.balance_service.get_balance(user_id)
balance_before = balance.balance
# 执行充值
transaction = await self.balance_service.recharge(
user_id,
redeem_code.face_value,
reference_type="redeem_code",
reference_id=redeem_code.id,
description=f"兑换码充值: {redeem_code.code}",
)
# 标记兑换码已使用
await self.code_repo.mark_as_used(redeem_code, user_id)
# 更新批次统计
if redeem_code.batch_id:
await self.batch_repo.increment_used_count(redeem_code.batch_id)
# 记录使用日志
await self.log_repo.create(
redeem_code_id=redeem_code.id,
user_id=user_id,
transaction_id=transaction.id,
code_snapshot=redeem_code.code,
face_value=redeem_code.face_value,
ip_address=ip_address,
user_agent=user_agent,
)
await self.code_repo.commit()
logger.info(
f"用户 {user_id} 兑换成功: {redeem_code.code}, "
f"面值 {redeem_code.face_value} 单位"
)
return {
"success": True,
"message": "兑换成功",
"face_value": f"{redeem_code.face_value / 1000:.2f}",
"balance_before": f"{balance_before / 1000:.2f}",
"balance_after": f"{transaction.balance_after / 1000:.2f}",
}
def _validate_redeem_code(self, code: RedeemCode) -> None:
"""验证兑换码有效性"""
if code.status == RedeemCodeStatus.DISABLED:
raise RedeemCodeDisabledError(code.code)
if code.status == RedeemCodeStatus.USED or code.used_count >= code.max_uses:
raise RedeemCodeUsedError(code.code)
if code.expires_at and code.expires_at < datetime.now(timezone.utc):
raise RedeemCodeExpiredError(code.code)
# ============================================================
# 管理员:批量生成
# ============================================================
async def create_batch(
self,
name: str,
face_value_units: int,
count: int,
*,
created_by: str,
description: str | None = None,
max_uses: int = 1,
expires_at: datetime | None = None,
) -> RedeemCodeBatch:
"""
创建兑换码批次
批量生成指定数量的兑换码。
Args:
name: 批次名称
face_value_units: 面值(单位额度)
count: 生成数量
created_by: 创建者 ID
description: 批次描述
max_uses: 每个兑换码最大使用次数
expires_at: 过期时间
Returns:
创建的批次
"""
if face_value_units <= 0:
raise ValidationError("面值必须大于 0")
if count <= 0 or count > 10000:
raise ValidationError("数量必须在 1-10000 之间")
# 创建批次
batch = await self.batch_repo.create(
name=name,
description=description,
face_value=face_value_units,
total_count=count,
created_by=created_by,
)
# 批量生成兑换码
codes_data = []
generated_codes = set()
while len(codes_data) < count:
new_code = generate_redeem_code()
if new_code not in generated_codes:
generated_codes.add(new_code)
codes_data.append({
"code": new_code,
"batch_id": batch.id,
"face_value": face_value_units,
"max_uses": max_uses,
"expires_at": expires_at,
"created_by": created_by,
})
await self.code_repo.bulk_create(codes_data)
await self.batch_repo.commit()
logger.info(
f"管理员 {created_by} 创建批次 '{name}': "
f"{count} 个兑换码, 面值 {face_value_units} 单位"
)
return batch
# ============================================================
# 管理员:导入
# ============================================================
async def import_codes(
self,
codes: list[dict[str, Any]],
*,
created_by: str,
batch_name: str | None = None,
) -> dict[str, Any]:
"""
导入兑换码
Args:
codes: 兑换码数据列表
created_by: 创建者 ID
batch_name: 批次名称(可选)
Returns:
导入结果
"""
batch_id = None
# 创建批次(如果指定)
if batch_name:
# 计算总面值用于批次记录
total_face_value = sum(c.get("face_value_units", 0) for c in codes)
batch = await self.batch_repo.create(
name=batch_name,
description="导入批次",
face_value=total_face_value // len(codes) if codes else 0,
total_count=len(codes),
created_by=created_by,
)
batch_id = batch.id
success_count = 0
failed_codes = []
for code_data in codes:
try:
# 检查兑换码是否已存在
existing = await self.code_repo.get_by_code(code_data["code"])
if existing:
failed_codes.append(code_data["code"])
continue
# 创建兑换码
await self.code_repo.create(
code=code_data["code"].strip().upper(),
batch_id=batch_id,
face_value=code_data["face_value_units"],
max_uses=code_data.get("max_uses", 1),
expires_at=code_data.get("expires_at"),
remark=code_data.get("remark"),
created_by=created_by,
)
success_count += 1
except Exception as e:
logger.warning(f"导入兑换码失败: {code_data.get('code')}, {e}")
failed_codes.append(code_data.get("code", "unknown"))
await self.code_repo.commit()
logger.info(
f"管理员 {created_by} 导入兑换码: "
f"成功 {success_count}, 失败 {len(failed_codes)}"
)
return {
"success_count": success_count,
"failed_count": len(failed_codes),
"failed_codes": failed_codes,
"batch_id": batch_id,
}
# ============================================================
# 管理员:导出
# ============================================================
async def export_codes(
self,
*,
batch_id: str | None = None,
status: RedeemCodeStatus | None = None,
limit: int = 10000,
) -> list[dict[str, Any]]:
"""
导出兑换码
Args:
batch_id: 批次 ID 过滤
status: 状态过滤
limit: 最大导出数量
Returns:
兑换码数据列表
"""
codes = await self.code_repo.get_all_with_filters(
batch_id=batch_id,
status=status,
limit=limit,
)
result = []
for code in codes:
result.append({
"code": code.code,
"face_value": f"{code.face_value / 1000:.2f}",
"status": code.status.value,
"max_uses": code.max_uses,
"used_count": code.used_count,
"expires_at": code.expires_at.isoformat() if code.expires_at else None,
"created_at": code.created_at.isoformat(),
"used_at": code.used_at.isoformat() if code.used_at else None,
"used_by": code.used_by,
})
return result
# ============================================================
# 管理员:查询
# ============================================================
async def get_codes(
self,
*,
offset: int = 0,
limit: int = 20,
status: RedeemCodeStatus | None = None,
batch_id: str | None = None,
code_like: str | None = None,
created_after: datetime | None = None,
created_before: datetime | None = None,
) -> tuple[list[RedeemCode], int]:
"""
获取兑换码列表
Returns:
(兑换码列表, 总数)
"""
codes = await self.code_repo.get_all_with_filters(
offset=offset,
limit=limit,
status=status,
batch_id=batch_id,
code_like=code_like,
created_after=created_after,
created_before=created_before,
)
total = await self.code_repo.count_with_filters(
status=status,
batch_id=batch_id,
code_like=code_like,
created_after=created_after,
created_before=created_before,
)
return codes, total
async def get_code_detail(self, code_id: str) -> RedeemCode:
"""
获取兑换码详情
Args:
code_id: 兑换码 ID
Returns:
兑换码记录
"""
code = await self.code_repo.get_by_id(code_id)
if not code:
raise ResourceNotFoundError("兑换码不存在", "redeem_code", code_id)
return code
async def disable_code(self, code_id: str) -> RedeemCode:
"""
禁用兑换码
Args:
code_id: 兑换码 ID
Returns:
更新后的兑换码
"""
code = await self.get_code_detail(code_id)
code = await self.code_repo.disable_code(code)
await self.code_repo.commit()
logger.info(f"兑换码已禁用: {code.code}")
return code
async def enable_code(self, code_id: str) -> RedeemCode:
"""
启用兑换码
Args:
code_id: 兑换码 ID
Returns:
更新后的兑换码
"""
code = await self.get_code_detail(code_id)
code = await self.code_repo.enable_code(code)
await self.code_repo.commit()
logger.info(f"兑换码已启用: {code.code}")
return code
# ============================================================
# 管理员:批次管理
# ============================================================
async def get_batches(
self,
*,
offset: int = 0,
limit: int = 20,
) -> tuple[list[RedeemCodeBatch], int]:
"""
获取批次列表
Returns:
(批次列表, 总数)
"""
batches = await self.batch_repo.get_all_batches(
offset=offset,
limit=limit,
)
total = await self.batch_repo.count()
return batches, total
async def get_batch_detail(self, batch_id: str) -> RedeemCodeBatch:
"""
获取批次详情
Args:
batch_id: 批次 ID
Returns:
批次记录
"""
batch = await self.batch_repo.get_by_id(batch_id)
if not batch:
raise ResourceNotFoundError("批次不存在", "batch", batch_id)
return batch
# ============================================================
# 管理员:使用日志
# ============================================================
async def get_usage_logs(
self,
*,
offset: int = 0,
limit: int = 20,
redeem_code_id: str | None = None,
user_id: str | None = None,
code_like: str | None = None,
created_after: datetime | None = None,
created_before: datetime | None = None,
) -> tuple[list[RedeemCodeUsageLog], int]:
"""
获取使用日志
Returns:
(日志列表, 总数)
"""
logs = await self.log_repo.get_all_with_filters(
offset=offset,
limit=limit,
redeem_code_id=redeem_code_id,
user_id=user_id,
code_like=code_like,
created_after=created_after,
created_before=created_before,
)
total = await self.log_repo.count_with_filters(
redeem_code_id=redeem_code_id,
user_id=user_id,
code_like=code_like,
created_after=created_after,
created_before=created_before,
)
return logs, total