BY Y!an - 2025年10月23日

300 行代码实现一个消息队列

消息队列其实并不复杂,你也能实现一个 Tiny MQ

300 行代码实现一个消息队列

在做词焙查词接口时,如果词库不存在这个单词,则需要调用外部 API 更新词库,而目前用的大模型 API 调用时长较长(超过 5 秒)且不可控,故需要异步去做这件事。

这种场景很适合使用消息队列(RabbitMQ、Kafka、Redis PUB/SUB 等等),但为了使架构尽可能简单,不想轻易引入中间件,加上考虑到 Go channel 很适合在不同协程之间传递消息,于是使用 channel 封装了一个简易的消息队列:支持多队列、支持多消费者,而且整个封装才不到 300 行代码。

消息定义

消息队列,首先要有消息。最简单的一个消息就是 ID + 数据:

type Msg struct {
	ID   uint64
	Data any
}

消息 ID 用一个 uint64 类型的自增序列足矣。

队列定义

有了消息,接下来就可以定义队列了。

最简单的队列用一个 slice 用于保存消息,然再加上 EnqueueDequeue 即可:

type Queue struct {
	msgs []*Msg
}

func (q *Queue) Enqueue(msg *Msg) error
func (q *Queue) Dequeue() *Msg

但这样在并发的时候会出现问题,你可能会想到用一个 sync.Lock 来加锁,可行,但可以利用 chan 来实现无锁队列,具体原理是:新增 inout 两个 chan,然后在同一个协程里将 in 接收到的消息放入 msgs,用 out 来发送出队消息。

Queue 的定义如下:

type Queue struct {
	wg sync.WaitGroup

	in  chan *Msg
	out chan *Msg

	msgs []*Msg
}

// 消息入队
func (q *Queue) Enqueue(msg *Msg) error {
	if q.in == nil {
		return errors.New("queue: queue is closed")
	}
	q.in <- msg
	return nil
}

// 消息出队
func (q *Queue) Dequeue() <-chan *Msg {
	return q.out
}

// 启动队列处理
func (q *Queue) Start() {
	q.wg.Add(1)
	go q.run()
}

// 等待队列消费
func (q *Queue) Wait() {
	q.wg.Wait()
}

// 关闭队列(只能出队,不能入队)
func (q *Queue) Close() {
	if q.in != nil {
		close(q.in)
	}
}

// 消息转发
func (q *Queue) run() {
	defer q.wg.Done()

	var nextMsg *Msg
	var out chan *Msg

	for {
		if len(q.msgs) > 0 {
			nextMsg = q.msgs[0]
			out = q.out
		} else {
			nextMsg = nil
			out = nil
		}

		select {
		case msg, ok := <-q.in:
			if !ok {
				// channel is closed
				q.in = nil
			} else {
				q.msgs = append(q.msgs, msg)
			}
		case out <- nextMsg:
			q.msgs = q.msgs[1:]
		}

		if q.in == nil && len(q.msgs) == 0 {
			close(q.out)
			return
		}
	}
}

然后再增加一个 New 函数:

func NewQueue(inBufSize, outBufSize int) *Queue {
	if inBufSize <= 0 {
		inBufSize = 1
	}
	if outBufSize <= 0 {
		outBufSize = 1
	}

	return &Queue{
		in:   make(chan *Msg, inBufSize),
		out:  make(chan *Msg, outBufSize),
		msgs: make([]*Msg, 0, 1024),
	}
}

消息队列

有了队列,那我们就可以实现消息队列了。前面我们提到,这个消息队列要实现的两个功能:支持多队列支持多消费者。那也就是说一个 MQ 实例里面允许有多个 Queue 实例,即 map[string]*Queue,然后每个队列是支持多个消费者的,即每个 Queue 启动之后需要开启多个消费者协程。

先定义消费者函数签名:

type ConsumerFunc func(ctx context.Context, msg *Msg) error

再把 QueueConsumerFunc 绑定到同一个结构体:

type Q struct {
	mu sync.Mutex

	// 消息队列
	queue *Queue
	// 消费者函数
	consumer ConsumerFunc
	// 并发数
	concurrency int
	// 当前消息序号
	seq uint64
}

然后就可以实现我们的 MQ 了:

type MQ struct {
	wg sync.WaitGroup
	mu sync.Mutex

	running bool

	queues map[string]*Q

	// 先前的文章有介绍过 contract.Logger,这里不影响代码阅读,不再赘述,
	// 感兴趣可以看上篇文章:《告别 Redis/MySQL:用一百行 Go 代码实现持久化 Set》,链接见文末
	log contract.Logger
}

func NewMQ(log contract.Logger) *MQ {
	return &MQ{
		queues: make(map[string]*Q),
		log:    log,
	}
}

// 注册一个队列
func (m *MQ) RegisterQueue(name string, consumer ConsumerFunc, concurrency int) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	if m.running {
		return errors.New("mq: cannot register queue while MQ is running")
	}

	// 避免重复注册
	if _, exists := m.queues[name]; exists {
		return errors.New("mq: queue already registered")
	}

	m.queues[name] = &Q{
		queue:       NewQueue(concurrency, concurrency),
		consumer:    consumer,
		concurrency: concurrency,
	}
	return nil
}

// 发布消息
func (m *MQ) Publish(queueName string, data any) error {
	m.mu.Lock()
	q, exists := m.queues[queueName]
	m.mu.Unlock()

	if !exists {
		return errors.New("mq: queue not found")
	}

	q.mu.Lock()
	defer q.mu.Unlock()
	msg := &Msg{
		ID:   q.seq,
		Data: data,
	}
	q.seq++

	return q.queue.Enqueue(msg)
}

// 启动 MQ 处理
func (s *MQ) Start(ctx context.Context) error {
	s.mu.Lock()
	defer s.mu.Unlock()

	if s.running {
		return errors.New("mq: MQ is already running")
	}

	s.running = true

	for name, q := range s.queues {
		q.queue.Start()
		for workerID := 0; workerID < q.concurrency; workerID++ {
			s.wg.Add(1)
			go s.consume(ctx, name, workerID, q)
		}
		s.log.Infof("Started MQ queue %s with concurrency %d", name, q.concurrency)
	}
	return nil
}

func (s *MQ) Stop() {
	s.mu.Lock()

	if !s.running {
		s.mu.Unlock()
		return
	}

	s.running = false

	for _, q := range s.queues {
		q.queue.Close()
	}
	s.mu.Unlock()

	s.wg.Wait()
}

func (s *MQ) consume(ctx context.Context, queueName string, workerID int, q *Q) {
	defer s.wg.Done()

	for {
		defer func() {
			if r := recover(); r != nil {
				s.log.Errorf("Recovered from panic in queue[%s] worker[%d]: %v", queueName, workerID, r)
			}
		}()

		select {
		case <-ctx.Done():
			s.log.Infof("Context canceled. Stopping consumer for queue[%s] worker[%d]", queueName, workerID)
			q.queue.Close()

		case msg, ok := <-q.queue.Dequeue():
			if !ok {
				// 队列已关闭,退出
				s.log.Infof("Queue[%s] worker[%d] is stopping as the queue is closed", queueName, workerID)
				return
			}
			if err := q.consumer(ctx, msg); err != nil {
				// 处理消息失败,记录日志或进行其他处理
				s.log.Errorf("Failed to process message ID[%d] from queue[%s] worker[%d]: %v", msg.ID, queueName, workerID, err)
			}
		}
	}
}

至此,我们这个简易消息队列就实现了。使用方法也很简单:

func main() {
	logger := NewLogger("debug")
	mq := NewMQ(logger)

	ctx := context.TODO()

	consumer1 := func(ctx context.Context, msg *Msg) error {
		logger.Infof("Consumer1 processing message ID: %d, Data: %v", msg.ID, msg.Data)
		return
	}

	err := mq.RegisterQueue("queue1", consumer1, 2)
	if err != nil {
		logger.Errorf("Failed to register queue1: %v", err)
		return
	}

	err = mq.Start(ctx)
	if err != nil {
		logger.Errorf("Failed to start MQ: %v", err)
		return
	}

	// 然后生产者发布消息:
	err = mq.Publish("queue1", "message for queue1")
	if err != nil {
		logger.Errorf("Failed to push message to queue1: %v", err)
	}
}

扩展

这个简易消息队列目前其实存在一个问题,就是当进程意外退出的时候,队列可能没有被清空,有一定概率丢数据。因为我现在的场景是允许丢数据的,所以暂时没加上这个逻辑,不过其实也不麻烦,最简单的方式只需要在 Close() 的时候把 msgs 直接 dump 到磁盘,启动 Queue 的时候也先从磁盘加载先前的数据,可以参考上一篇文章的原理;当然对可靠性要求更高的也可以用上 WAL(Write-ahead logging),总之先跑起来,然后根据问题继续迭代



如果你觉得文章对你有些帮助,可以请我喝杯咖啡 ↓