Implementing a Message Queue in 300 Lines of Code
Message queues aren't rocket science—you can build a Tiny MQ too!
When working on the dictionary lookup interface for Cipei (词焙), if a word is not in the dictionary, an external API needs to be called to update the library. Currently, calls to large model APIs take a long time (more than 5 seconds) and are unpredictable, so this needs to be done asynchronously.
This scenario is well-suited for a message queue (RabbitMQ, Kafka, Redis PUB/SUB, etc.). However, to keep the architecture as simple as possible and avoid introducing middleware lightly, and considering that Go channels are very suitable for passing messages between different goroutines, I encapsulated a simple message queue using channels: supporting multiple queues, multiple consumers, and the entire encapsulation is less than 300 lines of code.
Message Definition
A message queue first needs messages. The simplest message is just an ID + data:
type Msg struct {
ID uint64
Data any
}
A self-incrementing sequence of type uint64 is enough for the message ID.
Queue Definition
With messages, next we can define the queue.
The simplest queue uses a slice to store messages, then adds Enqueue and Dequeue:
type Queue struct {
msgs []*Msg
}
func (q *Queue) Enqueue(msg *Msg) error
func (q *Queue) Dequeue() *Msg
But this will cause problems with concurrency. You might think of using a sync.Lock to add a lock, which is feasible, but you can use chan to implement a lock-free queue. The principle is: add in and out channels, and in the same goroutine, put messages received from in into msgs, and use out to send messages for de-queuing.
The definition of Queue is as follows:
type Queue struct {
wg sync.WaitGroup
in chan *Msg
out chan *Msg
msgs []*Msg
}
// Enqueue adds a message to the queue
func (q *Queue) Enqueue(msg *Msg) error {
if q.in == nil {
return errors.New("queue: queue is closed")
}
q.in <- msg
return nil
}
// Dequeue returns the out channel for messages
func (q *Queue) Dequeue() <-chan *Msg {
return q.out
}
// Start launches queue processing
func (q *Queue) Start() {
q.wg.Add(1)
go q.run()
}
// Wait waits for the queue to be consumed
func (q *Queue) Wait() {
q.wg.Wait()
}
// Close closes the queue (can only dequeue, cannot enqueue)
func (q *Queue) Close() {
if q.in != nil {
close(q.in)
}
}
// Message forwarding
func (q *Queue) run() {
defer q.wg.Done()
var nextMsg *Msg
var out chan *Msg
for {
if len(q.msgs) > 0 {
nextMsg = q.msgs[0]
out = q.out
} else {
nextMsg = nil
out = nil
}
select {
case msg, ok := <-q.in:
if !ok {
// channel is closed
q.in = nil
} else {
q.msgs = append(q.msgs, msg)
}
case out <- nextMsg:
q.msgs = q.msgs[1:]
}
if q.in == nil && len(q.msgs) == 0 {
close(q.out)
return
}
}
}
Then add a New function:
func NewQueue(inBufSize, outBufSize int) *Queue {
if inBufSize <= 0 {
inBufSize = 1
}
if outBufSize <= 0 {
outBufSize = 1
}
return &Queue{
in: make(chan *Msg, inBufSize),
out: make(chan *Msg, outBufSize),
msgs: make([]*Msg, 0, 1024),
}
}
Message Queue
With queues, we can implement a message queue. As mentioned earlier, this message queue needs two features: supporting multiple queues and supporting multiple consumers. This means an MQ instance allows multiple Queue instances, i.e., map[string]*Queue, and each queue supports multiple consumers, meaning multiple consumer goroutines need to be started after each Queue is launched.
First, define the consumer function signature:
type ConsumerFunc func(ctx context.Context, msg *Msg) error
Then bind Queue and ConsumerFunc to the same struct:
type Q struct {
mu sync.Mutex
// Message queue
queue *Queue
// Consumer function
consumer ConsumerFunc
// Concurrency
concurrency int
// Current message sequence number
seq uint64
}
Then we can implement our MQ:
type MQ struct {
wg sync.WaitGroup
mu sync.Mutex
running bool
queues map[string]*Q
// A previous article introduced contract.Logger. It doesn't affect reading the code here,
// so I won't repeat it. If you're interested, you can check the last article:
// "Farewell Redis/MySQL: Implementing a Persistent Set in 100 Lines of Go", link at the end.
log contract.Logger
}
func NewMQ(log contract.Logger) *MQ {
return &MQ{
queues: make(map[string]*Q),
log: log,
}
}
// Register a queue
func (m *MQ) RegisterQueue(name string, consumer ConsumerFunc, concurrency int) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.running {
return errors.New("mq: cannot register queue while MQ is running")
}
// Avoid duplicate registration
if _, exists := m.queues[name]; exists {
return errors.New("mq: queue already registered")
}
m.queues[name] = &Q{
queue: NewQueue(concurrency, concurrency),
consumer: consumer,
concurrency: concurrency,
}
return nil
}
// Publish publishes a message
func (m *MQ) Publish(queueName string, data any) error {
m.mu.Lock()
q, exists := m.queues[queueName]
m.mu.Unlock()
if !exists {
return errors.New("mq: queue not found")
}
q.mu.Lock()
defer q.mu.Unlock()
msg := &Msg{
ID: q.seq,
Data: data,
}
q.seq++
return q.queue.Enqueue(msg)
}
// Start launches MQ processing
func (s *MQ) Start(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.running {
return errors.New("mq: MQ is already running")
}
s.running = true
for name, q := range s.queues {
q.queue.Start()
for workerID := 0; workerID < q.concurrency; workerID++ {
s.wg.Add(1)
go s.consume(ctx, name, workerID, q)
}
s.log.Infof("Started MQ queue %s with concurrency %d", name, q.concurrency)
}
return nil
}
func (s *MQ) Stop() {
s.mu.Lock()
if !s.running {
s.mu.Unlock()
return
}
s.running = false
for _, q := range s.queues {
q.queue.Close()
}
s.mu.Unlock()
s.wg.Wait()
}
func (s *MQ) consume(ctx context.Context, queueName string, workerID int, q *Q) {
defer s.wg.Done()
for {
defer func() {
if r := recover(); r != nil {
s.log.Errorf("Recovered from panic in queue[%s] worker[%d]: %v", queueName, workerID, r)
}
}()
select {
case <-ctx.Done():
s.log.Infof("Context canceled. Stopping consumer for queue[%s] worker[%d]", queueName, workerID)
q.queue.Close()
case msg, ok := <-q.queue.Dequeue():
if !ok {
// Queue closed, exit
s.log.Infof("Queue[%s] worker[%d] is stopping as the queue is closed", queueName, workerID)
return
}
if err := q.consumer(ctx, msg); err != nil {
// Failed to process message, log it or handle otherwise
s.log.Errorf("Failed to process message ID[%d] from queue[%s] worker[%d]: %v", msg.ID, queueName, workerID, err)
}
}
}
}
So far, our simple message queue is implemented. Using it is also very easy:
func main() {
logger := NewLogger("debug")
mq := NewMQ(logger)
ctx := context.TODO()
consumer1 := func(ctx context.Context, msg *Msg) error {
logger.Infof("Consumer1 processing message ID: %d, Data: %v", msg.ID, msg.Data)
return nil
}
err := mq.RegisterQueue("queue1", consumer1, 2)
if err != nil {
logger.Errorf("Failed to register queue1: %v", err)
return
}
err = mq.Start(ctx)
if err != nil {
logger.Errorf("Failed to start MQ: %v", err)
return
}
// Then the producer publishes messages:
err = mq.Publish("queue1", "message for queue1")
if err != nil {
logger.Errorf("Failed to push message to queue1: %v", err)
}
}
Extension
This simple message queue currently has an issue: when the process exits unexpectedly, the queue might not be cleared, and there’s a certain probability of data loss. Since my current scenario allows data loss, I haven’t added this logic yet. However, it’s not difficult; the simplest way is to dump msgs to disk upon Close(), and also load previous data from disk when starting the Queue, referring to the principle in the previous article. Of course, for those with higher reliability requirements, WAL (Write-ahead logging) could be used. In short, get it running first, then iterate based on issues encountered.