分布式持久内存文件系统Octopus(ATC-17 )分析(二)

xiaoxiao2021-02-28  8

清华课题 Octopus 源码分析(二) 前言论文摘要设计框架源码分析 include 头文件 Configuration.hppcommon.hppbitmap.hppdebug.hpplock.htable.hppglobal.hmempool.hpphashtable.hppstorage.hppfilesystem.hppTxManager.hppRdmaSocket.hppRPCClient.hppRPCServer.hppnrfs.hlibnrfs.h src 源文件文件依赖关系分析函数依赖关系分析典型工作流程

清华课题 Octopus 源码分析(二)

前言

由于项目工作的需要,我们团队阅读了清华在文件系统方面的一个比较新颖的工作:Octopus。Octopus是一个基于持久内存 NVM 和远程直接内存访问 RDMA 技术的分布式内存文件系统。清华的陆游游老师现已将代码开源,可 点击此处 阅读。

这一工作中的是 ATC-17 (CCF A类),可 点击此处 阅读论文。

我们团队希望通过学习清华的这个优秀的同行工作,来进一步开展自己对于分布式持久内存文件系统的研究。关于论文的分析,虽然有做PPT给同伴们介绍过,但具体的博客分析可能会晚些才放上来。这一系列的内容主要是分析Octopus的源码设计(少许会结合论文内容来讲,希望有兴趣的同学可以自己先读一读),总结Octopus的框架、结构、设计组件及代码创新点等。

系列分析总共包括 个部分。第一部分是 论文摘要,相当于Octopus系统的一个简介;第二部分是 设计框架,在这一部分我们会宏观地介绍Octopus的组成体系及各部分的功能及相互间的联系;第三部分是 代码分析,也是本博客的重中之重。在这一部分我们首先介绍头文件体系(在include文件夹中),了解Octopus的存储结构,表结构,主要数据结构,内存池划分等等。接下来我们介绍方法实现代码(在src文件夹中),我们会通过比对头文件内的函数名称来看每一个方法是采用何种方式实现,好处是什么,取舍考虑是什么。进一步地,我们会通过代码文件间的依赖关系,函数依赖关系去深入探讨Octopus的创新性、局限性并留出进一步讨论的空间。

论文摘要

(内容请见系列上一篇博客)

设计框架

(内容请见系列上一篇博客)

源码分析

include 头文件

Configuration.hpp

这是一个配置头文件,主要涉及文件系统的配置信息。 加了个人注释的代码如下所示:

#ifndef CONFIGURATION_HEADER #define CONFIGURATION_HEADER #include <iostream> /* ptree 性质与功能可参考链接 http://www.boost.org/doc/libs/1_65_1/doc/html/property_tree.html */ #include <boost/property_tree/ptree.hpp> #include <boost/property_tree/xml_parser.hpp> #include <boost/typeof/typeof.hpp> #include <unordered_map> #include <string> using namespace std; using namespace boost::property_tree; class Configuration { private: ptree pt; /* ip(表征节点)与id(表征文件系统持久区域)的映射对关系 */ unordered_map<uint16_t, string> id2ip; unordered_map<string, uint16_t> ip2id; /* 服务器数量信息 */ int ServerCount; public: Configuration(); ~Configuration(); string getIPbyID(uint16_t id); uint16_t getIDbyIP(string ip); unordered_map<uint16_t, string> getInstance(); int getServerCount(); }; #endif

common.hpp

这一个头文件主要定义了基本的元数据结构。 加了个人注释的代码如下所示:

#ifndef COMMON_HEADER #define COMMON_HEADER #include <stdint.h> #include <time.h> /* Data type alias: nrfs->int, nrfsFile->char* */ typedef int nrfs; typedef char* nrfsFile; #define MAX_MESSAGE_BLOCK_COUNT 10 /* Max count of block index in a message. */ /* 文件位置三元组: * 1. 节点id(在哪个服务器上) * 2. 偏移量(决定起始地址) * 3. 大小(决定结束地址) */ typedef struct { uint16_t node_id; uint64_t offset; uint64_t size; } file_pos_tuple; /* 文件位置信息(批量) * 1. 文件长度 * 2. 以文件位置三元组为单位组成的数组 */ struct file_pos_info { uint32_t len; file_pos_tuple tuple[MAX_MESSAGE_BLOCK_COUNT]; }; /* 文件元数据 * 1. 模式(文件/目录) * 2. 大小 * 3. 时间(访问/创建) */ struct nrfsfileattr { uint32_t mode; /* 0 - file, 1 - directory */ uint64_t size; uint32_t time; }; /** Definitions. **/ #define MAX_PATH_LENGTH 255 /* Max length of path. */ /** Definitions. **/ #define MAX_FILE_EXTENT_COUNT 20 /* Max extent count in meta of a file. */ #define BLOCK_SIZE (1 * 1024 * 1024) /* Current block size in bytes. */ #define MAX_FILE_NAME_LENGTH 50 /* Max file name length. */ #define MAX_DIRECTORY_COUNT 60 /* Max directory count. */ /** Classes and structures. **/ typedef uint64_t NodeHash; /* Node hash. */ /* 文件元数据三元组 * 1. 哈希数 * 2. 检索起始块 * 3. 占用块数量 */ typedef struct { NodeHash hashNode; /* Node hash array of extent. */ uint32_t indexExtentStartBlock; /* Index array of start block in an extent. */ uint32_t countExtentBlock; /* Count array of blocks in an extent. */ } FileMetaTuple; /* 文件元数据 * 1. 最后修改时间 * 2. extent数量 * 3. extent大小 * 4. 以文件元数据三元组为单位的数组 */ typedef struct /* File meta structure. */ { time_t timeLastModified; /* Last modified time. */ uint64_t count; /* Count of extents. (not required and might have consistency problem with size) */ uint64_t size; /* Size of extents. */ FileMetaTuple tuple[MAX_FILE_EXTENT_COUNT]; } FileMeta; /* 目录元数据二元组: * 1. 目录名 * 2. 是否为目录(石乐志) */ typedef struct { char names[MAX_FILE_NAME_LENGTH]; bool isDirectories; } DirectoryMetaTuple; /* 目录元数据: * 1. 目录名数量 * 2. 以目录元数据二元组为单位的数组 */ typedef struct /* Directory meta structure. */ { uint64_t count; /* Count of names. */ DirectoryMetaTuple tuple[MAX_DIRECTORY_COUNT]; } DirectoryMeta; typedef DirectoryMeta nrfsfilelist; static inline void NanosecondSleep(struct timespec *preTime, uint64_t diff) { struct timespec now; uint64_t temp; temp = 0; while (temp < diff) { clock_gettime(CLOCK_MONOTONIC, &now); temp = (now.tv_sec - preTime->tv_sec) * 1000000000 + now.tv_nsec - preTime->tv_nsec; temp = temp / 1000; } } #endif

bitmap.hpp

这一个头文件给出了位图类的定义。位图将用来记录文件inode的占用空间(元数据或数据所占用的持久内存块)。 加了个人注释的代码如下所示。

/*** Bitmap header. ***/ /** Version 1 + modification for table class. **/ /** Redundance check. **/ #ifndef BITMAP_HEADER #define BITMAP_HEADER /** Included files. **/ #include <stdint.h> /* Standard integers. E.g. uint8_t */ #include <stdlib.h> /* Standard library for memory allocation and exit function. */ #include <stdio.h> /* Standard I/O operations. E.g. fprintf() */ #include <string.h> /* Header for memory operations. E.g. memset() */ /* 类成员: * 1. 指向字节数组的指针; * 2. 空闲比特位的数量; * 3. 总比特位数。 * 类方法: * 1. 获取特定位置比特位状态(空闲/占用); * 2. 设置特定位置比特位状态(空闲->占用); * 3. 清除特定位置比特位状态(占用->空闲); * 4. 寻找空闲比特位位置; * 5. 计算空闲比特位数量; * 6. 计算总比特位数量; * 构造和析构函数 */ /** Classes. **/ class Bitmap { private: uint8_t *bytes; /* Byte array to hold bitmap. */ uint64_t varCountFree; /* Count variable of free bits. */ uint64_t varCountTotal; /* Count variable of total bits. */ public: bool get(uint64_t pos, bool *status); /* Get status of a bit. */ bool set(uint64_t pos); /* Set a bit. */ bool clear(uint64_t pos); /* Clear a bit. */ bool findFree(uint64_t *pos); /* Find first free bit. */ uint64_t countFree(); /* Count of free bits. */ uint64_t countTotal(); /* Count of total bits. */ Bitmap(uint64_t count, char *buffer); /* Constructor of bitmap. Read buffer to initialize current status. */ ~Bitmap(); /* Destructor of bitmap. Do not free buffer. */ }; /** Redundance check. **/ #endif

debug.hpp

这一头文件主要包含了调试(逻辑测试)函数。 代码如下所示:

/*** Debug header. ***/ /** Version 1 + Functional Model Modification **/ /** Redundance check. **/ #ifndef DEBUG_HEADER #define DEBUG_HEADER /** Included files. **/ #include <stdio.h> /* Standard I/O operations. E.g. vprintf() */ #include <stdarg.h> /* Standard argument operations. E.g. va_list */ #include <sys/time.h> /* Time functions. E.g. gettimeofday() */ /** Defninitions. **/ #define MAX_FORMAT_LEN 255 #define DEBUG false #define TITLE false #define TIMER false #define CUR false /** Classes. **/ class Debug { private: static long startTime; /* Last start time in milliseconds. */ public: static void debugTitle(const char *str); /* Print debug title string. */ static void debugItem(const char *format, ...); /* Print debug item string. */ static void debugCur(const char *format, ...); /* Print debug item string. */ static void notifyInfo(const char *format, ...); /* Print normal notification. */ static void notifyError(const char *format, ...); /* Print error information. */ static void startTimer(const char*); /* Start timer and display information. */ static void endTimer(); /* End timer and display information. */ }; /** Redundance check. **/ #endif

lock.h

这一头文件的主要作用是提供文件读写的锁服务,以避免数据访问冲突。 源码如下所示:

#ifndef LOCK_HEADER #define LOCK_HEADER #include <stdint.h> #include <unistd.h> class LockService { private: uint16_t WriteID; uint16_t ReadID; uint64_t MetaDataBaseAddress; public: LockService(uint64_t MetaDataBaseAddress); ~LockService(); uint64_t WriteLock(uint16_t NodeID, uint64_t Address); bool WriteUnlock(uint64_t key, uint16_t NodeID, uint64_t Address); uint64_t ReadLock(uint16_t NodeID, uint64_t Address); bool ReadUnlock(uint64_t key, uint16_t NodeID, uint64_t Address); }; #endif

table.hpp

这一头文件定义了模板表结构。 带个人注释的代码如下所示:

/*** Table header in file system. ***/ /** Version 1. **/ /* FIXME: Ignore malloc() failure. */ /** Redundance check. **/ #ifndef TABLE_HEADER #define TABLE_HEADER /** Included files. **/ #include <stdint.h> /* Standard integers. E.g. uint16_t */ #include <stdlib.h> /* Standard library. E.g. exit() */ #include <string.h> /* String operations. E.g. memcmp() */ #include <stdio.h> /* Standard I/O. */ #include <mutex> /* Mutex operations. */ #include "bitmap.hpp" /* Bitmap class. */ #include "debug.hpp" /* Debug class. */ /** Design. **/ /* +------------+---------------------------+ | Bitmap | Items | +------------+---------------------------+ - Memory overview of buffer - Use bitmap to check if a position is occupied. +---+---+---+---+---+---+------+---+---+---+---+ | 0 | 0 | 0 | 1 | 0 | 1 | .... | 0 | 0 | 0 | 0 | +---+---+---+---+---+---+------+---+---+---+---+ - Structure of bitmap - Use free bit chain to allocate a new item. (The free bit chain is generated from bitmap.) +--------------+-------------------------+ +--------------+----------+ | Position | Next free bit pointer -|--> .... --->| Position | NULL | +--------------+-------------------------+ +--------------+----------+ - Free bit chain - index +--------------+ 0 | Item 0 | +--------------+ 1 | Item 1 | +--------------+ - item array - 2 | Item 2 | +--------------+ | .... | +--------------+ n | Item n | +--------------+ */ /* 空闲比特单元: * 1. 位置(相对偏移); * 2. 下一比特位指针。 */ /** Structures. **/ typedef struct { /* Free bit structure. */ uint64_t position; /* Position of bit. */ void *nextFreeBit; /* Next free bit. */ } FreeBit; /* 模板表结构: * 数据成员: * 1. 指向位图的指针; * 2. 位图访问互斥锁; * 3. 文件(元)数据指针; * 4. 指向空闲比特链的头指针; * 5. 指向空闲比特链的尾指针。 * 6. buffer使用量。 * 成员方法: * 1. 创建数据域(分配空间); * 2. 获取数据域位置(pos/index)及地址; * 3. 向指定位置(index/pos)存入数据; * 4. 删除数据域(并重设空闲比特链) */ /** Classes. **/ template<typename T> class Table { private: Bitmap *bitmapItems; /* Bitmap for items. */ std::mutex mutexBitmapItems; /* Mutex for bitmap for items. */ T *items; /* Items of table. */ FreeBit *headFreeBit; /* Head free bit in the chain. */ FreeBit *tailFreeBit; /* Tail free bit in the chain. */ public: uint64_t sizeBufferUsed; /* Size of used bytes in buffer. */ bool create(uint64_t *index, T *item); /* Create an item. */ bool create(uint64_t *index); /* Create an item. No item will be assigned. */ bool get(uint64_t index, T *item); /* Get an item. */ bool get(uint64_t index, T *item, uint64_t *address); bool put(uint64_t index, T *item); /* Put an item. */ bool put(uint64_t index, T *item, uint64_t *address); bool remove(uint64_t index); /* Remove an item. */ uint64_t countSavedItems(); /* Saved items count. */ uint64_t countTotalItems(); /* Total items count. */ Table(char *buffer, uint64_t count); /* Constructor of table. */ ~Table(); /* Destructor of table. */ }; /* Create an item in table. (no item will be assigned) @param index Index of created item in table. @return If creation failed return false. Otherwise return true. */ template<typename T> bool Table<T>::create(uint64_t *index) { if (index == NULL) { return false; /* Fail due to null index. */ } else { bool result; mutexBitmapItems.lock(); /* Lock table bitmap. */ { if (headFreeBit == NULL) { /* If there is no free bit in bitmap. */ return false; /* Fail due to out of free bit. */ } else { *index = headFreeBit->position; /* Get index from free bit. */ FreeBit *currentFreeBit = headFreeBit; /* Get current free bit. */ headFreeBit = (FreeBit *)(headFreeBit->nextFreeBit); /* Move current free bit out of free bit chain. */ free(currentFreeBit); /* Release current free bit as used. */ if (bitmapItems->set(*index) == false) { /* Occupy the position first. Need not to roll back. */ result = false; /* Fail due to bitmap set error. No recovery here. */ } else { result = true; /* Succeed. */ } } } mutexBitmapItems.unlock(); /* Unlock table bitmap. */ //printf("Table::create (block): *index = %lu, &(items[*index]) = %lx\n", *index, &(items[*index])); return result; /* Return specific result. */ } } /* Create an item in table. @param index Index of created item in table. @param item Item to put in table. @return If creation failed return false. Otherwise return true. */ template<typename T> bool Table<T>::create(uint64_t *index, T *item) { if ((index == NULL) || (item == NULL)) { return false; /* Fail due to null index or null item. */ } else { bool result; mutexBitmapItems.lock(); /* Lock table bitmap. */ { if (headFreeBit == NULL) { /* If there is no free bit in bitmap. */ return false; /* Fail due to out of free bit. */ } else { *index = headFreeBit->position; /* Get index from free bit. */ FreeBit *currentFreeBit = headFreeBit; /* Get current free bit. */ headFreeBit = (FreeBit *)(headFreeBit->nextFreeBit); /* Move current free bit out of free bit chain. */ free(currentFreeBit); /* Release current free bit as used. */ if (bitmapItems->set(*index) == false) { /* Occupy the position first. Need not to roll back. */ result = false; /* Fail due to bitmap set error. No recovery here. */ } else { items[*index] = *item; /* Save item in items. */ result = true; /* Succeed. */ } } } mutexBitmapItems.unlock(); /* Unlock table bitmap. */ return result; /* Return specific result. */ } } /* Get an item from table. @param index Index of item. @param item Buffer of item. @return If item does not exist or other errors occur return false. Otherwise return true. */ template<typename T> bool Table<T>::get(uint64_t index, T *item) { //printf("Table::get: index = %lu\n", index); if (item == NULL) { return false; /* Fail due to null item buffer. */ } else { bool result; mutexBitmapItems.lock(); /* Though currently there is no bitmap reading or writing, other operations such as delete might affect item reading. */ { bool status; /* Status of existence of item. */ if (bitmapItems->get(index, &status) == false) { result = false; /* Fail due to item get error. */ } else { if (status == false) { result = false; /* Fail due to no item. */ } else { *item = items[index]; /* Get item and put it into buffer. */ result = true; /* Succeed. */ } } } mutexBitmapItems.unlock(); /* Unlock table bitmap. */ return result; /* Return specific result. */ } } template<typename T> bool Table<T>::get(uint64_t index, T *item, uint64_t *address) { //printf("Table::get: index = %lu\n", index); if (item == NULL) { return false; /* Fail due to null item buffer. */ } else { bool result; mutexBitmapItems.lock(); /* Though currently there is no bitmap reading or writing, other operations such as delete might affect item reading. */ { bool status; /* Status of existence of item. */ if (bitmapItems->get(index, &status) == false) { result = false; /* Fail due to item get error. */ } else { if (status == false) { result = false; /* Fail due to no item. */ } else { *item = items[index]; /* Get item and put it into buffer. */ *address = (uint64_t)(&items[index]); Debug::debugItem("get, address = %lx", *address); result = true; /* Succeed. */ } } } mutexBitmapItems.unlock(); /* Unlock table bitmap. */ return result; /* Return specific result. */ } } /* Put an item. If item has not been created then return false. @param index Index of item. @param item Item to put in. @return If index has not been created or other errors occur return false, otherwise return true. */ template<typename T> bool Table<T>::put(uint64_t index, T *item) { //printf("Table::put: index = %lu\n", index); if (item == NULL) { return false; /* Fail due to null item. */ } else { bool result; mutexBitmapItems.lock(); /* Lock table bitmap. */ { bool status; /* Status of existence of item. */ if (bitmapItems->get(index, &status) == false) { result = false; /* Fail due to item get error. */ } else { if (status == false) { result = false; /* Fail due to no item. */ } else { items[index] = *item; /* Save item into table. */ result = true; /* Succeed. */ } } } mutexBitmapItems.unlock(); /* Unlock table bitmap. */ return result; /* Return specific result. */ } } /* Get address of item. @param index Index of item. @param item Item to put in. @return If index has not been created or other errors occur return false, otherwise return true. */ template<typename T> bool Table<T>::put(uint64_t index, T *item, uint64_t *address) { bool result; mutexBitmapItems.lock(); /* Lock table bitmap. */ { bool status; /* Status of existence of item. */ if (bitmapItems->get(index, &status) == false) { result = false; /* Fail due to item get error. */ } else { if (status == false) { result = false; /* Fail due to no item. */ } else { *address = (uint64_t)&items[index]; /* Save item into table. */ Debug::debugItem("put, address = %lx", *address); //items[index] = *item; result = true; /* Succeed. */ } } } mutexBitmapItems.unlock(); /* Unlock table bitmap. */ return result; /* Return specific result. */ } /* Remove an item. @param index Index of item to remove. @return If error occurs return false, otherwise return true. */ template<typename T> bool Table<T>::remove(uint64_t index) { bool result; //printf("Table::remove: index = %lu\n", index); mutexBitmapItems.lock(); /* Lock table bitmap. */ { bool status; /* Status of existence of item. */ if (bitmapItems->get(index, &status) == false) { result = false; /* Fail due to bitmap get error. */ } else { if (status == false) { result = false; /* Fail due to no item. */ } else { if (bitmapItems->clear(index) == false) { result = false; /* Fail due to bitmap clear error. */ } else { FreeBit *currentFreeBit = (FreeBit *)malloc(sizeof(FreeBit)); /* New free bit. */ currentFreeBit->nextFreeBit = headFreeBit; /* Add current free bit to free bit chain. */ currentFreeBit->position = index; /* Index of newly released item. */ headFreeBit = currentFreeBit; /* Update head free bit. */ result = true; /* Succeed. */ } } } } mutexBitmapItems.unlock(); /* Unlock table bitmap. */ return result; /* Return specific result. */ } /* Get saved items count. @return Return count of saved items. */ template<typename T> uint64_t Table<T>::countSavedItems() { return (bitmapItems->countTotal() - bitmapItems->countFree()); /* Return count of saved items. */ } /* Get total items count. @return Return count of total items. */ template<typename T> uint64_t Table<T>::countTotalItems() { return (bitmapItems->countTotal()); /* Return count of total items. */ } /* Constructor of table. Initialize bitmap, items array and free bit chain. @param buffer Buffer of whole table (including bitmap and items). @param count Count of items in table (can be divided by 8 due to bitmap requirement). */ template<typename T> Table<T>::Table(char *buffer, uint64_t count) { if (buffer == NULL) { fprintf(stderr, "Table: buffer is null.\n"); exit(EXIT_FAILURE); /* Fail due to null buffer pointer. */ } else { if (count % 8 != 0) { fprintf(stderr, "Table: count should be times of eight.\n"); exit(EXIT_FAILURE); /* Fail due to count alignment. */ } else { bitmapItems = new Bitmap(count, buffer + count * sizeof(T)); /* Initialize item bitmap. */ items = (T *)(buffer); /* Initialize items array. */ sizeBufferUsed = count / 8 + count * sizeof(T); /* Size of used bytes in buffer. */ /* Initialize free bit chain. */ headFreeBit = NULL; /* Initialize head free bit pointer. */ FreeBit *currentFreeBit; // for (uint64_t index = 0; index < count / 8; index++) { /* Search in all bytes in array. */ // for (uint64_t offset = 0; offset <= 7; offset++) { /* Search in all offsets in byte. */ // if ((buffer[index] & (1 << (7 - offset))) == 0) { /* Judge if bit is cleared. */ // currentFreeBit = (FreeBit *)malloc(sizeof(FreeBit)); /* Create a new free bit. */ // currentFreeBit->position = index * 8 + offset; /* Assign position. */ // currentFreeBit->nextFreeBit = headFreeBit; /* Push current free bit before head. */ // headFreeBit = currentFreeBit; /* Update head free bit. */ // } // } // } for (int index = count / 8 - 1; index >= 0; index--) { /* Search in all bytes in array. */ for (int offset = 7; offset >= 0; offset--) { /* Search in all offsets in byte. */ if ((buffer[index] & (1 << (7 - offset))) == 0) { /* Judge if bit is cleared. */ currentFreeBit = (FreeBit *)malloc(sizeof(FreeBit)); /* Create a new free bit. */ currentFreeBit->position = index * 8 + offset; /* Assign position. */ currentFreeBit->nextFreeBit = headFreeBit; /* Push current free bit before head. */ headFreeBit = currentFreeBit; /* Update head free bit. */ } } } } } } /* Destructor of table. */ template<typename T> Table<T>::~Table() { FreeBit *currentFreeBit; while (headFreeBit != NULL) { currentFreeBit = headFreeBit; headFreeBit = (FreeBit *)(headFreeBit->nextFreeBit); /* Move to next. */ free(currentFreeBit); /* Release free bit. */ } delete bitmapItems; /* Release table bitmap. */ } /** Redundance check. **/ #endif

global.h

这一头文件主要定义了一些重要的全局宏变量、信息(文件系统操作接口)种类以及辅助元数据结构等。 带个人注释的代码如下所示:

/** Redundance check. **/ #ifndef GLOBAL_HEADER #define GLOBAL_HEADER #include <stdint.h> #include <vector> #include <mutex> #include <condition_variable> #include <atomic> #include <sys/syscall.h> #include <unistd.h> #include "common.hpp" using namespace std; /* 获取当前线程id */ static inline uint32_t gettid() { return (uint32_t)syscall(SYS_gettid); } /* 全局宏变量 * 1. 客户传输信息大小(4096 bytes) * 2. 最大服务客户数(1024) * 3. 服务器传输信息大小(4096 bytes) * 4. 服务器信息数量(8节点) * 5. 元数据总大小(1GB) * 6. 本地日志大小(40MB) * 7. 分布式日志大小(1MB) * 8. 使用 2PC 或者 Collect-Dispatch 分布式事务机制 */ #define CLIENT_MESSAGE_SIZE 4096 #define MAX_CLIENT_NUMBER 1024 #define SERVER_MASSAGE_SIZE CLIENT_MESSAGE_SIZE #define SERVER_MASSAGE_NUM 8 #define METADATA_SIZE (1024 * 1024 * 1024) #define LOCALLOGSIZE (40 * 1024 * 1024) #define DISTRIBUTEDLOGSIZE (1024 * 1024) // #define TRANSACTION_2PC 1 #define TRANSACTION_CD 1 /* 信息类型: * 1. 添加目录元数据 * 2. 删除目录元数据 * 3. 创建节iNode点并添加元数据 * 4. 只创建iNode节点 * 5. 获取属性(元数据) * 6. 访问(时间?/权限?) * 7. 创建目录 * 8. 读取目录 * 9. 区域内读 * 10. 区域内写 * 11. 更新元数据 * 12. 区域内读结束 * 13. 截断(日志?/文件?) * 14. 删除目录 * 15. 删除(可递归删除?) * 16. 释放块空间 * 17. 重命名 * 18. 应答(网络通信) * 19. 断开连接(网络通信) * 20. 测试 * 21. 原生读(RDMA读?) * 22. 原生写(RDMA写?) * 23. 提交事务 * 24. 读目录元数据 * 25. 失效(文件?/日志?) */ /** Enumerators and structures. **/ typedef enum { /* Message enumerator. */ MESSAGE_ADDMETATODIRECTORY, MESSAGE_REMOVEMETAFROMDIRECTORY, MESSAGE_MKNODWITHMETA, MESSAGE_MKNOD, MESSAGE_GETATTR, MESSAGE_ACCESS, MESSAGE_MKDIR, MESSAGE_READDIR, MESSAGE_EXTENTREAD, MESSAGE_EXTENTWRITE, MESSAGE_UPDATEMETA, MESSAGE_EXTENTREADEND, MESSAGE_TRUNCATE, MESSAGE_RMDIR, MESSAGE_REMOVE, MESSAGE_FREEBLOCK, MESSAGE_RENAME, MESSAGE_RESPONSE, MESSAGE_DISCONNECT, MESSAGE_TEST, MESSAGE_RAWWRITE, MESSAGE_RAWREAD, MESSAGE_DOCOMMIT, MESSAGE_READDIRECTORYMETA, MESSAGE_INVALID } Message; typedef struct { /* Extra information structure. */ uint16_t sourceNodeID; /* Source node ID. */ uint64_t taskID; /* Task ID. */ uint64_t sizeReceiveBuffer; /* Size of receive buffer. */ } ExtraInformation; typedef struct : ExtraInformation { /* General send buffer structure. */ Message message; /* Message type. */ char path[MAX_PATH_LENGTH]; /* Path. */ } GeneralSendBuffer; typedef struct : ExtraInformation { Message message; uint64_t startBlock; uint64_t countBlock; } BlockFreeSendBuffer; typedef struct : ExtraInformation { /* General receive buffer structure. */ Message message; /* Message type. */ bool result; /* Result. */ } GeneralReceiveBuffer; typedef struct : ExtraInformation { /* addMetaToDirectory send buffer structure. */ Message message; /* Message type. */ char path[MAX_PATH_LENGTH]; /* Path. */ char name[MAX_FILE_NAME_LENGTH]; /* Name to add. */ bool isDirectory; /* Is directory or not. */ } AddMetaToDirectorySendBuffer; typedef struct : ExtraInformation { /* removeMetaFromDirectory send buffer structure. */ Message message; /* Message type. */ char path[MAX_PATH_LENGTH]; /* Path. */ char name[MAX_FILE_NAME_LENGTH]; /* Name to add. */ } RemoveMetaFromDirectorySendBuffer; typedef struct : ExtraInformation { /* extentRead send buffer structure. */ Message message; /* Message type. */ char path[MAX_PATH_LENGTH]; /* Path. */ uint64_t size; /* Size to read. */ uint64_t offset; /* Offset to read. */ } ExtentReadSendBuffer; typedef struct : ExtraInformation { /* extentWrite send buffer structure. */ Message message; /* Message type. */ char path[MAX_PATH_LENGTH]; /* Path. */ uint64_t size; /* Size to read. */ uint64_t offset; /* Offset to read. */ } ExtentWriteSendBuffer; typedef struct : ExtraInformation { /* updateMeta send buffer structure. */ Message message; /* Message type. */ uint64_t offset; uint64_t key; /* Key to unlock. */ } UpdateMetaSendBuffer; typedef struct : ExtraInformation { /* extentReadEnd send buffer structure. */ Message message; /* Message type. */ uint64_t offset; uint64_t key; /* Key to unlock. */ } ExtentReadEndSendBuffer; typedef struct : ExtraInformation { /* mknodWithMeta send buffer structure. */ Message message; /* Message type. */ char path[MAX_PATH_LENGTH]; /* Path. */ FileMeta metaFile; /* File meta. */ } MakeNodeWithMetaSendBuffer; typedef struct : ExtraInformation { /* truncate send buffer structure. */ Message message; /* Message type. */ char path[MAX_PATH_LENGTH]; /* Path. */ uint64_t size; /* Size to kept after truncation. */ } TruncateSendBuffer; typedef struct : ExtraInformation { /* rename send buffer structure. */ Message message; /* Message type. */ char pathOld[MAX_PATH_LENGTH]; /* Old path. */ char pathNew[MAX_PATH_LENGTH]; /* New path. */ } RenameSendBuffer; typedef struct : ExtraInformation { /* getattr receive buffer structure. */ Message message; /* Message type. */ bool result; /* Result. */ FileMeta attribute; /* Attribute. */ } GetAttributeReceiveBuffer; typedef struct : ExtraInformation { /* readdir receive buffer structure. */ Message message; /* Message type. */ bool result; /* Result. */ nrfsfilelist list; /* List. */ } ReadDirectoryReceiveBuffer; typedef struct : ExtraInformation { /* extentRead receive buffer structure. */ Message message; /* Message type. */ bool result; /* Result. */ uint64_t offset; uint64_t key; /* Key to unlock. */ file_pos_info fpi; /* File position information. */ } ExtentReadReceiveBuffer; typedef struct : ExtraInformation { /* extentWrite receive buffer structure. */ Message message; /* Message type. */ bool result; /* Result. */ uint64_t offset; //FileMeta metaFile; /* File meta. */ uint64_t key; /* Key to unlock. */ file_pos_info fpi; /* File position information. */ } ExtentWriteReceiveBuffer; typedef struct : GeneralReceiveBuffer { uint64_t TxID; uint64_t srcBuffer; uint64_t desBuffer; uint64_t size; uint64_t key; uint64_t offset; } UpdataDirectoryMetaReceiveBuffer; typedef struct : UpdataDirectoryMetaReceiveBuffer { char path[MAX_PATH_LENGTH]; } DoRemoteCommitSendBuffer; typedef struct : GeneralReceiveBuffer { uint64_t hashAddress; uint64_t metaAddress; uint16_t parentNodeID; DirectoryMeta meta; } ReadDirectoryMetaReceiveBuffer; /* A global queue manager. */ template <typename T> class Queue { private: std::vector<T> queue; std::mutex m; std::condition_variable cond; uint8_t offset = 0; public: Queue(){} ~Queue(){} T pop() { std::unique_lock<std::mutex> mlock(m); while (queue.empty()) { cond.wait(mlock); } auto item = queue.front(); queue.erase(queue.begin()); return item; } T PopPolling() { while (offset == 0); auto item = queue.front(); queue.erase(queue.begin()); __sync_fetch_and_sub(&offset, 1); return item; } void push(T item) { std::unique_lock<std::mutex> mlock(m); queue.push_back(item); mlock.unlock(); cond.notify_one(); } void PushPolling(T item) { queue.push_back(item); __sync_fetch_and_add(&offset, 1); } }; /** Redundance check. **/ #endif

mempool.hpp

这一头文件主要描述了持久共享内存池的结构和各数据段分布。 带注释的个人代码如下所示:

#ifndef MEMPOOL_HEADER #define MEMPOOL_HEADER #include <unordered_map> #include <string.h> #include <stdint.h> #include <stdlib.h> #include <sys/ipc.h> #include <sys/shm.h> #include <debug.hpp> #include "global.h" #define SHARE_MEMORY_KEY 78 /************************************************************************************************ +-------+-------+-----+-------+-------------+-------------+----------+------------+---------+ | Cli_1 | Cli_2 | ... | Cli_N | SERVER_SEND | SERVER_RECV | MetaData | Data_block | LogFile | +-------+-------+-----+-------+-------------+-------------+----------+------------+---------+ / \ --------------------------/ \--------------------------- / \ +-----------------------+-----------------------+-----+-----------------------+ | Ser_1 (1, 2, ... 8) | Ser_2 (1, 2, ... 8) | ... | Ser_M (1, 2, ... 8) | +-----------------------+-----------------------+-----+-----------------------+ ************************************************************************************************/ typedef unordered_map<uint32_t, int> Thread2ID; /* 内存池地址段划分: * 1. 内存池基地址 * 2. 客户端基地址 * 3. 服务器基地址 * 4. 服务器发送基地址 * 5. 服务器接收基地址 * 6. 文件元数据基地址 * 7. 文件数据基地址 * 8. 本地日志起始地址 * 9. 分布式日志起始地址 */ class MemoryManager { private: uint64_t ServerCount; uint64_t MemoryBaseAddress; uint64_t ClientBaseAddress; uint64_t ServerSendBaseAddress; uint64_t ServerRecvBaseAddress; uint64_t MetadataBaseAddress; uint64_t DataBaseAddress; uint8_t *SendPoolPointer; uint64_t DMFSTotalSize; uint64_t LocalLogAddress; uint64_t DistributedLogAddress; int shmid; Thread2ID th2id; public: MemoryManager(uint64_t mm, uint64_t ServerCount, int DataSize); ~MemoryManager(); uint64_t getDmfsBaseAddress(); uint64_t getDmfsTotalSize(); uint64_t getMetadataBaseAddress(); uint64_t getDataAddress(); uint64_t getServerSendAddress(uint16_t NodeID, uint64_t *buffer); uint64_t getServerRecvAddress(uint16_t NodeID, uint16_t offset); uint64_t getClientMessageAddress(uint16_t NodeID); uint64_t getLocalLogAddress(); uint64_t getDistributedLogAddress(); void setID(int ID); }; #endif

hashtable.hpp

这一头文件介绍了文件系统需要使用的哈希算法:地址哈希和和Unique哈希(SHA-256,其实并没有用到),主要是为了将文件映射到不同的服务器节点以及映射到节点中不同的持久内存区。 代码如下所示:

/*** Hash table header for index in file system. ***/ /** Version 2 + Crypto++ support. **/ /** Redundance check. **/ #ifndef HASHTABLE_HEADER #define HASHTABLE_HEADER /** Included files. **/ #include <stdint.h> /* Standard integers. E.g. uint16_t */ #include <assert.h> /* Assert function. E.g. assert() */ #include <stdlib.h> /* Standard library. E.g. exit() */ #include <string.h> /* String operations. E.g. memcmp() */ #include <stdio.h> /* Standard I/O. */ #include <mutex> /* Mutex operations. */ #include "bitmap.hpp" /* Bitmap class. */ #include "debug.hpp" /* Debug class. */ // #include "sha256.h" /* SHA-256 algorithm. */ #include "table.hpp" /* Table template. Use FreeBit structure. */ #include <cryptopp/sha.h> /* SHA-256 algorithm in Crypto++ library. */ /** Name space. **/ using namespace CryptoPP; /** Design. **/ /* Chosen algorithm: UniqueHash: SHA-256 AddressHash (describe in Verilog pseudocode): 256'b SHA-256[20:0] - Hash items - - Chained items - (Fixed size: 16 MB) (Size depend on buffer) (due to alignment) AddressHash | 8 bytes | 8 bytes | | 32 bytes | 8 bytes | 8 bytes | 8 bytes | +----------+---------------+ +--------------------+------------+-------------+--------------+ +--------------------+------------+----------------+ 0 * 16 bytes | Lock | Head index -|-->| UniqueHash(Path) | Meta index | isDirectory | Next index -|--> .... -->| UniqueHash(Path) | Meta Index | 0 | +----------+---------------+ +--------------------+------------+-------------+--------------+ +--------------------+------------+----------------+ 1 * 16 bytes | Lock | Head index -|-->| | | | -|--> .... 0 for end of chain +----------+---------------+ +--------------------+------------+-------------+--------------+ | | | | An item of chain | | .... | .... | | | | +----------+---------------+ 1M * 16 bytes | Lock | Head index -|--> .... +----------+---------------+ Here link and next are both index of chained items array +---------------------+--------------------------+-----------------------------------------+ - Total buffer - | Hash items | Bitmap of chained items | Chained items | +---------------------+--------------------------+-----------------------------------------+ | 16 MB | (count / 8) bytes | (count * sizeof(ChainedItem)) bytes | Position actually contains index of chained item. +--------------+-------------------------+ +--------------+----------+ | Position | Next free bit pointer -|--> .... --->| Position | NULL | +--------------+-------------------------+ +--------------+----------+ - Free bit chain - */ /** Definitions. **/ #define HASH_ADDRESS_BITS 20 /* 20 bits can hold 1048576 hash items. */ #define HASH_ITEMS_COUNT (1 << HASH_ADDRESS_BITS) /* Actual count of hash items. */ #define HASH_ITEMS_SIZE (HASH_ITEMS_COUNT * sizeof(HashItem)) /* Size of hash items in bytes. */ /** Structures. **/ typedef uint64_t AddressHash; /* Address hash definition for locate. */ /* 8-byte address is far enough. Acutally 20-bit (2.5-byte) is enough, but an 64-bit variable is better in address computation. */ typedef struct { /* Unique hash structure for identify a unique path. There might be collision. */ //char bytes[32]; /* 32-byte variable can hold all data in SHA-256 format. */ uint64_t value[4]; } UniqueHash; typedef struct { /* Hash item structure. Total 16 bytes. Currently container class and type of item are combined together. */ uint64_t key; /* Key for lock. */ uint64_t indexHead; /* Index of head chained item. Value 0 is reserved for no item. */ } HashItem; typedef struct { /* Chained item structure. Total 32 bytes. */ uint64_t indexNext; /* Index of next chained item. Value 0 is reserved for no item. */ uint64_t indexMeta; /* Index of meta. */ bool isDirectory; /* Mark directory if true. Mark file if false. Align to 8 bytes. */ UniqueHash hashUnique; /* 32-byte unique hash for path. Lack of completeness. */ } ChainedItem; /* Use version in table template. */ // typedef struct { /* Free bit structure. */ // uint64_t position; /* Position of bit. */ // void *nextFreeBit; /* Next free bit. */ // } FreeBit; /** Classes. **/ class HashTable { private: Bitmap *bitmapChainedItems; /* Bitmap for chained items. */ std::mutex mutexBitmapChainedItems; /* Mutex for bitmap for chained items. */ HashItem *itemsHash; /* Hash items of hash table. */ ChainedItem *itemsChained; /* Chained items of hash table. */ FreeBit *headFreeBit; /* Head free bit in the chain. */ public: static void getAddressHash(const char *buf, uint64_t len, AddressHash *hashAddress); /* Get address hash of specific string. */ static AddressHash getAddressHash(UniqueHash *hashUnique); /* Get address hash by unique hash. */ static void getUniqueHash(const char *buf, uint64_t len, UniqueHash *hashUnique); /* Get unique hash of specific string. */ uint64_t sizeBufferUsed; /* Size of used bytes in buffer. */ bool get(const char *path, uint64_t *indexMeta, bool *isDirectory); /* Get a chained item. */ bool get(UniqueHash *hashUnique, uint64_t *indexMeta, bool *isDirectory); /* Get a chained item by hash. */ bool put(const char *path, uint64_t indexMeta, bool isDirectory); /* Put a chained item. */ bool put(UniqueHash *hashUnique, uint64_t indexMeta, bool isDirectory); /* Put a chained item by hash. */ bool del(const char *path); /* Delete a hash item. */ bool del(UniqueHash *hashUnique); /* Delete a hash item by hash. */ uint64_t getSavedHashItemsCount(); /* Get saved hash items count. */ uint64_t getSavedChainedItemsCount(); /* Get saved chained items count. */ uint64_t getTotalHashItemsCount(); /* Get total hash items count. */ uint64_t getTotalChainedItemsCount(); /* Get total chained items count. */ uint64_t getMaxLengthOfChain(); /* Get max length of chain. */ HashTable(char *buffer, uint64_t count); /* Constructor of hash table. */ ~HashTable(); /* Destructor of hash table. */ }; /** Redundance check. **/ #endif

storage.hpp

这一头文件描述的是Octopus中数据的存储结构。 带个人注释的代码如下所示:

/*** Storage header. ***/ /** Version 2 + functional model modifications. **/ /** Redundance check. **/ #ifndef STORAGE_HEADER #define STORAGE_HEADER /** Included files. **/ #include <stdint.h> /* Standard integers. E.g. uint16_t */ #include "hashtable.hpp" /* Hash table class. */ #include "table.hpp" /* Table template. */ #include "global.h" /* Block 被定义为字节(char型)数组, BLOCK_SIZE 为 1MB */ typedef struct /* Block structure. */ { char bytes[BLOCK_SIZE]; /* Raw data. */ } Block; /* Currently here is no back pointer in the block structure, which means consistency might be weak. If a meta is removed but the related blocks are not, then it will cost a lot of time to scan all files to determine which blocks need to be removed (E.g. rebuild a new bitmap from meta to represent latest blocks information and then compare it with the original block bitmap and fix). Besides, there is no checksum here, data correctness cannot be determined. */ /* 存储范式: * 1. 节点数(用于调整各数据段起始地址); * 2. Buffer使用数量; * 3. 哈希表; * 4. 文件元数据组; * 5. 目录元数据组; * 6. 内存块首地址(存放文件数据及日志?) * */ class Storage { private: uint64_t countNode; /* Count of nodes. */ public: uint64_t sizeBufferUsed; /* Size of used bytes in buffer. */ HashTable *hashtable; /* Hash table. */ Table<FileMeta> *tableFileMeta; /* File meta table. */ Table<DirectoryMeta> *tableDirectoryMeta; /* Directory meta table. */ Table<Block> *tableBlock; /* Block table. */ NodeHash getNodeHash(UniqueHash *hashUnique); /* Get node hash by unique hash. */ //NodeHash getNodeHash(const char *buffer); /* Get node hash. */ Storage(char *buffer, char *bufferBlock, uint64_t countFile, uint64_t countDirectory, uint64_t countBlock, uint64_t countNode); /* Constructor. */ ~Storage(); /* Deconstructor. */ }; /** Redundance check. **/ #endif

filesystem.hpp

这一头文件介绍了文件系统的基本原型。 这里的代码感觉实现得不好,主要是整体很乱,主要API接口与功能函数被掺在一起,文件系统框架结构与辅助结构也交织难以区分,可读性不高。 添加个人注释的代码如下所示:

/*** File system header. ***/ /** Version 2 + modifications for functional model. **/ /** Redundance check. **/ #ifndef FILESYSTEM_HEADER #define FILESYSTEM_HEADER /** Included files. **/ #include <stdint.h> /* Standard integers. E.g. uint16_t */ #include <stdlib.h> /* Standard library. */ #include <string.h> /* String operations. */ #include <stdio.h> /* Standard I/O. */ #include <time.h> /* Time functions. */ #include <mutex> /* Mutex functions. */ #include "storage.hpp" /* Storage class, definition of node hash and and hash functions. */ #include "debug.hpp" /* Debug class. */ #include "global.h" /* Global header. */ #include "lock.h" /** Classes. **/ /* 文件系统类定义: * 第一部分,属性 * 1. 存储范式; * 2. 本地哈希节点; * 3. 锁服务; * 4. 地址哈希表; * 第二部分,功能 * 1. 检查是否为本地哈希 * 2. 获取父目录 * 3. 获取文件名 * 4. 发送信息(需要发送缓冲区、接收缓冲区信息及消息长度) * 5. 填充文件位置信息 * 6. 初始化根(哈希)节点 * 7. 添加目录元数据 * 8. 删除目录元数据 * 9. 更新目录元数据 * 10. 生成节点(多种情况) * 11. 获取属性 * 12. 可达性检测 * 13. 新建目录 * 14. 读取指定目录 * 15. 递归读取目录 * 16. 读取目录元数据 * 17. 范围读取 * 18. 范围写入 * 19. 更新元数据 * 20. 删除(文件) * 21. 空闲块检测 * 22. 删除目录 * 23. 重命名 * 24. 写哈希项锁/解锁 * 25. 读哈希项锁/解锁 * 26. 更新远程元数据 */ class FileSystem { private: Storage *storage; /* Storage. */ NodeHash hashLocalNode; /* Local node hash. */ LockService *lock; uint64_t addressHashTable; bool checkLocal(NodeHash hashNode); /* Check if node hash is local. */ bool getParentDirectory(const char *path, char *parent); /* Get parent directory. */ bool getNameFromPath(const char *path, char *name); /* Get file name from path. */ bool sendMessage(NodeHash hashNode, void *bufferSend, uint64_t lengthSend, /* Send message. */ void *bufferReceive, uint64_t lengthReceive); void fillFilePositionInformation(uint64_t size, uint64_t offset, file_pos_info *fpi, FileMeta *metaFile); /* Fill file position information for read and write. */ public: void rootInitialize(NodeHash LocalNode); /* Internal functions. No parameter check. Must be called by message handler or functions in this class. */ bool addMetaToDirectory(const char *path, const char *name, bool isDirectory, uint64_t *TxID, uint64_t *srcBuffer, uint64_t *desBuffer, uint64_t *size, uint64_t *key, uint64_t *offset); /* Internal add meta to directory function. Might cause overhead. */ bool removeMetaFromDirectory(const char *path, const char *name, uint64_t *TxID, uint64_t *srcBuffer, uint64_t *desBuffer, uint64_t *size, uint64_t *key, uint64_t *offset); /* Internal remove meta from directory function. Might cause overhead. */ bool updateDirectoryMeta(const char *path, uint64_t TxID, uint64_t srcBuffer, uint64_t desBuffer, uint64_t size, uint64_t key, uint64_t offset); bool mknodWithMeta(const char *path, FileMeta *metaFile); /* Make node (file) with file meta. */ /* External functions. */ void parseMessage(char *bufferRequest, char *bufferResponse); /* Parse message. */ bool mknod(const char *path); /* Make node (file). */ bool mknod2pc(const char *path); bool mknodcd(const char *path); bool getattr(const char *path, FileMeta *attribute); /* Get attributes. */ bool access(const char *path); /* Check accessibility. */ bool mkdir(const char *path); /* Make directory. */ bool mkdir2pc(const char *path); bool mkdircd(const char *path); bool readdir(const char *path, nrfsfilelist *list); /* Read directory. */ bool recursivereaddir(const char *path, int depth); bool readDirectoryMeta(const char *path, DirectoryMeta *meta, uint64_t *hashAddress, uint64_t *metaAddress, uint16_t *parentNodeID); bool extentRead(const char *path, uint64_t size, uint64_t offset, file_pos_info *fpi, uint64_t *key_offset, uint64_t *key); /* Allocate read extent. */ bool extentReadEnd(uint64_t key, char* path); bool extentWrite(const char *path, uint64_t size, uint64_t offset, file_pos_info *fpi, uint64_t *key_offset, uint64_t *key); /* Allocate write extent. Unlock is implemented in updateMeta. */ bool updateMeta(const char *path, FileMeta *metaFile, uint64_t key); /* Update meta. Only unlock path due to lock in extentWrite. */ bool truncate(const char *path, uint64_t size); /* Truncate. */ bool remove(const char *path, FileMeta *metaFile); /* Remove file or empty directory. */ bool remove2pc(const char *path, FileMeta *metaFile); bool removecd(const char *path, FileMeta *metaFile); bool blockFree(uint64_t startBlock, uint64_t countBlock); bool rmdir(const char *path); /* Remove directory. */ bool rename(const char *pathOld, const char *pathNew); /* Rename file. */ uint64_t lockWriteHashItem(NodeHash hashNode, AddressHash hashAddressIndex); /* Lock hash item for write. */ void unlockWriteHashItem(uint64_t key, NodeHash hashNode, AddressHash hashAddressIndex); /* Unlock hash item. */ uint64_t lockReadHashItem(NodeHash hashNode, AddressHash hashAddressIndex); /* Lock hash item for read. */ void unlockReadHashItem(uint64_t key, NodeHash hashNode, AddressHash hashAddressIndex); /* Unlock hash item. */ void updateRemoteMeta(uint16_t parentNodeID, DirectoryMeta *meta, uint64_t parentMetaAddress, uint64_t parentHashAddress); FileSystem(char *buffer, char *bufferBlock, uint64_t countFile, /* Constructor of file system. */ uint64_t countDirectory, uint64_t countBlock, uint64_t countNode, NodeHash hashLocalNode); ~FileSystem(); /* Destructor of file system. */ }; /** Redundance check. **/ #endif

TxManager.hpp

这一头文件介绍了事务管理器的基本数据结构组成和功能,主要包括本地日志项结构和分布式日志项结构。这一部分代码相对简单,如下所示。

#ifndef TXMANAGER_HEADER #define TXMANAGER_HEADER #include <stdint.h> #include <mutex> #include <string.h> using namespace std; #define CACHELINE_SIZE (64) // 强制刷出数据到pm,以保证持久性和顺序一致性 #define _mm_clflush(addr)\ asm volatile("clflush %0" : "+m" (*(volatile char *)(addr))) typedef struct { uint64_t TxID; bool begin; bool prepare; bool commit; } __attribute__((packed)) DistributedLogEntry; typedef struct { uint64_t TxID; bool begin; char logData[4086]; bool commit; } __attribute__((packed)) LocalLogEntry; class TxManager { private: uint64_t LocalLogAddress; uint64_t DistributedLogAddress; uint64_t LocalLogIndex; uint64_t DistributedLogIndex; mutex LocalMutex; mutex DisMutex; public: TxManager(uint64_t LocalLogAddress, uint64_t DistributedLogAddress); ~TxManager(); void FlushData(uint64_t address, uint64_t size); uint64_t TxLocalBegin(); void TxWriteData(uint64_t TxID, uint64_t address, uint64_t size); uint64_t getTxWriteDataAddress(uint64_t txID); void TxLocalCommit(uint64_t TxID, bool action); uint64_t TxDistributedBegin(); void TxDistributedPrepare(uint64_t TxID, bool action); void TxDistributedCommit(uint64_t TxID, bool action); }; #endif

RdmaSocket.hpp

这一头文件定义了RDMA通信需要用到的许多变量和函数。由于我本人对RDMA通信还不太熟悉,所以这一块暂且不加详细描述,后面学习完毕后会再补充回来。想提的一点是RDMA通信这一块作为科研的部分价值不是很高,因为它采用的是固有的通信方式和协议,我们能够优化的地方不多(batch read / write等)。代码如下所示:

/*********************************************************************** * * * Tsinghua Univ, 2016 * ***********************************************************************/ #ifndef RDMASOCKET_HEADER #define RDMASOCKET_HEADER #include <infiniband/verbs.h> #include <sys/socket.h> #include <unordered_map> #include <stdlib.h> #include <unistd.h> #include <time.h> #include <netdb.h> #include <errno.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <string> #include <thread> #include <stdint.h> #include <assert.h> #include "Configuration.hpp" #include "debug.hpp" #include "global.h" /* * Important global information. */ #define MAX_POST_LIST 24 #define QPS_MAX_DEPTH 128 #define SIGNAL_BATCH 31 #define WORKER_NUMBER 2 #define QP_NUMBER (1 + WORKER_NUMBER) /* Important information of node-to-node connection */ typedef struct { struct ibv_qp *qp[QP_NUMBER]; struct ibv_cq *cq; uint32_t qpNum[QP_NUMBER]; uint64_t RegisteredMemory; uint32_t rkey; uint16_t lid; uint8_t gid[16]; int sock; uint16_t NodeID; uint64_t counter = 0; } PeerSockData; typedef struct { uint16_t NodeID; bool isServer; uint16_t GivenID; } ExchangeID; typedef struct { uint32_t rkey; uint32_t qpNum[QP_NUMBER]; uint16_t lid; uint8_t gid[16]; uint64_t RegisteredMemory; uint16_t NodeID; } ExchangeMeta; typedef struct { bool OpType; /* false - Read, true - Write */ uint16_t NodeID; uint64_t size; uint64_t bufferSend; uint64_t bufferReceive; } TransferTask; class RdmaSocket { private: // unordered_map<uint16_t, PeerSockData*> peers; PeerSockData* peers[1000]; char *DeviceName; uint32_t Port; /* Used by RDMA */ uint32_t ServerPort; /* Used by socket for data exchange */ int GidIndex; struct ibv_port_attr PortAttribute; bool isRunning; /* Indicate the state of system */ bool isServer; struct ibv_context *ctx; /* device handle */ struct ibv_pd *pd; /* Protection Domain handler */ struct ibv_cq **cq; /* Completion Queue */ int cqNum; /* Number of created CQ */ int cqPtr; /* Indicate the next cq entry, initialized as 0. */ uint64_t mm; /* Local Memory Region base address. */ uint64_t mmSize; /* Local Memory Region size. */ Configuration *conf; /* System Configuration. */ uint16_t MyNodeID; /* My NodeID, used for identification. */ uint16_t MaxNodeID; /* Max NodeID for client */ thread Listener; /* Wait for client connection */ uint8_t Mode; /* RC-0, UC-1, UD-2 */ int ServerCount; /* The total number of servers */ Queue<TransferTask *> queue[WORKER_NUMBER];/* Used for Data transfer. */ uint16_t TransferSignal; /* Used to notify compeletion of data transfer. */ thread worker[WORKER_NUMBER]; /* Performance Checker. */ uint64_t WriteSize[WORKER_NUMBER]; uint64_t ReadSize[WORKER_NUMBER]; uint64_t WriteTimeCost[WORKER_NUMBER]; uint64_t ReadTimeCost[WORKER_NUMBER]; bool WriteTest; bool CreateResources(); bool CreateQueuePair(PeerSockData *peer, int MaxWr); bool ModifyQPtoInit(struct ibv_qp *qp); bool ModifyQPtoRTR(struct ibv_qp *qp, uint32_t remote_qpn, uint16_t dlid, uint8_t *dgid); bool ModifyQPtoRTS(struct ibv_qp *qp); bool ConnectQueuePair(PeerSockData *peer); int DataSyncwithSocket(int sock, int size, char *LocalData, char *RemoteData); bool ResourcesDestroy(); void RdmaAccept(int fd); int SocketConnect(uint16_t NodeID); void ServerConnect(); bool DataTransferWorker(int id); public: struct ibv_mr *mr; /* Memory registration handler */ RdmaSocket(int _cqNum, uint64_t _mm, uint64_t _mmSize, Configuration* _conf, bool isServer, uint8_t Mode); ~RdmaSocket(); /* Called by server side to accept the connection of clients. */ void RdmaListen(); /* Called by client side to connect to each of server actively. */ void RdmaConnect(); /* Called to check completion of RDMA operations */ int PollCompletion(uint16_t NodeID, int PollNumber, struct ibv_wc *wc); int PollWithCQ(int cqPtr, int PollNumber, struct ibv_wc *wc); int PollOnce(int cqPtr, int PollNumber, struct ibv_wc *wc); /* Used for synchronization based on socket communication */ void SyncTool(uint16_t NodeID); int getCQCount(); uint16_t getNodeID(); void WaitClientConnection(uint16_t NodeID); void RdmaQueryQueuePair(uint16_t NodeID); void NotifyPerformance(); PeerSockData* getPeerInformation(uint16_t NodeID); /** *RdmaSend - Send data with RDMA_SEND *@param NodeID, Node ID where to write the data. *@param SourceBuffer, Local *absolute* address that keep the sending data. *@param BufferSize, Size of data to be sent. *return true on success, false on error. **/ bool RdmaSend(uint16_t NodeID, uint64_t SourceBuffer, uint64_t BufferSize); bool _RdmaBatchSend(uint16_t NodeID, uint64_t SourceBuffer, uint64_t BufferSize, int BatchSize); /** *RdmaReceive - Receive the remote data with RDMA_RECV *@param NodeID, Node ID from which receive the data. *@param SourceBuffer, Local *absolute* address that keep the remote data. *@param BufferSize, Size of data to be receive. *return true on success, false on error. **/ bool RdmaReceive(uint16_t NodeID, uint64_t SourceBuffer, uint64_t BufferSize); bool _RdmaBatchReceive(uint16_t NodeID, uint64_t SourceBuffer, uint64_t BufferSize, int BatchSize); /** *RdmaRead - Read data with RDMA_READ *@param NodeID, Node ID where to read the data. *@param SourceBuffer, Local *absolute* address that keep the read data. *@param DesBuffer, Remote *relative* address where to read. *@param BufferSize, Size of data to be read. *return true on success, false on error. **/ bool RdmaRead(uint16_t NodeID, uint64_t SourceBuffer, uint64_t DesBuffer, uint64_t BufferSize, int TaskID); bool _RdmaBatchRead(uint16_t NodeID, uint64_t SourceBuffer, uint64_t DesBuffer, uint64_t BufferSize, int BatchSize); bool RemoteRead(uint64_t bufferSend, uint16_t NodeID, uint64_t bufferReceive, uint64_t size); bool InboundHamal(int TaskID, uint64_t bufferSend, uint16_t NodeID, uint64_t bufferReceive, uint64_t size); /** *RdmaWrite - WRITE data with RDMA_WRITE *@param NodeID, Node ID where to write the data. *@param SourceBuffer, Local *absolute* address that keep the sending data. *@param DesBuffer, Remote *relative* address to receive the data. *@param BufferSize, Size of data to be sent. *@param imm, -1 - RDMA_WRITE, otherwise, - RDMA_WRITE_WITH_IMM *return true on success, false on error. **/ bool RdmaWrite(uint16_t NodeID, uint64_t SourceBuffer, uint64_t DesBuffer, uint64_t BufferSize, uint32_t imm, int TaskID); bool _RdmaBatchWrite(uint16_t NodeID, uint64_t SourceBuffer, uint64_t DesBuffer, uint64_t BufferSize, uint32_t imm, int BatchSize); bool RemoteWrite(uint64_t bufferSend, uint16_t NodeID, uint64_t bufferReceive, uint64_t size); bool OutboundHamal(int TaskID, uint64_t bufferSend, uint16_t NodeID, uint64_t bufferReceive, uint64_t size); /** *RdmaFetchAndAdd - Fetch data from DesBuffer to SourceBuffer, and add with "Add" remotely. *@param NodeID, Node ID where to write the data. *@param SourceBuffer, Local *relative* address that keep the fetching data. *@param DesBuffer, Remote *relative* address to add atomically. *@param Add, A 64-bits number add to the DesBuffer. *return true on success, false on error. **/ bool RdmaFetchAndAdd(uint16_t NodeID, uint64_t SourceBuffer, uint64_t DesBuffer, uint64_t Add); /** *RdmaCompareAndSwap - Compare data at DesBuffer and value of "Compare", swap with "Swap" if not equal. *@param NodeID, Node ID where to write the data. *@param SourceBuffer, Local *relative* address that keep the fetching data. *@param DesBuffer, Remote *relative* address to compare&swap atomically. *@param Compare, A 64-bits number used to compare. *@param Swap, A 64-bits number used to swap. *return true on success, false on error. **/ bool RdmaCompareAndSwap(uint16_t NodeID, uint64_t SourceBuffer, uint64_t DesBuffer, uint64_t Compare, uint64_t Swap); }; #endif

RPCClient.hpp

这一头文件定义了RPC客户端结构,它包括几个重要属性:配置描述,socket实例,内存管理器实例,Server标志,任务ID。

这一部分的代码我认为Server标志是很诡异的东西,既然作为一个Client类,它怎会又可能成为一个Server呢?但是别不信,Octopus的实际实现里,一个节点确实可能既是Client又是Server。代码如下所示。

#ifndef RPCCLINET_HREADER #define RPCCLINET_HREADER #include <thread> #include "RdmaSocket.hpp" #include "Configuration.hpp" #include "mempool.hpp" #include "global.h" using namespace std; class RPCClient { private: Configuration *conf; RdmaSocket *socket; MemoryManager *mem; bool isServer; uint32_t taskID; public: uint64_t mm; RPCClient(Configuration *conf, RdmaSocket *socket, MemoryManager *mem, uint64_t mm); RPCClient(); ~RPCClient(); RdmaSocket* getRdmaSocketInstance(); Configuration* getConfInstance(); bool RdmaCall(uint16_t DesNodeID, char *bufferSend, uint64_t lengthSend, char *bufferReceive, uint64_t lengthReceive); uint64_t ContractSendBuffer(GeneralSendBuffer *send); }; #endif

RPCServer.hpp

这一头文件定义了RPC服务端结构,它的主要属性包括:worker线程,socket实例,内存管理器实例,事务管理器实例,RPC客户端实例,Server数量,文件系统实例,线程-ID映射对。

这一部分令我觉得不太舒服的地方主要是RPC客户端实例与文件系统实例的引入,虽然我知道之后服务端之后会在需要的时候调用这两个实例中的函数,但这种设计会破坏整体感,设计显得非常杂糅。

#ifndef RPCSERVER_HREADER #define RPCSERVER_HREADER #include <thread> #include <unordered_map> #include <vector> #include "RdmaSocket.hpp" #include "Configuration.hpp" #include "RPCClient.hpp" #include "mempool.hpp" #include "global.h" #include "filesystem.hpp" #include "TxManager.hpp" using namespace std; typedef unordered_map<uint32_t, int> Thread2ID; typedef struct { uint64_t send; uint16_t NodeID; uint16_t offset; } RPCTask; class RPCServer { private: thread *wk; Configuration *conf; RdmaSocket *socket; MemoryManager *mem; uint64_t mm; TxManager *tx; RPCClient *client; int ServerCount; FileSystem *fs; int cqSize; Thread2ID th2id; vector<RPCTask*> tasks; bool UnlockWait; void Worker(int id); void ProcessRequest(GeneralSendBuffer *send, uint16_t NodeID, uint16_t offset); void ProcessQueueRequest(); public: RPCServer(int cqSize); RdmaSocket* getRdmaSocketInstance(); MemoryManager* getMemoryManagerInstance(); RPCClient* getRPCClientInstance(); TxManager* getTxManagerInstance(); uint64_t ContractReceiveBuffer(GeneralSendBuffer *send, GeneralReceiveBuffer *recv); void RequestPoller(int id); int getIDbyTID(); ~RPCServer(); }; #endif

nrfs.h

这一头文件看起来像是定义了一批接口,目前看起来用处不大,如果后面有新发现我会回来补充。 代码如下所示:

/**************************************************************** * * * * g++ -o nrfs nrfs.cpp test.cpp -std=c++14 -lpthread * ****************************************************************/ #ifndef NRFS_LIB_H #define NRFS_LIB_H #include <iostream> #include <stdio.h> #include <unistd.h> #include <stdint.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/shm.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <fcntl.h> #include "common.hpp" using namespace std; /** *nrfsConnect - Connect to a nrfs file system *@param host A string containing a ip address of native machine, *if you want to connect to local filesystem, host should be passed as "default". *@param port default is 10086. *@param size shared memory size **/ nrfs nrfsConnect(const char* host, int port, int size); /** *nrfsDisconnect - Disconnect to a nrfs file system *@param fs the configured filesystem handle. *return 0 on success, -1 on error. **/ int nrfsDisconnect(nrfs fs); /** *nrfsOpenFile - Open a nrfs file. * @param fs The configured filesystem handle. * @param path The full path to the file. * @param flags - an | of bits - supported flags are O_RDONLY, O_WRONLY, O_RDWR... * @return Returns the handle to the open file or NULL on error. **/ nrfsFile nrfsOpenFile(nrfs fs, const char* path, int flags); /** *nrfsCloseFile - Close an open file. * @param fs The configured filesystem handle. * @param path The full path to the file. * @return Returns 0 on success, -1 on error. **/ int nrfsCloseFile(nrfs fs, nrfsFile file); /** *nrfsMknod - create a file. * @param fs The configured filesystem handle. * @param path The full path to the file. * @return Returns 0 on success, -1 on error. **/ int nrfsMknod(nrfs fs, const char* path); /** *nrfsAccess - access a file. * @param fs The configured filesystem handle. * @param path The full path to the file. * @return Returns 0 on success, -1 on error. **/ int nrfsAccess(nrfs fs, const char* path); /** *nrfsGetAttribute - Get the attribute of the file. * @param fs The configured filesystem handle. * @param path The full path to the file. * @return Returns 0 on success, -1 on error. **/ int nrfsGetAttribute(nrfs fs, nrfsFile file, FileMeta *attr); /** *nrfsWrite - Write data into an open file. * @param fs The configured filesystem handle. * @param file The file handle. * @param buffer The data. * @param size The no. of bytes to write. * @param offset The offset of the file where to write. * @return Returns the number of bytes written, -1 on error. **/ int nrfsWrite(nrfs fs, nrfsFile file, const void* buffer, uint64_t size, uint64_t offset); /** *nrfsRead - Read data into an open file. * @param fs The configured filesystem handle. * @param file The file handle. * @param buffer The buffer to copy read bytes into. * @param size The no. of bytes to read. * @param offset The offset of the file where to read. * @return Returns the number of bytes actually read, -1 on error. **/ int nrfsRead(nrfs fs, nrfsFile file, void* buffer, uint64_t size, uint64_t offset); /** * nrfsCreateDirectory - Make the given file and all non-existent * parents into directories. * @param fs The configured filesystem handle. * @param path The path of the directory. * @return Returns 0 on success, -1 on error. */ int nrfsCreateDirectory(nrfs fs, const char* path); /** * nrfsDelete - Delete file. * @param fs The configured filesystem handle. * @param path The path of the directory. * @return Returns 0 on success, -1 on error. */ int nrfsDelete(nrfs fs, const char* path); int nrfsFreeBlock(uint16_t node_id, uint64_t startBlock, uint64_t countBlock); /** * nrfsRename - Rename file. * @param fs The configured filesystem handle. * @param oldPath The path of the source file. * @param newPath The path of the destination file. * @return Returns 0 on success, -1 on error. */ int nrfsRename(nrfs fs, const char* oldpath, const char* newpath); /** * nrfsListDirectory - Get list of files/directories for a given * directory-path. * @param fs The configured filesystem handle. * @param path The path of the directory. * @return Returns the number of the entries or -1 if the path not exsit. */ int nrfsListDirectory(nrfs fs, const char* path, nrfsfilelist *list); /** * for performance test */ int nrfsTest(nrfs fs, int offset); /** * Ignore these two function. **/ int nrfsRawWrite(nrfs fs, nrfsFile file, const void* buffer, uint64_t size, uint64_t offset); int nrfsRawRead(nrfs fs, nrfsFile file, void* buffer, uint64_t size, uint64_t offset); int nrfsRawRPC(nrfs fs); #endif

libnrfs.h

这一头文件看起来像前一个介绍的头文件的复制。 估计编程者在这里没有做好代码整理的工作。

#include "common.hpp" /** *nrfsConnect - Connect to a nrfs file system *@param host A string containing a ip address of native machine, *if you want to connect to local filesystem, host should be passed as "default". *@param port default is 10086. *@param size shared memory size **/ nrfs libnrfsConnect(const char* host, int port, int size); /** *nrfsDisconnect - Disconnect to a nrfs file system *@param fs the configured filesystem handle. *return 0 on success, -1 on error. **/ int libnrfsDisconnect(nrfs fs); /** *nrfsOpenFile - Open a nrfs file. * @param fs The configured filesystem handle. * @param path The full path to the file. * @param flags - an | of bits - supported flags are O_RDONLY, O_WRONLY, O_RDWR... * @return Returns the handle to the open file or NULL on error. **/ nrfsFile libnrfsOpenFile(nrfs fs, const char* path, int flags); /** *nrfsCloseFile - Close an open file. * @param fs The configured filesystem handle. * @param path The full path to the file. * @return Returns 0 on success, -1 on error. **/ int libnrfsCloseFile(nrfs fs, nrfsFile file); /** *nrfsMknod - create a file. * @param fs The configured filesystem handle. * @param path The full path to the file. * @return Returns 0 on success, -1 on error. **/ int libnrfsMknod(nrfs fs, const char* path); /** *nrfsAccess - access a file. * @param fs The configured filesystem handle. * @param path The full path to the file. * @return Returns 0 on success, -1 on error. **/ int libnrfsAccess(nrfs fs, const char* path); /** *nrfsGetAttribute - Get the attribute of the file. * @param fs The configured filesystem handle. * @param path The full path to the file. * @return Returns 0 on success, -1 on error. **/ int libnrfsGetAttribute(nrfs fs, nrfsFile file, FileMeta *attr); /** *nrfsWrite - Write data into an open file. * @param fs The configured filesystem handle. * @param file The file handle. * @param buffer The data. * @param size The no. of bytes to write. * @param offset The offset of the file where to write. * @return Returns the number of bytes written, -1 on error. **/ int libnrfsWrite(nrfs fs, nrfsFile file, const void* buffer, uint64_t size, uint64_t offset); /** *nrfsRead - Read data into an open file. * @param fs The configured filesystem handle. * @param file The file handle. * @param buffer The buffer to copy read bytes into. * @param size The no. of bytes to read. * @param offset The offset of the file where to read. * @return Returns the number of bytes actually read, -1 on error. **/ int libnrfsRead(nrfs fs, nrfsFile file, void* buffer, uint64_t size, uint64_t offset); /** * nrfsCreateDirectory - Make the given file and all non-existent * parents into directories. * @param fs The configured filesystem handle. * @param path The path of the directory. * @return Returns 0 on success, -1 on error. */ int libnrfsCreateDirectory(nrfs fs, const char* path); /** * nrfsDelete - Delete file. * @param fs The configured filesystem handle. * @param path The path of the directory. * @return Returns 0 on success, -1 on error. */ int libnrfsDelete(nrfs fs, const char* path); /** * nrfsRename - Rename file. * @param fs The configured filesystem handle. * @param oldPath The path of the source file. * @param newPath The path of the destination file. * @return Returns 0 on success, -1 on error. */ int libnrfsRename(nrfs fs, const char* oldpath, const char* newpath); /** * nrfsListDirectory - Get list of files/directories for a given * directory-path. * @param fs The configured filesystem handle. * @param path The path of the directory. * @return Returns the number of the entries or -1 if the path not exsit. */ int libnrfsListDirectory(nrfs fs, const char* path, nrfsfilelist *list); /** * for performance test */ int libnrfsTest(nrfs fs, int offset);

src 源文件

(内容请见系列下一篇博客)

文件依赖关系分析

(内容请见系列下一篇博客)

函数依赖关系分析

(内容请见系列下一篇博客)

典型工作流程

(内容请见系列下一篇博客)

转载请注明原文地址: https://www.6miu.com/read-1750194.html

最新回复(0)