xWork/app.py
2026-01-07 10:23:26 +08:00

151 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import Flask, render_template, request, Response, stream_with_context, jsonify, send_file
import subprocess
import os
import requests
import signal
app = Flask(__name__)
# 全局变量存储当前进程
current_process = None
# 默认配置
DEFAULT_CONFIG = {
"OLD_GITLAB_URL": os.environ.get("OLD_GITLAB_URL", "http://172.25.254.5:10088"),
"OLD_TOKEN": os.environ.get("OLD_TOKEN", "YJKiyTUEsfCpQ9yMSrwn"),
"NEW_GITLAB_URL": os.environ.get("NEW_GITLAB_URL", "http://172.23.24.8:32272"),
"NEW_TOKEN": os.environ.get("NEW_TOKEN", "glpat-jT8miNczJBRh9xRQbNoc"),
"DEFAULT_PASSWORD": os.environ.get("DEFAULT_PASSWORD", "Password123!@#"),
}
@app.route('/', methods=['GET'])
def index():
return render_template('index.html', config=DEFAULT_CONFIG)
@app.route('/check_connection', methods=['POST'])
def check_connection():
"""
检查与 GitLab 的连接状态。
接收 form-data: url, token
"""
url = request.form.get('url')
token = request.form.get('token')
if not url or not token:
return jsonify({"status": "error", "message": "URL 或 Token 为空"})
try:
# 尝试请求 version 接口 (需要认证)
# 兼容旧版本可能没有 version 权限,尝试 user 接口
target_url = f"{url.rstrip('/')}/api/v4/user"
resp = requests.get(target_url, headers={"Private-Token": token}, timeout=5)
if resp.status_code == 200:
user_data = resp.json()
return jsonify({
"status": "success",
"message": f"连接成功! 当前用户: {user_data.get('username')} ({user_data.get('name')})"
})
elif resp.status_code == 401:
return jsonify({"status": "error", "message": "认证失败: Token 无效"})
else:
return jsonify({"status": "error", "message": f"连接失败 (HTTP {resp.status_code})"})
except Exception as e:
return jsonify({"status": "error", "message": f"请求异常: {str(e)}"})
@app.route('/download_log', methods=['GET'])
def download_log():
"""下载服务器端的迁移日志文件"""
log_file = os.path.join(os.getcwd(), 'migration.log')
if os.path.exists(log_file):
return send_file(log_file, as_attachment=True, download_name='migration.log')
else:
return "Log file not found", 404
@app.route('/stop', methods=['POST'])
def stop_migration():
"""停止当前正在运行的迁移任务"""
global current_process
if current_process and current_process.poll() is None:
try:
# 发送 SIGTERM 信号
current_process.terminate()
# 也可以尝试 kill: current_process.kill()
return jsonify({"status": "success", "message": "任务已发送停止信号"})
except Exception as e:
return jsonify({"status": "error", "message": f"停止失败: {str(e)}"})
else:
return jsonify({"status": "error", "message": "当前没有正在运行的任务"})
@app.route('/run', methods=['POST'])
def run_migration():
"""启动迁移任务 (SSE 流式输出)"""
global current_process
mode = request.form.get('mode', 'all')
# 如果已有任务在运行,拒绝新任务
if current_process and current_process.poll() is None:
return jsonify({"status": "error", "message": "已有任务正在运行,请先停止"}), 400
# 仅复制必要的环境变量,避免污染
env = os.environ.copy()
env.update({
'OLD_GITLAB_URL': request.form.get('old_url', ''),
'OLD_TOKEN': request.form.get('old_token', ''),
'NEW_GITLAB_URL': request.form.get('new_url', ''),
'NEW_TOKEN': request.form.get('new_token', ''),
'DEFAULT_PASSWORD': request.form.get('default_password', ''),
'PYTHONUNBUFFERED': '1', # 强制 Python 输出不缓冲
'MAX_JOBS': '5' # 并发数配置
})
def generate():
global current_process
cmd = ["./gitlab_migration.sh", mode]
yield f"data: 🚀 开始执行迁移任务: {mode} ...\n\n"
# 使用上下文管理器确保资源释放
try:
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
env=env,
text=True,
bufsize=1 # 行缓冲
)
current_process = process
# 手动管理资源,不使用 with以便全局变量引用
try:
for line in process.stdout:
if line:
yield f"data: {line}\n\n"
return_code = process.wait()
if return_code == 0:
yield f"data: ✅ 迁移任务完成!\n\n"
elif return_code == -15: # SIGTERM
yield f"data: 🛑 迁移任务已手动停止。\n\n"
else:
yield f"data: ❌ 迁移任务失败 (Exit Code: {return_code})\n\n"
finally:
if process.stdout:
process.stdout.close()
if process.poll() is None:
process.terminate()
except Exception as e:
yield f"data: ❌ 系统错误: {str(e)}\n\n"
finally:
current_process = None
yield "event: close\ndata: closed\n\n"
return Response(stream_with_context(generate()), mimetype='text/event-stream')
if __name__ == '__main__':
# 生产环境建议关闭 debug
app.run(host='0.0.0.0', port=5000, debug=False)