nsqd持久化使用 go-diskqueue 包实现
diskqueue包实现: 当nsqd进程退出时,将内存中的数据写入到磁盘 当nsqd进程启动时,将磁盘中的数据读入到内存 在nsqd运行过程中,定时(默认2秒)将内存中的数据写入到磁盘
go get github.com/nsqio/go-diskqueuego-diskqueue/diskqueue.go
type Interface interface { Put([]byte) error // 消息生产接口 ReadChan() chan []byte // 消息消费接口 // this is expected to be an *unbuffered* channel Close() error // 队列关闭接口 Delete() error // 消息删除接口 Depth() int64 // 消息长度接口 Empty() error // 清空队列接口 }如上实例化操作过程中,看到有两步操作,调用d.retrieveMetaData()和d.ioLoop()
在ioLoop中使用select IO多路复用,频繁读写操作避免了使用锁的频繁调用
// ioLoop:队列的定时读写操作 // 定时调用sync函数将内存中的消息刷新到磁盘 func (d *diskQueue) ioLoop() { var dataRead []byte var err error var count int64 // 计数器变量 var r chan []byte syncTicker := time.NewTicker(d.syncTimeout) for { // dont sync all the time :) // count计数器打到d.syncEvery的数量时,设置d.needSync为true则执行同步操作 if count == d.syncEvery { d.needSync = true } // needSync变量控制是否需要同步,同步完成后该变量置为false if d.needSync { err = d.sync() if err != nil { d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err) } count = 0 } // 检测当前是否有数据需要被读取 // 条件成立:执行d.readOne()并将结果放入dataRead中,然后设置r为d.readChan // 条件不成立:将r设置为nil if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { if d.nextReadPos == d.readPos { dataRead, err = d.readOne() if err != nil { d.logf("ERROR: reading from diskqueue(%s) at %d of %s - %s", d.name, d.readPos, d.fileName(d.readFileNum), err) d.handleReadError() continue } } r = d.readChan } else { r = nil } select { // the Go channel spec dictates that nil channel operations (read or write) // in a select are skipped, we set r to d.readChan only when there is data to read // 在注释中作者写了这是一个Golang的特性 // 如果r不为空,则会将dataRead送入go channel。进入d.readChan的消息通过ReadChan函数向外暴露,最终被Topic/Channel的消息循环读取。 // 而如果r为空,则这个分支会被跳过。这个特性的使用统一了select的逻辑,简化了当数据为空时的判断。 case r <- dataRead: // 消息投递 count++ // moveForward sets needSync flag if a file is removed // 消息投递成功后的操作 d.moveForward() case <-d.emptyChan: // 执行清空操作时,文件全被删除,count计数器重置为0 d.emptyResponseChan <- d.deleteAllFiles() count = 0 case dataWrite := <-d.writeChan: // 消息写入则count计数器自增 count++ d.writeResponseChan <- d.writeOne(dataWrite) case <-syncTicker.C: // 每隔syncTimeout时间同步一次 if count == 0 { // avoid sync when there's no activity continue } d.needSync = true case <-d.exitChan: // 退出ioLook goto exit } } exit: d.logf("DISKQUEUE(%s): closing ... ioLoop", d.name) syncTicker.Stop() d.exitSyncChan <- 1 }在ioLoop中调用d.writeOne
case dataWrite := <-d.writeChan: // 消息写入则count计数器自增 count++ d.writeResponseChan <- d.writeOne(dataWrite)writeOne:将消息写入磁盘
func (d *diskQueue) writeOne(data []byte) error { var err error // 文件不存在则创建并设置新文件的偏移位置为0 if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return err } d.logf("DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName) if d.writePos > 0 { _, err = d.writeFile.Seek(d.writePos, 0) if err != nil { d.writeFile.Close() d.writeFile = nil return err } } } // 获取消息的长度 dataLen := int32(len(data)) // 检查消息的最小最大长度 if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize) } // 清空缓冲区 d.writeBuf.Reset() // 使用binary.Read编码 err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { return err } // 写入消息到缓冲区 _, err = d.writeBuf.Write(data) if err != nil { return err } // only write to the file once // 将缓冲区数据写入文件 _, err = d.writeFile.Write(d.writeBuf.Bytes()) if err != nil { d.writeFile.Close() d.writeFile = nil return err } // 将d.writePos写入位置偏移4+dataLen长度作为下次写入位置,加4是因为消息长度本身也占4字节 totalBytes := int64(4 + dataLen) d.writePos += totalBytes atomic.AddInt64(&d.depth, 1) // 如果当前写入位置大于每个文件的大小则下次写入时更换新文件,防止单个数据文件过大 if d.writePos > d.maxBytesPerFile { // 重置文件编号和偏移位置 d.writeFileNum++ d.writePos = 0 // sync every time we start writing to a new file // 将内存中数据同步到磁盘并更新元数据文件 err = d.sync() if err != nil { d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err) } if d.writeFile != nil { d.writeFile.Close() d.writeFile = nil } } return err }sync:将内存中数据同步到磁盘并更新元数据文件
func (d *diskQueue) sync() error { if d.writeFile != nil { err := d.writeFile.Sync() if err != nil { d.writeFile.Close() d.writeFile = nil return err } } err := d.persistMetaData() if err != nil { return err } d.needSync = false return nil }persistMetaData:将队列状态信息保存到元数据文件
// 逻辑跟retrieveMetaData类似 func (d *diskQueue) persistMetaData() error { var f *os.File var err error fileName := d.metaDataFileName() tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int()) // write to tmp file f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return err } _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", atomic.LoadInt64(&d.depth), d.readFileNum, d.readPos, d.writeFileNum, d.writePos) if err != nil { f.Close() return err } f.Sync() f.Close() // atomically rename return os.Rename(tmpFileName, fileName) }在ioLoop中写入消息到readChan管道
case r <- dataRead: // 消息投递 count++ // moveForward sets needSync flag if a file is removed // 消息投递成功后的操作 d.moveForward()消息投递成功后的操作
// moveForward:当消息投递成功后, // 将保存在d.nextReadPos和d.nextReadFileNum中的值赋值给d.readPos和d.readFileNum func (d *diskQueue) moveForward() { oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos depth := atomic.AddInt64(&d.depth, -1) // see if we need to clean up the old file // 删除已经读完的旧文件 if oldReadFileNum != d.nextReadFileNum { // sync every time we start reading from a new file d.needSync = true fn := d.fileName(oldReadFileNum) err := os.Remove(fn) if err != nil { d.logf("ERROR: failed to Remove(%s) - %s", fn, err) } } // 检查文件是否有错 d.checkTailCorruption(depth) } // 检查文件是否有错 // 如果有错误,则调用skipToNextRWFile重置读取和写入的文件编号和位置。 func (d *diskQueue) checkTailCorruption(depth int64) { if d.readFileNum < d.writeFileNum || d.readPos < d.writePos { return } // we've reached the end of the diskqueue // if depth isn't 0 something went wrong if depth != 0 { if depth < 0 { d.logf( "ERROR: diskqueue(%s) negative depth at tail (%d), metadata corruption, resetting 0...", d.name, depth) } else if depth > 0 { d.logf( "ERROR: diskqueue(%s) positive depth at tail (%d), data loss, resetting 0...", d.name, depth) } // force set depth 0 atomic.StoreInt64(&d.depth, 0) d.needSync = true } if d.readFileNum != d.writeFileNum || d.readPos != d.writePos { if d.readFileNum > d.writeFileNum { d.logf( "ERROR: diskqueue(%s) readFileNum > writeFileNum (%d > %d), corruption, skipping to next writeFileNum and resetting 0...", d.name, d.readFileNum, d.writeFileNum) } if d.readPos > d.writePos { d.logf( "ERROR: diskqueue(%s) readPos > writePos (%d > %d), corruption, skipping to next writeFileNum and resetting 0...", d.name, d.readPos, d.writePos) } d.skipToNextRWFile() d.needSync = true } }skipToNextRWFile:重置读取和写入的文件编号和位置
func (d *diskQueue) skipToNextRWFile() error { var err error // 关闭读文件句柄 if d.readFile != nil { d.readFile.Close() d.readFile = nil } // 关闭读文件句柄 if d.writeFile != nil { d.writeFile.Close() d.writeFile = nil } for i := d.readFileNum; i <= d.writeFileNum; i++ { // 获取数据文件名称 fn := d.fileName(i) innerErr := os.Remove(fn) if innerErr != nil && !os.IsNotExist(innerErr) { d.logf("ERROR: diskqueue(%s) failed to remove data file - %s", d.name, innerErr) err = innerErr } } // 重新初始化数据文件的信息 d.writeFileNum++ d.writePos = 0 d.readFileNum = d.writeFileNum d.readPos = 0 d.nextReadFileNum = d.writeFileNum d.nextReadPos = 0 atomic.StoreInt64(&d.depth, 0) return err }原子操作,返回队列中消息的数量
func (d *diskQueue) Depth() int64 { return atomic.LoadInt64(&d.depth) }在ioLoop中执行清空操作
case <-d.emptyChan: // 执行清空操作时,文件全被删除,count计数器重置为0 d.emptyResponseChan <- d.deleteAllFile count = 0移除所有数据文件
// 当触发Empty操作时,移除所有元数据文件 func (d *diskQueue) deleteAllFiles() error { err := d.skipToNextRWFile() innerErr := os.Remove(d.metaDataFileName()) if innerErr != nil && !os.IsNotExist(innerErr) { d.logf("ERROR: diskqueue(%s) failed to remove metadata file - %s", d.name, innerErr) return innerErr } return err }