Files
deep2048/tools/cleanup.py
2025-07-23 07:04:10 +08:00

281 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
项目清理工具
清理临时文件、旧数据目录和不必要的文件
"""
import os
import shutil
from pathlib import Path
import argparse
class ProjectCleaner:
"""项目清理器"""
def __init__(self, project_root: Path = None):
"""
初始化清理器
Args:
project_root: 项目根目录
"""
self.project_root = project_root or Path(__file__).parent.parent
# 要清理的目录模式
self.temp_dirs = [
# 旧命名的数据目录
"*_data",
"*_logs",
"*_checkpoints",
"training_data",
"demo_training_data",
"test_mcts_data",
"benchmark_training_data",
"gameplay_training_data",
"demo_mcts_training",
"test_batch_data",
"test_l0_data",
# Python缓存
"__pycache__",
".pytest_cache",
# 临时文件
"*.tmp",
"*.temp",
"*.bak",
"*.backup",
]
# 要清理的文件模式
self.temp_files = [
"*.pyc",
"*.pyo",
"*.log",
"*.pkl",
"*.pickle",
"*.prof",
"*.profile",
"mcts_*.png",
]
def scan_cleanup_targets(self) -> dict:
"""扫描需要清理的目标"""
targets = {
'directories': [],
'files': [],
'total_size': 0
}
# 扫描目录
for pattern in self.temp_dirs:
for path in self.project_root.glob(pattern):
if path.is_dir():
size = self._get_dir_size(path)
targets['directories'].append({
'path': path,
'size': size,
'pattern': pattern
})
targets['total_size'] += size
# 扫描文件
for pattern in self.temp_files:
for path in self.project_root.rglob(pattern):
if path.is_file():
size = path.stat().st_size
targets['files'].append({
'path': path,
'size': size,
'pattern': pattern
})
targets['total_size'] += size
return targets
def _get_dir_size(self, path: Path) -> int:
"""计算目录大小"""
total_size = 0
try:
for item in path.rglob('*'):
if item.is_file():
total_size += item.stat().st_size
except (OSError, PermissionError):
pass
return total_size
def _format_size(self, size_bytes: int) -> str:
"""格式化文件大小"""
for unit in ['B', 'KB', 'MB', 'GB']:
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} TB"
def preview_cleanup(self) -> dict:
"""预览清理操作"""
targets = self.scan_cleanup_targets()
print("🔍 扫描清理目标...")
print("=" * 50)
if targets['directories']:
print(f"📁 目录 ({len(targets['directories'])} 个):")
for item in targets['directories']:
rel_path = item['path'].relative_to(self.project_root)
size_str = self._format_size(item['size'])
print(f" {rel_path} ({size_str})")
if targets['files']:
print(f"\n📄 文件 ({len(targets['files'])} 个):")
# 按大小排序显示前10个最大的文件
sorted_files = sorted(targets['files'], key=lambda x: x['size'], reverse=True)
for item in sorted_files[:10]:
rel_path = item['path'].relative_to(self.project_root)
size_str = self._format_size(item['size'])
print(f" {rel_path} ({size_str})")
if len(targets['files']) > 10:
print(f" ... 还有 {len(targets['files']) - 10} 个文件")
total_size_str = self._format_size(targets['total_size'])
print(f"\n💾 总大小: {total_size_str}")
return targets
def clean_targets(self, targets: dict, dry_run: bool = False) -> dict:
"""执行清理操作"""
results = {
'cleaned_dirs': 0,
'cleaned_files': 0,
'freed_space': 0,
'errors': []
}
action = "预览" if dry_run else "清理"
print(f"\n🧹 {action}清理操作...")
print("=" * 50)
# 清理目录
for item in targets['directories']:
try:
if not dry_run:
shutil.rmtree(item['path'])
rel_path = item['path'].relative_to(self.project_root)
size_str = self._format_size(item['size'])
print(f"{'[预览]' if dry_run else ''} 删除目录: {rel_path} ({size_str})")
results['cleaned_dirs'] += 1
results['freed_space'] += item['size']
except Exception as e:
error_msg = f"删除目录失败 {item['path']}: {e}"
results['errors'].append(error_msg)
print(f"{error_msg}")
# 清理文件
for item in targets['files']:
try:
if not dry_run:
item['path'].unlink()
rel_path = item['path'].relative_to(self.project_root)
size_str = self._format_size(item['size'])
print(f"{'[预览]' if dry_run else ''} 删除文件: {rel_path} ({size_str})")
results['cleaned_files'] += 1
results['freed_space'] += item['size']
except Exception as e:
error_msg = f"删除文件失败 {item['path']}: {e}"
results['errors'].append(error_msg)
print(f"{error_msg}")
return results
def clean_project(self, dry_run: bool = False, interactive: bool = True) -> dict:
"""清理项目"""
print("🧹 Deep2048 项目清理工具")
print("=" * 50)
# 扫描目标
targets = self.preview_cleanup()
if targets['total_size'] == 0:
print("\n✨ 项目已经很干净了!")
return {'cleaned_dirs': 0, 'cleaned_files': 0, 'freed_space': 0, 'errors': []}
# 交互式确认
if interactive and not dry_run:
total_size_str = self._format_size(targets['total_size'])
response = input(f"\n❓ 确定要清理这些文件吗?(将释放 {total_size_str}) [y/N]: ")
if response.lower() not in ['y', 'yes']:
print("❌ 清理操作已取消")
return {'cleaned_dirs': 0, 'cleaned_files': 0, 'freed_space': 0, 'errors': []}
# 执行清理
results = self.clean_targets(targets, dry_run)
# 显示结果
print(f"\n📊 清理结果:")
print(f" 清理目录: {results['cleaned_dirs']}")
print(f" 清理文件: {results['cleaned_files']}")
print(f" 释放空间: {self._format_size(results['freed_space'])}")
if results['errors']:
print(f" 错误: {len(results['errors'])}")
for error in results['errors']:
print(f" {error}")
if not dry_run and results['freed_space'] > 0:
print(f"\n✅ 清理完成!")
return results
def main():
"""主函数"""
parser = argparse.ArgumentParser(description="Deep2048项目清理工具")
parser.add_argument("--dry-run", action="store_true", help="预览模式,不实际删除文件")
parser.add_argument("--yes", "-y", action="store_true", help="自动确认,不询问")
parser.add_argument("--project-root", help="项目根目录路径")
args = parser.parse_args()
# 确定项目根目录
if args.project_root:
project_root = Path(args.project_root)
else:
project_root = Path(__file__).parent.parent
if not project_root.exists():
print(f"❌ 项目根目录不存在: {project_root}")
return 1
# 创建清理器
cleaner = ProjectCleaner(project_root)
try:
# 执行清理
results = cleaner.clean_project(
dry_run=args.dry_run,
interactive=not args.yes
)
return 0 if not results['errors'] else 1
except KeyboardInterrupt:
print("\n❌ 用户中断清理操作")
return 1
except Exception as e:
print(f"❌ 清理过程中出现错误: {e}")
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
exit(main())