Explore the code behind the innovation. Automation, networking, and media solutions.
"""
荣耀云登录-截图机器人(Windows 兼容版)
"""
import time
import random
import logging
from datetime import datetime
from pathlib import Path
import schedule
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
# ---------- 参数 ----------
LOGIN_URL = ("https://hnid-drcn.cloud.hihonor.com/CAS/portal/loginAuth.html?...") # URL truncated for brevity in display
ACCOUNT = "###########"
PASSWORD = "######"
BASE_DIR = Path(__file__).parent.resolve()
SCREENSHOT_DIR = BASE_DIR / "screenshots"
LOG_FILE = BASE_DIR / "honor_spy.log"
INTERVAL_MINUTES = 8
PAGE_TIMEOUT = 30_000
RETRY_MAX = 3
# ---------- 日志 ----------
SCREENSHOT_DIR.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.FileHandler(LOG_FILE, encoding="utf-8", delay=True),
logging.StreamHandler()]
)
log = logging.getLogger("honor_spy")
# ---------- 工具 ----------
def timestamp_filename() -> str:
# ✅ Windows 兼容:小时和分钟去前导零用 %#H 和 %#M
return datetime.now().strftime("%Y年%m月%d日%#H点%#M分.png")
def random_sleep(a: float = 0.5, b: float = 1.5):
time.sleep(random.uniform(a, b))
# ---------- 主流程 ----------
def login_and_shot(page):
log.info("打开登录页")
page.goto(LOGIN_URL, timeout=PAGE_TIMEOUT)
log.info("输入账号")
account = page.locator('input[ht="input_pwdlogin_account"]')
account.click()
account.fill(ACCOUNT)
random_sleep()
log.info("输入密码")
pwd = page.locator('input[ht="input_pwdlogin_pwd"]')
pwd.click()
pwd.fill(PASSWORD)
random_sleep()
log.info("点击登录")
page.locator('div[ht="click_pwdlogin_submitLogin"]').click()
log.info("等待进入查找设备页")
loc_btn = page.locator('div.operation_button[aria-label="定位设备按钮"]')
loc_btn.wait_for(state="visible", timeout=PAGE_TIMEOUT)
log.info("点击【定位设备】5次")
for _ in range(5):
loc_btn.click()
time.sleep(0.8)
filename = SCREENSHOT_DIR / timestamp_filename()
page.screenshot(path=filename, full_page=True)
log.info("已截图 -> %s", filename)
# ---------- 定时任务 ----------
def job():
log.info("=== 定时任务开始 ===")
for attempt in range(1, RETRY_MAX + 1):
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True, args=["--disable-gpu", "--no-sandbox"])
context = browser.new_context(
viewport={"width": 1280, "height": 720},
user_agent=("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36")
)
page = context.new_page()
login_and_shot(page)
browser.close()
break
except PWTimeout:
log.warning("第 %s 次尝试超时,重试...", attempt)
except Exception as e:
log.exception("第 %s 次尝试异常: %s", attempt, e)
else:
log.error("已达到最大重试次数,放弃本次截图")
log.info("=== 定时任务结束 ===")
# ---------- 入口 ----------
if __name__ == "__main__":
log.info("荣耀云截图机器人启动,每 %s 分钟执行一次", INTERVAL_MINUTES)
schedule.every(INTERVAL_MINUTES).minutes.do(job)
job() # 立即跑一次
while True:
schedule.run_pending()
time.sleep(1)
import os
import socket
import zipfile
import io
import time
from flask import Flask, request, send_from_directory, jsonify, render_template_string, send_file
# ================= 配置区域 =================
UPLOAD_FOLDER = 'CYBER_DRIVE_STORAGE'
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 1024 * 1024 * 1024 * 1024 # 无限制
# ================= 修复版前端代码 (刷新功能) =================
HTML_TEMPLATE = """
极域 · 赛博传输终端
CYBER DRIVE
Drop Files Here
"""
@app.route('/')
def index():
return render_template_string(HTML_TEMPLATE)
@app.route('/list_files')
def list_files_api():
try:
files = []
paths = sorted(os.listdir(app.config['UPLOAD_FOLDER']), key=lambda x: os.path.getmtime(os.path.join(app.config['UPLOAD_FOLDER'], x)), reverse=True)
for f in paths:
if not f.startswith('.'):
fp = os.path.join(app.config['UPLOAD_FOLDER'], f)
if os.path.isfile(fp):
files.append({'name': f, 'size': os.path.getsize(fp)})
return jsonify(files)
except: return jsonify([])
@app.route('/upload', methods=['POST'])
def upload_file():
if 'files[]' not in request.files: return jsonify({'error': 'No file'}), 400
for f in request.files.getlist('files[]'):
if f.filename: f.save(os.path.join(app.config['UPLOAD_FOLDER'], f.filename))
return jsonify({'success': True})
@app.route('/download/')
def download_file(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
def get_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('10.255.255.255', 1))
IP = s.getsockname()[0]
s.close()
except: IP = '127.0.0.1'
return IP
if __name__ == '__main__':
print(f"\n >>> 修复版已启动: http://{get_ip()}:5000 <<<\n")
app.run(host='0.0.0.0', port=5000, threaded=True)