以太坊源码深入分析(10)-- 以太坊Bloom过滤器实现原理及应用场景分析

xiaoxiao2025-09-02  5

上一节分析reciept产生过程的时候提到:reciept会为日志数据生成一个Bloom过滤器,那Bloom过滤器是用来干嘛的呢?有什么用呢?

一,Bloom过滤器的数据结构和reciept创建Bloom的过程 type Bloom [BloomByteLength]byte BloomByteLength = 256 Bloom 就是一个256个字节数组。一共2048位。

我们看看怎么把庞大的收据日志数据放到bloom过滤器里面的。

func CreateBloom(receipts Receipts) Bloom { bloomBin := new(big.Int) for _, receipt := range receipts { bloomBin.Or(bloomBin, LogsBloom(receipt.Logs)) } return BytesToBloom(bloomBin.Bytes()) } func LogsBloom(logs []*Log) *big.Int { bin := new(big.Int) for _, log := range logs { bin.Or(bin, bloom9(log.Address.Bytes())) for _, b := range log.Topics { bin.Or(bin, bloom9(b[:])) } } return bin } func bloom9(b []byte) *big.Int { b = crypto.Keccak256(b[:]) r := new(big.Int) for i := 0; i < 6; i += 2 { t := big.NewInt(1) b := (uint(b[i+1]) + (uint(b[i]) << 8)) & 2047 r.Or(r, t.Lsh(t, b)) } return r }

1,先看看bloom9(b []byte)算法函数。 1.1, 首先将传入的数据,进行hash256的运算,得到一个32字节的hash 1.2,然后取第0和第1字节的值合成一个2字节无符号的int,和2047做按位与运算,得到一个小于2048的值b,这个值就表示bloom里面第b位的值为1。同理取第2,3 和第4,5字节合成另外两个无符号int,增加在bloom里面的命中率。 1.3,也就是说对于任何一个输入,如果它对应的三个下标的值不都为1,那么它肯定不在这个区块中。 当如如果对应的三位都为1,也不能说明一定在这个区块中。 这就是布隆过滤器的特性。 1.4,这三个数取或,得到一个bigInt,代表这个传参数据的bloom9值。

2,LogsBloom(logs []*Log)方法把日志数据转成对应的bloom9值,包括日志的合约地址以及每个日志Topic

3,CreateBloom(receipts Receipts)方法创建收据的bloom 3.1,创建一个空的bigInt bloomBin,遍历receipts,取得receipt里的日志,调用LogsBloom(receipt.Logs)将取得所有日志的bloom值按位或和到bloomBin。这意味着bloomBin包括了所有日志的bloom9数据。 3.2,调用BytesToBloom(bloomBin.Bytes())方法,把bloomBin加入区块的bloom过滤器中,这时Bloom过滤器就有了本次交易的所有收据。 3.3,需要说明的是Bloom过滤器只是提供一个查找数据是否存在的工具,它本身不包含任何数据。

4, BloomLookup()方法查找对应的数据是否在bloom过滤器里面。

func BloomLookup(bin Bloom, topic bytesBacked) bool { bloom := bin.Big() cmp := bloom9(topic.Bytes()[:]) return bloom.And(bloom, cmp).Cmp(cmp) == 0 }

先将传入的数据转成bloom9值,传入的bloomBin 转成bigInt。根据按位与操作,判断传入的值是否在Bloom过滤器里面。

二,Bloom过滤器的实际应用 bloom过滤器是用来快速的查找log的,那以太坊是如何用bloom过滤器来查找的呢? 想要要找某一条log,如果从区块链的头区块开始,根据区块头的hash依次开始查找的话是效率比较低的,每个区块写在本地数据库是散列存储的, 会增加很多io请求,io请求的速度很慢的。如何能快速的找到目的区块,这时候就要用到Chain_Indexer。以太坊的BloomIndexer具体实现了Chain_Indexer,可以认为是Chain_Indexer的派生类。 Chain_Indexer的初始化:

func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBackend, section, confirm uint64, throttling time.Duration, kind string) *ChainIndexer { c := &ChainIndexer{ chainDb: chainDb, indexDb: indexDb, backend: backend, update: make(chan struct{}, 1), quit: make(chan chan error), sectionSize: section, confirmsReq: confirm, throttling: throttling, log: log.New("type", kind), } // Initialize database dependent fields and start the updater c.loadValidSections() go c.updateLoop() return c }

chainDb是整个区块链的Db indexDb是这个BloomIndexer的Db sectionSize等于4096,把每4096个区块划到一个section中 loadValidSections,取得indexDb里面存放的section的数量 c.updateLoop是chainIndexer 更新的主循环,有新的区块,或者有新的没有在indexDb里面存放的section产生都会send到c.updateLoop的goroutine里面去。

func (c *ChainIndexer) updateLoop() { var ( updating bool updated time.Time ) for { select { case errc := <-c.quit: // Chain indexer terminating, report no failure and abort errc <- nil return case <-c.update: // Section headers completed (or rolled back), update the index c.lock.Lock() if c.knownSections > c.storedSections { // Periodically print an upgrade log message to the user if time.Since(updated) > 8*time.Second { if c.knownSections > c.storedSections+1 { updating = true c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) } updated = time.Now() } // Cache the current section count and head to allow unlocking the mutex section := c.storedSections var oldHead common.Hash if section > 0 { oldHead = c.SectionHead(section - 1) } // Process the newly defined section in the background c.lock.Unlock() newHead, err := c.processSection(section, oldHead) if err != nil { c.log.Error("Section processing failed", "error", err) } c.lock.Lock() // If processing succeeded and no reorgs occcurred, mark the section completed if err == nil && oldHead == c.SectionHead(section-1) { c.setSectionHead(section, newHead) c.setValidSections(section + 1) if c.storedSections == c.knownSections && updating { updating = false c.log.Info("Finished upgrading chain index") } c.cascadedHead = c.storedSections*c.sectionSize - 1 for _, child := range c.children { c.log.Trace("Cascading chain index update", "head", c.cascadedHead) child.newHead(c.cascadedHead, false) } } else { // If processing failed, don't retry until further notification c.log.Debug("Chain index processing failed", "section", section, "err", err) c.knownSections = c.storedSections } } // If there are still further sections to process, reschedule if c.knownSections > c.storedSections { time.AfterFunc(c.throttling, func() { select { case c.update <- struct{}{}: default: } }) } c.lock.Unlock() } } }

1,c.updateLoop收到update的通知后,看是否有已知的未写入indexDb的section。 2,调用c.processSection(section, oldHead)生成新的section

func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (common.Hash, error) { c.log.Trace("Processing new chain section", "section", section) // Reset and partial processing if err := c.backend.Reset(section, lastHead); err != nil { c.setValidSections(0) return common.Hash{}, err } for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { hash := GetCanonicalHash(c.chainDb, number) if hash == (common.Hash{}) { return common.Hash{}, fmt.Errorf("canonical block #%d unknown", number) } header := GetHeader(c.chainDb, hash, number) if header == nil { return common.Hash{}, fmt.Errorf("block #%d [%x…] not found", number, hash[:4]) } else if header.ParentHash != lastHead { return common.Hash{}, fmt.Errorf("chain reorged during section processing") } c.backend.Process(header) lastHead = header.Hash() } if err := c.backend.Commit(); err != nil { c.log.Error("Section commit failed", "error", err) return common.Hash{}, err } return lastHead, nil }

2.1,调用c.backend.Reset(section, lastHead)产生一个待组装的section,每个section中存在一个bloom过滤器。 2.2,把number等于section * c.sectionSize到(section+1)*c.sectionSize的block依次加入到待组装的section中。并把这些block的header.bloom加入到section的bloom过滤器中。 2.3,调用c.backend.Commit(),把新的section写入db。返回最近的那block的header。

3,更新sectionHead和ValidSctions,如果还有新的没有在db里面的section的话,在throttling时间后在循环更新一次。

三,外部调用接口查找log的流程 PublicFilterAPI提供了给外部rpc调用的过滤查找接口。比如GetLogs()方法

func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]*types.Log, error) { // Convert the RPC block numbers into internal representations if crit.FromBlock == nil { crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } if crit.ToBlock == nil { crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } // Create and run the filter to get all the logs filter := New(api.backend, crit.FromBlock.Int64(), crit.ToBlock.Int64(), crit.Addresses, crit.Topics) logs, err := filter.Logs(ctx) if err != nil { return nil, err } return returnLogs(logs), err }

1,FilterCriteria是外部请求的过滤条件,可以根据起始区块,日志的合约地址,日志topics的hash值来设置过滤条件。 2,以太坊内部根据FilterCriteria,创建一个过滤器,把合约地址和topics的hash作为bloombit的匹配器的匹配条件。 3,调用filter.Logs(ctx)来获取日志

func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { // Figure out the limits of the filter range header, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) if header == nil { return nil, nil } head := header.Number.Uint64() if f.begin == -1 { f.begin = int64(head) } end := uint64(f.end) if f.end == -1 { end = head } // Gather all indexed logs, and finish with non indexed ones var ( logs []*types.Log err error ) size, sections := f.backend.BloomStatus() if indexed := sections * size; indexed > uint64(f.begin) { if indexed > end { logs, err = f.indexedLogs(ctx, end) } else { logs, err = f.indexedLogs(ctx, indexed-1) } if err != nil { return logs, err } } rest, err := f.unindexedLogs(ctx, end) logs = append(logs, rest...) return logs, err }

3.1,如果没有设定起始位置,就认为从最新区块的header开始。找到开始位置区块对应的的section,如果开始位置在section里面就走f.indexedLogs()在chainIndexer里面找log,如果不是就调用f.unindexedLogs()不再chainIndexer里面找。 3.2,f.unindexedLogs()相对简单。

func (f *Filter) unindexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { var logs []*types.Log for ; f.begin <= int64(end); f.begin++ { header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(f.begin)) if header == nil || err != nil { return logs, err } if bloomFilter(header.Bloom, f.addresses, f.topics) { found, err := f.checkMatches(ctx, header) if err != nil { return logs, err } logs = append(logs, found...) } } return logs, nil }

3.2.1,因为没有并入section的区块都是比较新的区块,数量也不多。直接从最新的区块开始遍历查找就可以了。 3.2.2,bloomFilter(header.Bloom, f.addresses, f.topics)方法,根据合约地址和topics的bloom9值在header的bloom过滤器中按位与操作,看是否在这个区块中。 3.2.3,如果找到这个block,调用checkMatches方法在block里面查找对应的log

func (f *Filter) checkMatches(ctx context.Context, header *types.Header) (logs []*types.Log, err error) { // Get the logs of the block logsList, err := f.backend.GetLogs(ctx, header.Hash()) if err != nil { return nil, err } var unfiltered []*types.Log for _, logs := range logsList { unfiltered = append(unfiltered, logs...) } logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) if len(logs) > 0 { // We have matching logs, check if we need to resolve full logs via the light client if logs[0].TxHash == (common.Hash{}) { receipts, err := f.backend.GetReceipts(ctx, header.Hash()) if err != nil { return nil, err } unfiltered = unfiltered[:0] for _, receipt := range receipts { unfiltered = append(unfiltered, receipt.Logs...) } logs = filterLogs(unfiltered, nil, nil, f.addresses, f.topics) } return logs, nil } return nil, nil }

3.2.3.1 调用ethApi的f.backend.GetLogs(ctx, header.Hash())方法,找到这个区块的所有收据下的所有日志。 3.2.3.2 调用filterLogs(unfiltered, nil, nil, f.addresses, f.topics),根据f.addresses, f.topics过滤出想要的logs。如果第一个log的hash是空的,需要通过light client重现获取一遍所有的日志,再走一下过滤。

3.3,f.indexedLogs() 在chainIndexer里面查找日志

func (f *Filter) indexedLogs(ctx context.Context, end uint64) ([]*types.Log, error) { // Create a matcher session and request servicing from the backend matches := make(chan uint64, 64) session, err := f.matcher.Start(ctx, uint64(f.begin), end, matches) if err != nil { return nil, err } defer session.Close() f.backend.ServiceFilter(ctx, session) // Iterate over the matches until exhausted or context closed var logs []*types.Log for { select { case number, ok := <-matches: // Abort if all matches have been fulfilled if !ok { err := session.Error() if err == nil { f.begin = int64(end) + 1 } return logs, err } f.begin = int64(number) + 1 // Retrieve the suggested block and pull any truly matching logs header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(number)) if header == nil || err != nil { return logs, err } found, err := f.checkMatches(ctx, header) if err != nil { return logs, err } logs = append(logs, found...) case <-ctx.Done(): return logs, ctx.Err() } } }

indexedLogs启动一个匹配器来查找Filter条件下对应的区块,这一节暂不分析f.matcher的工作原理。 找到对应区块,接下来的事情就和unindexedLogs的处理一样了。

总结: 以太坊的bloom过滤器大大的提高了查询的效率。以太坊先创建topics的bloom,再创建logs的bloom,再创建收据的bloom,在创建header的bloom,最后创建block的bloom,一步一步构建上去。于此对应的,在查找日志的过程正好相反,先在block的bloom里面找,再在header的bloom里面找,再在收据的bloom里面找,直到找到最终的日志。

 

转载请注明原文地址: https://www.6miu.com/read-5035612.html

最新回复(0)