魔方财务批量拉取产品信息教程
使用魔方财务有时候经常上级【变化了ip】或者批量【补时间】什么的我们这里因为我们的财务换过域名导致上级无法给我们推送需要我们手动拉取信息一个两个还好几百个怎么办本教程就是【欧云服务器】专门给大家做的教程首先感谢极点云开发这个拉取工具的付出脚本是在我们电脑上执行的不是主控服务器相当于操控我们电脑帮我们执行多次更换ip的请求。1、安装python3.6或以上版本脚本要求我们电脑具备python3.6或以上版本。我们这里推荐3.10.11.访问python搜索安装即可https://www.python.org/downloads/windows/我们电脑上双击安装装好了 有个是否允许加长路径啥的要允许。2、电脑执行函数的安装2.1 在脚本所在文件夹右键打开终端2.2 只需要安装requests库使用pip安装只有第一次要安装后面跳过这一步pip install requests这里很多人装不上报错因为与win11的自带python冲突那就执行这个py -m pip install requests2.3 执行脚本假设你的脚本文件叫mfcw_laqu.py或者别的名字放在C:\Users\666\Downloads目录下方法一此时可以在 步骤2.1打开的终端 输入命令执行脚本py mfcw_laqu.py方法二因为你安装了python事实上你应该可以直接双击就能打开脚本文件然后跟着提示输入就行了3、token、cookie的获取方法打开并登陆魔方云后台然后按f12勾选保留日志然后刷新下页面找到如下文件特征其实很多文件都包含token\cookie的随便点点看有的话 就可以复制了都是一样的这个cookie也不是一直不变的魔方云每次登录都是不一样的但只要没有让你重新登陆就都是有效的python的代码如下可以创建个txt文件复制进去保存修改名字为.py结尾的就行了import requests import json import time import urllib3 # 禁用SSL警告 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class HostManager: def __init__(self, base_url, cookie, admin_path): self.base_url base_url.rstrip(/) self.admin_path admin_path self.cookies {PHPSESSID: cookie} self.session requests.Session() self.session.cookies.update(self.cookies) self.session.headers.update({ User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Accept: application/json, text/plain, */*, Content-Type: application/json;charsetUTF-8, Origin: base_url, Referer: f{base_url}/{admin_path}/, Sec-Fetch-Dest: empty, Sec-Fetch-Mode: cors, Sec-Fetch-Site: same-origin }) def get_host_ids(self, ip_segment, max_count1000): 根据IP段获取主机ID列表 Args: ip_segment: IP段如 1.1.1 max_count: 最大获取数量默认1000 Returns: list: 主机ID列表 request_time int(time.time() * 1000) # 构建请求URL url f{self.base_url}/{self.admin_path}/host/list params { queryConditions: ip, username: , uid: , page: 1, pagecount: max_count, order: id, sort: DESC, product_type: , server: , product: , payment: , billingcycle: , domainstatus: , domain: , ip: ip_segment, nextduedate: , request_time: request_time, languagesys: CN } try: print(f正在获取IP段 {ip_segment} 的主机列表...) response self.session.get(url, paramsparams, verifyFalse) response.raise_for_status() data response.json() if data.get(status) 200: host_list data.get(data, {}).get(list, []) ids [host.get(id) for host in host_list if host.get(id)] print(f✅ 成功获取 {len(ids)} 个主机ID) return ids else: print(f❌ 获取主机列表失败: {data}) return [] except Exception as e: print(f❌ 获取主机ID时发生错误: {e}) return [] def sync_host(self, host_id, delay0.1): 同步单个主机 Args: host_id: 主机ID delay: 请求间隔延迟秒避免请求过快 Returns: dict: 同步结果 request_time int(time.time() * 1000) url f{self.base_url}/{self.admin_path}/provision/default params { request_time: request_time, languagesys: CN } payload { id: host_id, func: sync } try: # 添加延迟避免请求过快 if delay 0: time.sleep(delay) response self.session.post( url, paramsparams, jsonpayload, verifyFalse ) response.raise_for_status() result response.json() return { host_id: host_id, status: result.get(status), data: result.get(data), message: result.get(message, ), success: result.get(status) 200 } except Exception as e: return { host_id: host_id, status: error, data: None, message: str(e), success: False } def batch_sync_hosts(self, ip_segment, max_count1000, delay0.1): 批量同步指定IP段的所有主机 Args: ip_segment: IP段 max_count: 最大主机数量 delay: 请求间隔秒 Returns: list: 所有同步结果 print(f\n 开始处理IP段: {ip_segment}) # 1. 获取主机ID列表 host_ids self.get_host_ids(ip_segment, max_count) if not host_ids: print(❌ 未找到任何主机ID程序结束) return [] print(f 找到 {len(host_ids)} 个主机开始同步...) # 2. 批量同步主机 results [] for i, host_id in enumerate(host_ids, 1): print(f 正在同步第 {i}/{len(host_ids)} 个主机 (ID: {host_id})) result self.sync_host(host_id, delay) results.append(result) # 输出当前结果 if result[success]: print(f ✅ 主机 {host_id} 同步成功: {result.get(message, 无返回消息)}) else: print(f ❌ 主机 {host_id} 同步失败: {result.get(message, 未知错误)}) # 3. 统计结果 success_count sum(1 for r in results if r[success]) failure_count len(results) - success_count print(f\n 同步完成) print(f✅ 成功: {success_count} 个) print(f❌ 失败: {failure_count} 个) return results def wait_for_exit(): 等待用户按键退出 print(\n *50) print(程序执行完毕) input(按Enter键退出...) def get_user_input(): 获取用户输入的所有配置参数 print( * 50) print( 主机批量同步工具) print( * 50) # 基础配置 base_url input(请输入基础URL (默认: https://www.baidu.com): ).strip() if not base_url: base_url http://baidu.com admin_path input(请输入后台路径 (默认: JD123): ).strip() if not admin_path: admin_path JD123 cookie input(请输入PHPSESSID Cookie: (不含phpsessid)).strip() if not cookie: print(❌ PHPSESSID 不能为空) return None # 查询参数 ip_segment input(请输入要查询的IP段 (例如: 1.1.1): ).strip() if not ip_segment: print(❌ IP段不能为空) return None try: max_count int(input(请输入最大获取数量 (默认1000): ) or 1000) delay float(input(请输入请求间隔秒数 (默认0.1): ) or 0.1) except ValueError: print(❌ 输入格式错误使用默认值) max_count 1000 delay 0.1 return { base_url: base_url, admin_path: admin_path, cookie: cookie, ip_segment: ip_segment, max_count: max_count, delay: delay } def main(): # 获取用户输入 config get_user_input() if not config: return # 创建管理器实例 manager HostManager( base_urlconfig[base_url], cookieconfig[cookie], admin_pathconfig[admin_path] ) # 执行批量同步 results manager.batch_sync_hosts( ip_segmentconfig[ip_segment], max_countconfig[max_count], delayconfig[delay] ) # 保存结果到文件 if results: timestamp int(time.time()) ip_segment_safe config[ip_segment].replace(., _) filename fsync_results_{ip_segment_safe}_{timestamp}.json with open(filename, w, encodingutf-8) as f: json.dump(results, f, ensure_asciiFalse, indent2) print(f\n 详细结果已保存到: {filename}) # 显示简要统计 success_ids [r[host_id] for r in results if r[success]] failed_ids [r[host_id] for r in results if not r[success]] if success_ids: print(f✅ 成功的主机ID: {success_ids}) if failed_ids: print(f❌ 失败的主机ID: {failed_ids}) wait_for_exit() if __name__ __main__: main()