571 lines
16 KiB
Python
571 lines
16 KiB
Python
"""
|
|
兑换码服务
|
|
|
|
处理兑换码相关的业务逻辑。
|
|
|
|
设计说明:
|
|
- 兑换操作使用行级锁确保原子性
|
|
- 支持批量生成和导入导出
|
|
- 记录完整的使用日志
|
|
"""
|
|
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from typing import Any
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.exceptions import (
|
|
AppException,
|
|
ResourceNotFoundError,
|
|
ValidationError,
|
|
)
|
|
from app.models.redeem_code import (
|
|
RedeemCode,
|
|
RedeemCodeBatch,
|
|
RedeemCodeUsageLog,
|
|
RedeemCodeStatus,
|
|
generate_redeem_code,
|
|
)
|
|
from app.models.balance import TransactionType
|
|
from app.repositories.redeem_code import (
|
|
RedeemCodeRepository,
|
|
RedeemCodeBatchRepository,
|
|
RedeemCodeUsageLogRepository,
|
|
)
|
|
from app.services.balance import BalanceService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class RedeemCodeNotFoundError(AppException):
|
|
"""兑换码不存在"""
|
|
|
|
def __init__(self, code: str):
|
|
super().__init__(
|
|
"兑换码不存在",
|
|
"REDEEM_CODE_NOT_FOUND",
|
|
{"code": code},
|
|
)
|
|
|
|
|
|
class RedeemCodeInvalidError(AppException):
|
|
"""兑换码无效"""
|
|
|
|
def __init__(self, code: str, reason: str):
|
|
super().__init__(
|
|
f"兑换码无效: {reason}",
|
|
"REDEEM_CODE_INVALID",
|
|
{"code": code, "reason": reason},
|
|
)
|
|
|
|
|
|
class RedeemCodeExpiredError(AppException):
|
|
"""兑换码已过期"""
|
|
|
|
def __init__(self, code: str):
|
|
super().__init__(
|
|
"兑换码已过期",
|
|
"REDEEM_CODE_EXPIRED",
|
|
{"code": code},
|
|
)
|
|
|
|
|
|
class RedeemCodeUsedError(AppException):
|
|
"""兑换码已使用"""
|
|
|
|
def __init__(self, code: str):
|
|
super().__init__(
|
|
"兑换码已使用",
|
|
"REDEEM_CODE_USED",
|
|
{"code": code},
|
|
)
|
|
|
|
|
|
class RedeemCodeDisabledError(AppException):
|
|
"""兑换码已禁用"""
|
|
|
|
def __init__(self, code: str):
|
|
super().__init__(
|
|
"兑换码已禁用",
|
|
"REDEEM_CODE_DISABLED",
|
|
{"code": code},
|
|
)
|
|
|
|
|
|
class RedeemCodeService:
|
|
"""兑换码服务"""
|
|
|
|
def __init__(self, session: AsyncSession):
|
|
"""
|
|
初始化兑换码服务
|
|
|
|
Args:
|
|
session: 数据库会话
|
|
"""
|
|
self.session = session
|
|
self.code_repo = RedeemCodeRepository(session)
|
|
self.batch_repo = RedeemCodeBatchRepository(session)
|
|
self.log_repo = RedeemCodeUsageLogRepository(session)
|
|
self.balance_service = BalanceService(session)
|
|
|
|
# ============================================================
|
|
# 用户兑换
|
|
# ============================================================
|
|
|
|
async def redeem(
|
|
self,
|
|
user_id: str,
|
|
code: str,
|
|
*,
|
|
ip_address: str | None = None,
|
|
user_agent: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
用户兑换余额
|
|
|
|
使用行级锁确保原子性,防止并发兑换。
|
|
|
|
Args:
|
|
user_id: 用户 ID
|
|
code: 兑换码
|
|
ip_address: 客户端 IP
|
|
user_agent: User Agent
|
|
|
|
Returns:
|
|
兑换结果
|
|
|
|
Raises:
|
|
RedeemCodeNotFoundError: 兑换码不存在
|
|
RedeemCodeInvalidError: 兑换码无效
|
|
"""
|
|
# 标准化兑换码
|
|
normalized_code = code.strip().upper().replace(" ", "")
|
|
|
|
# 获取兑换码并加锁
|
|
redeem_code = await self.code_repo.get_by_code_for_update(normalized_code)
|
|
|
|
if not redeem_code:
|
|
raise RedeemCodeNotFoundError(normalized_code)
|
|
|
|
# 验证兑换码状态
|
|
self._validate_redeem_code(redeem_code)
|
|
|
|
# 获取用户当前余额
|
|
balance = await self.balance_service.get_balance(user_id)
|
|
balance_before = balance.balance
|
|
|
|
# 执行充值
|
|
transaction = await self.balance_service.recharge(
|
|
user_id,
|
|
redeem_code.face_value,
|
|
reference_type="redeem_code",
|
|
reference_id=redeem_code.id,
|
|
description=f"兑换码充值: {redeem_code.code}",
|
|
)
|
|
|
|
# 标记兑换码已使用
|
|
await self.code_repo.mark_as_used(redeem_code, user_id)
|
|
|
|
# 更新批次统计
|
|
if redeem_code.batch_id:
|
|
await self.batch_repo.increment_used_count(redeem_code.batch_id)
|
|
|
|
# 记录使用日志
|
|
await self.log_repo.create(
|
|
redeem_code_id=redeem_code.id,
|
|
user_id=user_id,
|
|
transaction_id=transaction.id,
|
|
code_snapshot=redeem_code.code,
|
|
face_value=redeem_code.face_value,
|
|
ip_address=ip_address,
|
|
user_agent=user_agent,
|
|
)
|
|
|
|
await self.code_repo.commit()
|
|
|
|
logger.info(
|
|
f"用户 {user_id} 兑换成功: {redeem_code.code}, "
|
|
f"面值 {redeem_code.face_value} 单位"
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "兑换成功",
|
|
"face_value": f"{redeem_code.face_value / 1000:.2f}",
|
|
"balance_before": f"{balance_before / 1000:.2f}",
|
|
"balance_after": f"{transaction.balance_after / 1000:.2f}",
|
|
}
|
|
|
|
def _validate_redeem_code(self, code: RedeemCode) -> None:
|
|
"""验证兑换码有效性"""
|
|
if code.status == RedeemCodeStatus.DISABLED:
|
|
raise RedeemCodeDisabledError(code.code)
|
|
|
|
if code.status == RedeemCodeStatus.USED or code.used_count >= code.max_uses:
|
|
raise RedeemCodeUsedError(code.code)
|
|
|
|
if code.expires_at and code.expires_at < datetime.now(timezone.utc):
|
|
raise RedeemCodeExpiredError(code.code)
|
|
|
|
# ============================================================
|
|
# 管理员:批量生成
|
|
# ============================================================
|
|
|
|
async def create_batch(
|
|
self,
|
|
name: str,
|
|
face_value_units: int,
|
|
count: int,
|
|
*,
|
|
created_by: str,
|
|
description: str | None = None,
|
|
max_uses: int = 1,
|
|
expires_at: datetime | None = None,
|
|
) -> RedeemCodeBatch:
|
|
"""
|
|
创建兑换码批次
|
|
|
|
批量生成指定数量的兑换码。
|
|
|
|
Args:
|
|
name: 批次名称
|
|
face_value_units: 面值(单位额度)
|
|
count: 生成数量
|
|
created_by: 创建者 ID
|
|
description: 批次描述
|
|
max_uses: 每个兑换码最大使用次数
|
|
expires_at: 过期时间
|
|
|
|
Returns:
|
|
创建的批次
|
|
"""
|
|
if face_value_units <= 0:
|
|
raise ValidationError("面值必须大于 0")
|
|
if count <= 0 or count > 10000:
|
|
raise ValidationError("数量必须在 1-10000 之间")
|
|
|
|
# 创建批次
|
|
batch = await self.batch_repo.create(
|
|
name=name,
|
|
description=description,
|
|
face_value=face_value_units,
|
|
total_count=count,
|
|
created_by=created_by,
|
|
)
|
|
|
|
# 批量生成兑换码
|
|
codes_data = []
|
|
generated_codes = set()
|
|
|
|
while len(codes_data) < count:
|
|
new_code = generate_redeem_code()
|
|
if new_code not in generated_codes:
|
|
generated_codes.add(new_code)
|
|
codes_data.append({
|
|
"code": new_code,
|
|
"batch_id": batch.id,
|
|
"face_value": face_value_units,
|
|
"max_uses": max_uses,
|
|
"expires_at": expires_at,
|
|
"created_by": created_by,
|
|
})
|
|
|
|
await self.code_repo.bulk_create(codes_data)
|
|
await self.batch_repo.commit()
|
|
|
|
logger.info(
|
|
f"管理员 {created_by} 创建批次 '{name}': "
|
|
f"{count} 个兑换码, 面值 {face_value_units} 单位"
|
|
)
|
|
|
|
return batch
|
|
|
|
# ============================================================
|
|
# 管理员:导入
|
|
# ============================================================
|
|
|
|
async def import_codes(
|
|
self,
|
|
codes: list[dict[str, Any]],
|
|
*,
|
|
created_by: str,
|
|
batch_name: str | None = None,
|
|
) -> dict[str, Any]:
|
|
"""
|
|
导入兑换码
|
|
|
|
Args:
|
|
codes: 兑换码数据列表
|
|
created_by: 创建者 ID
|
|
batch_name: 批次名称(可选)
|
|
|
|
Returns:
|
|
导入结果
|
|
"""
|
|
batch_id = None
|
|
|
|
# 创建批次(如果指定)
|
|
if batch_name:
|
|
# 计算总面值用于批次记录
|
|
total_face_value = sum(c.get("face_value_units", 0) for c in codes)
|
|
batch = await self.batch_repo.create(
|
|
name=batch_name,
|
|
description="导入批次",
|
|
face_value=total_face_value // len(codes) if codes else 0,
|
|
total_count=len(codes),
|
|
created_by=created_by,
|
|
)
|
|
batch_id = batch.id
|
|
|
|
success_count = 0
|
|
failed_codes = []
|
|
|
|
for code_data in codes:
|
|
try:
|
|
# 检查兑换码是否已存在
|
|
existing = await self.code_repo.get_by_code(code_data["code"])
|
|
if existing:
|
|
failed_codes.append(code_data["code"])
|
|
continue
|
|
|
|
# 创建兑换码
|
|
await self.code_repo.create(
|
|
code=code_data["code"].strip().upper(),
|
|
batch_id=batch_id,
|
|
face_value=code_data["face_value_units"],
|
|
max_uses=code_data.get("max_uses", 1),
|
|
expires_at=code_data.get("expires_at"),
|
|
remark=code_data.get("remark"),
|
|
created_by=created_by,
|
|
)
|
|
success_count += 1
|
|
|
|
except Exception as e:
|
|
logger.warning(f"导入兑换码失败: {code_data.get('code')}, {e}")
|
|
failed_codes.append(code_data.get("code", "unknown"))
|
|
|
|
await self.code_repo.commit()
|
|
|
|
logger.info(
|
|
f"管理员 {created_by} 导入兑换码: "
|
|
f"成功 {success_count}, 失败 {len(failed_codes)}"
|
|
)
|
|
|
|
return {
|
|
"success_count": success_count,
|
|
"failed_count": len(failed_codes),
|
|
"failed_codes": failed_codes,
|
|
"batch_id": batch_id,
|
|
}
|
|
|
|
# ============================================================
|
|
# 管理员:导出
|
|
# ============================================================
|
|
|
|
async def export_codes(
|
|
self,
|
|
*,
|
|
batch_id: str | None = None,
|
|
status: RedeemCodeStatus | None = None,
|
|
limit: int = 10000,
|
|
) -> list[dict[str, Any]]:
|
|
"""
|
|
导出兑换码
|
|
|
|
Args:
|
|
batch_id: 批次 ID 过滤
|
|
status: 状态过滤
|
|
limit: 最大导出数量
|
|
|
|
Returns:
|
|
兑换码数据列表
|
|
"""
|
|
codes = await self.code_repo.get_all_with_filters(
|
|
batch_id=batch_id,
|
|
status=status,
|
|
limit=limit,
|
|
)
|
|
|
|
result = []
|
|
for code in codes:
|
|
result.append({
|
|
"code": code.code,
|
|
"face_value": f"{code.face_value / 1000:.2f}",
|
|
"status": code.status.value,
|
|
"max_uses": code.max_uses,
|
|
"used_count": code.used_count,
|
|
"expires_at": code.expires_at.isoformat() if code.expires_at else None,
|
|
"created_at": code.created_at.isoformat(),
|
|
"used_at": code.used_at.isoformat() if code.used_at else None,
|
|
"used_by": code.used_by,
|
|
})
|
|
|
|
return result
|
|
|
|
# ============================================================
|
|
# 管理员:查询
|
|
# ============================================================
|
|
|
|
async def get_codes(
|
|
self,
|
|
*,
|
|
offset: int = 0,
|
|
limit: int = 20,
|
|
status: RedeemCodeStatus | None = None,
|
|
batch_id: str | None = None,
|
|
code_like: str | None = None,
|
|
created_after: datetime | None = None,
|
|
created_before: datetime | None = None,
|
|
) -> tuple[list[RedeemCode], int]:
|
|
"""
|
|
获取兑换码列表
|
|
|
|
Returns:
|
|
(兑换码列表, 总数)
|
|
"""
|
|
codes = await self.code_repo.get_all_with_filters(
|
|
offset=offset,
|
|
limit=limit,
|
|
status=status,
|
|
batch_id=batch_id,
|
|
code_like=code_like,
|
|
created_after=created_after,
|
|
created_before=created_before,
|
|
)
|
|
total = await self.code_repo.count_with_filters(
|
|
status=status,
|
|
batch_id=batch_id,
|
|
code_like=code_like,
|
|
created_after=created_after,
|
|
created_before=created_before,
|
|
)
|
|
return codes, total
|
|
|
|
async def get_code_detail(self, code_id: str) -> RedeemCode:
|
|
"""
|
|
获取兑换码详情
|
|
|
|
Args:
|
|
code_id: 兑换码 ID
|
|
|
|
Returns:
|
|
兑换码记录
|
|
"""
|
|
code = await self.code_repo.get_by_id(code_id)
|
|
if not code:
|
|
raise ResourceNotFoundError("兑换码不存在", "redeem_code", code_id)
|
|
return code
|
|
|
|
async def disable_code(self, code_id: str) -> RedeemCode:
|
|
"""
|
|
禁用兑换码
|
|
|
|
Args:
|
|
code_id: 兑换码 ID
|
|
|
|
Returns:
|
|
更新后的兑换码
|
|
"""
|
|
code = await self.get_code_detail(code_id)
|
|
code = await self.code_repo.disable_code(code)
|
|
await self.code_repo.commit()
|
|
|
|
logger.info(f"兑换码已禁用: {code.code}")
|
|
return code
|
|
|
|
async def enable_code(self, code_id: str) -> RedeemCode:
|
|
"""
|
|
启用兑换码
|
|
|
|
Args:
|
|
code_id: 兑换码 ID
|
|
|
|
Returns:
|
|
更新后的兑换码
|
|
"""
|
|
code = await self.get_code_detail(code_id)
|
|
code = await self.code_repo.enable_code(code)
|
|
await self.code_repo.commit()
|
|
|
|
logger.info(f"兑换码已启用: {code.code}")
|
|
return code
|
|
|
|
# ============================================================
|
|
# 管理员:批次管理
|
|
# ============================================================
|
|
|
|
async def get_batches(
|
|
self,
|
|
*,
|
|
offset: int = 0,
|
|
limit: int = 20,
|
|
) -> tuple[list[RedeemCodeBatch], int]:
|
|
"""
|
|
获取批次列表
|
|
|
|
Returns:
|
|
(批次列表, 总数)
|
|
"""
|
|
batches = await self.batch_repo.get_all_batches(
|
|
offset=offset,
|
|
limit=limit,
|
|
)
|
|
total = await self.batch_repo.count()
|
|
return batches, total
|
|
|
|
async def get_batch_detail(self, batch_id: str) -> RedeemCodeBatch:
|
|
"""
|
|
获取批次详情
|
|
|
|
Args:
|
|
batch_id: 批次 ID
|
|
|
|
Returns:
|
|
批次记录
|
|
"""
|
|
batch = await self.batch_repo.get_by_id(batch_id)
|
|
if not batch:
|
|
raise ResourceNotFoundError("批次不存在", "batch", batch_id)
|
|
return batch
|
|
|
|
# ============================================================
|
|
# 管理员:使用日志
|
|
# ============================================================
|
|
|
|
async def get_usage_logs(
|
|
self,
|
|
*,
|
|
offset: int = 0,
|
|
limit: int = 20,
|
|
redeem_code_id: str | None = None,
|
|
user_id: str | None = None,
|
|
code_like: str | None = None,
|
|
created_after: datetime | None = None,
|
|
created_before: datetime | None = None,
|
|
) -> tuple[list[RedeemCodeUsageLog], int]:
|
|
"""
|
|
获取使用日志
|
|
|
|
Returns:
|
|
(日志列表, 总数)
|
|
"""
|
|
logs = await self.log_repo.get_all_with_filters(
|
|
offset=offset,
|
|
limit=limit,
|
|
redeem_code_id=redeem_code_id,
|
|
user_id=user_id,
|
|
code_like=code_like,
|
|
created_after=created_after,
|
|
created_before=created_before,
|
|
)
|
|
total = await self.log_repo.count_with_filters(
|
|
redeem_code_id=redeem_code_id,
|
|
user_id=user_id,
|
|
code_like=code_like,
|
|
created_after=created_after,
|
|
created_before=created_before,
|
|
)
|
|
return logs, total
|
|
|