mirror of
https://github.com/schroinerxy/Sing-box.git
synced 2026-06-21 20:35:47 +08:00
272 lines
10 KiB
Python
272 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os
|
|
import sys
|
|
import signal
|
|
import time
|
|
import stat
|
|
import subprocess
|
|
import urllib.request
|
|
import urllib.error
|
|
import http.server
|
|
import socketserver
|
|
import threading
|
|
import platform
|
|
from pathlib import Path
|
|
|
|
# 环境变量配置
|
|
PORT = int(os.environ.get('PORT', 3000)) # http服务端口
|
|
SUB_PATH = os.environ.get('SUB_PATH', 'sub') # 订阅token
|
|
config = {
|
|
'UUID': os.environ.get('UUID', 'b5c445d1-8e59-465f-af0c-1f4193d15693'), # 节点UUID,使用哪吒v1时在不不同的平台部署需要修改,否则agent会覆盖
|
|
'NEZHA_SERVER': os.environ.get('NEZHA_SERVER', ''), # 哪吒面板地址,v1格式: nezha.xxx.com:8008 v0格式: nezha.xxx.com
|
|
'NEZHA_PORT': os.environ.get('NEZHA_PORT', ''), # 哪吒v1请留空,哪吒v0 agent端口
|
|
'NEZHA_KEY': os.environ.get('NEZHA_KEY', ''), # 哪吒v1的NZ_CLIENT_SECRET或哪吒v0-agent密钥
|
|
'ARGO_DOMAIN': os.environ.get('ARGO_DOMAIN', ''), # 固定隧道域名,留空即启用临时隧道
|
|
'ARGO_AUTH': os.environ.get('ARGO_AUTH', ''), # 固定隧道token或json,留空即启用临时隧道,json获取:https://json.zone.id
|
|
'ARGO_PORT': os.environ.get('ARGO_PORT', '8001'), # argo端口 使用固定隧道token,cloudflare后台设置的端口需和这里对应
|
|
'CFIP': os.environ.get('CFIP', 'saas.sin.fan'), # 优选域名或优选ip
|
|
'CFPORT': os.environ.get('CFPORT', '443'), # 优选域名或优选ip对应端口
|
|
'NAME': os.environ.get('NAME', ''), # 节点备注
|
|
'S5_PORT': os.environ.get('S5_PORT', ''), # socks5端口,支持多端口玩具可填写,否则不动
|
|
'HY2_PORT': os.environ.get('HY2_PORT', ''), # Hy2 端口,支持多端口玩具可填写,否则不动
|
|
'TUIC_PORT': os.environ.get('TUIC_PORT', ''), # Tuic 端口,支持多端口玩具可填写,否则不动
|
|
'ANYTLS_PORT': os.environ.get('ANYTLS_PORT', ''), # AnyTLS 端口,支持多端口玩具可填写,否则不动
|
|
'REALITY_PORT': os.environ.get('REALITY_PORT', ''), # Reality 端口,支持多端口玩具可填写,否则不动
|
|
'ANYREALITY_PORT': os.environ.get('ANYREALITY_PORT', ''), # AnyReality 端口,支持多端口玩具可填写,否则不动
|
|
'CHAT_ID': os.environ.get('CHAT_ID', ''), # TG chat_id,可在https://t.me/laowang_serv00_bot 获取
|
|
'BOT_TOKEN': os.environ.get('BOT_TOKEN', ''), # TG bot_token, 使用自己的bot需要填写,使用上方的bot不用填写,不会给别人发送
|
|
'UPLOAD_URL': os.environ.get('UPLOAD_URL', ''), # 节点上传地址,需部署merge-sub订阅器项目,例如:https://merge.xxx.com
|
|
'FILE_PATH': os.environ.get('FILE_PATH', '.cache'), # sub,.txt节点存放目录
|
|
'DISABLE_ARGO': os.environ.get('DISABLE_ARGO', 'false'), # 是否禁用argo, true为禁用,false为不禁用,默认开启
|
|
}
|
|
|
|
def sleep(ms):
|
|
time.sleep(ms / 1000)
|
|
|
|
def get_architecture():
|
|
"""获取系统架构"""
|
|
arch = platform.machine().lower()
|
|
system = platform.system().lower()
|
|
|
|
if system in ['linux', 'darwin']:
|
|
if arch in ['x86_64', 'amd64']:
|
|
return 'amd64'
|
|
elif arch in ['aarch64', 'arm64']:
|
|
return 'arm64'
|
|
|
|
raise Exception(f"Unsupported architecture: {system} {arch}")
|
|
|
|
def download_file(url, dest_path):
|
|
print(f"Downloading from: {url}")
|
|
opener = urllib.request.build_opener()
|
|
opener.addheaders = [
|
|
('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'),
|
|
('Accept', '*/*'),
|
|
('Connection', 'keep-alive')
|
|
]
|
|
|
|
urllib.request.install_opener(opener)
|
|
|
|
try:
|
|
# 下载文件
|
|
urllib.request.urlretrieve(url, dest_path)
|
|
# print("Download completed!")
|
|
except urllib.error.HTTPError as e:
|
|
if os.path.exists(dest_path):
|
|
os.unlink(dest_path)
|
|
raise Exception(f"Download failed (HTTP {e.code}): {e.reason}")
|
|
except urllib.error.URLError as e:
|
|
if os.path.exists(dest_path):
|
|
os.unlink(dest_path)
|
|
raise Exception(f"Download failed (URL error): {str(e)}")
|
|
except Exception as e:
|
|
if os.path.exists(dest_path):
|
|
os.unlink(dest_path)
|
|
raise Exception(f"Download failed: {str(e)}")
|
|
|
|
def set_executable(file_path):
|
|
"""设置文件可执行权限"""
|
|
try:
|
|
current_permissions = os.stat(file_path).st_mode
|
|
os.chmod(file_path, current_permissions | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
|
|
except Exception as e:
|
|
raise Exception(f"Failed to set executable permission: {str(e)}")
|
|
|
|
def delete_file(file_path):
|
|
"""删除文件"""
|
|
try:
|
|
if os.path.exists(file_path):
|
|
os.unlink(file_path)
|
|
except Exception as e:
|
|
print(f"Failed to delete file {file_path}: {str(e)}")
|
|
|
|
class CustomHTTPRequestHandler(http.server.SimpleHTTPRequestHandler):
|
|
"""自定义HTTP请求处理器"""
|
|
|
|
def do_GET(self):
|
|
if self.path == '/':
|
|
self.handle_root()
|
|
elif self.path == f'/{SUB_PATH}':
|
|
self.handle_sub()
|
|
elif self.path == '/ps':
|
|
self.handle_ps()
|
|
else:
|
|
self.send_error(404, '404 Not Found')
|
|
|
|
def handle_root(self):
|
|
"""处理根路径请求"""
|
|
try:
|
|
html_path = os.path.join(os.path.dirname(__file__), 'index.html')
|
|
if os.path.exists(html_path):
|
|
with open(html_path, 'r', encoding='utf-8') as f:
|
|
data = f.read()
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', 'text/html')
|
|
self.end_headers()
|
|
self.wfile.write(data.encode('utf-8'))
|
|
else:
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', 'text/html; charset=utf-8')
|
|
self.end_headers()
|
|
self.wfile.write("Hello world!<br><br>You can access /{SUB_PATH}(Default: /sub) to get your nodes!".encode('utf-8'))
|
|
except Exception as e:
|
|
self.send_error(500, str(e))
|
|
|
|
def handle_sub(self):
|
|
"""处理订阅路径请求"""
|
|
sub_file_path = os.path.join(config['FILE_PATH'], 'sub.txt')
|
|
try:
|
|
if os.path.exists(sub_file_path):
|
|
with open(sub_file_path, 'r', encoding='utf-8') as f:
|
|
data = f.read()
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', 'text/plain')
|
|
self.end_headers()
|
|
self.wfile.write(data.encode('utf-8'))
|
|
else:
|
|
self.send_error(404, f'Sub file not found at: {sub_file_path}')
|
|
except Exception as e:
|
|
self.send_error(500, str(e))
|
|
|
|
def handle_ps(self):
|
|
"""处理进程列表请求"""
|
|
try:
|
|
result = subprocess.run(['ps', 'aux'], capture_output=True, text=True, check=True)
|
|
self.send_response(200)
|
|
self.send_header('Content-Type', 'text/plain')
|
|
self.end_headers()
|
|
self.wfile.write(result.stdout.encode('utf-8'))
|
|
except subprocess.CalledProcessError as e:
|
|
self.send_error(500, f'Error executing ps command: {str(e)}')
|
|
|
|
def log_message(self, format, *args):
|
|
"""覆盖日志方法,不输出访问日志"""
|
|
pass
|
|
|
|
def start_http_server():
|
|
"""启动HTTP服务器"""
|
|
handler = CustomHTTPRequestHandler
|
|
|
|
# 创建服务器,允许地址重用
|
|
socketserver.TCPServer.allow_reuse_address = True
|
|
httpd = socketserver.TCPServer(('0.0.0.0', PORT), handler)
|
|
|
|
server_thread = threading.Thread(target=httpd.serve_forever)
|
|
server_thread.daemon = True
|
|
server_thread.start()
|
|
|
|
return httpd
|
|
|
|
def cleanup(binary_path):
|
|
"""清理二进制文件"""
|
|
delete_file(binary_path)
|
|
|
|
def main():
|
|
"""主函数"""
|
|
binary_path = None
|
|
httpd = None
|
|
process = None
|
|
|
|
try:
|
|
# 启动HTTP服务器
|
|
httpd = start_http_server()
|
|
|
|
# 获取架构并下载
|
|
arch = get_architecture()
|
|
download_url = 'https://amd64.eooce.com/sbsh' if arch == 'amd64' else 'https://arm64.eooce.com/sbsh'
|
|
|
|
# print(f"Using download link: {download_url}")
|
|
binary_path = os.path.join(os.getcwd(), 'sbsh')
|
|
|
|
download_file(download_url, binary_path)
|
|
if not os.path.exists(binary_path):
|
|
raise Exception('Download failed, file does not exist at the specified path')
|
|
|
|
set_executable(binary_path)
|
|
|
|
# 准备环境变量
|
|
env = os.environ.copy()
|
|
env.update({k: str(v) for k, v in config.items()})
|
|
|
|
# 启动子进程
|
|
process = subprocess.Popen(
|
|
[binary_path],
|
|
env=env,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
universal_newlines=True,
|
|
bufsize=1
|
|
)
|
|
|
|
# 实时输出子进程日志
|
|
def log_output():
|
|
for line in process.stdout:
|
|
print(line, end='')
|
|
|
|
log_thread = threading.Thread(target=log_output)
|
|
log_thread.daemon = True
|
|
log_thread.start()
|
|
|
|
sleep(18000)
|
|
print('\nLogs will be deleted in 90 seconds, you can copy the above nodes!')
|
|
|
|
sleep(90000)
|
|
|
|
cleanup(binary_path)
|
|
|
|
# 清除控制台
|
|
os.system('clear' if os.name == 'posix' else 'cls')
|
|
|
|
print('✅ App is running')
|
|
print(f'🌐 HTTP server is running on {PORT}')
|
|
|
|
# 保持主线程运行
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down...")
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ An error occurred: {str(e)}")
|
|
if binary_path:
|
|
cleanup(binary_path)
|
|
sys.exit(1)
|
|
finally:
|
|
if process and process.poll() is None:
|
|
process.terminate()
|
|
if httpd:
|
|
httpd.shutdown()
|
|
|
|
def signal_handler(signum, frame):
|
|
print("\nReceived signal to terminate")
|
|
sys.exit(0)
|
|
|
|
if __name__ == "__main__":
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
main()
|