Zephyr RTOS 中k_msgq(消息队列)接口介绍
目录概述1 消息队列介绍1.1 核心消息队列函数列表1.2 消息队列的核心概念1.3 消息队列与其他 IPC 对比2 核心函数2.1 初始化消息队列2.2 发送消息k_msgq_put()2.3 接收消息k_msgq_get()2.4 消息队列管理函数3. 完整应用示例3.1 中断到线程通信经典模式3.2 多生产者-单消费者模式4 高级用法4.1 消息队列设计模式4.2 错误处理与恢复4.3 性能优化技巧4.4 调试函数与工具5 应用和总结5.1 常见问题与解决方案5.2 使用技巧概述在 Zephyr RTOS 中k_msgq消息队列是用于在线程间传递固定大小消息的核心进程间通信IPC机制。与传递字节流的管道或传递指针的 FIFO/LIFO 不同消息队列完整复制消息内容确保数据所有权清晰是最常用的线程间通信方式之一。通过合理使用消息队列可以在 Zephyr 系统中构建清晰、可靠、高效的线程间通信架构。对于大多数应用场景消息队列都是比管道、FIFO/LIFO 或邮箱更简单和安全的选择。1 消息队列介绍1.1 核心消息队列函数列表函数功能描述主要用途k_msgq_init()动态初始化消息队列运行时初始化需提供缓冲区k_msgq_alloc_init()从内存池初始化消息队列动态分配消息队列内存k_msgq_put()向队列发送消息生产者写入数据k_msgq_get()从队列接收消息消费者读取数据k_msgq_purge()清空队列所有消息重置队列状态k_msgq_num_used_get()获取已使用消息数监控队列状态k_msgq_num_free_get()获取空闲消息数检查队列容量k_msgq_get_attrs()获取队列属性调试和信息查询K_MSGQ_DEFINE()静态定义消息队列编译时定义最常用1.2 消息队列的核心概念消息队列是一个固定大小、固定消息长度的先进先出缓冲区。每条消息作为一个整体原子性地写入和读取消息队列结构 ┌────────────┬────────────┬────────────┬────────────┐ │ 消息槽 0 │ 消息槽 1 │ 消息槽 2 │ 消息槽 3 │ ← 队列深度4 │ (msg_size) │ (msg_size) │ (msg_size) │ (msg_size) │ └────────────┴────────────┴────────────┴────────────┘ ↑ ↑ 写入端 读取端 (k_msgq_put) (k_msgq_get)关键特性消息原子性每条消息的写入和读取是原子的不会出现部分消息数据复制发送时复制数据到队列接收时复制到用户缓冲区阻塞支持队列满时发送可阻塞队列空时接收可阻塞线程安全多线程并发访问安全中断限制中断中只能非阻塞发送K_NO_WAIT1.3 消息队列与其他 IPC 对比特性消息队列 (k_msgq)管道 (k_pipe)FIFO/LIFO (k_fifo/k_lifo)邮箱 (k_mbox)数据模型固定大小消息字节流指针数据块带元数据内存操作复制消息内容复制字节传递指针传递数据块引用原子性每条消息原子字节流无消息边界每个指针原子每个数据块原子阻塞行为发送/接收均可阻塞读取/写入均可阻塞获取可阻塞添加不阻塞发送/接收均可阻塞中断安全发送可用K_NO_WAIT限制较多添加安全 (k_fifo_put)限制较多内存开销中等预分配缓冲区低字节缓冲区最低仅指针高支持复杂操作典型用途结构化数据传递、事件通知串口数据、流处理对象池、简单通知大块数据传输、零拷贝需求2 核心函数2.1 初始化消息队列1静态初始化最常用、推荐#include zephyr/kernel.h /* 定义消息结构 */ struct sensor_data { float temperature; float humidity; uint32_t timestamp; }; /* 静态定义消息队列 - my_msgq: 队列变量名 - sizeof(struct sensor_data): 每条消息大小 - 10: 队列深度最多存储10条消息 - 4: 字节对齐通常为sizeof(uintptr_t) */ K_MSGQ_DEFINE(my_msgq, sizeof(struct sensor_data), 10, 4); void example_static_init(void) { printk(消息队列已静态初始化\n); printk( 消息大小: %zu 字节\n, sizeof(struct sensor_data)); printk( 队列深度: 10 条消息\n); printk( 总缓冲区: %zu 字节\n, sizeof(struct sensor_data) * 10); }2动态初始化/* 动态初始化消息队列 */ void example_dynamic_init(void) { struct k_msgq dynamic_msgq; struct sensor_data buffer[5]; /* 用户提供的缓冲区 */ /* 动态初始化 - dynamic_msgq: 消息队列控制块 - buffer: 存储消息的缓冲区 - sizeof(struct sensor_data): 每条消息大小 - 5: 队列深度与buffer大小匹配 */ k_msgq_init(dynamic_msgq, (char *)buffer, sizeof(struct sensor_data), 5); printk(消息队列已动态初始化\n); }2.2 发送消息k_msgq_put()1 函数原型int k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout);2参数说明msgq: 目标消息队列指针data: 指向要发送的消息数据timeout: 发送超时队列满时的行为控制3超时行为K_NO_WAIT:非阻塞队列满立即返回-ENOMSGK_FOREVER:永久阻塞直到队列有空位时间值:限时阻塞超时返回-EAGAIN4示例生产者线程/* 生产者周期性发送传感器数据 */ void sensor_producer_thread(void *arg1, void *arg2, void *arg3) { struct k_msgq *msgq (struct k_msgq *)arg1; struct sensor_data data; uint32_t sequence 0; int ret; while (1) { /* 准备消息数据 */ data.temperature read_temperature(); data.humidity read_humidity(); data.timestamp k_uptime_get(); /* 发送消息策略1永久阻塞*/ ret k_msgq_put(msgq, data, K_FOREVER); if (ret 0) { sequence; printk([P] 发送消息 #%u: %.1f°C, %.1f%%\n, sequence, data.temperature, data.humidity); } /* 策略2限时等待100ms*/ // ret k_msgq_put(msgq, data, K_MSEC(100)); // if (ret 0) { // /* 成功 */ // } else if (ret -EAGAIN) { // printk(队列满丢弃数据\n); // } /* 策略3非阻塞中断中必须使用*/ // ret k_msgq_put(msgq, data, K_NO_WAIT); // if (ret -ENOMSG) { // /* 队列满需要处理数据丢失 */ // } k_sleep(K_MSEC(1000)); /* 每秒采样一次 */ } }2.3 接收消息k_msgq_get()1函数原型int k_msgq_get(struct k_msgq *msgq, void *data, k_timeout_t timeout);2示例消费者线程/* 消费者处理传感器数据 */ void data_consumer_thread(void *arg1, void *arg2, void *arg3) { struct k_msgq *msgq (struct k_msgq *)arg1; struct sensor_data rx_data; int ret; printk(消费者线程启动\n); while (1) { /* 接收消息策略1永久阻塞等待*/ ret k_msgq_get(msgq, rx_data, K_FOREVER); if (ret 0) { /* 处理接收到的数据 */ printk([C] 收到: %.1f°C, %.1f%%, 时间: %u ms\n, rx_data.temperature, rx_data.humidity, rx_data.timestamp); /* 这里可以添加数据处理逻辑 */ process_sensor_data(rx_data); } /* 策略2限时等待带超时处理*/ // ret k_msgq_get(msgq, rx_data, K_MSEC(500)); // if (ret 0) { // /* 成功接收 */ // } else if (ret -EAGAIN) { // printk(等待超时无新数据\n); // /* 可以执行其他后台任务 */ // } /* 策略3非阻塞轮询 */ // ret k_msgq_get(msgq, rx_data, K_NO_WAIT); // if (ret -ENOMSG) { // k_sleep(K_MSEC(10)); /* 无数据时短暂休眠 */ // } } } void process_sensor_data(const struct sensor_data *data) { /* 示例处理简单阈值检查 */ if (data-temperature 30.0) { printk(警告温度过高\n); } if (data-humidity 80.0) { printk(警告湿度过高\n); } }2.4 消息队列管理函数/* 获取队列状态信息 */ void monitor_msgq_status(struct k_msgq *msgq, const char *name) { size_t used k_msgq_num_used_get(msgq); size_t free k_msgq_num_free_get(msgq); printk([%s] 状态: 已用%zu, 空闲%zu, 使用率%.1f%%\n, name, used, free, (used * 100.0) / (used free)); /* 获取详细属性 */ struct k_msgq_attrs attrs; k_msgq_get_attrs(msgq, attrs); printk( 消息大小: %zu 字节, 最大消息数: %zu\n, attrs.msg_size, attrs.max_msgs); } /* 清空队列中的所有消息 */ void reset_message_queue(struct k_msgq *msgq) { printk(清空消息队列...\n); k_msgq_purge(msgq); size_t used k_msgq_num_used_get(msgq); if (used 0) { printk(队列已清空\n); } }3. 完整应用示例3.1 中断到线程通信经典模式#include zephyr/kernel.h #include zephyr/device.h #include zephyr/drivers/gpio.h #include zephyr/sys/printk.h /* 定义按键事件消息 */ struct button_event { uint32_t press_count; uint8_t button_id; bool long_press; }; /* 静态定义消息队列 */ K_MSGQ_DEFINE(button_event_queue, sizeof(struct button_event), 5, 4); /* GPIO 设备 */ static const struct gpio_dt_spec button GPIO_DT_SPEC_GET(DT_ALIAS(sw0), gpios); static struct gpio_callback button_cb; /* 按键中断服务程序 */ void button_pressed_isr(const struct device *port, struct gpio_callback *cb, gpio_port_pins_t pins) { static uint32_t press_count 0; struct button_event evt; /* 去抖动检查简化版*/ static int64_t last_press_time 0; int64_t now k_uptime_get(); if (now - last_press_time 50) { /* 50ms 去抖 */ return; } last_press_time now; /* 准备事件数据 */ evt.press_count press_count; evt.button_id 1; evt.long_press false; /* 简化示例实际需要计时 */ /* 在中断中发送消息必须使用 K_NO_WAIT */ int ret k_msgq_put(button_event_queue, evt, K_NO_WAIT); if (ret ! 0) { /* 队列满事件丢失 */ static uint32_t lost_events 0; lost_events; if (lost_events % 10 0) { /* 每丢失10个事件报告一次避免频繁打印 */ printk(警告: 已丢失 %u 个按键事件\n, lost_events); } } } /* 事件处理线程 */ void button_event_handler(void *arg1, void *arg2, void *arg3) { struct button_event evt; printk(按键事件处理器启动\n); while (1) { /* 阻塞等待按键事件 */ if (k_msgq_get(button_event_queue, evt, K_FOREVER) 0) { /* 处理事件 */ printk(按键 #%d 按下 (总次数: %u)%s\n, evt.button_id, evt.press_count, evt.long_press ? [长按] : ); /* 这里可以触发相应的操作 */ handle_button_action(evt); } } } void handle_button_action(const struct button_event *evt) { /* 根据按键事件执行操作 */ switch (evt-button_id) { case 1: printk(执行主功能\n); break; default: break; } } /* 初始化函数 */ int init_button_system(void) { int ret; /* 检查设备是否就绪 */ if (!gpio_is_ready_dt(button)) { printk(按键设备未就绪\n); return -ENODEV; } /* 配置 GPIO 引脚 */ ret gpio_pin_configure_dt(button, GPIO_INPUT); if (ret 0) { return ret; } /* 配置中断 */ ret gpio_pin_interrupt_configure_dt(button, GPIO_INT_EDGE_TO_ACTIVE); if (ret 0) { return ret; } /* 初始化回调并添加 */ gpio_init_callback(button_cb, button_pressed_isr, BIT(button.pin)); gpio_add_callback(button.port, button_cb); /* 启动事件处理线程 */ k_thread_create(handler_thread, handler_stack, K_THREAD_STACK_SIZEOF(handler_stack), button_event_handler, NULL, NULL, NULL, 5, 0, K_NO_WAIT); printk(按键系统初始化完成\n); return 0; }3.2 多生产者-单消费者模式#include zephyr/kernel.h #include zephyr/random/rand32.h /* 定义工作命令消息 */ struct work_command { uint8_t command_id; uint32_t parameter; uint32_t priority; /* 1最高优先级 */ }; /* 静态定义工作队列 */ K_MSGQ_DEFINE(work_queue, sizeof(struct work_command), 20, 4); /* 多个生产者线程 */ void high_priority_producer(void *arg1, void *arg2, void *arg3) { struct work_command cmd; uint32_t count 0; while (1) { cmd.command_id 1; cmd.parameter count; cmd.priority 1; /* 高优先级 */ /* 发送高优先级命令 */ if (k_msgq_put(work_queue, cmd, K_MSEC(10)) ! 0) { printk(HP: 队列满丢弃命令\n); } else { printk(HP: 发送命令 #%u\n, count); } k_sleep(K_MSEC(200)); /* 每200ms发送一次 */ } } void low_priority_producer(void *arg1, void *arg2, void *arg3) { struct work_command cmd; uint32_t count 1000; while (1) { cmd.command_id 2; cmd.parameter count; cmd.priority 5; /* 低优先级 */ /* 发送低优先级命令 */ if (k_msgq_put(work_queue, cmd, K_MSEC(10)) ! 0) { printk(LP: 队列满丢弃命令\n); } k_sleep(K_MSEC(1000)); /* 每秒发送一次 */ } } /* 工作处理器消费者 */ void work_processor(void *arg1, void *arg2, void *arg3) { struct work_command cmd; uint32_t processed 0; printk(工作处理器启动\n); while (1) { /* 接收工作命令 */ if (k_msgq_get(work_queue, cmd, K_MSEC(500)) 0) { processed; /* 模拟命令处理 */ printk([%u] 处理命令: id%u, param%u, 优先级%u\n, processed, cmd.command_id, cmd.parameter, cmd.priority); /* 根据优先级模拟不同的处理时间 */ uint32_t process_time 50 / cmd.priority; /* ms */ k_sleep(K_MSEC(process_time)); /* 监控队列状态 */ if (processed % 10 0) { monitor_msgq_status(work_queue, 工作队列); } } else { /* 超时可以执行一些后台维护任务 */ static uint32_t idle_count 0; idle_count; if (idle_count % 5 0) { printk(处理器空闲执行维护任务\n); /* 执行垃圾回收、状态报告等 */ } } } } /* 主函数 */ int main(void) { printk( 多生产者-单消费者示例 \n); /* 启动生产者线程 */ k_thread_create(hp_producer_tid, hp_producer_stack, K_THREAD_STACK_SIZEOF(hp_producer_stack), high_priority_producer, NULL, NULL, NULL, 6, 0, K_NO_WAIT); k_thread_create(lp_producer_tid, lp_producer_stack, K_THREAD_STACK_SIZEOF(lp_producer_stack), low_priority_producer, NULL, NULL, NULL, 7, 0, K_NO_WAIT); /* 启动消费者线程 */ k_thread_create(processor_tid, processor_stack, K_THREAD_STACK_SIZEOF(processor_stack), work_processor, NULL, NULL, NULL, 5, 0, K_NO_WAIT); /* 主线程监控 */ while (1) { k_sleep(K_SECONDS(5)); printk( 系统运行状态 \n); monitor_msgq_status(work_queue, 主监控); } return 0; }4 高级用法4.1 消息队列设计模式/* 模式1请求-响应模式 */ struct request_msg { uint8_t req_id; void *data; size_t data_len; struct k_msgq *reply_queue; /* 回复队列 */ }; struct response_msg { uint8_t req_id; int status; void *result; }; /* 客户端发送请求 */ int send_request(struct k_msgq *req_queue, struct k_msgq *reply_queue, void *data, size_t len) { static uint8_t next_id 0; struct request_msg req; struct response_msg resp; req.req_id next_id; req.data data; req.data_len len; req.reply_queue reply_queue; /* 发送请求 */ if (k_msgq_put(req_queue, req, K_MSEC(100)) ! 0) { return -ETIMEDOUT; } /* 等待响应 */ if (k_msgq_get(reply_queue, resp, K_SECONDS(5)) ! 0) { return -ETIMEDOUT; } if (resp.req_id req.req_id resp.status 0) { return 0; /* 成功 */ } return -EIO; } /* 模式2批处理模式 */ #define BATCH_SIZE 8 struct batched_data { int samples[BATCH_SIZE]; uint8_t count; uint32_t timestamp; }; void batch_collector(struct k_msgq *input_queue, struct k_msgq *output_queue) { struct batched_data batch {.count 0}; int sample; int ret; while (1) { /* 收集单个样本 */ ret k_msgq_get(input_queue, sample, K_MSEC(10)); if (ret 0) { if (batch.count 0) { batch.timestamp k_uptime_get(); } batch.samples[batch.count] sample; /* 批次已满发送处理 */ if (batch.count BATCH_SIZE) { k_msgq_put(output_queue, batch, K_FOREVER); batch.count 0; /* 重置批次 */ } } else if (batch.count 0) { /* 超时且批次有部分数据发送不完整批次 */ k_msgq_put(output_queue, batch, K_FOREVER); batch.count 0; } } }4.2 错误处理与恢复/* 健壮的消息队列操作 */ #define MAX_SEND_RETRIES 3 #define SEND_TIMEOUT_MS 100 int robust_msgq_put(struct k_msgq *msgq, const void *data, const char *context) { int retry 0; int ret; while (retry MAX_SEND_RETRIES) { ret k_msgq_put(msgq, data, K_MSEC(SEND_TIMEOUT_MS)); if (ret 0) { return 0; /* 成功 */ } retry; /* 指数退避 */ k_sleep(K_MSEC(10 * (1 (retry - 1)))); printk(%s: 发送重试 %d/%d (错误: %d)\n, context, retry, MAX_SEND_RETRIES, ret); } /* 所有重试失败 */ printk(%s: 发送失败丢弃数据\n, context); return -EAGAIN; } /* 队列健康检查 */ int check_msgq_health(struct k_msgq *msgq, const char *name) { size_t used k_msgq_num_used_get(msgq); size_t free k_msgq_num_free_get(msgq); /* 检查队列是否接近满 */ if (used 0 free 0) { printk(警告: %s 队列已满!\n, name); return -ENOSPC; } /* 检查队列是否长时间满 */ static int64_t last_full_time 0; if (free 0) { int64_t now k_uptime_get(); if (last_full_time 0) { last_full_time now; } else if (now - last_full_time 5000) { /* 5秒 */ printk(警告: %s 队列持续满 5秒以上\n, name); return -EBUSY; } } else { last_full_time 0; } return 0; }4.3 性能优化技巧/* 技巧1避免动态内存分配 */ /* 使用静态消息池而不是每次分配 */ #define MSG_POOL_SIZE 20 struct my_message msg_pool[MSG_POOL_SIZE]; int msg_pool_index 0; struct my_message *alloc_message(void) { struct my_message *msg msg_pool[msg_pool_index]; msg_pool_index (msg_pool_index 1) % MSG_POOL_SIZE; return msg; } /* 技巧2内存对齐优化 */ /* 确保消息结构体正确对齐 */ struct __aligned(8) aligned_data { uint64_t timestamp; /* 需要8字节对齐 */ float values[4]; uint32_t flags; }; /* 技巧3批量操作减少上下文切换 */ void batch_send_sensor_data(struct k_msgq *msgq, const struct sensor_data *data_array, size_t count) { for (size_t i 0; i count; i) { /* 非阻塞发送能发多少发多少 */ if (k_msgq_put(msgq, data_array[i], K_NO_WAIT) ! 0) { printk(批处理: 发送 %zu/%zu 条数据后队列满\n, i, count); break; } } }4.4 调试函数与工具/* 调试检查消息队列状态 */ void debug_msgq_info(struct k_msgq *msgq, const char *name) { #ifdef CONFIG_MSGQ_DUMP struct k_msgq_attrs attrs; k_msgq_get_attrs(msgq, attrs); printk( %s 调试信息 \n, name); printk( 消息大小: %zu 字节\n, attrs.msg_size); printk( 最大消息数: %zu\n, attrs.max_msgs); printk( 已使用: %zu\n, k_msgq_num_used_get(msgq)); printk( 空闲: %zu\n, k_msgq_num_free_get(msgq)); /* 检查是否有等待的线程 */ #ifdef CONFIG_WAITQ_MAX_MONITOR_COUNT printk( 等待发送的线程: %p\n, msgq-send_waitq.waitq); printk( 等待接收的线程: %p\n, msgq-recv_waitq.waitq); #endif #endif } /* 定期监控所有消息队列 */ void monitor_all_msgqs(void) { /* 这里可以维护一个全局的消息队列列表 */ static struct k_msgq *registered_msgqs[10]; static size_t msgq_count 0; for (size_t i 0; i msgq_count; i) { debug_msgq_info(registered_msgqs[i], 监控); } }5 应用和总结5.1 常见问题与解决方案问题症状解决方案队列满导致数据丢失频繁返回-ENOMSG或-EAGAIN1. 增加队列深度2. 提高消费者处理速度3. 实现数据丢弃策略内存对齐错误硬件异常或数据损坏1. 使用__aligned属性2. 正确设置对齐参数3. 检查结构体填充死锁线程永久阻塞1. 设置合理超时2. 使用k_msgq_purge()清空队列3. 检查生产者-消费者速率匹配性能瓶颈CPU使用率高1. 减少消息频率2. 使用批处理3. 优化消息大小中断上下文限制中断中发送失败1. 确保使用K_NO_WAIT2. 增加队列深度减少满的情况3. 实现事件丢失处理5.2 使用技巧1 Zephyr 的消息队列 (k_msgq) 是一个可靠、线程安全、易于使用的线程间通信机制特别适合结构化数据传递传感器读数、控制命令、事件通知中断到线程通信外设中断快速传递数据到处理线程生产者-消费者模式多线程任务分发和处理请求-响应模式需要确认的交互式通信2关键优势数据所有权清晰复制语义无需担心数据生命周期原子性保证消息不会被分割灵活的阻塞控制支持阻塞、非阻塞、超时等待中断安全可在中断中非阻塞发送3使用建议优先选择静态初始化(K_MSGQ_DEFINE)合理设置队列深度根据生产消费速率差 安全余量始终检查返回值特别是k_msgq_put在中断中的调用注意内存对齐尤其是包含uint64_t、double的结构体监控队列状态定期检查使用率预防队列满