get_subscribe/main.py
2026-06-29 03:04:59 +00:00

230 lines
7.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import os
import re
import smtplib
import sys
import time
import html
import xml.etree.ElementTree as ET
from email.mime.text import MIMEText
from email.utils import formataddr
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
requests.packages.urllib3.disable_warnings()
ok_code = [200, 201, 202, 203, 204, 205, 206]
# 邮箱域名过滤列表
blackhole_list = ["cnr.cn", "cyberpolice.cn", "gov.cn", "samr.gov.cn", "12321.cn"
"miit.gov.cn", "chinatcc.gov.cn"]
def write_log(content, level="INFO"):
date_str = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
update_log = f"[{date_str}] [{level}] {content}\n"
print(update_log)
with open(f'./log/{time.strftime("%Y-%m", time.localtime(time.time()))}-update.log', 'a', encoding="utf-8") as f:
f.write(update_log)
def _extract_urls(summary):
decoded = html.unescape(summary)
urls = []
for raw_url in re.findall(r"https?://[^\s\"'<>]+", decoded):
url = raw_url.strip().rstrip('.,;)')
if url not in urls:
urls.append(url)
return urls, decoded
_NODE_SCHEME_RE = re.compile(r"(?:vmess|vless|trojan|ss|ssr|hysteria2?|tuic)://")
def _b64decode(text):
compact = re.sub(r"\s+", "", text)
if not compact:
return ""
try:
# binascii.Error 是 ValueError 的子类,统一捕获即可
raw = base64.b64decode(compact + "=" * (-len(compact) % 4))
except ValueError:
return ""
return raw.decode("utf-8", "ignore")
def _detect_kind(text):
"""根据下载内容判断订阅类型:'clash' / 'v2ray',无法识别返回 None。"""
sample = text.strip()
if not sample:
return None
# clash 配置为 YAML包含 proxies/proxy-groups 字段
if re.search(r"^(?:proxies|proxy-groups)\s*:", sample, re.MULTILINE):
return "clash"
# v2ray 订阅为节点 URI 列表,可能是明文或 base64 编码
if _NODE_SCHEME_RE.search(sample):
return "v2ray"
decoded = _b64decode(sample)
if decoded and _NODE_SCHEME_RE.search(decoded):
return "v2ray"
return None
def _download_with_retry(urls):
if not urls:
return None, None
for url in urls:
for _ in range(3):
try:
req = requests.request(
"GET",
url,
verify=False,
timeout=20,
headers={"User-Agent": "Mozilla/5.0"},
)
except requests.RequestException as e:
print(f"请求 {url} 失败: {e}")
continue
if req.status_code in ok_code:
return req, url
return None, urls[0]
def _build_session():
session = requests.Session()
retry = Retry(
total=3,
connect=3,
read=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET"],
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
proxy = os.environ.get("SUBSCRIBE_PROXY", "").strip()
if proxy:
session.proxies.update({"http": proxy, "https": proxy})
session.headers.update({"User-Agent": "Mozilla/5.0"})
return session
def _classify_subscriptions(session, urls):
"""逐个下载候选链接并按内容判断类型,返回 {'v2ray': (req, url), 'clash': (req, url)}。"""
found = {}
for url in urls:
if "v2ray" in found and "clash" in found:
break
try:
req = session.get(url, verify=False, timeout=20)
except requests.RequestException as e:
write_log(f"请求失败:{url} - {e}", "WARN")
continue
if req.status_code not in ok_code:
write_log(f"请求失败:{url} - {req.status_code}", "WARN")
continue
kind = _detect_kind(req.text)
if kind and kind not in found:
found[kind] = (req, url)
write_log(f"识别到 {kind} 订阅:{url}", "INFO")
return found
def get_subscribe_url():
dirs = './subscribe'
if not os.path.exists(dirs):
os.makedirs(dirs)
log_dir = "./log"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
update_list = []
session = _build_session()
try:
rss_req = session.get(
'https://www.cfmem.com/feeds/posts/default?alt=rss',
timeout=20,
)
except requests.RequestException as ex:
write_log(f"更新失败!拉取 RSS 异常: {ex}", "ERROR")
return
if rss_req.status_code not in ok_code:
write_log(f"更新失败!无法拉取原网站内容 - {rss_req.status_code}", "ERROR")
return
try:
root = ET.fromstring(rss_req.text)
except ET.ParseError as ex:
write_log(f"更新失败RSS 解析失败: {ex}", "ERROR")
return
item = root.find("./channel/item")
if item is None:
write_log("更新失败RSS 中未找到可用条目", "ERROR")
return
summary = item.findtext("description")
if not summary:
write_log("暂时没有可用的订阅更新", "WARN")
return
urls, _ = _extract_urls(summary)
# 链接已无固定后缀,需下载内容后再判断是 v2ray 还是 clash
classified = _classify_subscriptions(session, urls)
# 获取普通订阅链接
v2ray_entry = classified.get("v2ray")
if v2ray_entry:
v2ray_req, _ = v2ray_entry
update_list.append(f"v2ray: {v2ray_req.status_code}")
with open(dirs + '/v2ray.txt', 'w', encoding="utf-8") as f:
f.write(v2ray_req.text)
else:
cache_file = dirs + '/v2ray.txt'
if os.path.exists(cache_file) and os.path.getsize(cache_file) > 0:
update_list.append("v2ray: cache")
write_log("未获取到 v2ray 订阅,已保留本地缓存", "WARN")
else:
write_log("未获取到 v2ray 订阅", "WARN")
# 获取clash订阅链接
clash_entry = classified.get("clash")
if clash_entry:
clash_req, _ = clash_entry
update_list.append(f"clash: {clash_req.status_code}")
with open(dirs + '/clash.yml', 'w', encoding="utf-8") as f:
f.write(clash_req.content.decode("utf-8"))
else:
cache_file = dirs + '/clash.yml'
if os.path.exists(cache_file) and os.path.getsize(cache_file) > 0:
update_list.append("clash: cache")
write_log("未获取到 clash 订阅,已保留本地缓存", "WARN")
else:
write_log("未获取到 clash 订阅", "WARN")
if update_list:
file_pat = re.compile(r"v2ray\.txt|clash\.yml")
if file_pat.search(os.popen("git status").read()):
write_log(f"更新成功:{update_list}", "INFO")
else:
write_log(f"订阅暂未更新", "WARN")
else:
write_log(f"未能获取新的更新内容", "WARN")
def main():
get_subscribe_url()
# 主函数入口
if __name__ == '__main__':
main()