ypipe是一个无锁队列实现。链接两个线程,一个用于读取,一个用于写入。
ypipe_t中包含一个yqueue_t变量,用于保存数据。包含3个指针用于实现无锁队列: w:指向第一个未被冲刷的元素 r:指向第一个未被预获取的元素 f:指向第一个将将可以被冲刷的元素(数据未被压入)
绿色段为已经预读取的数据,读取线程可以获取该数据段数据。
inline bool read (T *value_) { //预获取数据 if (!check_read ()) return false; //读取一个数据 *value_ = queue.front (); queue.pop (); return true; }紫色段是已经写入管道的数据,读取线程可以预获取该段数据。预获取后,r指针将指向w处。
inline bool check_read () { //存在预获取数据 if (&queue.front () != r && r) return true; // 没有预获取数据,则获取更多数据,移动指针r指针到w处。如果没有数据可以预获取,则将c设置为NULL。 r = c.cas (&queue.front (), NULL); //没有可以读取数据,返回false,读取线程sleeping if (&queue.front () == r || !r) return false; return true; }蓝色段是已经写入管道数据,但是数据没有被冲刷进入管道,无法被读取线程读取。数据冲刷后,w指针将只想f处。
inline bool flush () { //没有可以冲刷数据 if (w == f) return true; if (c.cas (w, f) != w) { //当c为NULL是进入此处逻辑. c.set (f); w = f; return false; } w = f; return true; }红色段为写入的不完整数据,调用者可以回滚这段写入的数据,flush函数不会冲刷该段数据。
inline void write (const T &value_, bool incomplete_) { queue.back () = value_; queue.push (); //数据不完整,则不移动f指针。 if (!incomplete_) f = &queue.back (); } //回滚写入操作,可以回滚写入的不完整数据 inline bool unwrite (T *value_) { if (f == &queue.back ()) return false; queue.unpush (); *value_ = queue.back (); return true; }