mirror of
				https://gitea.com/Lydanne/buildx.git
				synced 2025-11-04 18:13:42 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			262 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			262 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package progress
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"io"
 | 
						|
	"sort"
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/pkg/errors"
 | 
						|
)
 | 
						|
 | 
						|
// Progress package provides utility functions for using the context to capture
 | 
						|
// progress of a running function. All progress items written contain an ID
 | 
						|
// that is used to collapse unread messages.
 | 
						|
 | 
						|
type contextKeyT string
 | 
						|
 | 
						|
var contextKey = contextKeyT("buildkit/util/progress")
 | 
						|
 | 
						|
// FromContext returns a progress writer from a context.
 | 
						|
func FromContext(ctx context.Context, opts ...WriterOption) (Writer, bool, context.Context) {
 | 
						|
	v := ctx.Value(contextKey)
 | 
						|
	pw, ok := v.(*progressWriter)
 | 
						|
	if !ok {
 | 
						|
		if pw, ok := v.(*MultiWriter); ok {
 | 
						|
			return pw, true, ctx
 | 
						|
		}
 | 
						|
		return &noOpWriter{}, false, ctx
 | 
						|
	}
 | 
						|
	pw = newWriter(pw)
 | 
						|
	for _, o := range opts {
 | 
						|
		o(pw)
 | 
						|
	}
 | 
						|
	ctx = context.WithValue(ctx, contextKey, pw)
 | 
						|
	return pw, true, ctx
 | 
						|
}
 | 
						|
 | 
						|
type WriterOption func(Writer)
 | 
						|
 | 
						|
// NewContext returns a new context and a progress reader that captures all
 | 
						|
// progress items writtern to this context. Last returned parameter is a closer
 | 
						|
// function to signal that no new writes will happen to this context.
 | 
						|
func NewContext(ctx context.Context) (Reader, context.Context, func()) {
 | 
						|
	pr, pw, cancel := pipe()
 | 
						|
	ctx = WithProgress(ctx, pw)
 | 
						|
	return pr, ctx, cancel
 | 
						|
}
 | 
						|
 | 
						|
func WithProgress(ctx context.Context, pw Writer) context.Context {
 | 
						|
	return context.WithValue(ctx, contextKey, pw)
 | 
						|
}
 | 
						|
 | 
						|
func WithMetadata(key string, val interface{}) WriterOption {
 | 
						|
	return func(w Writer) {
 | 
						|
		if pw, ok := w.(*progressWriter); ok {
 | 
						|
			pw.meta[key] = val
 | 
						|
		}
 | 
						|
		if pw, ok := w.(*MultiWriter); ok {
 | 
						|
			pw.meta[key] = val
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
type Controller interface {
 | 
						|
	Start(context.Context) (context.Context, func(error))
 | 
						|
	Status(id string, action string) func()
 | 
						|
}
 | 
						|
 | 
						|
type Writer interface {
 | 
						|
	Write(id string, value interface{}) error
 | 
						|
	Close() error
 | 
						|
}
 | 
						|
 | 
						|
type Reader interface {
 | 
						|
	Read(context.Context) ([]*Progress, error)
 | 
						|
}
 | 
						|
 | 
						|
type Progress struct {
 | 
						|
	ID        string
 | 
						|
	Timestamp time.Time
 | 
						|
	Sys       interface{}
 | 
						|
	meta      map[string]interface{}
 | 
						|
}
 | 
						|
 | 
						|
type Status struct {
 | 
						|
	Action    string
 | 
						|
	Current   int
 | 
						|
	Total     int
 | 
						|
	Started   *time.Time
 | 
						|
	Completed *time.Time
 | 
						|
}
 | 
						|
 | 
						|
type progressReader struct {
 | 
						|
	ctx     context.Context
 | 
						|
	cond    *sync.Cond
 | 
						|
	mu      sync.Mutex
 | 
						|
	writers map[*progressWriter]struct{}
 | 
						|
	dirty   map[string]*Progress
 | 
						|
}
 | 
						|
 | 
						|
func (pr *progressReader) Read(ctx context.Context) ([]*Progress, error) {
 | 
						|
	done := make(chan struct{})
 | 
						|
	defer close(done)
 | 
						|
	go func() {
 | 
						|
		select {
 | 
						|
		case <-done:
 | 
						|
		case <-ctx.Done():
 | 
						|
			pr.mu.Lock()
 | 
						|
			pr.cond.Broadcast()
 | 
						|
			pr.mu.Unlock()
 | 
						|
		}
 | 
						|
	}()
 | 
						|
	pr.mu.Lock()
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ctx.Done():
 | 
						|
			pr.mu.Unlock()
 | 
						|
			return nil, ctx.Err()
 | 
						|
		default:
 | 
						|
		}
 | 
						|
		dmap := pr.dirty
 | 
						|
		if len(dmap) == 0 {
 | 
						|
			select {
 | 
						|
			case <-pr.ctx.Done():
 | 
						|
				if len(pr.writers) == 0 {
 | 
						|
					pr.mu.Unlock()
 | 
						|
					return nil, io.EOF
 | 
						|
				}
 | 
						|
			default:
 | 
						|
			}
 | 
						|
			pr.cond.Wait()
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		pr.dirty = make(map[string]*Progress)
 | 
						|
		pr.mu.Unlock()
 | 
						|
 | 
						|
		out := make([]*Progress, 0, len(dmap))
 | 
						|
		for _, p := range dmap {
 | 
						|
			out = append(out, p)
 | 
						|
		}
 | 
						|
 | 
						|
		sort.Slice(out, func(i, j int) bool {
 | 
						|
			return out[i].Timestamp.Before(out[j].Timestamp)
 | 
						|
		})
 | 
						|
 | 
						|
		return out, nil
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (pr *progressReader) append(pw *progressWriter) {
 | 
						|
	pr.mu.Lock()
 | 
						|
	defer pr.mu.Unlock()
 | 
						|
 | 
						|
	select {
 | 
						|
	case <-pr.ctx.Done():
 | 
						|
		return
 | 
						|
	default:
 | 
						|
		pr.writers[pw] = struct{}{}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func pipe() (*progressReader, *progressWriter, func()) {
 | 
						|
	ctx, cancel := context.WithCancel(context.Background())
 | 
						|
	pr := &progressReader{
 | 
						|
		ctx:     ctx,
 | 
						|
		writers: make(map[*progressWriter]struct{}),
 | 
						|
		dirty:   make(map[string]*Progress),
 | 
						|
	}
 | 
						|
	pr.cond = sync.NewCond(&pr.mu)
 | 
						|
	go func() {
 | 
						|
		<-ctx.Done()
 | 
						|
		pr.mu.Lock()
 | 
						|
		pr.cond.Broadcast()
 | 
						|
		pr.mu.Unlock()
 | 
						|
	}()
 | 
						|
	pw := &progressWriter{
 | 
						|
		reader: pr,
 | 
						|
	}
 | 
						|
	return pr, pw, cancel
 | 
						|
}
 | 
						|
 | 
						|
func newWriter(pw *progressWriter) *progressWriter {
 | 
						|
	meta := make(map[string]interface{})
 | 
						|
	for k, v := range pw.meta {
 | 
						|
		meta[k] = v
 | 
						|
	}
 | 
						|
	pw = &progressWriter{
 | 
						|
		reader: pw.reader,
 | 
						|
		meta:   meta,
 | 
						|
	}
 | 
						|
	pw.reader.append(pw)
 | 
						|
	return pw
 | 
						|
}
 | 
						|
 | 
						|
type progressWriter struct {
 | 
						|
	done   bool
 | 
						|
	reader *progressReader
 | 
						|
	meta   map[string]interface{}
 | 
						|
}
 | 
						|
 | 
						|
func (pw *progressWriter) Write(id string, v interface{}) error {
 | 
						|
	if pw.done {
 | 
						|
		return errors.Errorf("writing %s to closed progress writer", id)
 | 
						|
	}
 | 
						|
	return pw.writeRawProgress(&Progress{
 | 
						|
		ID:        id,
 | 
						|
		Timestamp: time.Now(),
 | 
						|
		Sys:       v,
 | 
						|
		meta:      pw.meta,
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
func (pw *progressWriter) WriteRawProgress(p *Progress) error {
 | 
						|
	meta := p.meta
 | 
						|
	if len(pw.meta) > 0 {
 | 
						|
		meta = map[string]interface{}{}
 | 
						|
		for k, v := range p.meta {
 | 
						|
			meta[k] = v
 | 
						|
		}
 | 
						|
		for k, v := range pw.meta {
 | 
						|
			if _, ok := meta[k]; !ok {
 | 
						|
				meta[k] = v
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	p.meta = meta
 | 
						|
	return pw.writeRawProgress(p)
 | 
						|
}
 | 
						|
 | 
						|
func (pw *progressWriter) writeRawProgress(p *Progress) error {
 | 
						|
	pw.reader.mu.Lock()
 | 
						|
	pw.reader.dirty[p.ID] = p
 | 
						|
	pw.reader.cond.Broadcast()
 | 
						|
	pw.reader.mu.Unlock()
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (pw *progressWriter) Close() error {
 | 
						|
	pw.reader.mu.Lock()
 | 
						|
	delete(pw.reader.writers, pw)
 | 
						|
	pw.reader.mu.Unlock()
 | 
						|
	pw.reader.cond.Broadcast()
 | 
						|
	pw.done = true
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (p *Progress) Meta(key string) (interface{}, bool) {
 | 
						|
	v, ok := p.meta[key]
 | 
						|
	return v, ok
 | 
						|
}
 | 
						|
 | 
						|
type noOpWriter struct{}
 | 
						|
 | 
						|
func (pw *noOpWriter) Write(_ string, _ interface{}) error {
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (pw *noOpWriter) Close() error {
 | 
						|
	return nil
 | 
						|
}
 |