""" 用户服务 处理用户相关的业务逻辑。 """ from datetime import datetime, timezone from sqlalchemy.ext.asyncio import AsyncSession from app.core.exceptions import ( UserAlreadyExistsError, UserNotFoundError, ) from app.core.security import hash_password from app.models.user import User from app.repositories.user import UserRepository from app.schemas.user import UserCreate, UserUpdate class UserService: """用户服务""" def __init__(self, session: AsyncSession): """ 初始化用户服务 Args: session: 数据库会话 """ self.session = session self.user_repo = UserRepository(session) async def create_user(self, user_data: UserCreate) -> User: """ 创建新用户 Args: user_data: 用户注册数据 Returns: 新创建的用户 Raises: UserAlreadyExistsError: 用户名或邮箱已存在 """ # 检查用户名是否已存在 if await self.user_repo.exists_by_username(user_data.username): raise UserAlreadyExistsError("用户名") # 检查邮箱是否已存在 if user_data.email and await self.user_repo.exists_by_email(user_data.email): raise UserAlreadyExistsError("邮箱") # 创建用户 user = await self.user_repo.create( username=user_data.username.lower(), email=user_data.email.lower() if user_data.email else None, nickname=user_data.nickname, hashed_password=hash_password(user_data.password), ) await self.user_repo.commit() return user async def get_user_by_id(self, user_id: str) -> User: """ 通过 ID 获取用户 Args: user_id: 用户 ID Returns: 用户对象 Raises: UserNotFoundError: 用户不存在 """ user = await self.user_repo.get_by_id(user_id) if not user: raise UserNotFoundError(user_id) return user async def get_user_by_username(self, username: str) -> User | None: """ 通过用户名获取用户 Args: username: 用户名 Returns: 用户对象或 None """ return await self.user_repo.get_by_username(username) async def update_user( self, user_id: str, update_data: UserUpdate, ) -> User: """ 更新用户信息 Args: user_id: 用户 ID update_data: 更新数据 Returns: 更新后的用户 Raises: UserNotFoundError: 用户不存在 UserAlreadyExistsError: 邮箱已被使用 """ user = await self.get_user_by_id(user_id) # 检查邮箱是否被其他用户使用 if update_data.email: existing_user = await self.user_repo.get_by_email(update_data.email) if existing_user and existing_user.id != user_id: raise UserAlreadyExistsError("邮箱") # 准备更新数据 update_dict = update_data.model_dump(exclude_unset=True) if update_dict.get("email"): update_dict["email"] = update_dict["email"].lower() # 更新用户 user = await self.user_repo.update(user, **update_dict) await self.user_repo.commit() return user async def update_last_login(self, user: User) -> None: """ 更新用户最后登录时间 Args: user: 用户对象 """ await self.user_repo.update( user, last_login_at=datetime.now(timezone.utc), ) await self.user_repo.commit() async def deactivate_user(self, user_id: str) -> User: """ 禁用用户账户 Args: user_id: 用户 ID Returns: 更新后的用户 """ user = await self.get_user_by_id(user_id) user = await self.user_repo.update(user, is_active=False) await self.user_repo.commit() return user async def activate_user(self, user_id: str) -> User: """ 激活用户账户 Args: user_id: 用户 ID Returns: 更新后的用户 """ user = await self.get_user_by_id(user_id) user = await self.user_repo.update(user, is_active=True) await self.user_repo.commit() return user