281 lines
9.1 KiB
Python
281 lines
9.1 KiB
Python
"""
|
||
项目清理工具
|
||
|
||
清理临时文件、旧数据目录和不必要的文件
|
||
"""
|
||
|
||
import os
|
||
import shutil
|
||
from pathlib import Path
|
||
import argparse
|
||
|
||
|
||
class ProjectCleaner:
|
||
"""项目清理器"""
|
||
|
||
def __init__(self, project_root: Path = None):
|
||
"""
|
||
初始化清理器
|
||
|
||
Args:
|
||
project_root: 项目根目录
|
||
"""
|
||
self.project_root = project_root or Path(__file__).parent.parent
|
||
|
||
# 要清理的目录模式
|
||
self.temp_dirs = [
|
||
# 旧命名的数据目录
|
||
"*_data",
|
||
"*_logs",
|
||
"*_checkpoints",
|
||
"training_data",
|
||
"demo_training_data",
|
||
"test_mcts_data",
|
||
"benchmark_training_data",
|
||
"gameplay_training_data",
|
||
"demo_mcts_training",
|
||
"test_batch_data",
|
||
"test_l0_data",
|
||
|
||
# Python缓存
|
||
"__pycache__",
|
||
".pytest_cache",
|
||
|
||
# 临时文件
|
||
"*.tmp",
|
||
"*.temp",
|
||
"*.bak",
|
||
"*.backup",
|
||
]
|
||
|
||
# 要清理的文件模式
|
||
self.temp_files = [
|
||
"*.pyc",
|
||
"*.pyo",
|
||
"*.log",
|
||
"*.pkl",
|
||
"*.pickle",
|
||
"*.prof",
|
||
"*.profile",
|
||
"mcts_*.png",
|
||
]
|
||
|
||
def scan_cleanup_targets(self) -> dict:
|
||
"""扫描需要清理的目标"""
|
||
targets = {
|
||
'directories': [],
|
||
'files': [],
|
||
'total_size': 0
|
||
}
|
||
|
||
# 扫描目录
|
||
for pattern in self.temp_dirs:
|
||
for path in self.project_root.glob(pattern):
|
||
if path.is_dir():
|
||
size = self._get_dir_size(path)
|
||
targets['directories'].append({
|
||
'path': path,
|
||
'size': size,
|
||
'pattern': pattern
|
||
})
|
||
targets['total_size'] += size
|
||
|
||
# 扫描文件
|
||
for pattern in self.temp_files:
|
||
for path in self.project_root.rglob(pattern):
|
||
if path.is_file():
|
||
size = path.stat().st_size
|
||
targets['files'].append({
|
||
'path': path,
|
||
'size': size,
|
||
'pattern': pattern
|
||
})
|
||
targets['total_size'] += size
|
||
|
||
return targets
|
||
|
||
def _get_dir_size(self, path: Path) -> int:
|
||
"""计算目录大小"""
|
||
total_size = 0
|
||
try:
|
||
for item in path.rglob('*'):
|
||
if item.is_file():
|
||
total_size += item.stat().st_size
|
||
except (OSError, PermissionError):
|
||
pass
|
||
return total_size
|
||
|
||
def _format_size(self, size_bytes: int) -> str:
|
||
"""格式化文件大小"""
|
||
for unit in ['B', 'KB', 'MB', 'GB']:
|
||
if size_bytes < 1024:
|
||
return f"{size_bytes:.1f} {unit}"
|
||
size_bytes /= 1024
|
||
return f"{size_bytes:.1f} TB"
|
||
|
||
def preview_cleanup(self) -> dict:
|
||
"""预览清理操作"""
|
||
targets = self.scan_cleanup_targets()
|
||
|
||
print("🔍 扫描清理目标...")
|
||
print("=" * 50)
|
||
|
||
if targets['directories']:
|
||
print(f"📁 目录 ({len(targets['directories'])} 个):")
|
||
for item in targets['directories']:
|
||
rel_path = item['path'].relative_to(self.project_root)
|
||
size_str = self._format_size(item['size'])
|
||
print(f" {rel_path} ({size_str})")
|
||
|
||
if targets['files']:
|
||
print(f"\n📄 文件 ({len(targets['files'])} 个):")
|
||
# 按大小排序,显示前10个最大的文件
|
||
sorted_files = sorted(targets['files'], key=lambda x: x['size'], reverse=True)
|
||
for item in sorted_files[:10]:
|
||
rel_path = item['path'].relative_to(self.project_root)
|
||
size_str = self._format_size(item['size'])
|
||
print(f" {rel_path} ({size_str})")
|
||
|
||
if len(targets['files']) > 10:
|
||
print(f" ... 还有 {len(targets['files']) - 10} 个文件")
|
||
|
||
total_size_str = self._format_size(targets['total_size'])
|
||
print(f"\n💾 总大小: {total_size_str}")
|
||
|
||
return targets
|
||
|
||
def clean_targets(self, targets: dict, dry_run: bool = False) -> dict:
|
||
"""执行清理操作"""
|
||
results = {
|
||
'cleaned_dirs': 0,
|
||
'cleaned_files': 0,
|
||
'freed_space': 0,
|
||
'errors': []
|
||
}
|
||
|
||
action = "预览" if dry_run else "清理"
|
||
print(f"\n🧹 {action}清理操作...")
|
||
print("=" * 50)
|
||
|
||
# 清理目录
|
||
for item in targets['directories']:
|
||
try:
|
||
if not dry_run:
|
||
shutil.rmtree(item['path'])
|
||
|
||
rel_path = item['path'].relative_to(self.project_root)
|
||
size_str = self._format_size(item['size'])
|
||
print(f"{'[预览]' if dry_run else '✅'} 删除目录: {rel_path} ({size_str})")
|
||
|
||
results['cleaned_dirs'] += 1
|
||
results['freed_space'] += item['size']
|
||
|
||
except Exception as e:
|
||
error_msg = f"删除目录失败 {item['path']}: {e}"
|
||
results['errors'].append(error_msg)
|
||
print(f"❌ {error_msg}")
|
||
|
||
# 清理文件
|
||
for item in targets['files']:
|
||
try:
|
||
if not dry_run:
|
||
item['path'].unlink()
|
||
|
||
rel_path = item['path'].relative_to(self.project_root)
|
||
size_str = self._format_size(item['size'])
|
||
print(f"{'[预览]' if dry_run else '✅'} 删除文件: {rel_path} ({size_str})")
|
||
|
||
results['cleaned_files'] += 1
|
||
results['freed_space'] += item['size']
|
||
|
||
except Exception as e:
|
||
error_msg = f"删除文件失败 {item['path']}: {e}"
|
||
results['errors'].append(error_msg)
|
||
print(f"❌ {error_msg}")
|
||
|
||
return results
|
||
|
||
def clean_project(self, dry_run: bool = False, interactive: bool = True) -> dict:
|
||
"""清理项目"""
|
||
print("🧹 Deep2048 项目清理工具")
|
||
print("=" * 50)
|
||
|
||
# 扫描目标
|
||
targets = self.preview_cleanup()
|
||
|
||
if targets['total_size'] == 0:
|
||
print("\n✨ 项目已经很干净了!")
|
||
return {'cleaned_dirs': 0, 'cleaned_files': 0, 'freed_space': 0, 'errors': []}
|
||
|
||
# 交互式确认
|
||
if interactive and not dry_run:
|
||
total_size_str = self._format_size(targets['total_size'])
|
||
response = input(f"\n❓ 确定要清理这些文件吗?(将释放 {total_size_str}) [y/N]: ")
|
||
if response.lower() not in ['y', 'yes']:
|
||
print("❌ 清理操作已取消")
|
||
return {'cleaned_dirs': 0, 'cleaned_files': 0, 'freed_space': 0, 'errors': []}
|
||
|
||
# 执行清理
|
||
results = self.clean_targets(targets, dry_run)
|
||
|
||
# 显示结果
|
||
print(f"\n📊 清理结果:")
|
||
print(f" 清理目录: {results['cleaned_dirs']} 个")
|
||
print(f" 清理文件: {results['cleaned_files']} 个")
|
||
print(f" 释放空间: {self._format_size(results['freed_space'])}")
|
||
|
||
if results['errors']:
|
||
print(f" 错误: {len(results['errors'])} 个")
|
||
for error in results['errors']:
|
||
print(f" {error}")
|
||
|
||
if not dry_run and results['freed_space'] > 0:
|
||
print(f"\n✅ 清理完成!")
|
||
|
||
return results
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
parser = argparse.ArgumentParser(description="Deep2048项目清理工具")
|
||
parser.add_argument("--dry-run", action="store_true", help="预览模式,不实际删除文件")
|
||
parser.add_argument("--yes", "-y", action="store_true", help="自动确认,不询问")
|
||
parser.add_argument("--project-root", help="项目根目录路径")
|
||
|
||
args = parser.parse_args()
|
||
|
||
# 确定项目根目录
|
||
if args.project_root:
|
||
project_root = Path(args.project_root)
|
||
else:
|
||
project_root = Path(__file__).parent.parent
|
||
|
||
if not project_root.exists():
|
||
print(f"❌ 项目根目录不存在: {project_root}")
|
||
return 1
|
||
|
||
# 创建清理器
|
||
cleaner = ProjectCleaner(project_root)
|
||
|
||
try:
|
||
# 执行清理
|
||
results = cleaner.clean_project(
|
||
dry_run=args.dry_run,
|
||
interactive=not args.yes
|
||
)
|
||
|
||
return 0 if not results['errors'] else 1
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n❌ 用户中断清理操作")
|
||
return 1
|
||
except Exception as e:
|
||
print(f"❌ 清理过程中出现错误: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return 1
|
||
|
||
|
||
if __name__ == "__main__":
|
||
exit(main())
|