电力自动化通信入门:手把手教你用Python模拟IEC104协议的数据采集与遥控
电力自动化通信实战Python模拟IEC104协议的数据采集与遥控在工业自动化领域电力系统的远程监控与控制是保障电网稳定运行的关键技术。IEC 60870-5-104简称IEC104作为电力自动化系统中广泛采用的通信协议实现了控制站与受控站之间的高效数据交互。本文将带您用Python构建一个完整的IEC104协议模拟环境无需真实硬件设备即可深入理解协议核心机制。1. IEC104协议基础与环境搭建IEC104协议采用客户端/服务器架构基于TCP/IP实现实时数据传输。控制站客户端负责发送控制命令和接收数据而受控站服务器则执行命令并上报设备状态。协议支持两种传输模式非平衡模式仅控制站可发起通信平衡模式常用双方均可主动传输数据开发环境配置# 安装必要库 pip install socket struct time random协议帧结构由APCI控制信息和ASDU应用数据组成。关键帧类型包括帧类型功能描述典型应用场景I格式信息传输遥测、遥信数据上报S格式确认帧接收序号确认U格式控制帧启动/停止数据传输2. TCP连接与基础控制帧实现建立可靠通信的第一步是TCP连接管理。以下是Python实现的服务器端基础框架import socket class IEC104Server: def __init__(self, host0.0.0.0, port2404): self.server_socket socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((host, port)) self.server_socket.listen(1) print(fServer listening on {host}:{port}) def handle_connection(self): conn, addr self.server_socket.accept() print(fConnected by {addr}) # 默认处于STOPDT状态 data_transmission_active False while True: data conn.recv(255) # APDU最大长度255字节 if not data: break # 解析APCI start_byte, length data[0], data[1] if start_byte ! 0x68: print(Invalid start byte) continue control_field data[2:6] frame_type control_field[0] 0x03 # 处理U格式帧 if frame_type 3: if control_field[0] 0x07: # STARTDT激活 print(Received STARTDT activate) conn.send(bytes([0x68, 0x04, 0x0B, 0x00, 0x00, 0x00])) # STARTDT确认 data_transmission_active True elif control_field[0] 0x13: # TESTFR激活 print(Received TESTFR activate) conn.send(bytes([0x68, 0x04, 0x83, 0x00, 0x00, 0x00])) # TESTFR确认注意实际实现中需要添加超时处理和心跳机制确保连接稳定性3. 遥信数据采集与上报单点遥信SIQ是电力系统中最基础的状态量表示开关、断路器等设备的开合状态。其数据结构包含状态值和质量标志SIQ字节结构 bit 0: 状态值 (0分/开, 1合/关) bit 1-3: 保留 bit 4: 封锁标志 (BL) bit 5: 取代标志 (SB) bit 6: 刷新标志 (NT) bit 7: 有效标志 (IV)实现周期性数据上报的服务器端代码def send_spontaneous_data(conn, io_address, value): # 构建ASDU type_id 0x01 # 单点遥信 vsq 0x01 # 单个信息对象 cot 0x03 # 突发传输原因 asdu_addr 0x0001 io_addr_bytes io_address.to_bytes(3, little) # 构建SIQ (假设质量标志全正常) siq value 0x01 # 组装APDU apci bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) # I格式序号0 asdu bytes([type_id, vsq, cot, asdu_addr 0xFF, (asdu_addr 8) 0xFF]) io_addr_bytes bytes([siq]) conn.send(apci asdu) # 在服务器主循环中添加 if data_transmission_active and random.random() 0.3: # 30%概率上报 io_address random.randint(1, 100) state random.randint(0, 1) send_spontaneous_data(conn, io_address, state)客户端解析SIQ数据的示例def parse_siq(asdu): type_id asdu[0] if type_id ! 0x01: return None io_address int.from_bytes(asdu[6:9], little) siq asdu[9] state siq 0x01 iv (siq 7) 0x01 # 有效性标志 return { address: io_address, state: ON if state else OFF, valid: iv 0, timestamp: time.time() }4. 遥控命令实现与安全机制遥控命令SCO允许控制站改变受控站设备状态典型应用包括断路器分合闸操作。IEC104支持两种控制模式直接执行模式单步完成命令下发与执行选择-执行模式先选择验证再确认执行选择-执行模式实现def handle_control_command(conn, data): type_id data[6] if type_id ! 0x2D: # 单命令C_SC_NA_1 return False cot data[8] io_address int.from_bytes(data[10:13], little) sco data[13] # 解析控制命令 execute (sco 0x80) 0 command sco 0x01 if cot 0x06: # 选择命令 print(fSelect command for address {io_address}) # 返回确认帧肯定确认 confirm_asdu bytes([0x2D, 0x01, 0x07, 0x01, 0x00]) data[10:14] conn.send(bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) confirm_asdu) return True elif cot 0x08: # 执行命令 print(fExecute {ON if command else OFF} for address {io_address}) # 更新设备状态并返回执行确认 confirm_asdu bytes([0x2D, 0x01, 0x0A, 0x01, 0x00]) data[10:14] conn.send(bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) confirm_asdu) return True return False安全提示实际系统中应实现操作权限验证、防误操作闭锁等安全机制客户端发送遥控命令的示例流程def send_control_command(conn, io_address, command, select_executeTrue): # 构建ASDU type_id 0x2D # 单命令 vsq 0x01 # 单个信息对象 cot 0x06 if select_execute else 0x08 # 选择/执行阶段 asdu_addr 0x0001 io_addr_bytes io_address.to_bytes(3, little) # 构建SCO (选择阶段设置最高位为1) sco 0x80 if select_execute else 0x00 sco | (command 0x01) # 组装APDU apci bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) # I格式序号0 asdu bytes([type_id, vsq, cot, asdu_addr 0xFF, (asdu_addr 8) 0xFF]) io_addr_bytes bytes([sco]) conn.send(apci asdu) # 等待确认 response conn.recv(255) if len(response) 6: return False # 验证确认帧 return response[6] type_id and response[8] (cot 1)5. 高级功能与性能优化完整的IEC104实现还需要考虑以下高级功能时钟同步机制def handle_time_sync(conn, data): type_id data[6] if type_id ! 0x67: # 时钟同步命令C_CS_NA_1 return False # 解析时间 (CP56Time2a格式) milliseconds int.from_bytes(data[10:12], little) minutes data[12] 0x3F hours data[13] 0x1F day data[14] 0x1F month data[15] 0x0F year data[16] 0x7F # 返回带时标的确认 current_time time.time() time_bytes convert_to_cp56time2a(current_time) confirm_asdu bytes([0x67, 0x01, 0x07, 0x01, 0x00]) time_bytes conn.send(bytes([0x68, 0x0E, 0x00, 0x00, 0x00, 0x00]) confirm_asdu) return True性能优化技巧使用缓冲区减少小包传输实现帧序号自动管理异步处理耗时操作连接状态监控与自动恢复class IEC104Buffer: def __init__(self): self.send_seq 0 self.recv_seq 0 self.send_buffer [] def add_frame(self, apdu): self.send_buffer.append(apdu) self.send_seq (self.send_seq 1) % 32768 def get_ack_frames(self, ack_seq): # 返回已确认的帧 acked [] while self.send_buffer and self.send_buffer[0][seq] ack_seq: acked.append(self.send_buffer.pop(0)) return acked通过这个Python实现我们构建了一个完整的IEC104协议模拟环境涵盖了从基础连接建立到数据采集、遥控命令等核心功能。这种模拟方法不仅适用于学习协议原理也可用于自动化测试系统的开发。