fabirc源码解析6中讲述了peer结点如何创建和注册grpc服务,接下来的几篇文章将对peer注册的各个服务进行详述。该篇讲述ChaincodeSupport服务,ChaincodeSupport服务为每个peer提供了chaincode操作的支持。registerChaincodeSupport(peerServer.Server())一句,位于/fabric/peer/node/start.Go文件中的serve函数中,给peerServer注册了ChaincodeSupport服务。
ChaincodeSupport的服务原型和生成的go定义在/fabric/protos/peer/下的chaincode_shim.proto和chaincode_shim.pb.go中,核心的实现代码在/fabric/core/chaincode/chaincode_support.go中。主要的定义的是一个rpc Register(stream ChaincodeMessage) returns (stream ChaincodeMessage) {}服务。该服务实现客户端和服务器端ChaincodeMessage类型流数据的交换。用于服务端流数据交换的grpc流服务接口象为/fabric/protos/peer/chaincode_shim.pb.go中的ChaincodeSupport_RegisterServer,在/fabric/core/Container/ccintf/ccintf.go中有对应用于容器内部间的流接口ChaincodeStream。
ChaincodeSupport的服务是一个全局单例,该单例对象定义在chaincode_support.go中,var theChaincodeSupport *ChaincodeSupport。ChaincodeSupport对象自身存储一系列配置值,而接收和处理客户端ChaincodeMessage类型消息的任务其实是委托给了一个个Handler对象。
//生成的收发的数据类型 type ChaincodeMessage struct { Type ChaincodeMessage_Type Timestamp *google_protobuf1.Timestamp Payload []byte Txid string Proposal *SignedProposal ChaincodeEvent *ChaincodeEvent } //proto中ChaincodeSupport服务原型 service ChaincodeSupport { rpc Register(stream ChaincodeMessage) returns (stream ChaincodeMessage) {} } //生成的服务端流接口 type ChaincodeSupport_RegisterServer interface { Send(*ChaincodeMessage) error Recv() (*ChaincodeMessage, error) grpc.ClientStream } 12345678910111213141516171819 12345678910111213141516171819FSM是finite state machine的缩写,有限状态机,是ChaincodeSupport服务使用到的一个第三方库,在github.com/looplab/fsm可以下载。FSM将一个事物从状态A向状态B的转化看作一个事件,并可以设置在进入/离开某个状态时自动调用的时机函数。每个状态事件、状态、时机函数都用字符串关键字表示。在此简单介绍一下用法:
//创建一个状态机 //三个参数:1.默认状态 2.定义状态事件 3.定义状态转变时调用的函数 fsm := fsm.NewFSM( "green", fsm.Events{ //状态事件的名称 该事件的起始状态Src 该事件的结束状态Dst //即:状态事件warn(警告事件)表示事物的状态从状态green到状态yellow {Name: "warn", Src: []string{"green"}, Dst: "yellow"}, {Name: "panic", Src: []string{"yellow"}, Dst: "red"}, {Name: "calm", Src: []string{"red"}, Dst: "yellow"}, }, //状态事件调用函数,在此称为 时机函数。关键字用'_'隔开,格式是:"调用时机_事件或状态" //before表示在该事件或状态发生之前调用该函数,如"before_warn",表示在warn //这个状态事件发生前调用这个函数。"before_yellow"表示进入yellow状态之前调用 //该函数。 //依此类推,after表示在...之后,enter表示在进入...之时,leave表示在离开... //之时。 fsm.Callbacks{ //fsm内定义的状态事件函数,关键字指定的是XXX_event和XXX_state //表示任一的状态或状态事件 "before_event": func(e *fsm.Event) { fmt.Println("before_event") }, "leave_state": func(e *fsm.Event) { fmt.Println("leave_state") }, //根据自定义状态或事件所定义的状态事件函数 "before_yellow": func(e *fsm.Event) { fmt.Println("before_yellow") }, "before_warn": func(e *fsm.Event) { fmt.Println("before_warn") }, }, ) //打印当前状态,输出是默认状态green fmt.Println(fsm.Current()) //触发warn状态事件,状态将会从green转变到yellow //同时触发"before_warn"、"before_yellow"、"before_event"、"leave_state"函数 fsm.Event("warn") //打印当前状态,输出状态是yellow fmt.Println(fsm.Current()) 123456789101112131415161718192021222324252627282930313233343536373839404142 123456789101112131415161718192021222324252627282930313233343536373839404142
任何项目中,服务是以所能提供的操作为中心的,ChaincodeSupport服务的操作(即可被外部调用的函数)有Launch,Register,Execute,HandleChaincodeStream,Stop。
追溯ChaincodeSupport对象挂载的Register函数,最终调用的是/fabric/core/chaincode/handler.go中的HandleChaincodeStream函数。在HandleChaincodeStream函数中:
handler := newChaincodeSupportHandler(chaincodeSupport, stream) handler.processStream() 12 12创建了一个Handler,然后调用Handler的processStream函数对客户端发送的流数据进行了处理。这两个函数都在同文件中实现。newChaincodeSupportHandler函数所传入的两个参数值得注意,一个是chaincodeSupport,一个是stream。前者是Register服务所在的ChaincodeSupport对象自身,赋值给了Hanlder对象成员chaincodeSupport,为的是让Handler对象处理接收数据时能够使用ChaincodeSupport对象的服务;后者是Register服务的grpc流接口,赋值给了Handler对象成员ChatStream,为的是Handler能够从客户端接收到数据。后文还会提到这点。
newChaincodeSupportHandler创建并初始化了一个Handler,初始化的成员有: * ChatStream - grpc流服务接口,是用Register函数传进来的。 * chaincodeSupport - chaincodeSupport自身。 * nextState - 状态通道。 * FSM - 状态机,参看上文。 * policyChecker - 策略检查器,将在对应主题文章中详述。
processStream用recv标识、| errc | msgAvil | nextState | keepalivetimer |四个频道、select三者相互配合,形成了对客户端消息的接收控制。然后调用HandleMessage、serialSend、serialSendAsync处理接收到的消息。
errc - 错误频道msgAvil - ChaincodeMessage频道nextState - 包含ChaincodeMessage的频道keepalivetime - 心跳频道流程如下:
HandleMessage处理ChaincodeMessage数据的方式完全是由Handler中的状态机FSM驱动的。在newChaincodeSupportHandler有大段代码是初始化其状态机的:
v.FSM = fsm.NewFSM(createdstate,fsm.Events{...},fsm.Callbacks{...}) 1 1状态机FSM所注册的状态事件有:
///fabric/protos/peer/chaincode_shim.pb.go中定义 //REGISTER即pb.ChaincodeMessage_REGISTER.String()对应的字符串值,下同 //REGISTER事件表示从状态createdstate到状态establishedstate,下略。 REGISTER Src: []string{createdstate}, Dst: establishedstate READY PUT_STATE DEL_STATE INVOKE_CHAI COMPLETED GET_STATE GET_STATE_B GET_QUERY_R GET_HISTORY QUERY_STATE QUERY_STATE ERROR RESPONSE INIT TRANSACTION RESPONSE INIT TRANSACTION 12345678910111213141516171819202122 12345678910111213141516171819202122状态机FSM所涉及的事件状态有:
//在/fabric/core/chaincode/handler.go中以常量的形式定义 createdstate = "created" establishedstate = "established" readystate = "ready" endstate = "end" 12345 12345状态机FSM 状态事件所调用的时机函数为:
//在REGISTER事件发生之前调用beforeRegisterEvent,下同。 "before_REGISTER" : beforeRegisterEvent "before_COMPLETED" : beforeCompletedEvent "after_GET_STATE" : afterGetState "after_GET_STATE_BY_RANGE" : afterGetStateByRange "after_GET_QUERY_RESULT" : afterGetQueryResult "after_GET_HISTORY_FOR_KEY" : afterGetHistoryForKey "after_QUERY_STATE_NEXT" : afterQueryStateNext "after_QUERY_STATE_CLOSE" : afterQueryStateClose "after_PUT_STATE" : enterBusyState "after_DEL_STATE" : enterBusyState "after_INVOKE_CHAINCODE" : enterBusyState //表示在进入established状态之时调用enterEstablishedState,下同。 "enter_established" : enterEstablishedState "enter_ready" : enterReadyState "enter_end" : enterEndState 12345678910111213141516 12345678910111213141516在HandleMessage函数中,对传入的数据msg简单验证后,eventErr := handler.FSM.Event(msg.Type.String(), msg)触发了状态机的状态事件,进而触发了对应的时机函数。
以“REGISTER类型的ChaincodeMessage”为例。客户端通过grpc发送REGISTER类型的ChaincodeMessage信息,服务端通过msgAvil频道接收后传入HandlerMessage函数,状态机对应执行REGISTER状态事件,从状态createdstate向状态establishedstate转变,同时在转变之前自动触发beforeRegisterEvent时机函数完成注册。当状态进入establishedstate时,又接着触发了“enter_established”所对应的enterEstablishedState时机函数去通知客户端注册已经正确完成。
在beforeRegisterEvent函数中,err = handler.chaincodeSupport.registerHandler(handler)完成了注册,使用的是前文所提到的在创建Handler时传入进来的ChaincodeSupport对象的registerHandler函数。所谓的注册,也不过是将Handler对象赋值给ChaincodeSupport对象中的runningChaincodes中的chaincodeMap映射:chainID作key,以Handler对象为成员handler值的chaincodeRTEnv对象作value。
都是使用Handler中grpc服务端流接口ChatStream成员发送ChaincodeMessage消息的函数,两者都将应答ChaincodeMessage信息发送给客户端,也都实现了将所发送的ChaincodeMessage信息串行化的目的。区别在于serialSend是阻塞发送,而serialSendAsync是利用新启goroutine进行非阻塞发送,且这些非阻塞的goroutine中任何一个发生发送消息的错误,都会利用errc频道将错误发送给processStream函数。
不同类型的ChaincodeMessage的消息,能够触发状态机不同的状态事件,处理数据,完成Chaincode上的操作。有关其他类型事件以及具体的实现,在此不再赘述。强调一句,ChaincodeSupport服务是以状态机驱动的为chaincode提供支持的一项服务