mirror of
				https://gitea.com/Lydanne/buildx.git
				synced 2025-11-04 18:13:42 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			381 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			381 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright The OpenTelemetry Authors
 | 
						|
//
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package metric // import "go.opentelemetry.io/otel/sdk/metric"
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"go.opentelemetry.io/otel"
 | 
						|
	"go.opentelemetry.io/otel/internal/global"
 | 
						|
	"go.opentelemetry.io/otel/sdk/metric/metricdata"
 | 
						|
)
 | 
						|
 | 
						|
// Default periodic reader timing.
 | 
						|
const (
 | 
						|
	defaultTimeout  = time.Millisecond * 30000
 | 
						|
	defaultInterval = time.Millisecond * 60000
 | 
						|
)
 | 
						|
 | 
						|
// periodicReaderConfig contains configuration options for a PeriodicReader.
 | 
						|
type periodicReaderConfig struct {
 | 
						|
	interval  time.Duration
 | 
						|
	timeout   time.Duration
 | 
						|
	producers []Producer
 | 
						|
}
 | 
						|
 | 
						|
// newPeriodicReaderConfig returns a periodicReaderConfig configured with
 | 
						|
// options.
 | 
						|
func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig {
 | 
						|
	c := periodicReaderConfig{
 | 
						|
		interval: envDuration(envInterval, defaultInterval),
 | 
						|
		timeout:  envDuration(envTimeout, defaultTimeout),
 | 
						|
	}
 | 
						|
	for _, o := range options {
 | 
						|
		c = o.applyPeriodic(c)
 | 
						|
	}
 | 
						|
	return c
 | 
						|
}
 | 
						|
 | 
						|
// PeriodicReaderOption applies a configuration option value to a PeriodicReader.
 | 
						|
type PeriodicReaderOption interface {
 | 
						|
	applyPeriodic(periodicReaderConfig) periodicReaderConfig
 | 
						|
}
 | 
						|
 | 
						|
// periodicReaderOptionFunc applies a set of options to a periodicReaderConfig.
 | 
						|
type periodicReaderOptionFunc func(periodicReaderConfig) periodicReaderConfig
 | 
						|
 | 
						|
// applyPeriodic returns a periodicReaderConfig with option(s) applied.
 | 
						|
func (o periodicReaderOptionFunc) applyPeriodic(conf periodicReaderConfig) periodicReaderConfig {
 | 
						|
	return o(conf)
 | 
						|
}
 | 
						|
 | 
						|
// WithTimeout configures the time a PeriodicReader waits for an export to
 | 
						|
// complete before canceling it. This includes an export which occurs as part
 | 
						|
// of Shutdown or ForceFlush if the user passed context does not have a
 | 
						|
// deadline. If the user passed context does have a deadline, it will be used
 | 
						|
// instead.
 | 
						|
//
 | 
						|
// This option overrides any value set for the
 | 
						|
// OTEL_METRIC_EXPORT_TIMEOUT environment variable.
 | 
						|
//
 | 
						|
// If this option is not used or d is less than or equal to zero, 30 seconds
 | 
						|
// is used as the default.
 | 
						|
func WithTimeout(d time.Duration) PeriodicReaderOption {
 | 
						|
	return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
 | 
						|
		if d <= 0 {
 | 
						|
			return conf
 | 
						|
		}
 | 
						|
		conf.timeout = d
 | 
						|
		return conf
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// WithInterval configures the intervening time between exports for a
 | 
						|
// PeriodicReader.
 | 
						|
//
 | 
						|
// This option overrides any value set for the
 | 
						|
// OTEL_METRIC_EXPORT_INTERVAL environment variable.
 | 
						|
//
 | 
						|
// If this option is not used or d is less than or equal to zero, 60 seconds
 | 
						|
// is used as the default.
 | 
						|
func WithInterval(d time.Duration) PeriodicReaderOption {
 | 
						|
	return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
 | 
						|
		if d <= 0 {
 | 
						|
			return conf
 | 
						|
		}
 | 
						|
		conf.interval = d
 | 
						|
		return conf
 | 
						|
	})
 | 
						|
}
 | 
						|
 | 
						|
// NewPeriodicReader returns a Reader that collects and exports metric data to
 | 
						|
// the exporter at a defined interval. By default, the returned Reader will
 | 
						|
// collect and export data every 60 seconds, and will cancel any attempts that
 | 
						|
// exceed 30 seconds, collect and export combined. The collect and export time
 | 
						|
// are not counted towards the interval between attempts.
 | 
						|
//
 | 
						|
// The Collect method of the returned Reader continues to gather and return
 | 
						|
// metric data to the user. It will not automatically send that data to the
 | 
						|
// exporter. That is left to the user to accomplish.
 | 
						|
func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *PeriodicReader {
 | 
						|
	conf := newPeriodicReaderConfig(options)
 | 
						|
	ctx, cancel := context.WithCancel(context.Background())
 | 
						|
	r := &PeriodicReader{
 | 
						|
		interval: conf.interval,
 | 
						|
		timeout:  conf.timeout,
 | 
						|
		exporter: exporter,
 | 
						|
		flushCh:  make(chan chan error),
 | 
						|
		cancel:   cancel,
 | 
						|
		done:     make(chan struct{}),
 | 
						|
		rmPool: sync.Pool{
 | 
						|
			New: func() interface{} {
 | 
						|
				return &metricdata.ResourceMetrics{}
 | 
						|
			}},
 | 
						|
	}
 | 
						|
	r.externalProducers.Store(conf.producers)
 | 
						|
 | 
						|
	go func() {
 | 
						|
		defer func() { close(r.done) }()
 | 
						|
		r.run(ctx, conf.interval)
 | 
						|
	}()
 | 
						|
 | 
						|
	return r
 | 
						|
}
 | 
						|
 | 
						|
// PeriodicReader is a Reader that continuously collects and exports metric
 | 
						|
// data at a set interval.
 | 
						|
type PeriodicReader struct {
 | 
						|
	sdkProducer atomic.Value
 | 
						|
 | 
						|
	mu                sync.Mutex
 | 
						|
	isShutdown        bool
 | 
						|
	externalProducers atomic.Value
 | 
						|
 | 
						|
	interval time.Duration
 | 
						|
	timeout  time.Duration
 | 
						|
	exporter Exporter
 | 
						|
	flushCh  chan chan error
 | 
						|
 | 
						|
	done         chan struct{}
 | 
						|
	cancel       context.CancelFunc
 | 
						|
	shutdownOnce sync.Once
 | 
						|
 | 
						|
	rmPool sync.Pool
 | 
						|
}
 | 
						|
 | 
						|
// Compile time check the periodicReader implements Reader and is comparable.
 | 
						|
var _ = map[Reader]struct{}{&PeriodicReader{}: {}}
 | 
						|
 | 
						|
// newTicker allows testing override.
 | 
						|
var newTicker = time.NewTicker
 | 
						|
 | 
						|
// run continuously collects and exports metric data at the specified
 | 
						|
// interval. This will run until ctx is canceled or times out.
 | 
						|
func (r *PeriodicReader) run(ctx context.Context, interval time.Duration) {
 | 
						|
	ticker := newTicker(interval)
 | 
						|
	defer ticker.Stop()
 | 
						|
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ticker.C:
 | 
						|
			err := r.collectAndExport(ctx)
 | 
						|
			if err != nil {
 | 
						|
				otel.Handle(err)
 | 
						|
			}
 | 
						|
		case errCh := <-r.flushCh:
 | 
						|
			errCh <- r.collectAndExport(ctx)
 | 
						|
			ticker.Reset(interval)
 | 
						|
		case <-ctx.Done():
 | 
						|
			return
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// register registers p as the producer of this reader.
 | 
						|
func (r *PeriodicReader) register(p sdkProducer) {
 | 
						|
	// Only register once. If producer is already set, do nothing.
 | 
						|
	if !r.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
 | 
						|
		msg := "did not register periodic reader"
 | 
						|
		global.Error(errDuplicateRegister, msg)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// temporality reports the Temporality for the instrument kind provided.
 | 
						|
func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality {
 | 
						|
	return r.exporter.Temporality(kind)
 | 
						|
}
 | 
						|
 | 
						|
// aggregation returns what Aggregation to use for kind.
 | 
						|
func (r *PeriodicReader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive  // import-shadow for method scoped by type.
 | 
						|
	return r.exporter.Aggregation(kind)
 | 
						|
}
 | 
						|
 | 
						|
// collectAndExport gather all metric data related to the periodicReader r from
 | 
						|
// the SDK and exports it with r's exporter.
 | 
						|
func (r *PeriodicReader) collectAndExport(ctx context.Context) error {
 | 
						|
	ctx, cancel := context.WithTimeout(ctx, r.timeout)
 | 
						|
	defer cancel()
 | 
						|
 | 
						|
	// TODO (#3047): Use a sync.Pool or persistent pointer instead of allocating rm every Collect.
 | 
						|
	rm := r.rmPool.Get().(*metricdata.ResourceMetrics)
 | 
						|
	err := r.Collect(ctx, rm)
 | 
						|
	if err == nil {
 | 
						|
		err = r.export(ctx, rm)
 | 
						|
	}
 | 
						|
	r.rmPool.Put(rm)
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// Collect gathers all metric data related to the Reader from
 | 
						|
// the SDK and other Producers and stores the result in rm. The metric
 | 
						|
// data is not exported to the configured exporter, it is left to the caller to
 | 
						|
// handle that if desired.
 | 
						|
//
 | 
						|
// Collect will return an error if called after shutdown.
 | 
						|
// Collect will return an error if rm is a nil ResourceMetrics.
 | 
						|
// Collect will return an error if the context's Done channel is closed.
 | 
						|
//
 | 
						|
// This method is safe to call concurrently.
 | 
						|
func (r *PeriodicReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
 | 
						|
	if rm == nil {
 | 
						|
		return errors.New("periodic reader: *metricdata.ResourceMetrics is nil")
 | 
						|
	}
 | 
						|
	// TODO (#3047): When collect is updated to accept output as param, pass rm.
 | 
						|
	return r.collect(ctx, r.sdkProducer.Load(), rm)
 | 
						|
}
 | 
						|
 | 
						|
// collect unwraps p as a produceHolder and returns its produce results.
 | 
						|
func (r *PeriodicReader) collect(ctx context.Context, p interface{}, rm *metricdata.ResourceMetrics) error {
 | 
						|
	if p == nil {
 | 
						|
		return ErrReaderNotRegistered
 | 
						|
	}
 | 
						|
 | 
						|
	ph, ok := p.(produceHolder)
 | 
						|
	if !ok {
 | 
						|
		// The atomic.Value is entirely in the periodicReader's control so
 | 
						|
		// this should never happen. In the unforeseen case that this does
 | 
						|
		// happen, return an error instead of panicking so a users code does
 | 
						|
		// not halt in the processes.
 | 
						|
		err := fmt.Errorf("periodic reader: invalid producer: %T", p)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	err := ph.produce(ctx, rm)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	var errs []error
 | 
						|
	for _, producer := range r.externalProducers.Load().([]Producer) {
 | 
						|
		externalMetrics, err := producer.Produce(ctx)
 | 
						|
		if err != nil {
 | 
						|
			errs = append(errs, err)
 | 
						|
		}
 | 
						|
		rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
 | 
						|
	}
 | 
						|
 | 
						|
	global.Debug("PeriodicReader collection", "Data", rm)
 | 
						|
 | 
						|
	return unifyErrors(errs)
 | 
						|
}
 | 
						|
 | 
						|
// export exports metric data m using r's exporter.
 | 
						|
func (r *PeriodicReader) export(ctx context.Context, m *metricdata.ResourceMetrics) error {
 | 
						|
	return r.exporter.Export(ctx, m)
 | 
						|
}
 | 
						|
 | 
						|
// ForceFlush flushes pending telemetry.
 | 
						|
//
 | 
						|
// This method is safe to call concurrently.
 | 
						|
func (r *PeriodicReader) ForceFlush(ctx context.Context) error {
 | 
						|
	// Prioritize the ctx timeout if it is set.
 | 
						|
	if _, ok := ctx.Deadline(); !ok {
 | 
						|
		var cancel context.CancelFunc
 | 
						|
		ctx, cancel = context.WithTimeout(ctx, r.timeout)
 | 
						|
		defer cancel()
 | 
						|
	}
 | 
						|
 | 
						|
	errCh := make(chan error, 1)
 | 
						|
	select {
 | 
						|
	case r.flushCh <- errCh:
 | 
						|
		select {
 | 
						|
		case err := <-errCh:
 | 
						|
			if err != nil {
 | 
						|
				return err
 | 
						|
			}
 | 
						|
			close(errCh)
 | 
						|
		case <-ctx.Done():
 | 
						|
			return ctx.Err()
 | 
						|
		}
 | 
						|
	case <-r.done:
 | 
						|
		return ErrReaderShutdown
 | 
						|
	case <-ctx.Done():
 | 
						|
		return ctx.Err()
 | 
						|
	}
 | 
						|
	return r.exporter.ForceFlush(ctx)
 | 
						|
}
 | 
						|
 | 
						|
// Shutdown flushes pending telemetry and then stops the export pipeline.
 | 
						|
//
 | 
						|
// This method is safe to call concurrently.
 | 
						|
func (r *PeriodicReader) Shutdown(ctx context.Context) error {
 | 
						|
	err := ErrReaderShutdown
 | 
						|
	r.shutdownOnce.Do(func() {
 | 
						|
		// Prioritize the ctx timeout if it is set.
 | 
						|
		if _, ok := ctx.Deadline(); !ok {
 | 
						|
			var cancel context.CancelFunc
 | 
						|
			ctx, cancel = context.WithTimeout(ctx, r.timeout)
 | 
						|
			defer cancel()
 | 
						|
		}
 | 
						|
 | 
						|
		// Stop the run loop.
 | 
						|
		r.cancel()
 | 
						|
		<-r.done
 | 
						|
 | 
						|
		// Any future call to Collect will now return ErrReaderShutdown.
 | 
						|
		ph := r.sdkProducer.Swap(produceHolder{
 | 
						|
			produce: shutdownProducer{}.produce,
 | 
						|
		})
 | 
						|
 | 
						|
		if ph != nil { // Reader was registered.
 | 
						|
			// Flush pending telemetry.
 | 
						|
			m := r.rmPool.Get().(*metricdata.ResourceMetrics)
 | 
						|
			err = r.collect(ctx, ph, m)
 | 
						|
			if err == nil {
 | 
						|
				err = r.export(ctx, m)
 | 
						|
			}
 | 
						|
			r.rmPool.Put(m)
 | 
						|
		}
 | 
						|
 | 
						|
		sErr := r.exporter.Shutdown(ctx)
 | 
						|
		if err == nil || err == ErrReaderShutdown {
 | 
						|
			err = sErr
 | 
						|
		}
 | 
						|
 | 
						|
		r.mu.Lock()
 | 
						|
		defer r.mu.Unlock()
 | 
						|
		r.isShutdown = true
 | 
						|
		// release references to Producer(s)
 | 
						|
		r.externalProducers.Store([]Producer{})
 | 
						|
	})
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// MarshalLog returns logging data about the PeriodicReader.
 | 
						|
func (r *PeriodicReader) MarshalLog() interface{} {
 | 
						|
	r.mu.Lock()
 | 
						|
	down := r.isShutdown
 | 
						|
	r.mu.Unlock()
 | 
						|
	return struct {
 | 
						|
		Type       string
 | 
						|
		Exporter   Exporter
 | 
						|
		Registered bool
 | 
						|
		Shutdown   bool
 | 
						|
		Interval   time.Duration
 | 
						|
		Timeout    time.Duration
 | 
						|
	}{
 | 
						|
		Type:       "PeriodicReader",
 | 
						|
		Exporter:   r.exporter,
 | 
						|
		Registered: r.sdkProducer.Load() != nil,
 | 
						|
		Shutdown:   down,
 | 
						|
		Interval:   r.interval,
 | 
						|
		Timeout:    r.timeout,
 | 
						|
	}
 | 
						|
}
 |