mirror of
				https://gitea.com/Lydanne/buildx.git
				synced 2025-10-25 13:13:45 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			137 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			137 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2014 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package watch
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	"sync"
 | |
| 
 | |
| 	"k8s.io/klog/v2"
 | |
| 
 | |
| 	"k8s.io/apimachinery/pkg/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/util/net"
 | |
| 	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
 | |
| )
 | |
| 
 | |
| // Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
 | |
| type Decoder interface {
 | |
| 	// Decode should return the type of event, the decoded object, or an error.
 | |
| 	// An error will cause StreamWatcher to call Close(). Decode should block until
 | |
| 	// it has data or an error occurs.
 | |
| 	Decode() (action EventType, object runtime.Object, err error)
 | |
| 
 | |
| 	// Close should close the underlying io.Reader, signalling to the source of
 | |
| 	// the stream that it is no longer being watched. Close() must cause any
 | |
| 	// outstanding call to Decode() to return with an error of some sort.
 | |
| 	Close()
 | |
| }
 | |
| 
 | |
| // Reporter hides the details of how an error is turned into a runtime.Object for
 | |
| // reporting on a watch stream since this package may not import a higher level report.
 | |
| type Reporter interface {
 | |
| 	// AsObject must convert err into a valid runtime.Object for the watch stream.
 | |
| 	AsObject(err error) runtime.Object
 | |
| }
 | |
| 
 | |
| // StreamWatcher turns any stream for which you can write a Decoder interface
 | |
| // into a watch.Interface.
 | |
| type StreamWatcher struct {
 | |
| 	sync.Mutex
 | |
| 	source   Decoder
 | |
| 	reporter Reporter
 | |
| 	result   chan Event
 | |
| 	done     chan struct{}
 | |
| }
 | |
| 
 | |
| // NewStreamWatcher creates a StreamWatcher from the given decoder.
 | |
| func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
 | |
| 	sw := &StreamWatcher{
 | |
| 		source:   d,
 | |
| 		reporter: r,
 | |
| 		// It's easy for a consumer to add buffering via an extra
 | |
| 		// goroutine/channel, but impossible for them to remove it,
 | |
| 		// so nonbuffered is better.
 | |
| 		result: make(chan Event),
 | |
| 		// If the watcher is externally stopped there is no receiver anymore
 | |
| 		// and the send operations on the result channel, especially the
 | |
| 		// error reporting might block forever.
 | |
| 		// Therefore a dedicated stop channel is used to resolve this blocking.
 | |
| 		done: make(chan struct{}),
 | |
| 	}
 | |
| 	go sw.receive()
 | |
| 	return sw
 | |
| }
 | |
| 
 | |
| // ResultChan implements Interface.
 | |
| func (sw *StreamWatcher) ResultChan() <-chan Event {
 | |
| 	return sw.result
 | |
| }
 | |
| 
 | |
| // Stop implements Interface.
 | |
| func (sw *StreamWatcher) Stop() {
 | |
| 	// Call Close() exactly once by locking and setting a flag.
 | |
| 	sw.Lock()
 | |
| 	defer sw.Unlock()
 | |
| 	// closing a closed channel always panics, therefore check before closing
 | |
| 	select {
 | |
| 	case <-sw.done:
 | |
| 	default:
 | |
| 		close(sw.done)
 | |
| 		sw.source.Close()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // receive reads result from the decoder in a loop and sends down the result channel.
 | |
| func (sw *StreamWatcher) receive() {
 | |
| 	defer utilruntime.HandleCrash()
 | |
| 	defer close(sw.result)
 | |
| 	defer sw.Stop()
 | |
| 	for {
 | |
| 		action, obj, err := sw.source.Decode()
 | |
| 		if err != nil {
 | |
| 			switch err {
 | |
| 			case io.EOF:
 | |
| 				// watch closed normally
 | |
| 			case io.ErrUnexpectedEOF:
 | |
| 				klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
 | |
| 			default:
 | |
| 				if net.IsProbableEOF(err) || net.IsTimeout(err) {
 | |
| 					klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
 | |
| 				} else {
 | |
| 					select {
 | |
| 					case <-sw.done:
 | |
| 					case sw.result <- Event{
 | |
| 						Type:   Error,
 | |
| 						Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
 | |
| 					}:
 | |
| 					}
 | |
| 				}
 | |
| 			}
 | |
| 			return
 | |
| 		}
 | |
| 		select {
 | |
| 		case <-sw.done:
 | |
| 			return
 | |
| 		case sw.result <- Event{
 | |
| 			Type:   action,
 | |
| 			Object: obj,
 | |
| 		}:
 | |
| 		}
 | |
| 	}
 | |
| }
 | 
