Go语言Redis Stream实战新一代消息队列1. Redis Stream概述Redis Stream是Redis 5.0引入的数据结构提供了消息持久化、消费组、消息ID等特性是一种轻量级的消息队列方案。2. Stream客户端实现package redisstream import ( context fmt time github.com/redis/go-redis/v9 ) type StreamClient struct { client *redis.Client } func NewStreamClient(addr, password string, db int) (*StreamClient, error) { client : redis.NewClient(redis.Options{ Addr: addr, Password: password, DB: db, }) ctx : context.Background() if err : client.Ping(ctx).Err(); err ! nil { return nil, err } return StreamClient{client: client}, nil } func (s *StreamClient) XAdd(ctx context.Context, stream string, values map[string]interface{}) (string, error) { return s.client.XAdd(ctx, redis.XAddArgs{ Stream: stream, Values: values, }).Result() } func (s *StreamClient) XRead(ctx context.Context, stream string, count int64) ([]redis.XMessage, error) { return s.client.XRead(ctx, redis.XReadArgs{ Streams: []string{stream, 0}, Count: count, Block: time.Second * 5, }).Result() } func (s *StreamClient) XGroupCreate(ctx context.Context, stream, group, start string) error { return s.client.XGroupCreateMkStream(ctx, stream, group, start).Err() } func (s *StreamClient) XReadGroup(ctx context.Context, stream, group, consumer string, count int64) ([]redis.XMessage, error) { return s.client.XReadGroup(ctx, redis.XReadGroupArgs{ Group: group, Consumer: consumer, Streams: []string{stream, }, Count: count, Block: time.Second * 5, }).Result() } func (s *StreamClient) XAck(ctx context.Context, stream, group string, ids ...string) error { return s.client.XAck(ctx, stream, group, ids...).Err() }3. 总结Redis Stream提供了轻量级的消息队列功能适合对消息可靠性要求不是特别高的场景具有部署简单、性能高等优点。