以太坊小知识(二)——区块生成机制

xiaoxiao2021-02-28  23

下面的代码都是基于go-ethereum v1.6.7。

Q:以太坊go-ethereum中,当一个miner找到一个新区块时,会将结果Result发送到管道returnCh中。那接下来的逻辑是怎样的呢?

//miner/agent.go func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) { if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil { log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash()) //send操作 self.returnCh <- &Result{work, result} } else { //... } }

那是否有miners监听该管道returnCh呢?答案是:Yes!worker的wait方法一直迭代管道recv。只要拿到不为nil的数据,则将新区块写入本地数据库中,同时发送事件NewMinedBlockEvent。

下面负责持久化,发送相应事件等功能的协程一直处于阻塞之中。

//miner/worker.go func (self *worker) wait() { for { mustCommitNewWork := true for result := range self.recv { //从self实例的管道中接收结果 atomic.AddInt32(&self.atWork, -1) if result == nil { continue } block := result.Block work := result.Work if self.fullValidation { //... go self.mux.Post(core.NewMinedBlockEvent{Block: block}) //发送事件NewMinedBlockEvent } else { //... stat, err := self.chain.WriteBlock(block) //持久化新区块到本地数据库中 //... } //... // broadcast before waiting for validation go func(block *types.Block, logs []*types.Log, receipts []*types.Receipt) { self.mux.Post(core.NewMinedBlockEvent{Block: block}) //发送NewMinedBlockEvent事件,/eth/handler.go#(pm *ProtocolManager) Start()方法会订阅该事件并同时开一个协程【go pm.minedBroadcastLoop()】监听该事件 self.mux.Post(core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) if stat == core.CanonStatTy { self.mux.Post(core.ChainHeadEvent{Block: block}) //发送ChainHeadEvent事件,/miner/worker.go#(self *worker) update() 方法会读取该事件 self.mux.Post(logs) } //... }(block, work.state.Logs(), work.receipts) //匿名函数调用 } // Insert the block into the set of pending ones to wait for confirmations //矿工将自己开采的区块放进一个循环列表中,待达到五个确认后,再移除 self.unconfirmed.Insert(block.NumberU64(), block.Hash()) if mustCommitNewWork { self.commitNewWork() } } } }

newWorker函数很关键!我们仔细看看这个函数做了哪些功能:

// /miner/worker.go func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker { //1、创建worker实例 worker := &worker{ config: config, engine: engine, eth: eth, mux: mux, chainDb: eth.ChainDb(), recv: make(chan *Result, resultQueueSize), chain: eth.BlockChain(), proc: eth.BlockChain().Validator(), //processer possibleUncles: make(map[common.Hash]*types.Block), coinbase: coinbase, txQueue: make(map[common.Hash]*types.Transaction), agents: make(map[Agent]struct{}), unconfirmed: newUnconfirmedBlocks(eth.BlockChain(), 5), fullValidation: false, } //2、订阅三个事件 worker.events = worker.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{}) //3、监听三个事件,并根据事件类型做相应的动作 go worker.update() //4、等待本地产生的新区块,负责将其持久化到本地数据库中,并提交下一轮的新任务给Agent实例 go worker.wait() //5、提交新任务给CpuAgent,CpuAgent接收到任务后,负责执行挖矿逻辑,再将找到的新区块发送到管道中,这个时候,go worker.wait()协程开始工作 worker.commitNewWork() return worker }

newWorker函数创建一个Worker实例;订阅三个事件;开了两个goroutine,一个负责监听订阅的三个事件并根据事件类型做出相应的行为【go worker.update()】,一个负责将自己生成的新区块(为何不是从网络中传过来的其他节点的呢?因为wait方法中self.recv中的结果是自己发送的,这部分逻辑可以在/miner/agent.go#mine方法中找到!)持久化到本地数据库中,并调用commitNewWork()方法生成新的区块【go worker.wait()】,最后是提交新任务给Agent实例。

func (self *worker) update() { for event := range self.events.Chan() { // A real event arrived, process interesting content switch ev := event.Data.(type) { // type switch case core.ChainHeadEvent: self.commitNewWork() //接收到新区块被开采出来的通知后,自己本地立马开始下一轮的区块竞争 case core.ChainSideEvent: self.uncleMu.Lock() self.possibleUncles[ev.Block.Hash()] = ev.Block self.uncleMu.Unlock() case core.TxPreEvent: // Apply transaction to the pending state if we're not mining if atomic.LoadInt32(&self.mining) == 0 { self.currentMu.Lock() acc, _ := types.Sender(self.current.signer, ev.Tx) txs := map[common.Address]types.Transactions{acc: {ev.Tx}} txset := types.NewTransactionsByPriceAndNonce(txs) self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) self.currentMu.Unlock() } } } }

(self *worker) commitNewWork() 方法负责提交新任务给所有的Agent实例,具体是最后的push方法。

func (self *worker) commitNewWork() { //... if err := self.engine.Prepare(self.chain, header); err != nil { log.Error("Failed to prepare header for mining", "err", err) return } //... // Could potentially happen if starting to mine in an odd state. err := self.makeCurrent(parent, header) if err != nil { log.Error("Failed to create mining context", "err", err) return } // Create the current work task and check any fork transitions needed work := self.current //... pending, err := self.eth.TxPool().Pending() //从交易池的pending队列中拿交易 //... txs := types.NewTransactionsByPriceAndNonce(pending) work.commitTransactions(self.mux, txs, self.chain, self.coinbase) self.eth.TxPool().RemoveBatch(work.failedTxs) //... // Create the new block to seal with the consensus engine if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil { log.Error("Failed to finalize block for sealing", "err", err) return } // We only care about logging if we're actually mining. if atomic.LoadInt32(&self.mining) == 1 { log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) self.unconfirmed.Shift(work.Block.NumberU64() - 1) } self.push(work) //worker实例将新任务发送给Agent }

Agent实例监听新任务的逻辑在/miner/agent.go中:

func (self *CpuAgent) update() { out: for { select { case work := <-self.workCh: //从workCh管道拿新的任务 self.mu.Lock() if self.quitCurrentOp != nil { close(self.quitCurrentOp) } self.quitCurrentOp = make(chan struct{}) go self.mine(work, self.quitCurrentOp) //拿到任务,开始挖矿 self.mu.Unlock() case <-self.stop: self.mu.Lock() if self.quitCurrentOp != nil { close(self.quitCurrentOp) self.quitCurrentOp = nil } self.mu.Unlock() break out } } //... }

下面是newWorker函数的简化功能图。

worker.commitNewWork【开始】<-------------------------------回到--- go worker.update() <---go bc.postChainEvents【发送ChainHeadEvent事件】 <---- (bc *Blockchain) InsertChain(...) <----? 【何处调用InsertChain方法是个谜?】 | v self.push(work) 【发送新的任务到Agent实例】 | self.mux.Post(core.ChainHeadEvent{Block: block}) 【发送事件】 v ^ (self *CpuAgent) update() | | go func(...) v ^ go self.mine(work, self.quitCurrentOp)【执行挖矿】 | | |--------------------| | | go worker.wait() | | |--------------------| v ^ self.returnCh <- &Result{work, result} <----------------------- |

那在哪里订阅该事件了呢?

func (pm *ProtocolManager) Start() { // broadcast transactions ... // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) //订阅事件 go pm.minedBroadcastLoop() // start sync handlers ... }

这些订阅操作都是在启动节点的时候就要启动的服务,下面是函数调用栈。

startNode(...) [/cmd/geth/main.go] ^ | StartNode(...) [/cmd/utils/cmd.go] ^ | (n *Node) Start(...) [/node/node.go] ^ | (s *Ethereum) Start(...) [/eth/backend.go] ^ | (pm *ProtocolManager) Start() [/eth/hanlder.go] //启动四大协程

调用(self *CpuAgent) mine(…)的函数调用链:

路线1 路线2 startNode(...) [/geth/main.go] ^ | (s *Ethereum) StartMining(...) [/eth/backend.go] ^ | New(...). [/miner/miner.go] (self *Miner) Start(...) [/miner/miner.go] ^ ^ | | (self *Miner) (self *worker) Register(...) start() [/miner/miner.go] [/miner/worker.go] \ / \ / | (self *CpuAgent) Start() [/miner/agent.go] //2个地方调用 ^ | (self *CpuAgent) update() [/miner/agent.go] ^ | (self *CpuAgent) mine(...) [/miner/agent.go] //开采新区块的处理逻辑

下面是设置管道的逻辑:

//[/miner/worker.go] func (self *worker) register(agent Agent) { //... agent.SetReturnCh(self.recv) //这里做了双向管道变量的地址向单向管道指针变量的赋值 }

(self *CpuAgent) SetReturnCh的逻辑具体如下:

//[/miner/agent.go] func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch } //当作为参数的时候,会隐式转换

A: wait方法负责从self.recv中接收挖矿的结果Result,然后持久化到本地数据库中,同时发送事件,最后通过eth协议广播给peers(这部分逻辑在四大协程中)。

转载请注明原文地址: https://www.6miu.com/read-2600117.html

最新回复(0)