mirror of
				https://gitea.com/Lydanne/buildx.git
				synced 2025-11-04 18:13:42 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			268 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			268 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
//
 | 
						|
// Written by Maxim Khitrov (November 2012)
 | 
						|
//
 | 
						|
 | 
						|
// Package flowrate provides the tools for monitoring and limiting the flow rate
 | 
						|
// of an arbitrary data stream.
 | 
						|
package flowrate
 | 
						|
 | 
						|
import (
 | 
						|
	"math"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
)
 | 
						|
 | 
						|
// Monitor monitors and limits the transfer rate of a data stream.
 | 
						|
type Monitor struct {
 | 
						|
	mu      sync.Mutex    // Mutex guarding access to all internal fields
 | 
						|
	active  bool          // Flag indicating an active transfer
 | 
						|
	start   time.Duration // Transfer start time (clock() value)
 | 
						|
	bytes   int64         // Total number of bytes transferred
 | 
						|
	samples int64         // Total number of samples taken
 | 
						|
 | 
						|
	rSample float64 // Most recent transfer rate sample (bytes per second)
 | 
						|
	rEMA    float64 // Exponential moving average of rSample
 | 
						|
	rPeak   float64 // Peak transfer rate (max of all rSamples)
 | 
						|
	rWindow float64 // rEMA window (seconds)
 | 
						|
 | 
						|
	sBytes int64         // Number of bytes transferred since sLast
 | 
						|
	sLast  time.Duration // Most recent sample time (stop time when inactive)
 | 
						|
	sRate  time.Duration // Sampling rate
 | 
						|
 | 
						|
	tBytes int64         // Number of bytes expected in the current transfer
 | 
						|
	tLast  time.Duration // Time of the most recent transfer of at least 1 byte
 | 
						|
}
 | 
						|
 | 
						|
// New creates a new flow control monitor. Instantaneous transfer rate is
 | 
						|
// measured and updated for each sampleRate interval. windowSize determines the
 | 
						|
// weight of each sample in the exponential moving average (EMA) calculation.
 | 
						|
// The exact formulas are:
 | 
						|
//
 | 
						|
// 	sampleTime = currentTime - prevSampleTime
 | 
						|
// 	sampleRate = byteCount / sampleTime
 | 
						|
// 	weight     = 1 - exp(-sampleTime/windowSize)
 | 
						|
// 	newRate    = weight*sampleRate + (1-weight)*oldRate
 | 
						|
//
 | 
						|
// The default values for sampleRate and windowSize (if <= 0) are 100ms and 1s,
 | 
						|
// respectively.
 | 
						|
func New(sampleRate, windowSize time.Duration) *Monitor {
 | 
						|
	if sampleRate = clockRound(sampleRate); sampleRate <= 0 {
 | 
						|
		sampleRate = 5 * clockRate
 | 
						|
	}
 | 
						|
	if windowSize <= 0 {
 | 
						|
		windowSize = 1 * time.Second
 | 
						|
	}
 | 
						|
	now := clock()
 | 
						|
	return &Monitor{
 | 
						|
		active:  true,
 | 
						|
		start:   now,
 | 
						|
		rWindow: windowSize.Seconds(),
 | 
						|
		sLast:   now,
 | 
						|
		sRate:   sampleRate,
 | 
						|
		tLast:   now,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Update records the transfer of n bytes and returns n. It should be called
 | 
						|
// after each Read/Write operation, even if n is 0.
 | 
						|
func (m *Monitor) Update(n int) int {
 | 
						|
	m.mu.Lock()
 | 
						|
	m.update(n)
 | 
						|
	m.mu.Unlock()
 | 
						|
	return n
 | 
						|
}
 | 
						|
 | 
						|
// IO is a convenience method intended to wrap io.Reader and io.Writer method
 | 
						|
// execution. It calls m.Update(n) and then returns (n, err) unmodified.
 | 
						|
func (m *Monitor) IO(n int, err error) (int, error) {
 | 
						|
	return m.Update(n), err
 | 
						|
}
 | 
						|
 | 
						|
// Done marks the transfer as finished and prevents any further updates or
 | 
						|
// limiting. Instantaneous and current transfer rates drop to 0. Update, IO, and
 | 
						|
// Limit methods become NOOPs. It returns the total number of bytes transferred.
 | 
						|
func (m *Monitor) Done() int64 {
 | 
						|
	m.mu.Lock()
 | 
						|
	if now := m.update(0); m.sBytes > 0 {
 | 
						|
		m.reset(now)
 | 
						|
	}
 | 
						|
	m.active = false
 | 
						|
	m.tLast = 0
 | 
						|
	n := m.bytes
 | 
						|
	m.mu.Unlock()
 | 
						|
	return n
 | 
						|
}
 | 
						|
 | 
						|
// timeRemLimit is the maximum Status.TimeRem value.
 | 
						|
const timeRemLimit = 999*time.Hour + 59*time.Minute + 59*time.Second
 | 
						|
 | 
						|
// Status represents the current Monitor status. All transfer rates are in bytes
 | 
						|
// per second rounded to the nearest byte.
 | 
						|
type Status struct {
 | 
						|
	Active   bool          // Flag indicating an active transfer
 | 
						|
	Start    time.Time     // Transfer start time
 | 
						|
	Duration time.Duration // Time period covered by the statistics
 | 
						|
	Idle     time.Duration // Time since the last transfer of at least 1 byte
 | 
						|
	Bytes    int64         // Total number of bytes transferred
 | 
						|
	Samples  int64         // Total number of samples taken
 | 
						|
	InstRate int64         // Instantaneous transfer rate
 | 
						|
	CurRate  int64         // Current transfer rate (EMA of InstRate)
 | 
						|
	AvgRate  int64         // Average transfer rate (Bytes / Duration)
 | 
						|
	PeakRate int64         // Maximum instantaneous transfer rate
 | 
						|
	BytesRem int64         // Number of bytes remaining in the transfer
 | 
						|
	TimeRem  time.Duration // Estimated time to completion
 | 
						|
	Progress Percent       // Overall transfer progress
 | 
						|
}
 | 
						|
 | 
						|
// Status returns current transfer status information. The returned value
 | 
						|
// becomes static after a call to Done.
 | 
						|
func (m *Monitor) Status() Status {
 | 
						|
	m.mu.Lock()
 | 
						|
	now := m.update(0)
 | 
						|
	s := Status{
 | 
						|
		Active:   m.active,
 | 
						|
		Start:    clockToTime(m.start),
 | 
						|
		Duration: m.sLast - m.start,
 | 
						|
		Idle:     now - m.tLast,
 | 
						|
		Bytes:    m.bytes,
 | 
						|
		Samples:  m.samples,
 | 
						|
		PeakRate: round(m.rPeak),
 | 
						|
		BytesRem: m.tBytes - m.bytes,
 | 
						|
		Progress: percentOf(float64(m.bytes), float64(m.tBytes)),
 | 
						|
	}
 | 
						|
	if s.BytesRem < 0 {
 | 
						|
		s.BytesRem = 0
 | 
						|
	}
 | 
						|
	if s.Duration > 0 {
 | 
						|
		rAvg := float64(s.Bytes) / s.Duration.Seconds()
 | 
						|
		s.AvgRate = round(rAvg)
 | 
						|
		if s.Active {
 | 
						|
			s.InstRate = round(m.rSample)
 | 
						|
			s.CurRate = round(m.rEMA)
 | 
						|
			if s.BytesRem > 0 {
 | 
						|
				if tRate := 0.8*m.rEMA + 0.2*rAvg; tRate > 0 {
 | 
						|
					ns := float64(s.BytesRem) / tRate * 1e9
 | 
						|
					if ns > float64(timeRemLimit) {
 | 
						|
						ns = float64(timeRemLimit)
 | 
						|
					}
 | 
						|
					s.TimeRem = clockRound(time.Duration(ns))
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	m.mu.Unlock()
 | 
						|
	return s
 | 
						|
}
 | 
						|
 | 
						|
// Limit restricts the instantaneous (per-sample) data flow to rate bytes per
 | 
						|
// second. It returns the maximum number of bytes (0 <= n <= want) that may be
 | 
						|
// transferred immediately without exceeding the limit. If block == true, the
 | 
						|
// call blocks until n > 0. want is returned unmodified if want < 1, rate < 1,
 | 
						|
// or the transfer is inactive (after a call to Done).
 | 
						|
//
 | 
						|
// At least one byte is always allowed to be transferred in any given sampling
 | 
						|
// period. Thus, if the sampling rate is 100ms, the lowest achievable flow rate
 | 
						|
// is 10 bytes per second.
 | 
						|
//
 | 
						|
// For usage examples, see the implementation of Reader and Writer in io.go.
 | 
						|
func (m *Monitor) Limit(want int, rate int64, block bool) (n int) {
 | 
						|
	if want < 1 || rate < 1 {
 | 
						|
		return want
 | 
						|
	}
 | 
						|
	m.mu.Lock()
 | 
						|
 | 
						|
	// Determine the maximum number of bytes that can be sent in one sample
 | 
						|
	limit := round(float64(rate) * m.sRate.Seconds())
 | 
						|
	if limit <= 0 {
 | 
						|
		limit = 1
 | 
						|
	}
 | 
						|
 | 
						|
	// If block == true, wait until m.sBytes < limit
 | 
						|
	if now := m.update(0); block {
 | 
						|
		for m.sBytes >= limit && m.active {
 | 
						|
			now = m.waitNextSample(now)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	// Make limit <= want (unlimited if the transfer is no longer active)
 | 
						|
	if limit -= m.sBytes; limit > int64(want) || !m.active {
 | 
						|
		limit = int64(want)
 | 
						|
	}
 | 
						|
	m.mu.Unlock()
 | 
						|
 | 
						|
	if limit < 0 {
 | 
						|
		limit = 0
 | 
						|
	}
 | 
						|
	return int(limit)
 | 
						|
}
 | 
						|
 | 
						|
// SetTransferSize specifies the total size of the data transfer, which allows
 | 
						|
// the Monitor to calculate the overall progress and time to completion.
 | 
						|
func (m *Monitor) SetTransferSize(bytes int64) {
 | 
						|
	if bytes < 0 {
 | 
						|
		bytes = 0
 | 
						|
	}
 | 
						|
	m.mu.Lock()
 | 
						|
	m.tBytes = bytes
 | 
						|
	m.mu.Unlock()
 | 
						|
}
 | 
						|
 | 
						|
// update accumulates the transferred byte count for the current sample until
 | 
						|
// clock() - m.sLast >= m.sRate. The monitor status is updated once the current
 | 
						|
// sample is done.
 | 
						|
func (m *Monitor) update(n int) (now time.Duration) {
 | 
						|
	if !m.active {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	if now = clock(); n > 0 {
 | 
						|
		m.tLast = now
 | 
						|
	}
 | 
						|
	m.sBytes += int64(n)
 | 
						|
	if sTime := now - m.sLast; sTime >= m.sRate {
 | 
						|
		t := sTime.Seconds()
 | 
						|
		if m.rSample = float64(m.sBytes) / t; m.rSample > m.rPeak {
 | 
						|
			m.rPeak = m.rSample
 | 
						|
		}
 | 
						|
 | 
						|
		// Exponential moving average using a method similar to *nix load
 | 
						|
		// average calculation. Longer sampling periods carry greater weight.
 | 
						|
		if m.samples > 0 {
 | 
						|
			w := math.Exp(-t / m.rWindow)
 | 
						|
			m.rEMA = m.rSample + w*(m.rEMA-m.rSample)
 | 
						|
		} else {
 | 
						|
			m.rEMA = m.rSample
 | 
						|
		}
 | 
						|
		m.reset(now)
 | 
						|
	}
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
// reset clears the current sample state in preparation for the next sample.
 | 
						|
func (m *Monitor) reset(sampleTime time.Duration) {
 | 
						|
	m.bytes += m.sBytes
 | 
						|
	m.samples++
 | 
						|
	m.sBytes = 0
 | 
						|
	m.sLast = sampleTime
 | 
						|
}
 | 
						|
 | 
						|
// waitNextSample sleeps for the remainder of the current sample. The lock is
 | 
						|
// released and reacquired during the actual sleep period, so it's possible for
 | 
						|
// the transfer to be inactive when this method returns.
 | 
						|
func (m *Monitor) waitNextSample(now time.Duration) time.Duration {
 | 
						|
	const minWait = 5 * time.Millisecond
 | 
						|
	current := m.sLast
 | 
						|
 | 
						|
	// sleep until the last sample time changes (ideally, just one iteration)
 | 
						|
	for m.sLast == current && m.active {
 | 
						|
		d := current + m.sRate - now
 | 
						|
		m.mu.Unlock()
 | 
						|
		if d < minWait {
 | 
						|
			d = minWait
 | 
						|
		}
 | 
						|
		time.Sleep(d)
 | 
						|
		m.mu.Lock()
 | 
						|
		now = m.update(0)
 | 
						|
	}
 | 
						|
	return now
 | 
						|
}
 |