mirror of
				https://gitea.com/Lydanne/buildx.git
				synced 2025-10-26 13:43:48 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			261 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			261 lines
		
	
	
		
			7.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /*
 | |
| Copyright 2014 The Kubernetes Authors.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License.
 | |
| */
 | |
| 
 | |
| package watch
 | |
| 
 | |
| import (
 | |
| 	"sync"
 | |
| 
 | |
| 	"k8s.io/apimachinery/pkg/runtime"
 | |
| 	"k8s.io/apimachinery/pkg/runtime/schema"
 | |
| )
 | |
| 
 | |
| // FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch
 | |
| // channel is full.
 | |
| type FullChannelBehavior int
 | |
| 
 | |
| const (
 | |
| 	WaitIfChannelFull FullChannelBehavior = iota
 | |
| 	DropIfChannelFull
 | |
| )
 | |
| 
 | |
| // Buffer the incoming queue a little bit even though it should rarely ever accumulate
 | |
| // anything, just in case a few events are received in such a short window that
 | |
| // Broadcaster can't move them onto the watchers' queues fast enough.
 | |
| const incomingQueueLength = 25
 | |
| 
 | |
| // Broadcaster distributes event notifications among any number of watchers. Every event
 | |
| // is delivered to every watcher.
 | |
| type Broadcaster struct {
 | |
| 	// TODO: see if this lock is needed now that new watchers go through
 | |
| 	// the incoming channel.
 | |
| 	lock sync.Mutex
 | |
| 
 | |
| 	watchers     map[int64]*broadcasterWatcher
 | |
| 	nextWatcher  int64
 | |
| 	distributing sync.WaitGroup
 | |
| 
 | |
| 	incoming chan Event
 | |
| 
 | |
| 	// How large to make watcher's channel.
 | |
| 	watchQueueLength int
 | |
| 	// If one of the watch channels is full, don't wait for it to become empty.
 | |
| 	// Instead just deliver it to the watchers that do have space in their
 | |
| 	// channels and move on to the next event.
 | |
| 	// It's more fair to do this on a per-watcher basis than to do it on the
 | |
| 	// "incoming" channel, which would allow one slow watcher to prevent all
 | |
| 	// other watchers from getting new events.
 | |
| 	fullChannelBehavior FullChannelBehavior
 | |
| }
 | |
| 
 | |
| // NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
 | |
| // It is guaranteed that events will be distributed in the order in which they occur,
 | |
| // but the order in which a single event is distributed among all of the watchers is unspecified.
 | |
| func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
 | |
| 	m := &Broadcaster{
 | |
| 		watchers:            map[int64]*broadcasterWatcher{},
 | |
| 		incoming:            make(chan Event, incomingQueueLength),
 | |
| 		watchQueueLength:    queueLength,
 | |
| 		fullChannelBehavior: fullChannelBehavior,
 | |
| 	}
 | |
| 	m.distributing.Add(1)
 | |
| 	go m.loop()
 | |
| 	return m
 | |
| }
 | |
| 
 | |
| const internalRunFunctionMarker = "internal-do-function"
 | |
| 
 | |
| // a function type we can shoehorn into the queue.
 | |
| type functionFakeRuntimeObject func()
 | |
| 
 | |
| func (obj functionFakeRuntimeObject) GetObjectKind() schema.ObjectKind {
 | |
| 	return schema.EmptyObjectKind
 | |
| }
 | |
| func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object {
 | |
| 	if obj == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	// funcs are immutable. Hence, just return the original func.
 | |
| 	return obj
 | |
| }
 | |
| 
 | |
| // Execute f, blocking the incoming queue (and waiting for it to drain first).
 | |
| // The purpose of this terrible hack is so that watchers added after an event
 | |
| // won't ever see that event, and will always see any event after they are
 | |
| // added.
 | |
| func (b *Broadcaster) blockQueue(f func()) {
 | |
| 	var wg sync.WaitGroup
 | |
| 	wg.Add(1)
 | |
| 	b.incoming <- Event{
 | |
| 		Type: internalRunFunctionMarker,
 | |
| 		Object: functionFakeRuntimeObject(func() {
 | |
| 			defer wg.Done()
 | |
| 			f()
 | |
| 		}),
 | |
| 	}
 | |
| 	wg.Wait()
 | |
| }
 | |
| 
 | |
| // Watch adds a new watcher to the list and returns an Interface for it.
 | |
| // Note: new watchers will only receive new events. They won't get an entire history
 | |
| // of previous events.
 | |
| func (m *Broadcaster) Watch() Interface {
 | |
| 	var w *broadcasterWatcher
 | |
| 	m.blockQueue(func() {
 | |
| 		m.lock.Lock()
 | |
| 		defer m.lock.Unlock()
 | |
| 		id := m.nextWatcher
 | |
| 		m.nextWatcher++
 | |
| 		w = &broadcasterWatcher{
 | |
| 			result:  make(chan Event, m.watchQueueLength),
 | |
| 			stopped: make(chan struct{}),
 | |
| 			id:      id,
 | |
| 			m:       m,
 | |
| 		}
 | |
| 		m.watchers[id] = w
 | |
| 	})
 | |
| 	return w
 | |
| }
 | |
| 
 | |
| // WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
 | |
| // queuedEvents down the new watch before beginning to send ordinary events from Broadcaster.
 | |
| // The returned watch will have a queue length that is at least large enough to accommodate
 | |
| // all of the items in queuedEvents.
 | |
| func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
 | |
| 	var w *broadcasterWatcher
 | |
| 	m.blockQueue(func() {
 | |
| 		m.lock.Lock()
 | |
| 		defer m.lock.Unlock()
 | |
| 		id := m.nextWatcher
 | |
| 		m.nextWatcher++
 | |
| 		length := m.watchQueueLength
 | |
| 		if n := len(queuedEvents) + 1; n > length {
 | |
| 			length = n
 | |
| 		}
 | |
| 		w = &broadcasterWatcher{
 | |
| 			result:  make(chan Event, length),
 | |
| 			stopped: make(chan struct{}),
 | |
| 			id:      id,
 | |
| 			m:       m,
 | |
| 		}
 | |
| 		m.watchers[id] = w
 | |
| 		for _, e := range queuedEvents {
 | |
| 			w.result <- e
 | |
| 		}
 | |
| 	})
 | |
| 	return w
 | |
| }
 | |
| 
 | |
| // stopWatching stops the given watcher and removes it from the list.
 | |
| func (m *Broadcaster) stopWatching(id int64) {
 | |
| 	m.lock.Lock()
 | |
| 	defer m.lock.Unlock()
 | |
| 	w, ok := m.watchers[id]
 | |
| 	if !ok {
 | |
| 		// No need to do anything, it's already been removed from the list.
 | |
| 		return
 | |
| 	}
 | |
| 	delete(m.watchers, id)
 | |
| 	close(w.result)
 | |
| }
 | |
| 
 | |
| // closeAll disconnects all watchers (presumably in response to a Shutdown call).
 | |
| func (m *Broadcaster) closeAll() {
 | |
| 	m.lock.Lock()
 | |
| 	defer m.lock.Unlock()
 | |
| 	for _, w := range m.watchers {
 | |
| 		close(w.result)
 | |
| 	}
 | |
| 	// Delete everything from the map, since presence/absence in the map is used
 | |
| 	// by stopWatching to avoid double-closing the channel.
 | |
| 	m.watchers = map[int64]*broadcasterWatcher{}
 | |
| }
 | |
| 
 | |
| // Action distributes the given event among all watchers.
 | |
| func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
 | |
| 	m.incoming <- Event{action, obj}
 | |
| }
 | |
| 
 | |
| // Shutdown disconnects all watchers (but any queued events will still be distributed).
 | |
| // You must not call Action or Watch* after calling Shutdown. This call blocks
 | |
| // until all events have been distributed through the outbound channels. Note
 | |
| // that since they can be buffered, this means that the watchers might not
 | |
| // have received the data yet as it can remain sitting in the buffered
 | |
| // channel.
 | |
| func (m *Broadcaster) Shutdown() {
 | |
| 	close(m.incoming)
 | |
| 	m.distributing.Wait()
 | |
| }
 | |
| 
 | |
| // loop receives from m.incoming and distributes to all watchers.
 | |
| func (m *Broadcaster) loop() {
 | |
| 	// Deliberately not catching crashes here. Yes, bring down the process if there's a
 | |
| 	// bug in watch.Broadcaster.
 | |
| 	for event := range m.incoming {
 | |
| 		if event.Type == internalRunFunctionMarker {
 | |
| 			event.Object.(functionFakeRuntimeObject)()
 | |
| 			continue
 | |
| 		}
 | |
| 		m.distribute(event)
 | |
| 	}
 | |
| 	m.closeAll()
 | |
| 	m.distributing.Done()
 | |
| }
 | |
| 
 | |
| // distribute sends event to all watchers. Blocking.
 | |
| func (m *Broadcaster) distribute(event Event) {
 | |
| 	m.lock.Lock()
 | |
| 	defer m.lock.Unlock()
 | |
| 	if m.fullChannelBehavior == DropIfChannelFull {
 | |
| 		for _, w := range m.watchers {
 | |
| 			select {
 | |
| 			case w.result <- event:
 | |
| 			case <-w.stopped:
 | |
| 			default: // Don't block if the event can't be queued.
 | |
| 			}
 | |
| 		}
 | |
| 	} else {
 | |
| 		for _, w := range m.watchers {
 | |
| 			select {
 | |
| 			case w.result <- event:
 | |
| 			case <-w.stopped:
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // broadcasterWatcher handles a single watcher of a broadcaster
 | |
| type broadcasterWatcher struct {
 | |
| 	result  chan Event
 | |
| 	stopped chan struct{}
 | |
| 	stop    sync.Once
 | |
| 	id      int64
 | |
| 	m       *Broadcaster
 | |
| }
 | |
| 
 | |
| // ResultChan returns a channel to use for waiting on events.
 | |
| func (mw *broadcasterWatcher) ResultChan() <-chan Event {
 | |
| 	return mw.result
 | |
| }
 | |
| 
 | |
| // Stop stops watching and removes mw from its list.
 | |
| func (mw *broadcasterWatcher) Stop() {
 | |
| 	mw.stop.Do(func() {
 | |
| 		close(mw.stopped)
 | |
| 		mw.m.stopWatching(mw.id)
 | |
| 	})
 | |
| }
 | 
