kubernetes源码之watch包streamwatcher.go阅读理解五

xiaoxiao2021-02-28  48

这是watcher包中最后一个文件了

streamwatcher.go先看数据结构

type Decoder interface { Decode() (action EventType, object runtime.Object, err error) Close() } type StreamWatcher struct { sync.Mutex source Decoder result chan Event stopped bool }

再看看方法

StreamWatcher 有两 ResultChan()个stop()方法,实现Interface func (sw *StreamWatcher) ResultChan() <-chan Event { return sw.result } // Stop implements Interface. func (sw *StreamWatcher) Stop() { // Call Close() exactly once by locking and setting a flag. sw.Lock() defer sw.Unlock() if !sw.stopped { sw.stopped = true sw.source.Close() } } 实例化一个StreamWatcher并且有一个goroutine func NewStreamWatcher(d Decoder) *StreamWatcher { sw := &StreamWatcher{ source: d, // It's easy for a consumer to add buffering via an extra // goroutine/channel, but impossible for them to remove it, // so nonbuffered is better. result: make(chan Event), } go sw.receive() return sw } func (sw *StreamWatcher) stopping() bool { sw.Lock() defer sw.Unlock() return sw.stopped } // receive reads result from the decoder in a loop and sends down the result channel. receive方法循环读取decoder的结果,然后发送到结果通道。 receive方法从decoder中使用Decode方法获取(action EventType, object runtime.Object, err error) func (sw *StreamWatcher) receive() { defer close(sw.result) defer sw.Stop() defer utilruntime.HandleCrash() for { action, obj, err := sw.source.Decode() if err != nil { // Ignore expected error. if sw.stopping() { return } switch err { case io.EOF: // watch closed normally case io.ErrUnexpectedEOF: glog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err) default: msg := "Unable to decode an event from the watch stream: %v" if net.IsProbableEOF(err) { glog.V(5).Infof(msg, err) } else { glog.Errorf(msg, err) } } return } sw.result <- Event{ Type: action, Object: obj, } } }

总结

说白了就是从Decoder读取数据然后发送给result type StreamWatcher struct { sync.Mutex source Decoder result chan Event stopped bool

拓展,看看使用了NewStreamWatcher的文件

k8s.io\client-go\rest\request.go 中694行使用了这个函数 回头在仔细阅读以下这个文件 return watch.NewStreamWatcher(restclientwatch.NewDecoder(decoder, r.serializers.Decoder)), nil
转载请注明原文地址: https://www.6miu.com/read-77575.html

最新回复(0)