nsq源码分析(3):nsqd之数据持久化

xiaoxiao2021-02-28  85

nsq源码分析(3):nsqd之数据持久化

nsqd持久化使用 go-diskqueue 包实现

diskqueue包实现: 当nsqd进程退出时,将内存中的数据写入到磁盘 当nsqd进程启动时,将磁盘中的数据读入到内存 在nsqd运行过程中,定时(默认2秒)将内存中的数据写入到磁盘

go get github.com/nsqio/go-diskqueue

元数据和数据文件

nsqd.867.dat -> nsqd.dat nsqd.dat top1.diskqueue.000000.dat # top1队列(topic)下的数据文件 top1.diskqueue.meta.dat # top1队列(topic)下的元数据文件

diskqueue对外接口

go-diskqueue/diskqueue.go

type Interface interface { Put([]byte) error // 消息生产接口 ReadChan() chan []byte // 消息消费接口 // this is expected to be an *unbuffered* channel Close() error // 队列关闭接口 Delete() error // 消息删除接口 Depth() int64 // 消息长度接口 Empty() error // 清空队列接口 }

diskqueue结构体

// diskQueue实现了先入先出规则的队列服务 type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) // 数据文件相关信息 readPos int64 // 记录数据文件读的位置 writePos int64 // 记录数据文件写的位置 readFileNum int64 // 当前读文件的编号 writeFileNum int64 // 当前写文件的编号 depth int64 // 队列中消息的数量 sync.RWMutex // instantiation time metadata // 元数据相关 name string // 元数据名称 dataPath string // 元数据的数据目录 maxBytesPerFile int64 // 每个文件的最大字节数,默认100M // currently this cannot change once created minMsgSize int32 // 一条消息的最小长度 maxMsgSize int32 // 一条消息的最大长度 syncEvery int64 // 当写入的消息达到syncEvery时则执行sync操作 // number of writes per fsync syncTimeout time.Duration // 每隔syncTimeout时间执行同步一次 // duration of time per fsync exitFlag int32 // 队列退出标识。比如当删除队列时会将该队列标记为1,阻止其他线程操作该队列 needSync bool // 是否需要同步 // keeps track of the position where we have read // (but not yet sent over readChan) // 读操作是为了投递消息给客户端,如果投递失败则继续使用当前的读取位置再次尝试投递消息 nextReadPos int64 // 记录正在投递的消息的位置 nextReadFileNum int64 // 记录正在投递的消息的文件编号 readFile *os.File // 读文件句柄 writeFile *os.File // 写文件句柄 reader *bufio.Reader // 读文件操作的缓存区 writeBuf bytes.Buffer // 写文件操作的缓存区 // exposed via ReadChan() readChan chan []byte // 获取消息的channel // internal channels writeChan chan []byte // 写入消息的channel writeResponseChan chan error // 返回写入消息的状态 emptyChan chan int // 清空消息的channel emptyResponseChan chan error // 返回清空队列的状态 exitChan chan int // 队列退出的channel exitSyncChan chan int // 队列退出的同步channel,确保ioLoop先退出 logger Logger }

diskqueue实例化过程

func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logger Logger) Interface { d := diskQueue{ name: name, dataPath: dataPath, maxBytesPerFile: maxBytesPerFile, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, readChan: make(chan []byte), writeChan: make(chan []byte), writeResponseChan: make(chan error), emptyChan: make(chan int), emptyResponseChan: make(chan error), exitChan: make(chan int), exitSyncChan: make(chan int), syncEvery: syncEvery, syncTimeout: syncTimeout, logger: logger, } // no need to lock here, nothing else could possibly be touching this instance // 加载元数据信息(这里不需要添加锁,因为只调用一次) err := d.retrieveMetaData() if err != nil && !os.IsNotExist(err) { d.logf("ERROR: diskqueue(%s) failed to retrieveMetaData - %s", d.name, err) } // 队列的定时读写操作 go d.ioLoop() return &d }

如上实例化操作过程中,看到有两步操作,调用d.retrieveMetaData()和d.ioLoop()

加载元数据信息

// retrieveMetaData:从元数据文件中恢复队列的状态。如果元数据文件不存在返回err,如果存在则加载元数据文件中的内容 // top1.diskqueue.meta.dat 元数据文件内容格式只有三行("%d\n%d,%d\n%d,%d\n") // 2 # 队列中消息的数量 // 0,0 # 读文件的编号,偏移位置 // 0,76 # 写文件的编号,偏移位置 func (d *diskQueue) retrieveMetaData() error { var f *os.File var err error fileName := d.metaDataFileName() f, err = os.OpenFile(fileName, os.O_RDONLY, 0600) if err != nil { return err } defer f.Close() var depth int64 _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &depth, &d.readFileNum, &d.readPos, &d.writeFileNum, &d.writePos) if err != nil { return err } atomic.StoreInt64(&d.depth, depth) d.nextReadFileNum = d.readFileNum d.nextReadPos = d.readPos return nil }

队列的定时读写操作

在ioLoop中使用select IO多路复用,频繁读写操作避免了使用锁的频繁调用

// ioLoop:队列的定时读写操作 // 定时调用sync函数将内存中的消息刷新到磁盘 func (d *diskQueue) ioLoop() { var dataRead []byte var err error var count int64 // 计数器变量 var r chan []byte syncTicker := time.NewTicker(d.syncTimeout) for { // dont sync all the time :) // count计数器打到d.syncEvery的数量时,设置d.needSync为true则执行同步操作 if count == d.syncEvery { d.needSync = true } // needSync变量控制是否需要同步,同步完成后该变量置为false if d.needSync { err = d.sync() if err != nil { d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err) } count = 0 } // 检测当前是否有数据需要被读取 // 条件成立:执行d.readOne()并将结果放入dataRead中,然后设置r为d.readChan // 条件不成立:将r设置为nil if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { if d.nextReadPos == d.readPos { dataRead, err = d.readOne() if err != nil { d.logf("ERROR: reading from diskqueue(%s) at %d of %s - %s", d.name, d.readPos, d.fileName(d.readFileNum), err) d.handleReadError() continue } } r = d.readChan } else { r = nil } select { // the Go channel spec dictates that nil channel operations (read or write) // in a select are skipped, we set r to d.readChan only when there is data to read // 在注释中作者写了这是一个Golang的特性 // 如果r不为空,则会将dataRead送入go channel。进入d.readChan的消息通过ReadChan函数向外暴露,最终被Topic/Channel的消息循环读取。 // 而如果r为空,则这个分支会被跳过。这个特性的使用统一了select的逻辑,简化了当数据为空时的判断。 case r <- dataRead: // 消息投递 count++ // moveForward sets needSync flag if a file is removed // 消息投递成功后的操作 d.moveForward() case <-d.emptyChan: // 执行清空操作时,文件全被删除,count计数器重置为0 d.emptyResponseChan <- d.deleteAllFiles() count = 0 case dataWrite := <-d.writeChan: // 消息写入则count计数器自增 count++ d.writeResponseChan <- d.writeOne(dataWrite) case <-syncTicker.C: // 每隔syncTimeout时间同步一次 if count == 0 { // avoid sync when there's no activity continue } d.needSync = true case <-d.exitChan: // 退出ioLook goto exit } } exit: d.logf("DISKQUEUE(%s): closing ... ioLoop", d.name) syncTicker.Stop() d.exitSyncChan <- 1 }

Put接口实现

// Put:写入消息并返回写入的结果 func (d *diskQueue) Put(data []byte) error { d.RLock() defer d.RUnlock() if d.exitFlag == 1 { return errors.New("exiting") } d.writeChan <- data return <-d.writeResponseChan }

在ioLoop中调用d.writeOne

case dataWrite := <-d.writeChan: // 消息写入则count计数器自增 count++ d.writeResponseChan <- d.writeOne(dataWrite)

writeOne:将消息写入磁盘

func (d *diskQueue) writeOne(data []byte) error { var err error // 文件不存在则创建并设置新文件的偏移位置为0 if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return err } d.logf("DISKQUEUE(%s): writeOne() opened %s", d.name, curFileName) if d.writePos > 0 { _, err = d.writeFile.Seek(d.writePos, 0) if err != nil { d.writeFile.Close() d.writeFile = nil return err } } } // 获取消息的长度 dataLen := int32(len(data)) // 检查消息的最小最大长度 if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { return fmt.Errorf("invalid message write size (%d) maxMsgSize=%d", dataLen, d.maxMsgSize) } // 清空缓冲区 d.writeBuf.Reset() // 使用binary.Read编码 err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { return err } // 写入消息到缓冲区 _, err = d.writeBuf.Write(data) if err != nil { return err } // only write to the file once // 将缓冲区数据写入文件 _, err = d.writeFile.Write(d.writeBuf.Bytes()) if err != nil { d.writeFile.Close() d.writeFile = nil return err } // 将d.writePos写入位置偏移4+dataLen长度作为下次写入位置,加4是因为消息长度本身也占4字节 totalBytes := int64(4 + dataLen) d.writePos += totalBytes atomic.AddInt64(&d.depth, 1) // 如果当前写入位置大于每个文件的大小则下次写入时更换新文件,防止单个数据文件过大 if d.writePos > d.maxBytesPerFile { // 重置文件编号和偏移位置 d.writeFileNum++ d.writePos = 0 // sync every time we start writing to a new file // 将内存中数据同步到磁盘并更新元数据文件 err = d.sync() if err != nil { d.logf("ERROR: diskqueue(%s) failed to sync - %s", d.name, err) } if d.writeFile != nil { d.writeFile.Close() d.writeFile = nil } } return err }

sync:将内存中数据同步到磁盘并更新元数据文件

func (d *diskQueue) sync() error { if d.writeFile != nil { err := d.writeFile.Sync() if err != nil { d.writeFile.Close() d.writeFile = nil return err } } err := d.persistMetaData() if err != nil { return err } d.needSync = false return nil }

persistMetaData:将队列状态信息保存到元数据文件

// 逻辑跟retrieveMetaData类似 func (d *diskQueue) persistMetaData() error { var f *os.File var err error fileName := d.metaDataFileName() tmpFileName := fmt.Sprintf("%s.%d.tmp", fileName, rand.Int()) // write to tmp file f, err = os.OpenFile(tmpFileName, os.O_RDWR|os.O_CREATE, 0600) if err != nil { return err } _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", atomic.LoadInt64(&d.depth), d.readFileNum, d.readPos, d.writeFileNum, d.writePos) if err != nil { f.Close() return err } f.Sync() f.Close() // atomically rename return os.Rename(tmpFileName, fileName) }

ReadChan接口实现

直接给调用方返回readChan管道 // ReadChan:读取消息 func (d *diskQueue) ReadChan() chan []byte { return d.readChan }

在ioLoop中写入消息到readChan管道

case r <- dataRead: // 消息投递 count++ // moveForward sets needSync flag if a file is removed // 消息投递成功后的操作 d.moveForward()

消息投递成功后的操作

// moveForward:当消息投递成功后, // 将保存在d.nextReadPos和d.nextReadFileNum中的值赋值给d.readPos和d.readFileNum func (d *diskQueue) moveForward() { oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos depth := atomic.AddInt64(&d.depth, -1) // see if we need to clean up the old file // 删除已经读完的旧文件 if oldReadFileNum != d.nextReadFileNum { // sync every time we start reading from a new file d.needSync = true fn := d.fileName(oldReadFileNum) err := os.Remove(fn) if err != nil { d.logf("ERROR: failed to Remove(%s) - %s", fn, err) } } // 检查文件是否有错 d.checkTailCorruption(depth) } // 检查文件是否有错 // 如果有错误,则调用skipToNextRWFile重置读取和写入的文件编号和位置。 func (d *diskQueue) checkTailCorruption(depth int64) { if d.readFileNum < d.writeFileNum || d.readPos < d.writePos { return } // we've reached the end of the diskqueue // if depth isn't 0 something went wrong if depth != 0 { if depth < 0 { d.logf( "ERROR: diskqueue(%s) negative depth at tail (%d), metadata corruption, resetting 0...", d.name, depth) } else if depth > 0 { d.logf( "ERROR: diskqueue(%s) positive depth at tail (%d), data loss, resetting 0...", d.name, depth) } // force set depth 0 atomic.StoreInt64(&d.depth, 0) d.needSync = true } if d.readFileNum != d.writeFileNum || d.readPos != d.writePos { if d.readFileNum > d.writeFileNum { d.logf( "ERROR: diskqueue(%s) readFileNum > writeFileNum (%d > %d), corruption, skipping to next writeFileNum and resetting 0...", d.name, d.readFileNum, d.writeFileNum) } if d.readPos > d.writePos { d.logf( "ERROR: diskqueue(%s) readPos > writePos (%d > %d), corruption, skipping to next writeFileNum and resetting 0...", d.name, d.readPos, d.writePos) } d.skipToNextRWFile() d.needSync = true } }

skipToNextRWFile:重置读取和写入的文件编号和位置

func (d *diskQueue) skipToNextRWFile() error { var err error // 关闭读文件句柄 if d.readFile != nil { d.readFile.Close() d.readFile = nil } // 关闭读文件句柄 if d.writeFile != nil { d.writeFile.Close() d.writeFile = nil } for i := d.readFileNum; i <= d.writeFileNum; i++ { // 获取数据文件名称 fn := d.fileName(i) innerErr := os.Remove(fn) if innerErr != nil && !os.IsNotExist(innerErr) { d.logf("ERROR: diskqueue(%s) failed to remove data file - %s", d.name, innerErr) err = innerErr } } // 重新初始化数据文件的信息 d.writeFileNum++ d.writePos = 0 d.readFileNum = d.writeFileNum d.readPos = 0 d.nextReadFileNum = d.writeFileNum d.nextReadPos = 0 atomic.StoreInt64(&d.depth, 0) return err }

Close和Delete接口实现

// Close:关闭队列 // 关闭队列并持久化数据到磁盘 func (d *diskQueue) Close() error { err := d.exit(false) if err != nil { return err } return d.sync() } // Delete:删除队列 // 删除队列并删除该队列中的数据 func (d *diskQueue) Delete() error { return d.exit(true) } // exit:退出操作,标识exitFlag为1 // deleted为true:删除该队列中的数据 // deleted为false:持久化数据到磁盘 func (d *diskQueue) exit(deleted bool) error { d.Lock() defer d.Unlock() d.exitFlag = 1 if deleted { d.logf("DISKQUEUE(%s): deleting", d.name) } else { d.logf("DISKQUEUE(%s): closing", d.name) } // 退出队列并等待ioLoop先退出 close(d.exitChan) // ensure that ioLoop has exited <-d.exitSyncChan // 关闭读文件句柄 if d.readFile != nil { d.readFile.Close() d.readFile = nil } // 关闭写文件句柄 if d.writeFile != nil { d.writeFile.Close() d.writeFile = nil } return nil }

Depth接口实现

原子操作,返回队列中消息的数量

func (d *diskQueue) Depth() int64 { return atomic.LoadInt64(&d.depth) }

Empty接口实现

// Empty:清空队列中的数据 func (d *diskQueue) Empty() error { d.RLock() defer d.RUnlock() if d.exitFlag == 1 { return errors.New("exiting") } d.logf("DISKQUEUE(%s): emptying", d.name) d.emptyChan <- 1 return <-d.emptyResponseChan }

在ioLoop中执行清空操作

case <-d.emptyChan: // 执行清空操作时,文件全被删除,count计数器重置为0 d.emptyResponseChan <- d.deleteAllFile count = 0

移除所有数据文件

// 当触发Empty操作时,移除所有元数据文件 func (d *diskQueue) deleteAllFiles() error { err := d.skipToNextRWFile() innerErr := os.Remove(d.metaDataFileName()) if innerErr != nil && !os.IsNotExist(innerErr) { d.logf("ERROR: diskqueue(%s) failed to remove metadata file - %s", d.name, innerErr) return innerErr } return err }
转载请注明原文地址: https://www.6miu.com/read-85118.html

最新回复(0)