mirror of
https://gitea.com/Lydanne/buildx.git
synced 2025-07-10 05:27:07 +08:00
deps: update buildkit, vendor changes
Signed-off-by: Laura Brehm <laurabrehm@hey.com>
This commit is contained in:
625
vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
generated
vendored
Normal file
625
vendor/go.opentelemetry.io/otel/sdk/metric/pipeline.go
generated
vendored
Normal file
@ -0,0 +1,625 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package metric // import "go.opentelemetry.io/otel/sdk/metric"
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"go.opentelemetry.io/otel/internal/global"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/embedded"
|
||||
"go.opentelemetry.io/otel/sdk/instrumentation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
)
|
||||
|
||||
var (
|
||||
errCreatingAggregators = errors.New("could not create all aggregators")
|
||||
errIncompatibleAggregation = errors.New("incompatible aggregation")
|
||||
errUnknownAggregation = errors.New("unrecognized aggregation")
|
||||
)
|
||||
|
||||
// instrumentSync is a synchronization point between a pipeline and an
|
||||
// instrument's aggregate function.
|
||||
type instrumentSync struct {
|
||||
name string
|
||||
description string
|
||||
unit string
|
||||
compAgg aggregate.ComputeAggregation
|
||||
}
|
||||
|
||||
func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline {
|
||||
if res == nil {
|
||||
res = resource.Empty()
|
||||
}
|
||||
return &pipeline{
|
||||
resource: res,
|
||||
reader: reader,
|
||||
views: views,
|
||||
// aggregations is lazy allocated when needed.
|
||||
}
|
||||
}
|
||||
|
||||
// pipeline connects all of the instruments created by a meter provider to a Reader.
|
||||
// This is the object that will be `Reader.register()` when a meter provider is created.
|
||||
//
|
||||
// As instruments are created the instrument should be checked if it exists in
|
||||
// the views of a the Reader, and if so each aggregate function should be added
|
||||
// to the pipeline.
|
||||
type pipeline struct {
|
||||
resource *resource.Resource
|
||||
|
||||
reader Reader
|
||||
views []View
|
||||
|
||||
sync.Mutex
|
||||
aggregations map[instrumentation.Scope][]instrumentSync
|
||||
callbacks []func(context.Context) error
|
||||
multiCallbacks list.List
|
||||
}
|
||||
|
||||
// addSync adds the instrumentSync to pipeline p with scope. This method is not
|
||||
// idempotent. Duplicate calls will result in duplicate additions, it is the
|
||||
// callers responsibility to ensure this is called with unique values.
|
||||
func (p *pipeline) addSync(scope instrumentation.Scope, iSync instrumentSync) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
if p.aggregations == nil {
|
||||
p.aggregations = map[instrumentation.Scope][]instrumentSync{
|
||||
scope: {iSync},
|
||||
}
|
||||
return
|
||||
}
|
||||
p.aggregations[scope] = append(p.aggregations[scope], iSync)
|
||||
}
|
||||
|
||||
// addCallback registers a single instrument callback to be run when
|
||||
// `produce()` is called.
|
||||
func (p *pipeline) addCallback(cback func(context.Context) error) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
p.callbacks = append(p.callbacks, cback)
|
||||
}
|
||||
|
||||
type multiCallback func(context.Context) error
|
||||
|
||||
// addMultiCallback registers a multi-instrument callback to be run when
|
||||
// `produce()` is called.
|
||||
func (p *pipeline) addMultiCallback(c multiCallback) (unregister func()) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
e := p.multiCallbacks.PushBack(c)
|
||||
return func() {
|
||||
p.Lock()
|
||||
p.multiCallbacks.Remove(e)
|
||||
p.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// produce returns aggregated metrics from a single collection.
|
||||
//
|
||||
// This method is safe to call concurrently.
|
||||
func (p *pipeline) produce(ctx context.Context, rm *metricdata.ResourceMetrics) error {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
var errs multierror
|
||||
for _, c := range p.callbacks {
|
||||
// TODO make the callbacks parallel. ( #3034 )
|
||||
if err := c(ctx); err != nil {
|
||||
errs.append(err)
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
rm.Resource = nil
|
||||
rm.ScopeMetrics = rm.ScopeMetrics[:0]
|
||||
return err
|
||||
}
|
||||
}
|
||||
for e := p.multiCallbacks.Front(); e != nil; e = e.Next() {
|
||||
// TODO make the callbacks parallel. ( #3034 )
|
||||
f := e.Value.(multiCallback)
|
||||
if err := f(ctx); err != nil {
|
||||
errs.append(err)
|
||||
}
|
||||
if err := ctx.Err(); err != nil {
|
||||
// This means the context expired before we finished running callbacks.
|
||||
rm.Resource = nil
|
||||
rm.ScopeMetrics = rm.ScopeMetrics[:0]
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
rm.Resource = p.resource
|
||||
rm.ScopeMetrics = internal.ReuseSlice(rm.ScopeMetrics, len(p.aggregations))
|
||||
|
||||
i := 0
|
||||
for scope, instruments := range p.aggregations {
|
||||
rm.ScopeMetrics[i].Metrics = internal.ReuseSlice(rm.ScopeMetrics[i].Metrics, len(instruments))
|
||||
j := 0
|
||||
for _, inst := range instruments {
|
||||
data := rm.ScopeMetrics[i].Metrics[j].Data
|
||||
if n := inst.compAgg(&data); n > 0 {
|
||||
rm.ScopeMetrics[i].Metrics[j].Name = inst.name
|
||||
rm.ScopeMetrics[i].Metrics[j].Description = inst.description
|
||||
rm.ScopeMetrics[i].Metrics[j].Unit = inst.unit
|
||||
rm.ScopeMetrics[i].Metrics[j].Data = data
|
||||
j++
|
||||
}
|
||||
}
|
||||
rm.ScopeMetrics[i].Metrics = rm.ScopeMetrics[i].Metrics[:j]
|
||||
if len(rm.ScopeMetrics[i].Metrics) > 0 {
|
||||
rm.ScopeMetrics[i].Scope = scope
|
||||
i++
|
||||
}
|
||||
}
|
||||
|
||||
rm.ScopeMetrics = rm.ScopeMetrics[:i]
|
||||
|
||||
return errs.errorOrNil()
|
||||
}
|
||||
|
||||
// inserter facilitates inserting of new instruments from a single scope into a
|
||||
// pipeline.
|
||||
type inserter[N int64 | float64] struct {
|
||||
// aggregators is a cache that holds aggregate function inputs whose
|
||||
// outputs have been inserted into the underlying reader pipeline. This
|
||||
// cache ensures no duplicate aggregate functions are inserted into the
|
||||
// reader pipeline and if a new request during an instrument creation asks
|
||||
// for the same aggregate function input the same instance is returned.
|
||||
aggregators *cache[instID, aggVal[N]]
|
||||
|
||||
// views is a cache that holds instrument identifiers for all the
|
||||
// instruments a Meter has created, it is provided from the Meter that owns
|
||||
// this inserter. This cache ensures during the creation of instruments
|
||||
// with the same name but different options (e.g. description, unit) a
|
||||
// warning message is logged.
|
||||
views *cache[string, instID]
|
||||
|
||||
pipeline *pipeline
|
||||
}
|
||||
|
||||
func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *inserter[N] {
|
||||
if vc == nil {
|
||||
vc = &cache[string, instID]{}
|
||||
}
|
||||
return &inserter[N]{
|
||||
aggregators: &cache[instID, aggVal[N]]{},
|
||||
views: vc,
|
||||
pipeline: p,
|
||||
}
|
||||
}
|
||||
|
||||
// Instrument inserts the instrument inst with instUnit into a pipeline. All
|
||||
// views the pipeline contains are matched against, and any matching view that
|
||||
// creates a unique aggregate function will have its output inserted into the
|
||||
// pipeline and its input included in the returned slice.
|
||||
//
|
||||
// The returned aggregate function inputs are ensured to be deduplicated and
|
||||
// unique. If another view in another pipeline that is cached by this
|
||||
// inserter's cache has already inserted the same aggregate function for the
|
||||
// same instrument, that functions input instance is returned.
|
||||
//
|
||||
// If another instrument has already been inserted by this inserter, or any
|
||||
// other using the same cache, and it conflicts with the instrument being
|
||||
// inserted in this call, an aggregate function input matching the arguments
|
||||
// will still be returned but an Info level log message will also be logged to
|
||||
// the OTel global logger.
|
||||
//
|
||||
// If the passed instrument would result in an incompatible aggregate function,
|
||||
// an error is returned and that aggregate function output is not inserted nor
|
||||
// is its input returned.
|
||||
//
|
||||
// If an instrument is determined to use a Drop aggregation, that instrument is
|
||||
// not inserted nor returned.
|
||||
func (i *inserter[N]) Instrument(inst Instrument) ([]aggregate.Measure[N], error) {
|
||||
var (
|
||||
matched bool
|
||||
measures []aggregate.Measure[N]
|
||||
)
|
||||
|
||||
errs := &multierror{wrapped: errCreatingAggregators}
|
||||
seen := make(map[uint64]struct{})
|
||||
for _, v := range i.pipeline.views {
|
||||
stream, match := v(inst)
|
||||
if !match {
|
||||
continue
|
||||
}
|
||||
matched = true
|
||||
|
||||
in, id, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
}
|
||||
if in == nil { // Drop aggregation.
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[id]; ok {
|
||||
// This aggregate function has already been added.
|
||||
continue
|
||||
}
|
||||
seen[id] = struct{}{}
|
||||
measures = append(measures, in)
|
||||
}
|
||||
|
||||
if matched {
|
||||
return measures, errs.errorOrNil()
|
||||
}
|
||||
|
||||
// Apply implicit default view if no explicit matched.
|
||||
stream := Stream{
|
||||
Name: inst.Name,
|
||||
Description: inst.Description,
|
||||
Unit: inst.Unit,
|
||||
}
|
||||
in, _, err := i.cachedAggregator(inst.Scope, inst.Kind, stream)
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
}
|
||||
if in != nil {
|
||||
// Ensured to have not seen given matched was false.
|
||||
measures = append(measures, in)
|
||||
}
|
||||
return measures, errs.errorOrNil()
|
||||
}
|
||||
|
||||
var aggIDCount uint64
|
||||
|
||||
// aggVal is the cached value in an aggregators cache.
|
||||
type aggVal[N int64 | float64] struct {
|
||||
ID uint64
|
||||
Measure aggregate.Measure[N]
|
||||
Err error
|
||||
}
|
||||
|
||||
// cachedAggregator returns the appropriate aggregate input and output
|
||||
// functions for an instrument configuration. If the exact instrument has been
|
||||
// created within the inst.Scope, those aggregate function instances will be
|
||||
// returned. Otherwise, new computed aggregate functions will be cached and
|
||||
// returned.
|
||||
//
|
||||
// If the instrument configuration conflicts with an instrument that has
|
||||
// already been created (e.g. description, unit, data type) a warning will be
|
||||
// logged at the "Info" level with the global OTel logger. Valid new aggregate
|
||||
// functions for the instrument configuration will still be returned without an
|
||||
// error.
|
||||
//
|
||||
// If the instrument defines an unknown or incompatible aggregation, an error
|
||||
// is returned.
|
||||
func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) {
|
||||
switch stream.Aggregation.(type) {
|
||||
case nil:
|
||||
// Undefined, nil, means to use the default from the reader.
|
||||
stream.Aggregation = i.pipeline.reader.aggregation(kind)
|
||||
switch stream.Aggregation.(type) {
|
||||
case nil, AggregationDefault:
|
||||
// If the reader returns default or nil use the default selector.
|
||||
stream.Aggregation = DefaultAggregationSelector(kind)
|
||||
default:
|
||||
// Deep copy and validate before using.
|
||||
stream.Aggregation = stream.Aggregation.copy()
|
||||
if err := stream.Aggregation.err(); err != nil {
|
||||
orig := stream.Aggregation
|
||||
stream.Aggregation = DefaultAggregationSelector(kind)
|
||||
global.Error(
|
||||
err, "using default aggregation instead",
|
||||
"aggregation", orig,
|
||||
"replacement", stream.Aggregation,
|
||||
)
|
||||
}
|
||||
}
|
||||
case AggregationDefault:
|
||||
stream.Aggregation = DefaultAggregationSelector(kind)
|
||||
}
|
||||
|
||||
if err := isAggregatorCompatible(kind, stream.Aggregation); err != nil {
|
||||
return nil, 0, fmt.Errorf(
|
||||
"creating aggregator with instrumentKind: %d, aggregation %v: %w",
|
||||
kind, stream.Aggregation, err,
|
||||
)
|
||||
}
|
||||
|
||||
id := i.instID(kind, stream)
|
||||
// If there is a conflict, the specification says the view should
|
||||
// still be applied and a warning should be logged.
|
||||
i.logConflict(id)
|
||||
|
||||
// If there are requests for the same instrument with different name
|
||||
// casing, the first-seen needs to be returned. Use a normalize ID for the
|
||||
// cache lookup to ensure the correct comparison.
|
||||
normID := id.normalize()
|
||||
cv := i.aggregators.Lookup(normID, func() aggVal[N] {
|
||||
b := aggregate.Builder[N]{
|
||||
Temporality: i.pipeline.reader.temporality(kind),
|
||||
}
|
||||
b.Filter = stream.AttributeFilter
|
||||
in, out, err := i.aggregateFunc(b, stream.Aggregation, kind)
|
||||
if err != nil {
|
||||
return aggVal[N]{0, nil, err}
|
||||
}
|
||||
if in == nil { // Drop aggregator.
|
||||
return aggVal[N]{0, nil, nil}
|
||||
}
|
||||
i.pipeline.addSync(scope, instrumentSync{
|
||||
// Use the first-seen name casing for this and all subsequent
|
||||
// requests of this instrument.
|
||||
name: stream.Name,
|
||||
description: stream.Description,
|
||||
unit: stream.Unit,
|
||||
compAgg: out,
|
||||
})
|
||||
id := atomic.AddUint64(&aggIDCount, 1)
|
||||
return aggVal[N]{id, in, err}
|
||||
})
|
||||
return cv.Measure, cv.ID, cv.Err
|
||||
}
|
||||
|
||||
// logConflict validates if an instrument with the same case-insensitive name
|
||||
// as id has already been created. If that instrument conflicts with id, a
|
||||
// warning is logged.
|
||||
func (i *inserter[N]) logConflict(id instID) {
|
||||
// The API specification defines names as case-insensitive. If there is a
|
||||
// different casing of a name it needs to be a conflict.
|
||||
name := id.normalize().Name
|
||||
existing := i.views.Lookup(name, func() instID { return id })
|
||||
if id == existing {
|
||||
return
|
||||
}
|
||||
|
||||
const msg = "duplicate metric stream definitions"
|
||||
args := []interface{}{
|
||||
"names", fmt.Sprintf("%q, %q", existing.Name, id.Name),
|
||||
"descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description),
|
||||
"kinds", fmt.Sprintf("%s, %s", existing.Kind, id.Kind),
|
||||
"units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit),
|
||||
"numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number),
|
||||
}
|
||||
|
||||
// The specification recommends logging a suggested view to resolve
|
||||
// conflicts if possible.
|
||||
//
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#duplicate-instrument-registration
|
||||
if id.Unit != existing.Unit || id.Number != existing.Number {
|
||||
// There is no view resolution for these, don't make a suggestion.
|
||||
global.Warn(msg, args...)
|
||||
return
|
||||
}
|
||||
|
||||
var stream string
|
||||
if id.Name != existing.Name || id.Kind != existing.Kind {
|
||||
stream = `Stream{Name: "{{NEW_NAME}}"}`
|
||||
} else if id.Description != existing.Description {
|
||||
stream = fmt.Sprintf("Stream{Description: %q}", existing.Description)
|
||||
}
|
||||
|
||||
inst := fmt.Sprintf(
|
||||
"Instrument{Name: %q, Description: %q, Kind: %q, Unit: %q}",
|
||||
id.Name, id.Description, "InstrumentKind"+id.Kind.String(), id.Unit,
|
||||
)
|
||||
args = append(args, "suggested.view", fmt.Sprintf("NewView(%s, %s)", inst, stream))
|
||||
|
||||
global.Warn(msg, args...)
|
||||
}
|
||||
|
||||
func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID {
|
||||
var zero N
|
||||
return instID{
|
||||
Name: stream.Name,
|
||||
Description: stream.Description,
|
||||
Unit: stream.Unit,
|
||||
Kind: kind,
|
||||
Number: fmt.Sprintf("%T", zero),
|
||||
}
|
||||
}
|
||||
|
||||
// aggregateFunc returns new aggregate functions matching agg, kind, and
|
||||
// monotonic. If the agg is unknown or temporality is invalid, an error is
|
||||
// returned.
|
||||
func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg Aggregation, kind InstrumentKind) (meas aggregate.Measure[N], comp aggregate.ComputeAggregation, err error) {
|
||||
switch a := agg.(type) {
|
||||
case AggregationDefault:
|
||||
return i.aggregateFunc(b, DefaultAggregationSelector(kind), kind)
|
||||
case AggregationDrop:
|
||||
// Return nil in and out to signify the drop aggregator.
|
||||
case AggregationLastValue:
|
||||
meas, comp = b.LastValue()
|
||||
case AggregationSum:
|
||||
switch kind {
|
||||
case InstrumentKindObservableCounter:
|
||||
meas, comp = b.PrecomputedSum(true)
|
||||
case InstrumentKindObservableUpDownCounter:
|
||||
meas, comp = b.PrecomputedSum(false)
|
||||
case InstrumentKindCounter, InstrumentKindHistogram:
|
||||
meas, comp = b.Sum(true)
|
||||
default:
|
||||
// InstrumentKindUpDownCounter, InstrumentKindObservableGauge, and
|
||||
// instrumentKindUndefined or other invalid instrument kinds.
|
||||
meas, comp = b.Sum(false)
|
||||
}
|
||||
case AggregationExplicitBucketHistogram:
|
||||
var noSum bool
|
||||
switch kind {
|
||||
case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge:
|
||||
// The sum should not be collected for any instrument that can make
|
||||
// negative measurements:
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations
|
||||
noSum = true
|
||||
}
|
||||
meas, comp = b.ExplicitBucketHistogram(a.Boundaries, a.NoMinMax, noSum)
|
||||
case AggregationBase2ExponentialHistogram:
|
||||
var noSum bool
|
||||
switch kind {
|
||||
case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge:
|
||||
// The sum should not be collected for any instrument that can make
|
||||
// negative measurements:
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations
|
||||
noSum = true
|
||||
}
|
||||
meas, comp = b.ExponentialBucketHistogram(a.MaxSize, a.MaxScale, a.NoMinMax, noSum)
|
||||
|
||||
default:
|
||||
err = errUnknownAggregation
|
||||
}
|
||||
|
||||
return meas, comp, err
|
||||
}
|
||||
|
||||
// isAggregatorCompatible checks if the aggregation can be used by the instrument.
|
||||
// Current compatibility:
|
||||
//
|
||||
// | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram |
|
||||
// |--------------------------|------|-----------|-----|-----------|-----------------------|
|
||||
// | Counter | ✓ | | ✓ | ✓ | ✓ |
|
||||
// | UpDownCounter | ✓ | | ✓ | ✓ | ✓ |
|
||||
// | Histogram | ✓ | | ✓ | ✓ | ✓ |
|
||||
// | Observable Counter | ✓ | | ✓ | ✓ | ✓ |
|
||||
// | Observable UpDownCounter | ✓ | | ✓ | ✓ | ✓ |
|
||||
// | Observable Gauge | ✓ | ✓ | | ✓ | ✓ |.
|
||||
func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error {
|
||||
switch agg.(type) {
|
||||
case AggregationDefault:
|
||||
return nil
|
||||
case AggregationExplicitBucketHistogram, AggregationBase2ExponentialHistogram:
|
||||
switch kind {
|
||||
case InstrumentKindCounter,
|
||||
InstrumentKindUpDownCounter,
|
||||
InstrumentKindHistogram,
|
||||
InstrumentKindObservableCounter,
|
||||
InstrumentKindObservableUpDownCounter,
|
||||
InstrumentKindObservableGauge:
|
||||
return nil
|
||||
default:
|
||||
return errIncompatibleAggregation
|
||||
}
|
||||
case AggregationSum:
|
||||
switch kind {
|
||||
case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter, InstrumentKindCounter, InstrumentKindHistogram, InstrumentKindUpDownCounter:
|
||||
return nil
|
||||
default:
|
||||
// TODO: review need for aggregation check after
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
|
||||
return errIncompatibleAggregation
|
||||
}
|
||||
case AggregationLastValue:
|
||||
if kind == InstrumentKindObservableGauge {
|
||||
return nil
|
||||
}
|
||||
// TODO: review need for aggregation check after
|
||||
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
|
||||
return errIncompatibleAggregation
|
||||
case AggregationDrop:
|
||||
return nil
|
||||
default:
|
||||
// This is used passed checking for default, it should be an error at this point.
|
||||
return fmt.Errorf("%w: %v", errUnknownAggregation, agg)
|
||||
}
|
||||
}
|
||||
|
||||
// pipelines is the group of pipelines connecting Readers with instrument
|
||||
// measurement.
|
||||
type pipelines []*pipeline
|
||||
|
||||
func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines {
|
||||
pipes := make([]*pipeline, 0, len(readers))
|
||||
for _, r := range readers {
|
||||
p := newPipeline(res, r, views)
|
||||
r.register(p)
|
||||
pipes = append(pipes, p)
|
||||
}
|
||||
return pipes
|
||||
}
|
||||
|
||||
func (p pipelines) registerCallback(cback func(context.Context) error) {
|
||||
for _, pipe := range p {
|
||||
pipe.addCallback(cback)
|
||||
}
|
||||
}
|
||||
|
||||
func (p pipelines) registerMultiCallback(c multiCallback) metric.Registration {
|
||||
unregs := make([]func(), len(p))
|
||||
for i, pipe := range p {
|
||||
unregs[i] = pipe.addMultiCallback(c)
|
||||
}
|
||||
return unregisterFuncs{f: unregs}
|
||||
}
|
||||
|
||||
type unregisterFuncs struct {
|
||||
embedded.Registration
|
||||
f []func()
|
||||
}
|
||||
|
||||
func (u unregisterFuncs) Unregister() error {
|
||||
for _, f := range u.f {
|
||||
f()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// resolver facilitates resolving aggregate functions an instrument calls to
|
||||
// aggregate measurements with while updating all pipelines that need to pull
|
||||
// from those aggregations.
|
||||
type resolver[N int64 | float64] struct {
|
||||
inserters []*inserter[N]
|
||||
}
|
||||
|
||||
func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) resolver[N] {
|
||||
in := make([]*inserter[N], len(p))
|
||||
for i := range in {
|
||||
in[i] = newInserter[N](p[i], vc)
|
||||
}
|
||||
return resolver[N]{in}
|
||||
}
|
||||
|
||||
// Aggregators returns the Aggregators that must be updated by the instrument
|
||||
// defined by key.
|
||||
func (r resolver[N]) Aggregators(id Instrument) ([]aggregate.Measure[N], error) {
|
||||
var measures []aggregate.Measure[N]
|
||||
|
||||
errs := &multierror{}
|
||||
for _, i := range r.inserters {
|
||||
in, err := i.Instrument(id)
|
||||
if err != nil {
|
||||
errs.append(err)
|
||||
}
|
||||
measures = append(measures, in...)
|
||||
}
|
||||
return measures, errs.errorOrNil()
|
||||
}
|
||||
|
||||
type multierror struct {
|
||||
wrapped error
|
||||
errors []string
|
||||
}
|
||||
|
||||
func (m *multierror) errorOrNil() error {
|
||||
if len(m.errors) == 0 {
|
||||
return nil
|
||||
}
|
||||
if m.wrapped == nil {
|
||||
return errors.New(strings.Join(m.errors, "; "))
|
||||
}
|
||||
return fmt.Errorf("%w: %s", m.wrapped, strings.Join(m.errors, "; "))
|
||||
}
|
||||
|
||||
func (m *multierror) append(err error) {
|
||||
m.errors = append(m.errors, err.Error())
|
||||
}
|
Reference in New Issue
Block a user