以太坊源码深入分析(5)-- Ethereum服务和以太坊P2P协议发送广播源码分析

xiaoxiao2025-08-24  124

在“以太坊源码深入分析(2)”一文中,我们提到Ethereum作为一个service,被Node 注册进去。Node start的时候会启动其注册的所有服务,Ethereum service也是一样。

一,ethereum service的初始化和启动 初始化方法

func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { if config.SyncMode == downloader.LightSync { return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum") } if !config.SyncMode.IsValid() { return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode) } chainDb, err := CreateDB(ctx, config, "chaindata") if err != nil { return nil, err } stopDbUpgrade := upgradeDeduplicateData(chainDb) chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis) if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok { return nil, genesisErr } log.Info("Initialised chain configuration", "config", chainConfig) eth := &Ethereum{ config: config, chainDb: chainDb, chainConfig: chainConfig, eventMux: ctx.EventMux, accountManager: ctx.AccountManager, engine: CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb), shutdownChan: make(chan bool), stopDbUpgrade: stopDbUpgrade, networkId: config.NetworkId, gasPrice: config.GasPrice, etherbase: config.Etherbase, bloomRequests: make(chan chan *bloombits.Retrieval), bloomIndexer: NewBloomIndexer(chainDb, params.BloomBitsBlocks), } log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId) if !config.SkipBcVersionCheck { bcVersion := core.GetBlockChainVersion(chainDb) if bcVersion != core.BlockChainVersion && bcVersion != 0 { return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, core.BlockChainVersion) } core.WriteBlockChainVersion(chainDb, core.BlockChainVersion) } var ( vmConfig = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording} cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout} ) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig) if err != nil { return nil, err } // Rewind the chain in case of an incompatible config upgrade. if compat, ok := genesisErr.(*params.ConfigCompatError); ok { log.Warn("Rewinding chain to upgrade configuration", "err", compat) eth.blockchain.SetHead(compat.RewindTo) core.WriteChainConfig(chainDb, genesisHash, chainConfig) } eth.bloomIndexer.Start(eth.blockchain) if config.TxPool.Journal != "" { config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil { return nil, err } eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) eth.miner.SetExtra(makeExtraData(config.ExtraData)) eth.ApiBackend = &EthApiBackend{eth, nil} gpoParams := config.GPO if gpoParams.Default == nil { gpoParams.Default = config.GasPrice } eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams) return eth, nil }

1,如果config.SyncMode 是 downloader.LightSync,走的是les/backend.go的初始化方法。 2,chainDb, err := CreateDB(ctx, config, "chaindata")打开leveldb,leveldb是eth存储数据库。 3,stopDbUpgrade := upgradeDeduplicateData(chainDb) 检查chainDb版本,如果需要的话,启动后台进程进行升级。 4,chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)装载创世区块。 根据节点条件判断是从数据库里面读取,还是从默认配置文件读取,还是从自定义配置文件读取,或者是从代码里面获取默认值。并返回区块链的config和创世块的hash。 5,装载Etherum struct的各个成员。eventMux和accountManager 是Node 启动 eth service的时候传入的。eventMux可以认为是一个全局的事件多路复用器,accountManager认为是一个全局的账户管理器。engine创建共识引擎。etherbase 配置此Etherum的主账号地址。初始化bloomRequests 通道和bloom过滤器。 6,判断客户端版本号和数据库版本号是否一致 7,eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig) 初始化eth的blockchain,也就是eth的区块链 8,eth.blockchain.SetHead(compat.RewindTo) 根据创始区块设置区块头 9,eth.bloomIndexer.Start(eth.blockchain)启动bloomIndexer 10,eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) 初始化eth 区块链的交易池,存储本地生产的和P2P网络同步过来的交易。 11,eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb)初始化以太坊协议管理器,用于区块链P2P通讯 12, miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) 初始化矿工 13,eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams) 创建预言最新gasprice的预言机。

ethereum service的初始化配置了不少东西,基本上涉及到了以太坊区块链系统的所有内容,后续一一分解各个模块。

启动方法

func (s *Ethereum) Start(srvr *p2p.Server) error { // Start the bloom bits servicing goroutines s.startBloomHandlers() // Start the RPC service s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion()) // Figure out a max peers count based on the server limits maxPeers := srvr.MaxPeers if s.config.LightServ > 0 { if s.config.LightPeers >= srvr.MaxPeers { return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers) } maxPeers -= s.config.LightPeers } // Start the networking layer and the light server if requested s.protocolManager.Start(maxPeers) if s.lesServer != nil { s.lesServer.Start(srvr) } return nil }

首先启动bloom过滤器 eth 的net 相关Api 加入RPC 服务。 s.protocolManager.Start(maxPeers) 设置最大同步节点数,并启动eth P2P通讯。 如果ethereum service 出问题了才会启动lesServer。

二,ProtocolManager 以太坊P2P通讯协议管理 首先分析一下同在eth目录下的eth/handler.go。 ProtocolManager 的初始化方法

func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ networkId: networkId, eventMux: mux, txpool: txpool, blockchain: blockchain, chainconfig: config, peers: newPeerSet(), newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } // Figure out whether to allow fast sync or not if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { log.Warn("Blockchain not empty, fast sync disabled") mode = downloader.FullSync } if mode == downloader.FastSync { manager.fastSync = uint32(1) } // Initiate a sub-protocol for every implemented version we can handle manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) for i, version := range ProtocolVersions { // Skip protocol version if incompatible with the mode of operation if mode == downloader.FastSync && version < eth63 { continue } // Compatible; initialise the sub-protocol version := version // Closure for the run manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{ Name: ProtocolName, Version: version, Length: ProtocolLengths[i], Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { peer := manager.newPeer(int(version), p, rw) select { case manager.newPeerCh <- peer: manager.wg.Add(1) defer manager.wg.Done() return manager.handle(peer) case <-manager.quitSync: return p2p.DiscQuitting } }, NodeInfo: func() interface{} { return manager.NodeInfo() }, PeerInfo: func(id discover.NodeID) interface{} { if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { return p.Info() } return nil }, }) } if len(manager.SubProtocols) == 0 { return nil, errIncompatibleConfig } // Construct the different synchronisation mechanisms manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer) validator := func(header *types.Header) error { return engine.VerifyHeader(blockchain, header, true) } heighter := func() uint64 { return blockchain.CurrentBlock().NumberU64() } inserter := func(blocks types.Blocks) (int, error) { // If fast sync is running, deny importing weird blocks if atomic.LoadUint32(&manager.fastSync) == 1 { log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) return 0, nil } atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import return manager.blockchain.InsertChain(blocks) } manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) return manager, nil }

1,peers 为以太坊临近的同步网络节点,newPeerCh、noMorePeers、txsyncCh、quitSync对应同步的通知 2,manager.SubProtocols 创建以太坊 P2P server 的 通讯协议,通常只有一个值。manager.SubProtocols,在Node start的时候传给以太坊 P2P server并同时start P2P server。协议里面三个函数指针(Run、NodeInfo、PeerInfo)非常重要,后面会用到。 3,manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer) 创建了一个下载器,从远程网络节点中获取hashes和blocks。 4,manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)收集网络其他以太坊节点发过来的同步通知,进行验证,并做出相应的处理。初始化传入的几个参数 都是用于处理同步区块链数据的函数指针

Ethereum service 启动的时候会同时启动ProtocolManager,ProtocolManager的start()方法:

func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers // broadcast transactions pm.txCh = make(chan core.TxPreEvent, txChanSize) pm.txSub = pm.txpool.SubscribeTxPreEvent(pm.txCh) go pm.txBroadcastLoop() // broadcast mined blocks pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) go pm.minedBroadcastLoop() // start sync handlers go pm.syncer() go pm.txsyncLoop() }

1,创建一个新交易的订阅通道,并启动交易广播的goroutine 2,创建一个挖坑的订阅通道,并启动挖坑广播的goroutine 注:同为订阅通道为什么pm.txSub和pm.minedBlockSub的实现不一样?深入代码会发现pm.txSub用的是event/feed通知方式,pm.minedBlockSub用的是event/TypeMuxEvent通知方式,event/TypeMuxEvent方式将要被Deprecated。 3,pm.syncer() 启动同步goroutine,定时的和网络其他节点同步,并处理网络节点的相关通知 4,pm.txsyncLoop() 启动交易同步goroutine,把新的交易均匀的同步给网路节点

三,ProtocolManager主动向网络节点广播 ProtocolManager Start()方法里面的4个goroutine都是处理ProtocolManager向以太坊网络节点进行广播的。

1,pm.txBroadcastLoop()方法

func (self *ProtocolManager) txBroadcastLoop() { for { select { case event := <-self.txCh: self.BroadcastTx(event.Tx.Hash(), event.Tx) // Err() channel will be closed when unsubscribing. case <-self.txSub.Err(): return } } }

core/tx_pool.go 产生新的交易的时候会send self.txCh,这时候会激活 self.BroadcastTx(event.Tx.Hash(), event.Tx)

func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { // Broadcast transaction to a batch of peers not knowing about it peers := pm.peers.PeersWithoutTx(hash) //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { peer.SendTransactions(types.Transactions{tx}) } log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers)) }

向缓存的没有这个交易hash的网络节点广播此次交易。

2,pm.minedBroadcastLoop()方法

// Mined broadcast loop func (self *ProtocolManager) minedBroadcastLoop() { // automatically stops if unsubscribe for obj := range self.minedBlockSub.Chan() { switch ev := obj.Data.(type) { case core.NewMinedBlockEvent: self.BroadcastBlock(ev.Block, true) // First propagate block to peers self.BroadcastBlock(ev.Block, false) // Only then announce to the rest } } }

收到miner.go 里面NewMinedBlockEvent 挖到新区块的事件通知,激活self.BroadcastBlock(ev.Block, true)

func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { hash := block.Hash() peers := pm.peers.PeersWithoutBlock(hash) // If propagation is requested, send to a subset of the peer if propagate { // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) var td *big.Int if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) } else { log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) return } // Send the block to a subset of our peers transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { peer.SendNewBlock(block, td) } log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) return } // Otherwise if the block is indeed in out own chain, announce it if pm.blockchain.HasBlock(hash, block.NumberU64()) { for _, peer := range peers { peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) } log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) } }

如果propagate为true 向网络节点广播整个挖到的block,为false 只广播挖到的区块的hash值和number值。广播的区块还包括这个区块打包的所有交易。

3,pm.syncer() 方法

func (pm *ProtocolManager) syncer() { // Start and ensure cleanup of sync mechanisms pm.fetcher.Start() defer pm.fetcher.Stop() defer pm.downloader.Terminate() // Wait for different events to fire synchronisation operations forceSync := time.NewTicker(forceSyncCycle) defer forceSync.Stop() for { select { case <-pm.newPeerCh: // Make sure we have peers to select from, then sync if pm.peers.Len() < minDesiredPeerCount { break } go pm.synchronise(pm.peers.BestPeer()) case <-forceSync.C: // Force a sync even if not enough peers are present go pm.synchronise(pm.peers.BestPeer()) case <-pm.noMorePeers: return } } }

pm.fetcher.Start()启动 fetcher,辅助同步区块数据

当P2P server执行 ProtocolManager 的p2p.Protocol 的Run指针的时候会send pm.newPeerCh,这时候选择最优的网络节点(TD 总难度最大的)启动pm.synchronise(pm.peers.BestPeer()) goroutine。

// synchronise tries to sync up our local block chain with a remote peer. func (pm *ProtocolManager) synchronise(peer *peer) { // Short circuit if no peers are available if peer == nil { return } // Make sure the peer's TD is higher than our own currentBlock := pm.blockchain.CurrentBlock() td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) pHead, pTd := peer.Head() if pTd.Cmp(td) <= 0 { return } // Otherwise try to sync with the downloader mode := downloader.FullSync if atomic.LoadUint32(&pm.fastSync) == 1 { // Fast sync was explicitly requested, and explicitly granted mode = downloader.FastSync } else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 { // The database seems empty as the current block is the genesis. Yet the fast // block is ahead, so fast sync was enabled for this node at a certain point. // The only scenario where this can happen is if the user manually (or via a // bad block) rolled back a fast sync node below the sync point. In this case // however it's safe to reenable fast sync. atomic.StoreUint32(&pm.fastSync, 1) mode = downloader.FastSync } // Run the sync cycle, and disable fast sync if we've went past the pivot block if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { return } if atomic.LoadUint32(&pm.fastSync) == 1 { log.Info("Fast sync complete, auto disabling") atomic.StoreUint32(&pm.fastSync, 0) } atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 { // We've completed a sync cycle, notify all peers of new state. This path is // essential in star-topology networks where a gateway node needs to notify // all its out-of-date peers of the availability of a new block. This failure // scenario will most often crop up in private and hackathon networks with // degenerate connectivity, but it should be healthy for the mainnet too to // more reliably update peers or the local TD state. go pm.BroadcastBlock(head, false) } }

如果最优的网络节点的TD值大于本地最新区块的TD值,调用pm.downloader.Synchronise(peer.id, pHead, pTd, mode)进行同步。同步完成后再屌用go pm.BroadcastBlock(head, false),把自己最新的区块状态广播出去。

4,pm.txsyncLoop()方法

func (pm *ProtocolManager) txsyncLoop() { var ( pending = make(map[discover.NodeID]*txsync) sending = false // whether a send is active pack = new(txsync) // the pack that is being sent done = make(chan error, 1) // result of the send ) // send starts a sending a pack of transactions from the sync. send := func(s *txsync) { // Fill pack with transactions up to the target size. size := common.StorageSize(0) pack.p = s.p pack.txs = pack.txs[:0] for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { pack.txs = append(pack.txs, s.txs[i]) size += s.txs[i].Size() } // Remove the transactions that will be sent. s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])] if len(s.txs) == 0 { delete(pending, s.p.ID()) } // Send the pack in the background. s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size) sending = true go func() { done <- pack.p.SendTransactions(pack.txs) }() } // pick chooses the next pending sync. pick := func() *txsync { if len(pending) == 0 { return nil } n := rand.Intn(len(pending)) + 1 for _, s := range pending { if n--; n == 0 { return s } } return nil } for { select { case s := <-pm.txsyncCh: pending[s.p.ID()] = s if !sending { send(s) } case err := <-done: sending = false // Stop tracking peers that cause send failures. if err != nil { pack.p.Log().Debug("Transaction send failed", "err", err) delete(pending, pack.p.ID()) } // Schedule the next send. if s := pick(); s != nil { send(s) } case <-pm.quitSync: return } } }

当从网络节点同步过来最新的交易数据后,本地也会把新同步下来的交易数据广播给网络中的其他节点。

总结一下 这四个goroutine 基本上就在不停的做广播区块、广播交易,同步到区块、同步到交易,再广播区块、广播交易。

转载请注明原文地址: https://www.6miu.com/read-5035218.html

最新回复(0)