GraphLite github地址 https://github.com/schencoding/GraphLite
图很适合进行分布式并行计算,比如最短路径,PageRank等问题,比较著名的图计算框架有Prege,cmu的GraphLab,apache的Giraph等。GraphLite属于BSP模型。
BSP模型如下图:
也就是超步计算,apache giraph也属于此模式。
图着色在单机环境下的算法,最快一般是贪心算法,也就是每次去找不相邻的节点去着色,直到全部完成。我们在分布式并行计算环境下也要用贪心算法,每次找到不相邻的所有节点同时着色,在4个超步内完成一次着色,第一步根据出度的大小选择哪些节点可能要被着色,第二步处理冲突,第三步删除被选中节点和邻居节点之间的边,第四步给选中的节点染色。
#include <stdio.h> #include <string.h> #include <math.h> #include <list> #include "GraphLite.h" #define VERTEX_CLASS_NAME(name) GraphColorVertex##name #define NUM_VERTICES_COLORED 0 #define NUM_VERTICES_UNKNOWN 1 #define NUM_VERTICES_IN_SET 2 #define NUM_VERTICES_NOT_IN_SET 3 #define NUM_VERTICES_TENTATIVELY_IN_SET 4 enum Phase { LOTTERY, CONFLICT_RESOLUTION, EDGE_CLEANING, COLOR_ASSIGNMENT }; enum State { UNKNOWN, TENTATIVELY_IN_SET, NOT_IN_SET, IN_SET }; enum MessageType { WANTS_TO_BE_IN_SET, IS_IN_SET }; struct ColorVertex { long long id; int color; int isColored; enum State state; int isDeleted; int edgeNums; Phase phase; //list<long long> target; }; struct Message { long long senderId; enum MessageType message; }; long long V0; Phase phase; int colorId = 1; class VERTEX_CLASS_NAME(InputFormatter): public InputFormatter { public: int64_t getVertexNum() { unsigned long long n; sscanf(m_ptotal_vertex_line, "%lld", &n); m_total_vertex= n; return m_total_vertex; } int64_t getEdgeNum() { unsigned long long n; sscanf(m_ptotal_edge_line, "%lld", &n); m_total_edge= n; return m_total_edge; } int getVertexValueSize() { m_n_value_size = sizeof(ColorVertex); return m_n_value_size; } int getEdgeValueSize() { m_e_value_size = sizeof(int); return m_e_value_size; } int getMessageValueSize() { m_m_value_size = sizeof(struct Message); return m_m_value_size; } void loadGraph() { unsigned long long last_vertex; unsigned long long from; unsigned long long to; int weight = 0; ColorVertex vertex; int outdegree = 0; const char *line= getEdgeLine(); // Note: modify this if an edge weight is to be read // modify the 'weight' variable sscanf(line, "%lld %lld", &from, &to); addEdge(from, to, &weight); vertex.color = -1; vertex.isDeleted = 0; last_vertex = from; ++outdegree; for (int64_t i = 1; i < m_total_edge; ++i) { line= getEdgeLine(); // Note: modify this if an edge weight is to be read // modify the 'weight' variable sscanf(line, "%lld %lld", &from, &to); if (last_vertex != from) { addVertex(last_vertex, &vertex, outdegree); last_vertex = from; outdegree = 1; } else { ++outdegree; } addEdge(from, to, &weight); } addVertex(last_vertex, &vertex, outdegree); } }; class VERTEX_CLASS_NAME(OutputFormatter): public OutputFormatter { public: void writeResult() { int64_t vid; ColorVertex value; char s[1024]; for (ResultIterator r_iter; ! r_iter.done(); r_iter.next() ) { r_iter.getIdValue(vid, &value); int n = sprintf(s, "%lld: %d\n", (unsigned long long)vid, value.color); writeNextResLine(s, n); } } }; // An aggregator that records a double value tom compute sum class VERTEX_CLASS_NAME(Aggregator): public Aggregator<long long> { public: void init() { m_global = 0; m_local = 0; } void* getGlobal() { return &m_global; } void setGlobal(const void* p) { m_global = * (long long *)p; } void* getLocal() { return &m_local; } void merge(const void* p) { m_global += * (long long *)p; } void accumulate(const void* p) { m_local += * (long long *)p; } }; class VERTEX_CLASS_NAME(): public Vertex <ColorVertex, int, Message> { public: void compute(MessageIterator* pmsgs) { long long acc = 1; ColorVertex tmp = getValue(); Message msg; long long myId = getVertexId(); long long minId = myId; int numNeighborsMovedIntoSet = 0; double random; double probability; //list<long long>::iterator it; //OutEdgeIterator iter = getOutEdgeIterator(); //printf("fcefef\n"); if(getSuperstep() == 0) { tmp.id = getVertexId(); tmp.phase = LOTTERY; if(tmp.id == V0) { tmp.color = 0; tmp.isColored = 1; tmp.state = IN_SET; tmp.edgeNums = getOutEdgeIterator().size(); } else { tmp.color = -1; tmp.isColored = 0; tmp.state = UNKNOWN; tmp.edgeNums = getOutEdgeIterator().size(); } } else { if (getValue().isColored) { voteToHalt(); return; } State state = getValue().state; switch (tmp.phase) { case LOTTERY: printf("lottery\n"); switch (state) { case UNKNOWN: srand((unsigned)time(NULL)); random = (double)rand()/RAND_MAX/10000; if (tmp.edgeNums == 0) { tmp.state = IN_SET; } else if (random * tmp.edgeNums <= 1.0) { tmp.state = TENTATIVELY_IN_SET; msg.senderId = getVertexId(); msg.message = WANTS_TO_BE_IN_SET; sendMessageToAllNeighbors(msg); } break; default: break; } tmp.phase = CONFLICT_RESOLUTION; break; case CONFLICT_RESOLUTION: switch (state) { case TENTATIVELY_IN_SET: if(!pmsgs->done()) { for(; ! pmsgs->done(); pmsgs->next()) { if(pmsgs->getValue().message == WANTS_TO_BE_IN_SET) { long long neighborId = pmsgs->getValue().senderId; if(neighborId < minId) { minId = neighborId; } } } if(minId == myId) { tmp.state = IN_SET; tmp.isDeleted = 1; msg.senderId = getVertexId(); msg.message = IS_IN_SET; sendMessageToAllNeighbors(msg); } else { tmp.state = UNKNOWN; } } else { tmp.state = IN_SET; tmp.isDeleted = 1; msg.senderId = getVertexId(); msg.message = IS_IN_SET; sendMessageToAllNeighbors(msg); } break; default: break; } tmp.phase = EDGE_CLEANING; break; case EDGE_CLEANING: for(; ! pmsgs->done(); pmsgs->next()) { if(pmsgs->getValue().message == IS_IN_SET) { //tmp.target.remove(pmsgs->getValue().senderId); ++numNeighborsMovedIntoSet; } } if(numNeighborsMovedIntoSet > 0) { tmp.edgeNums -= numNeighborsMovedIntoSet; tmp.state = NOT_IN_SET; } tmp.phase = COLOR_ASSIGNMENT; break; case COLOR_ASSIGNMENT: if(state == IN_SET) { tmp.color = getSuperstep()/4; tmp.isColored = 1; accumulateAggr(0, &acc); } else { tmp.state = UNKNOWN; } tmp.phase = LOTTERY; break; default: break; } switch (getValue().state) { case UNKNOWN: accumulateAggr(NUM_VERTICES_UNKNOWN, &acc); break; case TENTATIVELY_IN_SET: accumulateAggr(NUM_VERTICES_TENTATIVELY_IN_SET, &acc); break; case NOT_IN_SET: accumulateAggr(NUM_VERTICES_NOT_IN_SET, &acc); break; case IN_SET: accumulateAggr(NUM_VERTICES_IN_SET, &acc); break; default: break; } } *mutableValue() = tmp; } }; class VERTEX_CLASS_NAME(Graph): public Graph { public: VERTEX_CLASS_NAME(Aggregator)* aggregator; public: // argv[0]: PageRankVertex.so // argv[1]: <input path> // argv[2]: <output path> void init(int argc, char* argv[]) { setNumHosts(5); setHost(0, "localhost", 1411); setHost(1, "localhost", 1421); setHost(2, "localhost", 1431); setHost(3, "localhost", 1441); setHost(4, "localhost", 1451); if (argc < 3) { printf ("Usage: %s <input path> <output path>\n", argv[0]); exit(1); } m_pin_path = argv[1]; m_pout_path = argv[2]; phase = LOTTERY; aggregator = new VERTEX_CLASS_NAME(Aggregator)[5]; regNumAggr(5); for(int i = 0; i < 5; i++) regAggr(i, &aggregator[i]); } void term() { delete[] aggregator; } }; /* STOP: do not change the code below. */ extern "C" Graph* create_graph() { Graph* pgraph = new VERTEX_CLASS_NAME(Graph); pgraph->m_pin_formatter = new VERTEX_CLASS_NAME(InputFormatter); pgraph->m_pout_formatter = new VERTEX_CLASS_NAME(OutputFormatter); pgraph->m_pver_base = new VERTEX_CLASS_NAME(); return pgraph; } extern "C" void destroy_graph(Graph* pobject) { delete ( VERTEX_CLASS_NAME()* )(pobject->m_pver_base); delete ( VERTEX_CLASS_NAME(OutputFormatter)* )(pobject->m_pout_formatter); delete ( VERTEX_CLASS_NAME(InputFormatter)* )(pobject->m_pin_formatter); delete ( VERTEX_CLASS_NAME(Graph)* )pobject; }