vendor: update buildkit to 862b22d7

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi
2022-12-13 12:00:06 -08:00
parent 0e293a4ec9
commit 12ec931237
18 changed files with 1566 additions and 366 deletions

View File

@ -35,8 +35,13 @@ func (mr *MultiReader) Reader(ctx context.Context) Reader {
isBehind := len(mr.sent) > 0
if !isBehind {
mr.writers[w] = closeWriter
select {
case <-mr.done:
isBehind = true
default:
if !isBehind {
mr.writers[w] = closeWriter
}
}
go func() {
@ -74,9 +79,6 @@ func (mr *MultiReader) Reader(ctx context.Context) Reader {
case <-ctx.Done():
close()
return
case <-mr.done:
close()
return
default:
}
}

View File

@ -118,12 +118,22 @@ func (pr *progressReader) Read(ctx context.Context) ([]*Progress, error) {
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-done:
case <-ctx.Done():
pr.mu.Lock()
pr.cond.Broadcast()
pr.mu.Unlock()
prdone := pr.ctx.Done()
for {
select {
case <-done:
return
case <-ctx.Done():
pr.mu.Lock()
pr.cond.Broadcast()
pr.mu.Unlock()
return
case <-prdone:
pr.mu.Lock()
pr.cond.Broadcast()
pr.mu.Unlock()
prdone = nil
}
}
}()
pr.mu.Lock()

View File

@ -24,6 +24,7 @@ type detector struct {
}
var ServiceName string
var Recorder *TraceRecorder
var detectors map[string]detector
var once sync.Once
@ -80,6 +81,11 @@ func detect() error {
return err
}
if Recorder != nil {
Recorder.SpanExporter = exp
exp = Recorder
}
if exp == nil {
return nil
}
@ -98,6 +104,10 @@ func detect() error {
sp := sdktrace.NewBatchSpanProcessor(exp)
if Recorder != nil {
Recorder.flush = sp.ForceFlush
}
sdktp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sp), sdktrace.WithResource(res))
closers = append(closers, sdktp.Shutdown)

View File

@ -0,0 +1,115 @@
package detect
import (
"context"
"sync"
"time"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.opentelemetry.io/otel/trace"
)
type TraceRecorder struct {
sdktrace.SpanExporter
mu sync.Mutex
m map[trace.TraceID]*stubs
listeners map[trace.TraceID]int
flush func(context.Context) error
}
type stubs struct {
spans []tracetest.SpanStub
last time.Time
}
func NewTraceRecorder() *TraceRecorder {
tr := &TraceRecorder{
m: map[trace.TraceID]*stubs{},
listeners: map[trace.TraceID]int{},
}
go func() {
t := time.NewTimer(60 * time.Second)
for {
<-t.C
tr.gc()
t.Reset(50 * time.Second)
}
}()
return tr
}
func (r *TraceRecorder) Record(traceID trace.TraceID) func() []tracetest.SpanStub {
if r.flush != nil {
r.flush(context.TODO())
}
r.mu.Lock()
defer r.mu.Unlock()
r.listeners[traceID]++
var once sync.Once
var spans []tracetest.SpanStub
return func() []tracetest.SpanStub {
once.Do(func() {
r.mu.Lock()
defer r.mu.Unlock()
if v, ok := r.m[traceID]; ok {
spans = v.spans
}
r.listeners[traceID]--
if r.listeners[traceID] == 0 {
delete(r.listeners, traceID)
}
})
return spans
}
}
func (r *TraceRecorder) gc() {
r.mu.Lock()
defer r.mu.Unlock()
now := time.Now()
for k, s := range r.m {
if _, ok := r.listeners[k]; ok {
continue
}
if now.Sub(s.last) > 60*time.Second {
delete(r.m, k)
}
}
}
func (r *TraceRecorder) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
r.mu.Lock()
now := time.Now()
for _, s := range spans {
ss := tracetest.SpanStubFromReadOnlySpan(s)
v, ok := r.m[ss.SpanContext.TraceID()]
if !ok {
v = &stubs{}
r.m[s.SpanContext().TraceID()] = v
}
v.last = now
v.spans = append(v.spans, ss)
}
r.mu.Unlock()
if r.SpanExporter == nil {
return nil
}
return r.SpanExporter.ExportSpans(ctx, spans)
}
func (r *TraceRecorder) Shutdown(ctx context.Context) error {
if r.SpanExporter == nil {
return nil
}
return r.SpanExporter.Shutdown(ctx)
}