修复无法正确获取订阅连接的bug

This commit is contained in:
ermaozi 2026-06-26 11:23:09 +08:00
parent 78676478d9
commit ededd6f138

145
main.py
View File

@ -1,3 +1,4 @@
import base64
import os import os
import re import re
import smtplib import smtplib
@ -41,28 +42,36 @@ def _extract_urls(summary):
return urls, decoded return urls, decoded
def _pick_url(urls, mode): _NODE_SCHEME_RE = re.compile(r"(?:vmess|vless|trojan|ss|ssr|hysteria2?|tuic)://")
if mode == "v2ray":
for suffix in (".txt", ".json"):
for url in urls:
if url.lower().endswith(suffix):
return url
if mode == "clash":
for suffix in (".yaml", ".yml"):
for url in urls:
if url.lower().endswith(suffix):
return url
return ""
def _pick_urls(urls, mode): def _b64decode(text):
matched = [] compact = re.sub(r"\s+", "", text)
suffixes = (".txt", ".json") if mode == "v2ray" else (".yaml", ".yml") if not compact:
for suffix in suffixes: return ""
for url in urls: try:
if url.lower().endswith(suffix) and url not in matched: # binascii.Error 是 ValueError 的子类,统一捕获即可
matched.append(url) raw = base64.b64decode(compact + "=" * (-len(compact) % 4))
return matched except ValueError:
return ""
return raw.decode("utf-8", "ignore")
def _detect_kind(text):
"""根据下载内容判断订阅类型:'clash' / 'v2ray',无法识别返回 None。"""
sample = text.strip()
if not sample:
return None
# clash 配置为 YAML包含 proxies/proxy-groups 字段
if re.search(r"^(?:proxies|proxy-groups)\s*:", sample, re.MULTILINE):
return "clash"
# v2ray 订阅为节点 URI 列表,可能是明文或 base64 编码
if _NODE_SCHEME_RE.search(sample):
return "v2ray"
decoded = _b64decode(sample)
if decoded and _NODE_SCHEME_RE.search(decoded):
return "v2ray"
return None
def _download_with_retry(urls): def _download_with_retry(urls):
@ -108,19 +117,25 @@ def _build_session():
return session return session
def _download_candidates(session, urls): def _classify_subscriptions(session, urls):
if not urls: """逐个下载候选链接并按内容判断类型,返回 {'v2ray': (req, url), 'clash': (req, url)}。"""
return None, None found = {}
for url in urls: for url in urls:
if "v2ray" in found and "clash" in found:
break
try: try:
req = session.get(url, verify=False, timeout=20) req = session.get(url, verify=False, timeout=20)
except requests.RequestException as e: except requests.RequestException as e:
write_log(f"请求失败:{url} - {e}", "WARN") write_log(f"请求失败:{url} - {e}", "WARN")
continue continue
if req.status_code in ok_code: if req.status_code not in ok_code:
return req, url write_log(f"请求失败:{url} - {req.status_code}", "WARN")
write_log(f"请求失败:{url} - {req.status_code}", "WARN") continue
return None, urls[0] kind = _detect_kind(req.text)
if kind and kind not in found:
found[kind] = (req, url)
write_log(f"识别到 {kind} 订阅:{url}", "INFO")
return found
def get_subscribe_url(): def get_subscribe_url():
dirs = './subscribe' dirs = './subscribe'
@ -161,62 +176,40 @@ def get_subscribe_url():
write_log("暂时没有可用的订阅更新", "WARN") write_log("暂时没有可用的订阅更新", "WARN")
return return
urls, decoded_summary = _extract_urls(summary) urls, _ = _extract_urls(summary)
v2ray_url = _pick_url(urls, "v2ray") # 链接已无固定后缀,需下载内容后再判断是 v2ray 还是 clash
clash_url = _pick_url(urls, "clash") classified = _classify_subscriptions(session, urls)
v2ray_candidates = _pick_urls(urls, "v2ray")
clash_candidates = _pick_urls(urls, "clash")
# 兼容旧页面结构,通用提取失败时再尝试历史规则
if not v2ray_url:
v2ray_list = re.findall(r">V2Ray/XRay -&gt; (.*?)</span>", summary)
if not v2ray_list:
v2ray_list = re.findall(r">V2Ray/XRay -> (.*?)</span>", decoded_summary)
if any(v2ray_list):
v2ray_url = v2ray_list[-1].replace('amp;', '')
if v2ray_url not in v2ray_candidates:
v2ray_candidates.append(v2ray_url)
if not clash_url:
clash_list = re.findall(r">clash -&gt; (.*?)</span>", summary)
if not clash_list:
clash_list = re.findall(r">clash -> (.*?)</span>", decoded_summary)
if any(clash_list) and not clash_list[-1].startswith("订阅地址生成失败"):
clash_url = clash_list[-1].replace('amp;', '')
if clash_url not in clash_candidates:
clash_candidates.append(clash_url)
# 获取普通订阅链接 # 获取普通订阅链接
if v2ray_url: v2ray_entry = classified.get("v2ray")
v2ray_req, used_v2ray_url = _download_candidates(session, v2ray_candidates) if v2ray_entry:
if not v2ray_req: v2ray_req, _ = v2ray_entry
cache_file = dirs + '/v2ray.txt' update_list.append(f"v2ray: {v2ray_req.status_code}")
if os.path.exists(cache_file) and os.path.getsize(cache_file) > 0: with open(dirs + '/v2ray.txt', 'w', encoding="utf-8") as f:
update_list.append("v2ray: cache") f.write(v2ray_req.text)
write_log(f"获取 v2ray 订阅失败,已保留本地缓存:{used_v2ray_url}", "WARN") else:
else: cache_file = dirs + '/v2ray.txt'
write_log(f"获取 v2ray 订阅失败:{used_v2ray_url}", "WARN") if os.path.exists(cache_file) and os.path.getsize(cache_file) > 0:
update_list.append("v2ray: cache")
write_log("未获取到 v2ray 订阅,已保留本地缓存", "WARN")
else: else:
update_list.append(f"v2ray: {v2ray_req.status_code}") write_log("未获取到 v2ray 订阅", "WARN")
with open(dirs + '/v2ray.txt', 'w', encoding="utf-8") as f:
f.write(v2ray_req.text)
# 获取clash订阅链接 # 获取clash订阅链接
if clash_url and not clash_url.startswith("订阅地址生成失败"): clash_entry = classified.get("clash")
clash_req, used_clash_url = _download_candidates(session, clash_candidates) if clash_entry:
if not clash_req: clash_req, _ = clash_entry
cache_file = dirs + '/clash.yml' update_list.append(f"clash: {clash_req.status_code}")
if os.path.exists(cache_file) and os.path.getsize(cache_file) > 0: with open(dirs + '/clash.yml', 'w', encoding="utf-8") as f:
update_list.append("clash: cache") f.write(clash_req.content.decode("utf-8"))
write_log(f"获取 clash 订阅失败,已保留本地缓存:{used_clash_url}", "WARN") else:
else: cache_file = dirs + '/clash.yml'
write_log(f"获取 clash 订阅失败:{used_clash_url}", "WARN") if os.path.exists(cache_file) and os.path.getsize(cache_file) > 0:
update_list.append("clash: cache")
write_log("未获取到 clash 订阅,已保留本地缓存", "WARN")
else: else:
update_list.append(f"clash: {clash_req.status_code}") write_log("未获取到 clash 订阅", "WARN")
with open(dirs + '/clash.yml', 'w', encoding="utf-8") as f:
clash_content = clash_req.content.decode("utf-8")
f.write(clash_content)
if update_list: if update_list:
file_pat = re.compile(r"v2ray\.txt|clash\.yml") file_pat = re.compile(r"v2ray\.txt|clash\.yml")
if file_pat.search(os.popen("git status").read()): if file_pat.search(os.popen("git status").read()):