Thrift使用入门-RPC服务

xiaoxiao2021-02-28  7

Thrift是一个跨语言的服务部署框架,除了提供性能优异的序列化/反序列化功能,还提供了RPC服务,相对于Protocol buffers,它支持的语言更加广泛。本文介绍使用Thrift的RPC实现client和server间的通信。对于Thrift的介绍和语法说明可以参见文献[1-2]。

本文通过定义的Thrift语法结构,并使用Thrift编译器自动生成的server端服务接口代码,以及client端桩代码,旨在实现:client通过调用server端的提供的服务接口,查询给定人员的city信息;client端请求的参数包括多个人员的ID和姓名;server端处理后通过预先定义的数据结构向client给出响应。

1.定义请求和响应的数据结构,以及服务接口

//InfoQuery.thrift namespace cpp com.test.infoquery //在cpp中,这是自动生成代码的命名空间 namespace java com.test.infoquery //在java中,这是自动生成代码的package名 struct RequestData { 1: required i32 id; 2: required string name; } //client端发送的请求参数结构 struct Instruction { 1: required string taskid; 2: required list<RequestData> requestData; } struct ResponseData { 1: required string name; 2: required string city; } //服务端响应后的 返回结构体 struct InstructionResult { 1: required string taskid; 2: list<ResponseData> responseData; } //定义服务 service WTWServ { /* *reqService:定义的服务名称,client通过调用该服务向service端发送请求 *Instruction:client发送请求,提供的请求参数 *InstructionResult:serve端响应的返回值 */ InstructionResult reqService(1: Instruction ins ); }

通过”thrift –gen ”将自动生成序列化以及RPC调用的客户端和服务端接口代码等。其中”WTWServ_server.skeleton.cpp”提供了简单的服务端代码,可以在其基础上修改已满足自定义的需求。

# thrift --gen cpp InfoQuery.thrift # tree gen-cpp/ gen-cpp/ ├── InfoQuery_constants.cpp ├── InfoQuery_constants.h ├── InfoQuery_types.cpp ├── InfoQuery_types.h ├── WTWServ.cpp ├── WTWServ.h └── WTWServ_server.skeleton.cpp

2.client端代码

#include "WTWServ.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <iostream> #include <thrift/transport/TSocket.h>//add using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server; using boost::shared_ptr; using namespace ::com::test::infoquery; using namespace std; bool sendQuery(Instruction & inst, WTWServClient & client) { InstructionResult ret; //用于接收来自server端的反馈 try{ client.reqService(ret,inst); //调用定义的service发送请求,并获得相应 }catch(TProtocolException& tx){ cout<<"Connect TProtocol error! "<<tx.what()<<endl; return false; }catch(TException& tx){ cout<<"Send error! "<<tx.what()<<endl; return false; } cout<<"Get the result:"<<endl; for(size_t i = 0 ;i < (ret.responseData).size() ; ++i){ cout<<"name:"<<(ret.responseData)[i].name<<"; city:"<<(ret.responseData)[i].city<<endl; } return true; } int main(int argc, char **argv) { int port = 9090; string strHost = "127.0.0.1"; shared_ptr<TSocket> socket(new TSocket(strHost,port)); shared_ptr<TTransport> transport(new TFramedTransport(socket)); shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); WTWServClient client(protocol); try { transport->open(); cout<<"connect established"<<endl; }catch(TException& tx){ cout<<"Connect error! "<<tx.what()<<endl; return -1; } RequestData s1,s2; vector<RequestData> vecInfo; s1.id = 100; s1.name = "jacky"; vecInfo.push_back(s1); s2.id = 200; s2.name = "king"; vecInfo.push_back(s2); Instruction inst; inst.__set_taskid("query_task_01"); inst.__set_requestData(vecInfo); if(sendQuery(inst,client)){ cout<<"query success ..."<<endl; } return 0; }

首先,看下传输通道TCP链接的建立以及客户端的创建

shared_ptr<TSocket> socket(new TSocket(strHost,port)); shared_ptr<TTransport> transport(new TFramedTransport(socket)); shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport)); WTWServClient client(protocol); transport->open();

使用目的主机的IP以及端口构造一个socket,TSocket是采用TCP协议进行数据传输,此时的socket对象还并未建立真正的链接。TFramedTransport将对socket对象进行包装,以提高传输效率。TFramedTransport将被待发送的数据进行缓存,直到flush操作被调用,才会将完整的二进制数据块发送出去,它使得另一端的接收者总是可以读取固定长度的数据。类似的传输方式还有TBufferedTransport,TMemoryBuffer参见头文件

"thrift/transport/TBufferTransports.h"

TProtocol对上述包装后(给定传输方式的socket)传输对象定义使用的编码方式。TBinaryProtocol是二进制传输,所有待传输的数据将以基本的二进制格式进行传输。类似的编码还有TJSONProtocol,TCompactProtocol,TDebugProtocol。调用TTransport的open()方法,才会建立真正的TCP链接。 WTWServClient类是Thrift根据定义传输数据结构为客户端自动生成的。参见”WTWServ.h”。该类的一个重要方法是reqService

void reqService(InstructionResult& _return, const Instruction& ins);

void reqService(InstructionResult& _return, const Instruction& ins); 它是它是一个阻塞的方法调用,提供请求参数,并阻塞的等待server端的返回。在定义的sendQuery函数中,给出了其使用方法。 RequestData和Instruction类是Thrift根据自定义的数据结构”InfoQuery.thrift”自动生成的。它的成员变量默认都是公有属性,可以直接访问,同时也可以通过以”__set”开始的公有方法设置,它们位于”InfoQuery_types.h”。

3.server端代码

将自动生成的”WTWServ_server.skeleton.cpp”重命名为”server.cpp”,并修改其中的实现以满足自定义的需求。

#include "WTWServ.h" #include <thrift/protocol/TBinaryProtocol.h> #include <thrift/server/TSimpleServer.h> #include <thrift/transport/TServerSocket.h> #include <thrift/transport/TBufferTransports.h> #include <iostream> #include <thrift/concurrency/ThreadManager.h> #include <thrift/concurrency/PosixThreadFactory.h> #include <thrift/server/TThreadPoolServer.h> #include <thrift/server/TThreadedServer.h> using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; using namespace ::apache::thrift::server; using namespace std; using boost::shared_ptr; using namespace ::com::test::infoquery; using namespace ::apache::thrift::concurrency; class WTWServHandler : virtual public WTWServIf { public: WTWServHandler() { } void reqService(InstructionResult& _return, const Instruction& ins) { cout<<"Query task id:"<<ins.taskid<<endl; _return.taskid = ins.taskid; vector<ResponseData> vecInfo; for(size_t i = 0 ;i < (ins.requestData).size() ; ++i){ ResponseData rsp; rsp.name = (ins.requestData)[i].name; if((ins.requestData)[i].id == 100){ rsp.city = "Nanjing"; } else { rsp.city = "NULL"; } vecInfo.push_back(rsp); } _return.__set_responseData(vecInfo); cout<<"Query deal success ..."<<endl; } }; int main(int argc, char **argv) { int port = 9090; shared_ptr<WTWServHandler> handler(new WTWServHandler()); shared_ptr<TProcessor> processor(new WTWServProcessor(handler)); shared_ptr<TServerTransport> serverTransport(new TServerSocket(port)); shared_ptr<TTransportFactory> transportFactory(new TFramedTransportFactory()); shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory()); shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(15); shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory> (new PosixThreadFactory()); threadManager->threadFactory(threadFactory); threadManager->start(); TThreadPoolServer server(processor, serverTransport, transportFactory, protocolFactory, threadManager); server.serve(); cout<<"service start..."<<endl; return 0; }

WTWServHandler类是Thrift根据”InfoQuery.thrift”定义的service自动生成的。需要实现其中的reqService方法,来自定义对客户端请求的处理。 下面通过创建server对象需要的参数,建立创建server需要的哪些对象。 TThreadPoolServer server(processor, serverTransport, transportFactory, protocolFactory, threadManager); TProcessor:请求/响应动作的处理者,它依赖于输入和输出的数据流; TServerTransport:服务端传输使用的传输方式,TServerSocket是指服务端将以TCP的指定port端口对外开放服务; TTransportFactory:传输方式,对应于client端TFramedTransport传输方式; TBinaryProtocolFactory:编码方式,即底层以二进制形式传输 ThreadManager:是为服务端设置的线程管理者,通过PosixThreadFactory为其创建线程对象

4.运行

# tree . ├── client │ ├── client.cpp │ ├── InfoQuery_constants.cpp │ ├── InfoQuery_constants.h │ ├── InfoQuery_types.cpp │ ├── InfoQuery_types.h │ ├── WTWServ.cpp │ └── WTWServ.h ├── InfoQuery.thrift └── server ├── InfoQuery_constants.cpp ├── InfoQuery_constants.h ├── InfoQuery_types.cpp ├── InfoQuery_types.h ├── server.cpp ├── WTWServ.cpp └── WTWServ.h

到client和server目录下分别编译代码:

g++ -g *.cpp -o server -DHAVE_NETINET_IN_H -I ./ -lthrift -lpthread -lrt

先启动server端,然后启动client端:


1.Thirft框架介绍 2.Thrift使用指南

转载请注明原文地址: https://www.6miu.com/read-1400206.html

最新回复(0)