mirror of
				https://gitea.com/Lydanne/buildx.git
				synced 2025-11-04 18:13:42 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			215 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			215 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
// Copyright The OpenTelemetry Authors
 | 
						|
//
 | 
						|
// Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
// you may not use this file except in compliance with the License.
 | 
						|
// You may obtain a copy of the License at
 | 
						|
//
 | 
						|
//     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
//
 | 
						|
// Unless required by applicable law or agreed to in writing, software
 | 
						|
// distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
// See the License for the specific language governing permissions and
 | 
						|
// limitations under the License.
 | 
						|
 | 
						|
package metric // import "go.opentelemetry.io/otel/sdk/metric"
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"errors"
 | 
						|
	"fmt"
 | 
						|
	"sync"
 | 
						|
	"sync/atomic"
 | 
						|
 | 
						|
	"go.opentelemetry.io/otel/internal/global"
 | 
						|
	"go.opentelemetry.io/otel/sdk/metric/metricdata"
 | 
						|
)
 | 
						|
 | 
						|
// ManualReader is a simple Reader that allows an application to
 | 
						|
// read metrics on demand.
 | 
						|
type ManualReader struct {
 | 
						|
	sdkProducer  atomic.Value
 | 
						|
	shutdownOnce sync.Once
 | 
						|
 | 
						|
	mu                sync.Mutex
 | 
						|
	isShutdown        bool
 | 
						|
	externalProducers atomic.Value
 | 
						|
 | 
						|
	temporalitySelector TemporalitySelector
 | 
						|
	aggregationSelector AggregationSelector
 | 
						|
}
 | 
						|
 | 
						|
// Compile time check the manualReader implements Reader and is comparable.
 | 
						|
var _ = map[Reader]struct{}{&ManualReader{}: {}}
 | 
						|
 | 
						|
// NewManualReader returns a Reader which is directly called to collect metrics.
 | 
						|
func NewManualReader(opts ...ManualReaderOption) *ManualReader {
 | 
						|
	cfg := newManualReaderConfig(opts)
 | 
						|
	r := &ManualReader{
 | 
						|
		temporalitySelector: cfg.temporalitySelector,
 | 
						|
		aggregationSelector: cfg.aggregationSelector,
 | 
						|
	}
 | 
						|
	r.externalProducers.Store(cfg.producers)
 | 
						|
	return r
 | 
						|
}
 | 
						|
 | 
						|
// register stores the sdkProducer which enables the caller
 | 
						|
// to read metrics from the SDK on demand.
 | 
						|
func (mr *ManualReader) register(p sdkProducer) {
 | 
						|
	// Only register once. If producer is already set, do nothing.
 | 
						|
	if !mr.sdkProducer.CompareAndSwap(nil, produceHolder{produce: p.produce}) {
 | 
						|
		msg := "did not register manual reader"
 | 
						|
		global.Error(errDuplicateRegister, msg)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// temporality reports the Temporality for the instrument kind provided.
 | 
						|
func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality {
 | 
						|
	return mr.temporalitySelector(kind)
 | 
						|
}
 | 
						|
 | 
						|
// aggregation returns what Aggregation to use for kind.
 | 
						|
func (mr *ManualReader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive  // import-shadow for method scoped by type.
 | 
						|
	return mr.aggregationSelector(kind)
 | 
						|
}
 | 
						|
 | 
						|
// Shutdown closes any connections and frees any resources used by the reader.
 | 
						|
//
 | 
						|
// This method is safe to call concurrently.
 | 
						|
func (mr *ManualReader) Shutdown(context.Context) error {
 | 
						|
	err := ErrReaderShutdown
 | 
						|
	mr.shutdownOnce.Do(func() {
 | 
						|
		// Any future call to Collect will now return ErrReaderShutdown.
 | 
						|
		mr.sdkProducer.Store(produceHolder{
 | 
						|
			produce: shutdownProducer{}.produce,
 | 
						|
		})
 | 
						|
		mr.mu.Lock()
 | 
						|
		defer mr.mu.Unlock()
 | 
						|
		mr.isShutdown = true
 | 
						|
		// release references to Producer(s)
 | 
						|
		mr.externalProducers.Store([]Producer{})
 | 
						|
		err = nil
 | 
						|
	})
 | 
						|
	return err
 | 
						|
}
 | 
						|
 | 
						|
// Collect gathers all metric data related to the Reader from
 | 
						|
// the SDK and other Producers and stores the result in rm.
 | 
						|
//
 | 
						|
// Collect will return an error if called after shutdown.
 | 
						|
// Collect will return an error if rm is a nil ResourceMetrics.
 | 
						|
// Collect will return an error if the context's Done channel is closed.
 | 
						|
//
 | 
						|
// This method is safe to call concurrently.
 | 
						|
func (mr *ManualReader) Collect(ctx context.Context, rm *metricdata.ResourceMetrics) error {
 | 
						|
	if rm == nil {
 | 
						|
		return errors.New("manual reader: *metricdata.ResourceMetrics is nil")
 | 
						|
	}
 | 
						|
	p := mr.sdkProducer.Load()
 | 
						|
	if p == nil {
 | 
						|
		return ErrReaderNotRegistered
 | 
						|
	}
 | 
						|
 | 
						|
	ph, ok := p.(produceHolder)
 | 
						|
	if !ok {
 | 
						|
		// The atomic.Value is entirely in the periodicReader's control so
 | 
						|
		// this should never happen. In the unforeseen case that this does
 | 
						|
		// happen, return an error instead of panicking so a users code does
 | 
						|
		// not halt in the processes.
 | 
						|
		err := fmt.Errorf("manual reader: invalid producer: %T", p)
 | 
						|
		return err
 | 
						|
	}
 | 
						|
 | 
						|
	err := ph.produce(ctx, rm)
 | 
						|
	if err != nil {
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	var errs []error
 | 
						|
	for _, producer := range mr.externalProducers.Load().([]Producer) {
 | 
						|
		externalMetrics, err := producer.Produce(ctx)
 | 
						|
		if err != nil {
 | 
						|
			errs = append(errs, err)
 | 
						|
		}
 | 
						|
		rm.ScopeMetrics = append(rm.ScopeMetrics, externalMetrics...)
 | 
						|
	}
 | 
						|
 | 
						|
	global.Debug("ManualReader collection", "Data", rm)
 | 
						|
 | 
						|
	return unifyErrors(errs)
 | 
						|
}
 | 
						|
 | 
						|
// MarshalLog returns logging data about the ManualReader.
 | 
						|
func (r *ManualReader) MarshalLog() interface{} {
 | 
						|
	r.mu.Lock()
 | 
						|
	down := r.isShutdown
 | 
						|
	r.mu.Unlock()
 | 
						|
	return struct {
 | 
						|
		Type       string
 | 
						|
		Registered bool
 | 
						|
		Shutdown   bool
 | 
						|
	}{
 | 
						|
		Type:       "ManualReader",
 | 
						|
		Registered: r.sdkProducer.Load() != nil,
 | 
						|
		Shutdown:   down,
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// manualReaderConfig contains configuration options for a ManualReader.
 | 
						|
type manualReaderConfig struct {
 | 
						|
	temporalitySelector TemporalitySelector
 | 
						|
	aggregationSelector AggregationSelector
 | 
						|
	producers           []Producer
 | 
						|
}
 | 
						|
 | 
						|
// newManualReaderConfig returns a manualReaderConfig configured with options.
 | 
						|
func newManualReaderConfig(opts []ManualReaderOption) manualReaderConfig {
 | 
						|
	cfg := manualReaderConfig{
 | 
						|
		temporalitySelector: DefaultTemporalitySelector,
 | 
						|
		aggregationSelector: DefaultAggregationSelector,
 | 
						|
	}
 | 
						|
	for _, opt := range opts {
 | 
						|
		cfg = opt.applyManual(cfg)
 | 
						|
	}
 | 
						|
	return cfg
 | 
						|
}
 | 
						|
 | 
						|
// ManualReaderOption applies a configuration option value to a ManualReader.
 | 
						|
type ManualReaderOption interface {
 | 
						|
	applyManual(manualReaderConfig) manualReaderConfig
 | 
						|
}
 | 
						|
 | 
						|
// WithTemporalitySelector sets the TemporalitySelector a reader will use to
 | 
						|
// determine the Temporality of an instrument based on its kind. If this
 | 
						|
// option is not used, the reader will use the DefaultTemporalitySelector.
 | 
						|
func WithTemporalitySelector(selector TemporalitySelector) ManualReaderOption {
 | 
						|
	return temporalitySelectorOption{selector: selector}
 | 
						|
}
 | 
						|
 | 
						|
type temporalitySelectorOption struct {
 | 
						|
	selector func(instrument InstrumentKind) metricdata.Temporality
 | 
						|
}
 | 
						|
 | 
						|
// applyManual returns a manualReaderConfig with option applied.
 | 
						|
func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualReaderConfig {
 | 
						|
	mrc.temporalitySelector = t.selector
 | 
						|
	return mrc
 | 
						|
}
 | 
						|
 | 
						|
// WithAggregationSelector sets the AggregationSelector a reader will use to
 | 
						|
// determine the aggregation to use for an instrument based on its kind. If
 | 
						|
// this option is not used, the reader will use the DefaultAggregationSelector
 | 
						|
// or the aggregation explicitly passed for a view matching an instrument.
 | 
						|
func WithAggregationSelector(selector AggregationSelector) ManualReaderOption {
 | 
						|
	return aggregationSelectorOption{selector: selector}
 | 
						|
}
 | 
						|
 | 
						|
type aggregationSelectorOption struct {
 | 
						|
	selector AggregationSelector
 | 
						|
}
 | 
						|
 | 
						|
// applyManual returns a manualReaderConfig with option applied.
 | 
						|
func (t aggregationSelectorOption) applyManual(c manualReaderConfig) manualReaderConfig {
 | 
						|
	c.aggregationSelector = t.selector
 | 
						|
	return c
 | 
						|
}
 |