基于Nostr协议的私信机器人框架:构建去中心化社交自动化服务
1. 项目概述一个去中心化社交的自动化信使最近在捣鼓Nostr协议想实现一些自动化交互比如自动回复、关键词监控或者简单的机器人服务。在GitHub上翻找时遇到了一个挺有意思的项目dhalsim/nostr-dm-agent。光看名字nostr-dm-agent核心指向很明确——这是一个基于Nostr协议、专门处理私信Direct Message DM的代理或智能体。Nostr本身是一个极简的、去中心化的社交网络协议它没有中心服务器你的身份是一对密钥你的“推文”在Nostr里叫“Note”和社交关系通过分布式的中继器Relay网络进行广播和存储。私信功能在Nostr中同样重要它允许用户进行点对点的加密通信。然而原生的客户端或库在处理自动化、高并发的私信任务时往往显得力不从心需要开发者自己管理连接、处理事件流、实现重连逻辑和消息队列。nostr-dm-agent的出现就是为了封装这些底层复杂性提供一个稳定、可扩展的私信处理框架让你能更专注于业务逻辑本身。简单来说你可以把它想象成一个专门为Nostr私信打造的“机器人框架”或“消息中间件”。它帮你处理与多个中继器的连接、订阅私信事件、解密消息、管理会话状态然后以事件驱动的方式将处理好的消息“喂”给你的业务逻辑代码。无论你是想做一个自动客服机器人、一个群发通知工具还是一个基于私信的自动化工作流触发器这个项目都提供了一个相当不错的起点。2. 核心架构与设计思路拆解2.1 为什么需要专门的DM Agent在深入代码之前我们先聊聊为什么在Nostr生态中一个专门的私信代理是有必要的。如果你直接用现有的Nostr客户端库比如nostr-sdk写一个监听私信的脚本很快会遇到几个典型问题连接管理复杂为了消息的可靠性和低延迟通常需要同时连接多个中继器。手动管理这些连接的建立、维持、断开重连以及负载均衡代码会变得冗长且容易出错。事件流处理繁琐Nostr协议基于事件Event。你需要持续监听特定类型的事件对于私信是kind 4并从中过滤出发给自己的消息。这涉及到持续的订阅Subscription管理。消息解密是门槛Nostr的私信使用接收者的公钥进行加密。处理加密消息的解密流程特别是当你有多个密钥对例如一个主身份几个子身份时逻辑会变得复杂。状态与上下文管理一个实用的机器人需要维护会话状态。比如用户问“天气怎么样”机器人回复“请问您想查询哪个城市”然后等待用户的下一条消息。这种简单的多轮对话就需要在内存或外部存储中维护一个会话上下文将后续消息与之前的询问关联起来。可扩展性与容错性当消息量增大时如何避免阻塞如何处理失败的消息发送如何优雅地重启服务而不丢失正在处理的消息这些都是在生产环境中必须考虑的问题。nostr-dm-agent的设计目标正是为了解决上述痛点。它将连接管理、事件订阅、消息加解密、会话管理这些“脏活累活”抽象成一个独立的服务层向上提供一个清晰、简洁的API或事件接口让开发者只需关心“收到消息后做什么”以及“要发送什么消息”。2.2 项目整体架构窥探虽然我没有直接运行这个特定仓库的代码项目可能还在迭代但基于其命名、常见的Nostr机器人模式以及类似项目的设计我们可以推断出其核心架构通常包含以下几个层次中继器连接池这是最底层。Agent会维护一个到多个Nostr中继器如wss://relay.damus.io,wss://nostr.wine的WebSocket连接池。它负责连接的建立、心跳保持、自动重连并可能根据中继器的响应速度或可靠性进行智能切换。事件订阅与过滤器管理Agent会向连接的中继器发起一个或多个订阅REQ请求过滤器Filter的核心是{“kinds”: [4], “#p”: [my_public_key]}。这意味着“请通知我所有kind为4私信且标签#p中包含我公钥的事件”。#p标签在Nostr私信事件中用于标识接收者。消息处理流水线这是Agent的核心。当从中继器收到一个符合条件的私信事件后流水线启动验证检查事件的签名sig是否有效确保消息未被篡改。解密使用自己的私钥解密事件内容content字段。这里通常涉及NIP-04标准定义的加密方式secp256k1 ECDH AES-256-CBC。解析与会话关联将解密后的明文通常是JSON字符串解析为结构化数据。然后根据发送者的公钥pubkey和可能的额外标签如#e引用事件ID用于线程对话找到或创建一个“会话”Session。触发业务逻辑将解析好的消息、发送者信息、会话上下文等封装成一个标准格式的“消息对象”通过回调函数Callback、事件发射器EventEmitter或消息队列Message Queue传递给上层业务处理器。业务逻辑处理器这是开发者需要编写代码的部分。Agent会以插件化或配置化的方式加载这些处理器。处理器接收消息对象执行逻辑如调用外部API查询天气、从数据库获取信息、运行一段脚本然后生成回复内容。发送队列与回执管理业务处理器生成的回复会被交给发送模块。发送模块可能需要使用接收者的公钥加密回复内容。构造一个符合Nostr标准的kind 4事件并签名。将事件放入发送队列通过连接池广播到中继器网络。可选地监听中继器返回的OK指令作为发送回执实现至少一次at-least-once的发送保证。状态与存储Agent需要持久化一些状态例如已处理过的事件ID防止重复处理、活跃的会话上下文、用户的偏好设置等。这可以通过内存、文件、SQLite或Redis等外部数据库来实现。注意以上是基于通用模式的分析。dhalsim/nostr-dm-agent的具体实现可能有所不同可能更轻量或更复杂。但其核心价值——将Nostr私信通信的复杂性封装起来——是确定的。3. 核心模块深度解析与实操要点3.1 密钥管理与安全实践安全是私信代理的生命线。私钥泄露意味着你的机器人身份完全失控。1. 私钥的存储与加载绝对不要将私钥硬编码在源代码中或提交到版本控制系统。常见的做法是环境变量NOSTR_PRIVATE_KEY nsec1...。这是最简单、最通用的方式适合容器化部署。配置文件使用.env文件通过dotenv库加载但确保.env在.gitignore中。密钥管理服务在生产环境中考虑使用云服务商的密钥管理服务如AWS KMS, GCP Secret Manager在运行时动态获取。在代码中加载私钥后通常会将其转换为十六进制格式供加密、签名函数使用。# 示例从环境变量加载并转换私钥 (Python伪代码) import os from nostr.key import PrivateKey private_key_hex os.getenv(NOSTR_PRIVATE_KEY) if private_key_hex.startswith(nsec1): # 如果是bech32编码需要解码 private_key PrivateKey.from_nsec(private_key_hex) else: # 假设已经是十六进制字符串 private_key PrivateKey(bytes.fromhex(private_key_hex)) # 获取公钥用于构造过滤器 public_key_hex private_key.public_key.hex()2. 多身份支持一个Agent可能需要代表多个身份不同的密钥对运行。例如一个客服系统可能有“技术支持A”、“技术支持B”等多个机器人账号。nostr-dm-agent应该支持配置一个密钥列表或一个密钥库。在处理消息时需要根据消息的接收者#p标签来决定使用哪个私钥进行解密。这要求架构上有一个KeyManager模块来管理多个密钥对。3. 解密过程详解Nostr NIP-04 加密流程是发送方使用自己的私钥和接收方的公钥通过ECDH椭圆曲线迪菲-赫尔曼算法计算出一个共享密钥。使用这个共享密钥派生出一个AES-256-CBC加密所需的密钥和初始化向量IV。对消息明文进行AES-256-CBC加密。最终事件内容格式为[加密算法标识]?[初始化向量]?[密文]通常像[nip04, iv, ciphertext]的JSON序列化字符串。因此Agent的解密模块必须严格实现这一流程。一个常见的坑是IV的处理必须确保从事件内容中正确提取并解码。3.2 会话管理与上下文保持无状态的HTTP请求很好处理但对话是有状态的。用户的第一句话和第二句话可能属于同一个任务。1. 会话标识最自然的会话标识是“对话双方”。在Nostr中就是“发送者公钥 接收者公钥机器人公钥”的组合。这个组合是唯一的。你可以用f{sender_pk}_{receiver_pk}这样的字符串作为会话ID。2. 上下文存储内存存储最简单用一个字典sessions {}来存。但服务重启后所有上下文丢失。只适合开发测试。外部存储生产环境必须使用外部存储。Redis是极佳选择因为它支持设置过期时间TTL可以自动清理闲置过久的会话。将会话ID作为Key上下文对象JSON序列化后作为Value存储起来。# 示例使用Redis存储会话 (Python伪代码) import redis import json import time redis_client redis.Redis(hostlocalhost, port6379, db0) SESSION_TTL 1800 # 30分钟无活动后过期 def get_session(session_id): data redis_client.get(session_id) return json.loads(data) if data else {step: initial, data: {}} def update_session(session_id, context): redis_client.setex(session_id, SESSION_TTL, json.dumps(context))上下文内容上下文里应该存什么这取决于你的业务逻辑。通常包括current_step当前处于对话的哪一步如awaiting_city_name。user_data用户在此会话中已提供的数据如{city: Beijing}。created_at/last_active时间戳用于清理。message_history_id关联的Nostr事件ID用于构建对话线程。3. 超时与清理必须实现会话清理机制防止内存或存储泄漏。可以启动一个后台定时任务定期扫描并删除last_active时间超过阈值的会话。3.3 事件处理与业务逻辑的松耦合Agent的核心设计原则应该是“通信”与“业务”分离。Agent只负责可靠地收发、解析、封装消息不关心消息的具体内容。业务逻辑应该以“插件”、“处理器”或“技能”的形式存在。1. 处理器注册机制Agent可以提供一个register_handler方法允许开发者注册针对不同消息类型的处理器。# 伪代码示例 class DmAgent: def __init__(self): self.handlers [] def register_handler(self, handler_func): self.handlers.append(handler_func) async def on_message(self, decrypted_msg, session): for handler in self.handlers: # 处理器可以返回一个回复或者修改会话上下文 reply await handler(decrypted_msg, session) if reply: await self.send_reply(session.sender_pk, reply) break # 或者设计成多个处理器可链式调用2. 消息路由更高级的设计是引入消息路由。处理器可以声明自己关心哪些关键词、命令或消息模式。Agent在收到消息后根据内容匹配到最合适的处理器。这类似于聊天机器人框架如Hubot、Botpress的设计。# 伪代码基于命令的路由 handlers { ‘/weather’: weather_handler, ‘/help’: help_handler, ‘default’: echo_handler, # 默认处理器例如复读或提示 } msg_content decrypted_msg.get(‘content’, ‘’).strip() if msg_content.startswith(‘/’): command msg_content.split()[0] handler handlers.get(command, handlers[‘default’]) else: handler handlers[‘default’] await handler(decrypted_msg, session)4. 从零开始构建一个简易版Nostr DM Agent为了彻底理解其原理我们不妨用Python假设使用nostr-sdk和asyncio勾勒一个最简化的、可运行的DM Agent核心骨架。请注意这是一个教育性质的示例省略了完整的错误处理和生产级特性。4.1 环境准备与依赖安装首先创建一个新的Python虚拟环境并安装核心依赖。# 创建项目目录并进入 mkdir simple-nostr-dm-agent cd simple-nostr-dm-agent python -m venv venv # 激活虚拟环境 (Linux/macOS) source venv/bin/activate # 激活虚拟环境 (Windows) # venv\Scripts\activate # 安装依赖 pip install nostr-sdk-asyncio redis # 假设我们使用nostr-sdk和redis存储会话 pip install python-dotenv # 用于加载环境变量4.2 核心代理类实现我们创建一个simple_agent.py文件。import asyncio import json import logging from typing import Dict, Optional, Callable, Awaitable from nostr_sdk import Client, Keys, Filter, Kind, HandleNotification, Event, RelayMessage, init_logger, LogLevel from nostr_sdk import NostrError import redis.asyncio as redis from dotenv import load_dotenv import os import time # 加载 .env 文件中的环境变量 load_dotenv() logging.basicConfig(levellogging.INFO) logger logging.getLogger(__name__) # 定义消息和会话的数据结构 class NostrMessage: def __init__(self, event: Event, decrypted_content: str): self.id event.id() self.sender_pubkey event.pubkey() self.content decrypted_content self.created_at event.created_at() self.raw_event event class Session: def __init__(self, session_id: str, sender_pk: str, receiver_pk: str): self.id session_id self.sender_pk sender_pk self.receiver_pk receiver_pk self.context {step: start, data: {}} self.last_active time.time() def update_context(self, key, value): self.context[key] value self.last_active time.time() class SimpleDmAgent: def __init__(self, private_key_hex: str, relays: list): 初始化Agent。 :param private_key_hex: 机器人的私钥十六进制字符串 :param relays: 要连接的中继器URL列表 self.keys Keys.parse(private_key_hex) self.relays relays self.client None self.handlers [] # 业务逻辑处理器列表 # Redis连接用于会话存储 self.redis_client redis.Redis(hostlocalhost, port6379, db0, decode_responsesTrue) self.session_ttl 1800 # 30分钟 async def connect(self): 连接到Nostr中继网络 self.client Client(self.keys) for relay in self.relays: try: await self.client.add_relay(relay) logger.info(f已添加中继器: {relay}) except Exception as e: logger.error(f添加中继器 {relay} 失败: {e}) await self.client.connect() async def _get_session(self, sender_pk: str) - Optional[Session]: 从Redis获取或创建会话 session_id f{sender_pk}_{self.keys.public_key().to_hex()} data await self.redis_client.get(session_id) if data: ctx json.loads(data) session Session(session_id, sender_pk, self.keys.public_key().to_hex()) session.context ctx session.last_active time.time() return session else: # 创建新会话 new_session Session(session_id, sender_pk, self.keys.public_key().to_hex()) await self._save_session(new_session) return new_session async def _save_session(self, session: Session): 保存会话到Redis await self.redis_client.setex( session.id, self.session_ttl, json.dumps(session.context) ) async def _decrypt_message(self, event: Event) - Optional[str]: 尝试解密NIP-04加密消息 try: # nostr-sdk 提供了便捷的解密方法 decrypted await self.client.decrypt(event.pubkey(), event.content()) return decrypted except NostrError as e: logger.warning(f解密来自 {event.pubkey()} 的消息失败: {e}) return None async def _handle_dm_event(self, event: Event): 处理单个私信事件的核心方法 # 1. 解密 decrypted_content await self._decrypt_message(event) if not decrypted_content: return logger.info(f收到来自 {event.pubkey()[:16]}... 的私信: {decrypted_content[:50]}...) # 2. 获取或创建会话 session await self._get_session(event.pubkey()) if not session: logger.error(无法创建或获取会话) return # 3. 封装消息对象 msg NostrMessage(event, decrypted_content) # 4. 调用所有注册的业务处理器 reply_content None for handler in self.handlers: try: # 处理器可以返回一个字符串作为回复也可以修改session.context handler_reply await handler(msg, session) if handler_reply and isinstance(handler_reply, str): reply_content handler_reply break # 假设一个处理器处理了就停止也可设计为管道 except Exception as e: logger.error(f处理器 {handler.__name__} 执行出错: {e}) # 5. 保存更新后的会话上下文 await self._save_session(session) # 6. 如果有回复则发送 if reply_content: await self._send_reply(event.pubkey(), reply_content) async def _send_reply(self, recipient_pubkey_hex: str, content: str): 发送加密回复 try: # 使用接收者的公钥加密内容 encrypted_msg await self.client.encrypt(recipient_pubkey_hex, content) # 构建并发送 kind 4 事件 event Event(Kind(4), encrypted_msg) await self.client.send_event(event) logger.info(f已发送回复给 {recipient_pubkey_hex[:16]}...) except Exception as e: logger.error(f发送回复失败: {e}) def register_handler(self, handler: Callable[[NostrMessage, Session], Awaitable[Optional[str]]]): 注册一个业务逻辑处理器 self.handlers.append(handler) logger.info(f已注册处理器: {handler.__name__}) async def start_listening(self): 开始监听私信 if not self.client: await self.connect() # 构建过滤器只订阅发给我的 kind 4 事件 filter Filter().kind(Kind(4)).pubkey(self.keys.public_key()) subscription_id dm_subscription # 设置通知处理回调 async def handle_notification(relay_message: RelayMessage): if isinstance(relay_message, Event): event relay_message if event.kind() 4: # 确保是私信 # 异步处理避免阻塞 asyncio.create_task(self._handle_dm_event(event)) # 添加中继器通知处理器具体API取决于nostr-sdk版本 # 这里是一个通用逻辑示意实际需要查阅nostr-sdk-asyncio的文档 # 通常是通过 client.subscribe([filter]) 并设置一个全局的通知回调 # 以下为示例性代码可能需要调整 try: await self.client.subscribe([filter], subscription_id) logger.info(已开始监听私信...) # 保持运行 await asyncio.Future() # 永久等待直到被取消 except Exception as e: logger.error(f监听过程中出错: {e}) async def cleanup(self): 清理资源 if self.client: await self.client.disconnect() await self.redis_client.close()4.3 编写你的第一个业务处理器现在让我们创建一个handlers.py文件编写几个简单的处理器。# handlers.py import asyncio from typing import Optional from simple_agent import NostrMessage, Session async def echo_handler(msg: NostrMessage, session: Session) - Optional[str]: 复读机处理器回复用户发送的相同内容 return fEcho: {msg.content} async def greeting_handler(msg: NostrMessage, session: Session) - Optional[str]: 问候处理器识别问候语并回复 content_lower msg.content.lower().strip() greetings [hi, hello, hey, 你好, 嗨] if any(greet in content_lower for greet in greetings): return fHello there! Your public key is {msg.sender_pubkey[:16]}... # 如果不匹配返回None让其他处理器处理 return None async def weather_handler(msg: NostrMessage, session: Session) - Optional[str]: 简单的多轮对话天气查询处理器 current_step session.context.get(step) if current_step start: if weather in msg.content.lower(): session.update_context(step, awaiting_city) # 注意这里只更新了session.context返回值是给用户的提示 return Sure! Which citys weather would you like to know? return None # 不是查询天气交给其他处理器 elif current_step awaiting_city: city msg.content.strip() session.update_context(step, start) # 重置步骤 session.update_context(last_city, city) # 这里应该调用一个真实的天气API此处模拟 await asyncio.sleep(0.5) # 模拟网络延迟 return fThe weather in {city} is sunny and 25°C. (This is a simulation)4.4 运行你的Agent最后创建一个main.py来启动一切。# main.py import asyncio import os from simple_agent import SimpleDmAgent import handlers async def main(): # 从环境变量读取私钥和中继器列表 private_key os.getenv(NOSTR_PRIVATE_KEY) # 例如nsec1... 或 hex if not private_key: raise ValueError(请设置环境变量 NOSTR_PRIVATE_KEY) relays [ wss://relay.damus.io, wss://nostr.wine, wss://relay.snort.social, # 可以添加更多中继器 ] # 1. 初始化Agent agent SimpleDmAgent(private_key, relays) # 2. 注册业务处理器注意顺序先匹配的先执行 agent.register_handler(handlers.greeting_handler) agent.register_handler(handlers.weather_handler) agent.register_handler(handlers.echo_handler) # 兜底处理器 # 3. 连接并开始监听 try: await agent.connect() logger.info(Agent启动成功开始监听...) await agent.start_listening() except KeyboardInterrupt: logger.info(收到中断信号正在关闭...) except Exception as e: logger.error(fAgent运行出错: {e}) finally: await agent.cleanup() if __name__ __main__: asyncio.run(main())在运行前请确保将你的Nostr私钥十六进制格式或nsec格式设置到环境变量NOSTR_PRIVATE_KEY中。本地运行着一个Redis服务器redis-server或者修改SimpleDmAgent初始化部分将会话存储切换到其他方式如内存字典仅用于测试。根据你使用的nostr-sdk版本可能需要调整_handle_dm_event和start_listening中的具体API调用。上述代码是一个概念性框架展示了核心流程。运行程序python main.py如果一切顺利你的机器人就会上线开始监听并处理发给它的私信了。你可以用另一个Nostr客户端如Damus、Amethyst向这个机器人的公钥发送加密私信进行测试。5. 生产环境部署与高级话题一个玩具级的Agent和能稳定运行的生产级服务之间还有很大差距。基于dhalsim/nostr-dm-agent这类项目的设计思想我们可以探讨如何将其加固。5.1 可靠性保障重连、幂等与死信队列中继器连接稳定性网络是不稳定的。Agent必须实现健壮的重连逻辑。不仅仅是连接断开时重连还要处理中继器无响应、订阅过期等情况。通常需要为每个中继器连接维护一个状态机连接中、已连接、断开、重试中并采用指数退避策略进行重连。消息处理的幂等性Nostr网络可能因为多个中继器而收到重复的事件。你的处理器必须能够处理重复消息而不产生副作用比如重复扣款、重复发送通知。最有效的方法是在处理事件前检查事件ID是否已经在“已处理ID集合”中。这个集合可以放在Redis里并设置一个合理的过期时间比如24小时。死信队列对于处理失败的消息如解密失败、业务逻辑异常、外部API调用超时不要简单地丢弃或无限重试。应该将其放入一个“死信队列”可以是Redis List或专门的MQ并记录失败原因。之后可以手动检查这些失败消息进行分析和修复。这为调试和问题追溯提供了极大便利。5.2 性能与扩展性考虑异步与并发现代Nostr库如Rust的nostr-sdkPython的nostr-sdk-asyncio都基于异步IO。确保你的整个处理链路网络IO、解密、业务逻辑、发送都是非阻塞的。对于CPU密集型的业务逻辑如复杂的文本处理考虑将其放入单独的线程池中运行避免阻塞事件循环。水平扩展如果消息量非常大单个Agent进程可能成为瓶颈。此时可以考虑水平扩展多个Agent实例让多个Agent实例连接相同的中继器使用相同的密钥。这需要解决消息去重的问题所有实例都会收到相同事件。一个方案是引入一个外部的、分布式的“领导者选举”或“分片”机制。例如使用Redis分布式锁让实例竞争处理特定发送者公钥pubkey的消息实现基于发送者的分片。消息队列解耦Agent核心只负责接收和解密消息然后将解密后的消息投递到一个内部消息队列如RabbitMQ、Kafka、Redis Stream。然后由一群独立的“业务逻辑工作进程”从队列中消费并处理消息。这样接收能力和处理能力可以独立扩展。5.3 监控、日志与可观测性没有监控的系统就是在黑暗中飞行。结构化日志不要只用print。使用logging模块输出结构化的JSON日志方便被ELKElasticsearch, Logstash, Kibana或Loki等日志系统收集和查询。记录关键事件连接状态变化、收到消息、处理开始/结束、发送消息、错误异常。指标监控暴露关键指标可以使用Prometheus客户端库。重要的指标包括nostr_connections_total当前活跃的中继器连接数。nostr_messages_received_total接收到的消息总数按类型。nostr_messages_processed_total成功处理的消息总数。nostr_message_processing_duration_seconds消息处理耗时直方图。nostr_send_errors_total消息发送失败次数。健康检查端点为Agent提供一个HTTP健康检查端点例如/health返回连接状态、队列深度等。这便于容器编排平台如Kubernetes判断服务是否健康。5.4 与dhalsim/nostr-dm-agent的对比与选型思考我们构建的简易版Agent实现了核心流程。而dhalsim/nostr-dm-agent作为一个开源项目很可能提供了更多开箱即用的特性更完善的连接管理可能内置了更智能的中继器选择、连接池和负载均衡。更丰富的配置可能通过YAML或TOML文件提供灵活的配置包括中继器列表、重试策略、日志级别等。插件化系统可能定义了标准的插件接口方便社区贡献各种功能插件如命令处理、自动转发、内容审核。内置的实用处理器可能自带了一些基础处理器如帮助命令、状态查询、消息转发等。更优的错误处理与恢复在生产中打磨过的错误处理逻辑总是更可靠。那么是应该直接用现成的项目还是自己造轮子使用dhalsim/nostr-dm-agent如果你的需求是快速搭建一个稳定、功能丰富的Nostr私信机器人并且其设计符合你的架构理念比如它采用的编程语言、插件体系那么直接使用它是最高效的选择。你需要做的是学习它的配置方式并为其编写业务插件。自己实现如果你的需求非常特殊比如需要深度定制通信协议、与现有系统做极紧密的集成、或有极致的性能要求或者你想通过造轮子来深入学习Nostr协议和异步编程那么从零开始或基于我们的简易框架扩展是一个很好的学习过程。但你需要准备好应对前面提到的所有生产环境挑战。无论选择哪条路理解我们在本文中拆解的核心架构、安全要点和问题解决方案都将帮助你更好地使用或构建一个属于自己的、强大的Nostr私信自动化代理。去中心化社交的自动化浪潮才刚刚开始拥有一个属于自己的“数字信使”无疑是探索这片新大陆的利器。