BY Y!an - 2025年9月8日

告别 Redis/MySQL:用一百行 Go 代码实现持久化 Set

杀鸡焉用牛刀!

问题出现

在做词焙词库更新的时候遇到一个问题:如果某一个单词是一个非法的单词,那就需要进行标记,之后再次遇到的时候可以直接跳过。

这个方案要实现的话,可能第一时间会想到用 Redis 的 Set;或者数据库里加一张表,一行一个非法单词。

但是词焙本身是没有用到 Redis 的,如果要用还得配置下内存淘汰策略;这么简单的需求放数据库的话又有点杀鸡用牛刀了。

所以我选择了直接使用内存 + 定期持久化到文件,整个技术方案不难,加起来就一百行左右的代码。

基础功能

虽然整个技术方案不复杂,但我们也拆解需求,逐步完善。

业务侧关心的方法:Add(新增)和 Exist(判断是否存在),至于怎么持久化的细节是业务侧不太关心的,所以我们先实现最简单的集合:

type PersistentList struct {
	records map[string]struct{} // 内存中的数据集合

	mu sync.RWMutex  // 用于并发控制的读写锁
}

// Add 将 value 添加到缓存中。
func (c *PersistentList) Add(value string) {
	c.mu.Lock()
	defer c.mu.Unlock()

	if _, found := c.records[value]; !found {
		c.records[value] = struct{}{}
	}
}

// Exist 检查 value 是否存在于缓存中。
func (c *PersistentList) Exist(value string) bool {
	c.mu.RLock()
	_, found := c.records[value]
	c.mu.RUnlock()
	return found
}

到这里,最简单的内存集合就实现了,接下来我们增加持久化功能!

持久化

我们可以将集合逐行保存到文本文件,这么做有两个好处:一是进程启动时加载集合的逻辑很简单,二是持久化的时候即使每 Add 一个元素就直接写文件,因为是追加写,性能也还是可以的。

修改结构体定义,新增 peddingfile,以及再增加一个 chan 用于发送退出信号:

type PersistentList struct {
	records map[string]struct{} // 内存中的数据集合
	pending []string            // 待持久化的新数据

	mu    sync.RWMutex  // 用于并发控制的读写锁
	file  *os.File      // 持久化文件句柄
	close chan struct{} // 用于关闭后台任务的信号

	log contract.Logger
}

我们将读取数据和写入数据这两个逻辑分开实现:

从文件加载数据

如前面所说,加载数据是逐行读取然后放进 records 里就行,我们把这个逻辑放在实例化时进行:

// contract.Logger 的定义见文末
func NewPersistentList(dataFile string, log contract.Logger) (*PersistentList, error) {
	file, err := os.OpenFile(dataFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
	if err != nil {
		return nil, err
	}

	cache := &PersistentList{
		records: make(map[string]struct{}),
		file:    file,
		close:   make(chan struct{}),
		log:     log,
	}

	scanner := bufio.NewScanner(file)
	for scanner.Scan() {
		word := scanner.Text()
		if word != "" {
			cache.records[word] = struct{}{}
		}
	}

	if err := scanner.Err(); err != nil && err != io.EOF {
		file.Close()
		return nil, err
	}

	log.Infof("Loaded %d data into persistent cache from %s", len(cache.records), dataFile)
	return cache, nil
}

持久化数据到文件

我们可以每 Add 一个数据就追加写文件,但这里做一个小小的过度设计,批量刷盘,具体做法就是每次 Add 时追加到 pending 数组,后台每分钟写一次磁盘:

// 修改 Add 方法:
func (c *PersistentList) Add(value string) {
	c.mu.Lock()
	defer c.mu.Unlock()

	if _, found := c.records[value]; !found {
		c.records[value] = struct{}{}
		c.pending = append(c.pending, value) // 新增了这行
	}
}

// 新增刷盘方法:
func (c *PersistentList) syncToFile() error {
	c.mu.Lock()
	defer c.mu.Unlock()

	if len(c.pending) == 0 {
		return nil
	}

	count := len(c.pending)
	writer := bufio.NewWriter(c.file)
	for _, word := range c.pending {
		if _, err := writer.WriteString(word + "\n"); err != nil {
			return err
		}
	}

	if err := writer.Flush(); err != nil {
		return err
	}

	// 清空待刷盘数据,准备下一次写入
	c.pending = nil

	c.log.Infof("Successfully synced %d new keys to file.", count)
	return nil
}

// 新增后台任务方法(给进程启动时以协程异步执行):
func (c *PersistentList) Serve(ctx context.Context) error {
	const syncInterval = 1 * time.Minute
	ticker := time.NewTicker(syncInterval)
	defer ticker.Stop()
	defer c.file.Close()

	for {
		select {
		case <-ctx.Done():
			c.log.Warn("Context canceled. Performing final sync...")
			return c.syncToFile()
		case <-ticker.C:
			if err := c.syncToFile(); err != nil {
				return err
			}
		case <-c.close:
			c.log.Warn("Received close signal. Performing final sync...")
			return c.syncToFile()
		}
	}
}

到这里,一个带有持久化功能的 Set 就已经实现了,可以这样使用:

func main() {
	myset, err := NewPersistentList("/path/to/setfile")
	if err != nil {
		panic(err)
	}

	ctx, cancel := context.WithCancel(context.TODO())
	go myset.Serve(ctx)

	myset.Add("asdfgh")
	fmt.Println(myset.Exist("asdfgh"))
	fmt.Println(myset.Exist("qwerty"))

	cancel() // 退出进程时通知刷盘
}

但是上面这个 context.WithCancel 的方法还是略麻烦了,还记得我们刚刚定义的 close 这个 chan 吗,我们可以增加一个 Shutdown 方法:

func (c *PersistentList) Shutdown() {
	c.close <- struct{}{}
}

这样就可以直接使用 myset.Shutdown() 来关闭了

定义

contract.Logger 定义如下:

package contract

type Logger interface {
	Debug(msg string)
	Debugf(msg string, args ...interface{})

	Info(msg string)
	Infof(msg string, args ...interface{})

	Warn(msg string)
	Warnf(msg string, args ...interface{})

	Error(msg string)
	Errorf(msg string, args ...interface{})

	Close() error
}

如果你觉得文章对你有些帮助,可以请我喝杯咖啡 ↓