mirror of
https://gitea.com/Lydanne/buildx.git
synced 2025-07-12 22:47:09 +08:00
vendor: update buildkit to v0.18.0-rc1
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
87
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
generated
vendored
87
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/aggregate.go
generated
vendored
@ -1,16 +1,5 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
|
||||
@ -19,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
@ -44,42 +34,73 @@ type Builder[N int64 | float64] struct {
|
||||
// Filter is the attribute filter the aggregate function will use on the
|
||||
// input of measurements.
|
||||
Filter attribute.Filter
|
||||
// ReservoirFunc is the factory function used by aggregate functions to
|
||||
// create new exemplar reservoirs for a new seen attribute set.
|
||||
//
|
||||
// If this is not provided a default factory function that returns an
|
||||
// exemplar.Drop reservoir will be used.
|
||||
ReservoirFunc func() exemplar.FilteredReservoir[N]
|
||||
// AggregationLimit is the cardinality limit of measurement attributes. Any
|
||||
// measurement for new attributes once the limit has been reached will be
|
||||
// aggregated into a single aggregate for the "otel.metric.overflow"
|
||||
// attribute.
|
||||
//
|
||||
// If AggregationLimit is less than or equal to zero there will not be an
|
||||
// aggregation limit imposed (i.e. unlimited attribute sets).
|
||||
AggregationLimit int
|
||||
}
|
||||
|
||||
func (b Builder[N]) filter(f Measure[N]) Measure[N] {
|
||||
func (b Builder[N]) resFunc() func() exemplar.FilteredReservoir[N] {
|
||||
if b.ReservoirFunc != nil {
|
||||
return b.ReservoirFunc
|
||||
}
|
||||
|
||||
return exemplar.Drop
|
||||
}
|
||||
|
||||
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
|
||||
|
||||
func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] {
|
||||
if b.Filter != nil {
|
||||
fltr := b.Filter // Copy to make it immutable after assignment.
|
||||
return func(ctx context.Context, n N, a attribute.Set) {
|
||||
fAttr, _ := a.Filter(fltr)
|
||||
f(ctx, n, fAttr)
|
||||
fAttr, dropped := a.Filter(fltr)
|
||||
f(ctx, n, fAttr, dropped)
|
||||
}
|
||||
}
|
||||
return f
|
||||
return func(ctx context.Context, n N, a attribute.Set) {
|
||||
f(ctx, n, a, nil)
|
||||
}
|
||||
}
|
||||
|
||||
// LastValue returns a last-value aggregate function input and output.
|
||||
//
|
||||
// The Builder.Temporality is ignored and delta is use always.
|
||||
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
|
||||
// Delta temporality is the only temporality that makes semantic sense for
|
||||
// a last-value aggregate.
|
||||
lv := newLastValue[N]()
|
||||
lv := newLastValue[N](b.AggregationLimit, b.resFunc())
|
||||
switch b.Temporality {
|
||||
case metricdata.DeltaTemporality:
|
||||
return b.filter(lv.measure), lv.delta
|
||||
default:
|
||||
return b.filter(lv.measure), lv.cumulative
|
||||
}
|
||||
}
|
||||
|
||||
return b.filter(lv.measure), func(dest *metricdata.Aggregation) int {
|
||||
// Ignore if dest is not a metricdata.Gauge. The chance for memory
|
||||
// reuse of the DataPoints is missed (better luck next time).
|
||||
gData, _ := (*dest).(metricdata.Gauge[N])
|
||||
lv.computeAggregation(&gData.DataPoints)
|
||||
*dest = gData
|
||||
|
||||
return len(gData.DataPoints)
|
||||
// PrecomputedLastValue returns a last-value aggregate function input and
|
||||
// output. The aggregation returned from the returned ComputeAggregation
|
||||
// function will always only return values from the previous collection cycle.
|
||||
func (b Builder[N]) PrecomputedLastValue() (Measure[N], ComputeAggregation) {
|
||||
lv := newPrecomputedLastValue[N](b.AggregationLimit, b.resFunc())
|
||||
switch b.Temporality {
|
||||
case metricdata.DeltaTemporality:
|
||||
return b.filter(lv.measure), lv.delta
|
||||
default:
|
||||
return b.filter(lv.measure), lv.cumulative
|
||||
}
|
||||
}
|
||||
|
||||
// PrecomputedSum returns a sum aggregate function input and output. The
|
||||
// arguments passed to the input are expected to be the precomputed sum values.
|
||||
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
|
||||
s := newPrecomputedSum[N](monotonic)
|
||||
s := newPrecomputedSum[N](monotonic, b.AggregationLimit, b.resFunc())
|
||||
switch b.Temporality {
|
||||
case metricdata.DeltaTemporality:
|
||||
return b.filter(s.measure), s.delta
|
||||
@ -90,7 +111,7 @@ func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregati
|
||||
|
||||
// Sum returns a sum aggregate function input and output.
|
||||
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
|
||||
s := newSum[N](monotonic)
|
||||
s := newSum[N](monotonic, b.AggregationLimit, b.resFunc())
|
||||
switch b.Temporality {
|
||||
case metricdata.DeltaTemporality:
|
||||
return b.filter(s.measure), s.delta
|
||||
@ -102,7 +123,7 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
|
||||
// ExplicitBucketHistogram returns a histogram aggregate function input and
|
||||
// output.
|
||||
func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
|
||||
h := newHistogram[N](boundaries, noMinMax, noSum)
|
||||
h := newHistogram[N](boundaries, noMinMax, noSum, b.AggregationLimit, b.resFunc())
|
||||
switch b.Temporality {
|
||||
case metricdata.DeltaTemporality:
|
||||
return b.filter(h.measure), h.delta
|
||||
@ -114,7 +135,7 @@ func (b Builder[N]) ExplicitBucketHistogram(boundaries []float64, noMinMax, noSu
|
||||
// ExponentialBucketHistogram returns a histogram aggregate function input and
|
||||
// output.
|
||||
func (b Builder[N]) ExponentialBucketHistogram(maxSize, maxScale int32, noMinMax, noSum bool) (Measure[N], ComputeAggregation) {
|
||||
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum)
|
||||
h := newExponentialHistogram[N](maxSize, maxScale, noMinMax, noSum, b.AggregationLimit, b.resFunc())
|
||||
switch b.Temporality {
|
||||
case metricdata.DeltaTemporality:
|
||||
return b.filter(h.measure), h.delta
|
||||
|
13
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go
generated
vendored
13
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/doc.go
generated
vendored
@ -1,16 +1,5 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
// Package aggregate provides aggregate types used compute aggregations and
|
||||
// cycle the state of metric measurements made by the SDK. These types and
|
||||
|
42
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go
generated
vendored
Normal file
42
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exemplar.go
generated
vendored
Normal file
@ -0,0 +1,42 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
var exemplarPool = sync.Pool{
|
||||
New: func() any { return new([]exemplar.Exemplar) },
|
||||
}
|
||||
|
||||
func collectExemplars[N int64 | float64](out *[]metricdata.Exemplar[N], f func(*[]exemplar.Exemplar)) {
|
||||
dest := exemplarPool.Get().(*[]exemplar.Exemplar)
|
||||
defer func() {
|
||||
*dest = (*dest)[:0]
|
||||
exemplarPool.Put(dest)
|
||||
}()
|
||||
|
||||
*dest = reset(*dest, len(*out), cap(*out))
|
||||
|
||||
f(dest)
|
||||
|
||||
*out = reset(*out, len(*dest), cap(*dest))
|
||||
for i, e := range *dest {
|
||||
(*out)[i].FilteredAttributes = e.FilteredAttributes
|
||||
(*out)[i].Time = e.Time
|
||||
(*out)[i].SpanID = e.SpanID
|
||||
(*out)[i].TraceID = e.TraceID
|
||||
|
||||
switch e.Value.Type() {
|
||||
case exemplar.Int64ValueType:
|
||||
(*out)[i].Value = N(e.Value.Int64())
|
||||
case exemplar.Float64ValueType:
|
||||
(*out)[i].Value = N(e.Value.Float64())
|
||||
}
|
||||
}
|
||||
}
|
104
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go
generated
vendored
104
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/exponential_histogram.go
generated
vendored
@ -1,16 +1,5 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
|
||||
@ -23,6 +12,7 @@ import (
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
@ -40,6 +30,9 @@ const (
|
||||
|
||||
// expoHistogramDataPoint is a single data point in an exponential histogram.
|
||||
type expoHistogramDataPoint[N int64 | float64] struct {
|
||||
attrs attribute.Set
|
||||
res exemplar.FilteredReservoir[N]
|
||||
|
||||
count uint64
|
||||
min N
|
||||
max N
|
||||
@ -56,7 +49,7 @@ type expoHistogramDataPoint[N int64 | float64] struct {
|
||||
zeroCount uint64
|
||||
}
|
||||
|
||||
func newExpoHistogramDataPoint[N int64 | float64](maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramDataPoint[N] {
|
||||
func newExpoHistogramDataPoint[N int64 | float64](attrs attribute.Set, maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramDataPoint[N] {
|
||||
f := math.MaxFloat64
|
||||
max := N(f) // if N is int64, max will overflow to -9223372036854775808
|
||||
min := N(-f)
|
||||
@ -65,6 +58,7 @@ func newExpoHistogramDataPoint[N int64 | float64](maxSize, maxScale int, noMinMa
|
||||
min = N(minInt64)
|
||||
}
|
||||
return &expoHistogramDataPoint[N]{
|
||||
attrs: attrs,
|
||||
min: max,
|
||||
max: min,
|
||||
maxSize: maxSize,
|
||||
@ -288,14 +282,16 @@ func (b *expoBuckets) downscale(delta int) {
|
||||
// newExponentialHistogram returns an Aggregator that summarizes a set of
|
||||
// measurements as an exponential histogram. Each histogram is scoped by attributes
|
||||
// and the aggregation cycle the measurements were made in.
|
||||
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool) *expoHistogram[N] {
|
||||
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *expoHistogram[N] {
|
||||
return &expoHistogram[N]{
|
||||
noSum: noSum,
|
||||
noMinMax: noMinMax,
|
||||
maxSize: int(maxSize),
|
||||
maxScale: int(maxScale),
|
||||
|
||||
values: make(map[attribute.Set]*expoHistogramDataPoint[N]),
|
||||
newRes: r,
|
||||
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
|
||||
values: make(map[attribute.Distinct]*expoHistogramDataPoint[N]),
|
||||
|
||||
start: now(),
|
||||
}
|
||||
@ -309,13 +305,15 @@ type expoHistogram[N int64 | float64] struct {
|
||||
maxSize int
|
||||
maxScale int
|
||||
|
||||
values map[attribute.Set]*expoHistogramDataPoint[N]
|
||||
newRes func() exemplar.FilteredReservoir[N]
|
||||
limit limiter[*expoHistogramDataPoint[N]]
|
||||
values map[attribute.Distinct]*expoHistogramDataPoint[N]
|
||||
valuesMu sync.Mutex
|
||||
|
||||
start time.Time
|
||||
}
|
||||
|
||||
func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Set) {
|
||||
func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
|
||||
// Ignore NaN and infinity.
|
||||
if math.IsInf(float64(value), 0) || math.IsNaN(float64(value)) {
|
||||
return
|
||||
@ -324,12 +322,16 @@ func (e *expoHistogram[N]) measure(_ context.Context, value N, attr attribute.Se
|
||||
e.valuesMu.Lock()
|
||||
defer e.valuesMu.Unlock()
|
||||
|
||||
v, ok := e.values[attr]
|
||||
attr := e.limit.Attributes(fltrAttr, e.values)
|
||||
v, ok := e.values[attr.Equivalent()]
|
||||
if !ok {
|
||||
v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum)
|
||||
e.values[attr] = v
|
||||
v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum)
|
||||
v.res = e.newRes()
|
||||
|
||||
e.values[attr.Equivalent()] = v
|
||||
}
|
||||
v.record(value)
|
||||
v.res.Offer(ctx, value, droppedAttr)
|
||||
}
|
||||
|
||||
func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
|
||||
@ -347,33 +349,38 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
|
||||
hDPts := reset(h.DataPoints, n, n)
|
||||
|
||||
var i int
|
||||
for a, b := range e.values {
|
||||
hDPts[i].Attributes = a
|
||||
for _, val := range e.values {
|
||||
hDPts[i].Attributes = val.attrs
|
||||
hDPts[i].StartTime = e.start
|
||||
hDPts[i].Time = t
|
||||
hDPts[i].Count = b.count
|
||||
hDPts[i].Scale = int32(b.scale)
|
||||
hDPts[i].ZeroCount = b.zeroCount
|
||||
hDPts[i].Count = val.count
|
||||
hDPts[i].Scale = int32(val.scale)
|
||||
hDPts[i].ZeroCount = val.zeroCount
|
||||
hDPts[i].ZeroThreshold = 0.0
|
||||
|
||||
hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin)
|
||||
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts))
|
||||
copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts)
|
||||
hDPts[i].PositiveBucket.Offset = int32(val.posBuckets.startBin)
|
||||
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts))
|
||||
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
|
||||
|
||||
hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin)
|
||||
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts))
|
||||
hDPts[i].NegativeBucket.Offset = int32(val.negBuckets.startBin)
|
||||
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts))
|
||||
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
|
||||
|
||||
if !e.noSum {
|
||||
hDPts[i].Sum = b.sum
|
||||
hDPts[i].Sum = val.sum
|
||||
}
|
||||
if !e.noMinMax {
|
||||
hDPts[i].Min = metricdata.NewExtrema(b.min)
|
||||
hDPts[i].Max = metricdata.NewExtrema(b.max)
|
||||
hDPts[i].Min = metricdata.NewExtrema(val.min)
|
||||
hDPts[i].Max = metricdata.NewExtrema(val.max)
|
||||
}
|
||||
|
||||
delete(e.values, a)
|
||||
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
|
||||
|
||||
i++
|
||||
}
|
||||
// Unused attribute sets do not report.
|
||||
clear(e.values)
|
||||
|
||||
e.start = t
|
||||
h.DataPoints = hDPts
|
||||
*dest = h
|
||||
@ -395,30 +402,33 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int {
|
||||
hDPts := reset(h.DataPoints, n, n)
|
||||
|
||||
var i int
|
||||
for a, b := range e.values {
|
||||
hDPts[i].Attributes = a
|
||||
for _, val := range e.values {
|
||||
hDPts[i].Attributes = val.attrs
|
||||
hDPts[i].StartTime = e.start
|
||||
hDPts[i].Time = t
|
||||
hDPts[i].Count = b.count
|
||||
hDPts[i].Scale = int32(b.scale)
|
||||
hDPts[i].ZeroCount = b.zeroCount
|
||||
hDPts[i].Count = val.count
|
||||
hDPts[i].Scale = int32(val.scale)
|
||||
hDPts[i].ZeroCount = val.zeroCount
|
||||
hDPts[i].ZeroThreshold = 0.0
|
||||
|
||||
hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin)
|
||||
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts))
|
||||
copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts)
|
||||
hDPts[i].PositiveBucket.Offset = int32(val.posBuckets.startBin)
|
||||
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts))
|
||||
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
|
||||
|
||||
hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin)
|
||||
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts))
|
||||
hDPts[i].NegativeBucket.Offset = int32(val.negBuckets.startBin)
|
||||
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts))
|
||||
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
|
||||
|
||||
if !e.noSum {
|
||||
hDPts[i].Sum = b.sum
|
||||
hDPts[i].Sum = val.sum
|
||||
}
|
||||
if !e.noMinMax {
|
||||
hDPts[i].Min = metricdata.NewExtrema(b.min)
|
||||
hDPts[i].Max = metricdata.NewExtrema(b.max)
|
||||
hDPts[i].Min = metricdata.NewExtrema(val.min)
|
||||
hDPts[i].Max = metricdata.NewExtrema(val.max)
|
||||
}
|
||||
|
||||
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
|
||||
|
||||
i++
|
||||
// TODO (#3006): This will use an unbounded amount of memory if there
|
||||
// are unbounded number of attribute sets being aggregated. Attribute
|
||||
|
106
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go
generated
vendored
106
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/histogram.go
generated
vendored
@ -1,30 +1,24 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
type buckets[N int64 | float64] struct {
|
||||
attrs attribute.Set
|
||||
res exemplar.FilteredReservoir[N]
|
||||
|
||||
counts []uint64
|
||||
count uint64
|
||||
total N
|
||||
@ -32,8 +26,8 @@ type buckets[N int64 | float64] struct {
|
||||
}
|
||||
|
||||
// newBuckets returns buckets with n bins.
|
||||
func newBuckets[N int64 | float64](n int) *buckets[N] {
|
||||
return &buckets[N]{counts: make([]uint64, n)}
|
||||
func newBuckets[N int64 | float64](attrs attribute.Set, n int) *buckets[N] {
|
||||
return &buckets[N]{attrs: attrs, counts: make([]uint64, n)}
|
||||
}
|
||||
|
||||
func (b *buckets[N]) sum(value N) { b.total += value }
|
||||
@ -54,28 +48,31 @@ type histValues[N int64 | float64] struct {
|
||||
noSum bool
|
||||
bounds []float64
|
||||
|
||||
values map[attribute.Set]*buckets[N]
|
||||
newRes func() exemplar.FilteredReservoir[N]
|
||||
limit limiter[*buckets[N]]
|
||||
values map[attribute.Distinct]*buckets[N]
|
||||
valuesMu sync.Mutex
|
||||
}
|
||||
|
||||
func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] {
|
||||
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histValues[N] {
|
||||
// The responsibility of keeping all buckets correctly associated with the
|
||||
// passed boundaries is ultimately this type's responsibility. Make a copy
|
||||
// here so we can always guarantee this. Or, in the case of failure, have
|
||||
// complete control over the fix.
|
||||
b := make([]float64, len(bounds))
|
||||
copy(b, bounds)
|
||||
sort.Float64s(b)
|
||||
b := slices.Clone(bounds)
|
||||
slices.Sort(b)
|
||||
return &histValues[N]{
|
||||
noSum: noSum,
|
||||
bounds: b,
|
||||
values: make(map[attribute.Set]*buckets[N]),
|
||||
newRes: r,
|
||||
limit: newLimiter[*buckets[N]](limit),
|
||||
values: make(map[attribute.Distinct]*buckets[N]),
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate records the measurement value, scoped by attr, and aggregates it
|
||||
// into a histogram.
|
||||
func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set) {
|
||||
func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
|
||||
// This search will return an index in the range [0, len(s.bounds)], where
|
||||
// it will return len(s.bounds) if value is greater than the last element
|
||||
// of s.bounds. This aligns with the buckets in that the length of buckets
|
||||
@ -86,7 +83,8 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set)
|
||||
s.valuesMu.Lock()
|
||||
defer s.valuesMu.Unlock()
|
||||
|
||||
b, ok := s.values[attr]
|
||||
attr := s.limit.Attributes(fltrAttr, s.values)
|
||||
b, ok := s.values[attr.Equivalent()]
|
||||
if !ok {
|
||||
// N+1 buckets. For example:
|
||||
//
|
||||
@ -95,22 +93,25 @@ func (s *histValues[N]) measure(_ context.Context, value N, attr attribute.Set)
|
||||
// Then,
|
||||
//
|
||||
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
|
||||
b = newBuckets[N](len(s.bounds) + 1)
|
||||
b = newBuckets[N](attr, len(s.bounds)+1)
|
||||
b.res = s.newRes()
|
||||
|
||||
// Ensure min and max are recorded values (not zero), for new buckets.
|
||||
b.min, b.max = value, value
|
||||
s.values[attr] = b
|
||||
s.values[attr.Equivalent()] = b
|
||||
}
|
||||
b.bin(idx, value)
|
||||
if !s.noSum {
|
||||
b.sum(value)
|
||||
}
|
||||
b.res.Offer(ctx, value, droppedAttr)
|
||||
}
|
||||
|
||||
// newHistogram returns an Aggregator that summarizes a set of measurements as
|
||||
// an histogram.
|
||||
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool) *histogram[N] {
|
||||
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histogram[N] {
|
||||
return &histogram[N]{
|
||||
histValues: newHistValues[N](boundaries, noSum),
|
||||
histValues: newHistValues[N](boundaries, noSum, limit, r),
|
||||
noMinMax: noMinMax,
|
||||
start: now(),
|
||||
}
|
||||
@ -137,34 +138,35 @@ func (s *histogram[N]) delta(dest *metricdata.Aggregation) int {
|
||||
defer s.valuesMu.Unlock()
|
||||
|
||||
// Do not allow modification of our copy of bounds.
|
||||
bounds := make([]float64, len(s.bounds))
|
||||
copy(bounds, s.bounds)
|
||||
bounds := slices.Clone(s.bounds)
|
||||
|
||||
n := len(s.values)
|
||||
hDPts := reset(h.DataPoints, n, n)
|
||||
|
||||
var i int
|
||||
for a, b := range s.values {
|
||||
hDPts[i].Attributes = a
|
||||
for _, val := range s.values {
|
||||
hDPts[i].Attributes = val.attrs
|
||||
hDPts[i].StartTime = s.start
|
||||
hDPts[i].Time = t
|
||||
hDPts[i].Count = b.count
|
||||
hDPts[i].Count = val.count
|
||||
hDPts[i].Bounds = bounds
|
||||
hDPts[i].BucketCounts = b.counts
|
||||
hDPts[i].BucketCounts = val.counts
|
||||
|
||||
if !s.noSum {
|
||||
hDPts[i].Sum = b.total
|
||||
hDPts[i].Sum = val.total
|
||||
}
|
||||
|
||||
if !s.noMinMax {
|
||||
hDPts[i].Min = metricdata.NewExtrema(b.min)
|
||||
hDPts[i].Max = metricdata.NewExtrema(b.max)
|
||||
hDPts[i].Min = metricdata.NewExtrema(val.min)
|
||||
hDPts[i].Max = metricdata.NewExtrema(val.max)
|
||||
}
|
||||
|
||||
// Unused attribute sets do not report.
|
||||
delete(s.values, a)
|
||||
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
|
||||
|
||||
i++
|
||||
}
|
||||
// Unused attribute sets do not report.
|
||||
clear(s.values)
|
||||
// The delta collection cycle resets.
|
||||
s.start = t
|
||||
|
||||
@ -186,37 +188,37 @@ func (s *histogram[N]) cumulative(dest *metricdata.Aggregation) int {
|
||||
defer s.valuesMu.Unlock()
|
||||
|
||||
// Do not allow modification of our copy of bounds.
|
||||
bounds := make([]float64, len(s.bounds))
|
||||
copy(bounds, s.bounds)
|
||||
bounds := slices.Clone(s.bounds)
|
||||
|
||||
n := len(s.values)
|
||||
hDPts := reset(h.DataPoints, n, n)
|
||||
|
||||
var i int
|
||||
for a, b := range s.values {
|
||||
for _, val := range s.values {
|
||||
hDPts[i].Attributes = val.attrs
|
||||
hDPts[i].StartTime = s.start
|
||||
hDPts[i].Time = t
|
||||
hDPts[i].Count = val.count
|
||||
hDPts[i].Bounds = bounds
|
||||
|
||||
// The HistogramDataPoint field values returned need to be copies of
|
||||
// the buckets value as we will keep updating them.
|
||||
//
|
||||
// TODO (#3047): Making copies for bounds and counts incurs a large
|
||||
// memory allocation footprint. Alternatives should be explored.
|
||||
counts := make([]uint64, len(b.counts))
|
||||
copy(counts, b.counts)
|
||||
|
||||
hDPts[i].Attributes = a
|
||||
hDPts[i].StartTime = s.start
|
||||
hDPts[i].Time = t
|
||||
hDPts[i].Count = b.count
|
||||
hDPts[i].Bounds = bounds
|
||||
hDPts[i].BucketCounts = counts
|
||||
hDPts[i].BucketCounts = slices.Clone(val.counts)
|
||||
|
||||
if !s.noSum {
|
||||
hDPts[i].Sum = b.total
|
||||
hDPts[i].Sum = val.total
|
||||
}
|
||||
|
||||
if !s.noMinMax {
|
||||
hDPts[i].Min = metricdata.NewExtrema(b.min)
|
||||
hDPts[i].Max = metricdata.NewExtrema(b.max)
|
||||
hDPts[i].Min = metricdata.NewExtrema(val.min)
|
||||
hDPts[i].Max = metricdata.NewExtrema(val.max)
|
||||
}
|
||||
|
||||
collectExemplars(&hDPts[i].Exemplars, val.res.Collect)
|
||||
|
||||
i++
|
||||
// TODO (#3006): This will use an unbounded amount of memory if there
|
||||
// are unbounded number of attribute sets being aggregated. Attribute
|
||||
|
158
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go
generated
vendored
158
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/lastvalue.go
generated
vendored
@ -1,16 +1,5 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
|
||||
@ -20,49 +9,154 @@ import (
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
// datapoint is timestamped measurement data.
|
||||
type datapoint[N int64 | float64] struct {
|
||||
timestamp time.Time
|
||||
value N
|
||||
attrs attribute.Set
|
||||
value N
|
||||
res exemplar.FilteredReservoir[N]
|
||||
}
|
||||
|
||||
func newLastValue[N int64 | float64]() *lastValue[N] {
|
||||
return &lastValue[N]{values: make(map[attribute.Set]datapoint[N])}
|
||||
func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *lastValue[N] {
|
||||
return &lastValue[N]{
|
||||
newRes: r,
|
||||
limit: newLimiter[datapoint[N]](limit),
|
||||
values: make(map[attribute.Distinct]datapoint[N]),
|
||||
start: now(),
|
||||
}
|
||||
}
|
||||
|
||||
// lastValue summarizes a set of measurements as the last one made.
|
||||
type lastValue[N int64 | float64] struct {
|
||||
sync.Mutex
|
||||
|
||||
values map[attribute.Set]datapoint[N]
|
||||
newRes func() exemplar.FilteredReservoir[N]
|
||||
limit limiter[datapoint[N]]
|
||||
values map[attribute.Distinct]datapoint[N]
|
||||
start time.Time
|
||||
}
|
||||
|
||||
func (s *lastValue[N]) measure(ctx context.Context, value N, attr attribute.Set) {
|
||||
d := datapoint[N]{timestamp: now(), value: value}
|
||||
s.Lock()
|
||||
s.values[attr] = d
|
||||
s.Unlock()
|
||||
}
|
||||
|
||||
func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) {
|
||||
func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
attr := s.limit.Attributes(fltrAttr, s.values)
|
||||
d, ok := s.values[attr.Equivalent()]
|
||||
if !ok {
|
||||
d.res = s.newRes()
|
||||
}
|
||||
|
||||
d.attrs = attr
|
||||
d.value = value
|
||||
d.res.Offer(ctx, value, droppedAttr)
|
||||
|
||||
s.values[attr.Equivalent()] = d
|
||||
}
|
||||
|
||||
func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int {
|
||||
t := now()
|
||||
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
|
||||
// the DataPoints is missed (better luck next time).
|
||||
gData, _ := (*dest).(metricdata.Gauge[N])
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
n := s.copyDpts(&gData.DataPoints, t)
|
||||
// Do not report stale values.
|
||||
clear(s.values)
|
||||
// Update start time for delta temporality.
|
||||
s.start = t
|
||||
|
||||
*dest = gData
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int {
|
||||
t := now()
|
||||
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
|
||||
// the DataPoints is missed (better luck next time).
|
||||
gData, _ := (*dest).(metricdata.Gauge[N])
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
n := s.copyDpts(&gData.DataPoints, t)
|
||||
// TODO (#3006): This will use an unbounded amount of memory if there
|
||||
// are unbounded number of attribute sets being aggregated. Attribute
|
||||
// sets that become "stale" need to be forgotten so this will not
|
||||
// overload the system.
|
||||
*dest = gData
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
// copyDpts copies the datapoints held by s into dest. The number of datapoints
|
||||
// copied is returned.
|
||||
func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int {
|
||||
n := len(s.values)
|
||||
*dest = reset(*dest, n, n)
|
||||
|
||||
var i int
|
||||
for a, v := range s.values {
|
||||
(*dest)[i].Attributes = a
|
||||
// The event time is the only meaningful timestamp, StartTime is
|
||||
// ignored.
|
||||
(*dest)[i].Time = v.timestamp
|
||||
for _, v := range s.values {
|
||||
(*dest)[i].Attributes = v.attrs
|
||||
(*dest)[i].StartTime = s.start
|
||||
(*dest)[i].Time = t
|
||||
(*dest)[i].Value = v.value
|
||||
// Do not report stale values.
|
||||
delete(s.values, a)
|
||||
collectExemplars(&(*dest)[i].Exemplars, v.res.Collect)
|
||||
i++
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// newPrecomputedLastValue returns an aggregator that summarizes a set of
|
||||
// observations as the last one made.
|
||||
func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *precomputedLastValue[N] {
|
||||
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
|
||||
}
|
||||
|
||||
// precomputedLastValue summarizes a set of observations as the last one made.
|
||||
type precomputedLastValue[N int64 | float64] struct {
|
||||
*lastValue[N]
|
||||
}
|
||||
|
||||
func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int {
|
||||
t := now()
|
||||
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
|
||||
// the DataPoints is missed (better luck next time).
|
||||
gData, _ := (*dest).(metricdata.Gauge[N])
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
n := s.copyDpts(&gData.DataPoints, t)
|
||||
// Do not report stale values.
|
||||
clear(s.values)
|
||||
// Update start time for delta temporality.
|
||||
s.start = t
|
||||
|
||||
*dest = gData
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int {
|
||||
t := now()
|
||||
// Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of
|
||||
// the DataPoints is missed (better luck next time).
|
||||
gData, _ := (*dest).(metricdata.Gauge[N])
|
||||
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
n := s.copyDpts(&gData.DataPoints, t)
|
||||
// Do not report stale values.
|
||||
clear(s.values)
|
||||
*dest = gData
|
||||
|
||||
return n
|
||||
}
|
||||
|
42
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/limit.go
generated
vendored
Normal file
42
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/limit.go
generated
vendored
Normal file
@ -0,0 +1,42 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
|
||||
import "go.opentelemetry.io/otel/attribute"
|
||||
|
||||
// overflowSet is the attribute set used to record a measurement when adding
|
||||
// another distinct attribute set to the aggregate would exceed the aggregate
|
||||
// limit.
|
||||
var overflowSet = attribute.NewSet(attribute.Bool("otel.metric.overflow", true))
|
||||
|
||||
// limiter limits aggregate values.
|
||||
type limiter[V any] struct {
|
||||
// aggLimit is the maximum number of metric streams that can be aggregated.
|
||||
//
|
||||
// Any metric stream with attributes distinct from any set already
|
||||
// aggregated once the aggLimit will be meet will instead be aggregated
|
||||
// into an "overflow" metric stream. That stream will only contain the
|
||||
// "otel.metric.overflow"=true attribute.
|
||||
aggLimit int
|
||||
}
|
||||
|
||||
// newLimiter returns a new Limiter with the provided aggregation limit.
|
||||
func newLimiter[V any](aggregation int) limiter[V] {
|
||||
return limiter[V]{aggLimit: aggregation}
|
||||
}
|
||||
|
||||
// Attributes checks if adding a measurement for attrs will exceed the
|
||||
// aggregation cardinality limit for the existing measurements. If it will,
|
||||
// overflowSet is returned. Otherwise, if it will not exceed the limit, or the
|
||||
// limit is not set (limit <= 0), attr is returned.
|
||||
func (l limiter[V]) Attributes(attrs attribute.Set, measurements map[attribute.Distinct]V) attribute.Set {
|
||||
if l.aggLimit > 0 {
|
||||
_, exists := measurements[attrs.Equivalent()]
|
||||
if !exists && len(measurements) >= l.aggLimit-1 {
|
||||
return overflowSet
|
||||
}
|
||||
}
|
||||
|
||||
return attrs
|
||||
}
|
104
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go
generated
vendored
104
vendor/go.opentelemetry.io/otel/sdk/metric/internal/aggregate/sum.go
generated
vendored
@ -1,16 +1,5 @@
|
||||
// Copyright The OpenTelemetry Authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
|
||||
|
||||
@ -20,31 +9,55 @@ import (
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
)
|
||||
|
||||
type sumValue[N int64 | float64] struct {
|
||||
n N
|
||||
res exemplar.FilteredReservoir[N]
|
||||
attrs attribute.Set
|
||||
}
|
||||
|
||||
// valueMap is the storage for sums.
|
||||
type valueMap[N int64 | float64] struct {
|
||||
sync.Mutex
|
||||
values map[attribute.Set]N
|
||||
newRes func() exemplar.FilteredReservoir[N]
|
||||
limit limiter[sumValue[N]]
|
||||
values map[attribute.Distinct]sumValue[N]
|
||||
}
|
||||
|
||||
func newValueMap[N int64 | float64]() *valueMap[N] {
|
||||
return &valueMap[N]{values: make(map[attribute.Set]N)}
|
||||
func newValueMap[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *valueMap[N] {
|
||||
return &valueMap[N]{
|
||||
newRes: r,
|
||||
limit: newLimiter[sumValue[N]](limit),
|
||||
values: make(map[attribute.Distinct]sumValue[N]),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *valueMap[N]) measure(_ context.Context, value N, attr attribute.Set) {
|
||||
func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) {
|
||||
s.Lock()
|
||||
s.values[attr] += value
|
||||
s.Unlock()
|
||||
defer s.Unlock()
|
||||
|
||||
attr := s.limit.Attributes(fltrAttr, s.values)
|
||||
v, ok := s.values[attr.Equivalent()]
|
||||
if !ok {
|
||||
v.res = s.newRes()
|
||||
}
|
||||
|
||||
v.attrs = attr
|
||||
v.n += value
|
||||
v.res.Offer(ctx, value, droppedAttr)
|
||||
|
||||
s.values[attr.Equivalent()] = v
|
||||
}
|
||||
|
||||
// newSum returns an aggregator that summarizes a set of measurements as their
|
||||
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
|
||||
// the measurements were made in.
|
||||
func newSum[N int64 | float64](monotonic bool) *sum[N] {
|
||||
func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *sum[N] {
|
||||
return &sum[N]{
|
||||
valueMap: newValueMap[N](),
|
||||
valueMap: newValueMap[N](limit, r),
|
||||
monotonic: monotonic,
|
||||
start: now(),
|
||||
}
|
||||
@ -74,15 +87,16 @@ func (s *sum[N]) delta(dest *metricdata.Aggregation) int {
|
||||
dPts := reset(sData.DataPoints, n, n)
|
||||
|
||||
var i int
|
||||
for attr, value := range s.values {
|
||||
dPts[i].Attributes = attr
|
||||
for _, val := range s.values {
|
||||
dPts[i].Attributes = val.attrs
|
||||
dPts[i].StartTime = s.start
|
||||
dPts[i].Time = t
|
||||
dPts[i].Value = value
|
||||
// Do not report stale values.
|
||||
delete(s.values, attr)
|
||||
dPts[i].Value = val.n
|
||||
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
|
||||
i++
|
||||
}
|
||||
// Do not report stale values.
|
||||
clear(s.values)
|
||||
// The delta collection cycle resets.
|
||||
s.start = t
|
||||
|
||||
@ -108,11 +122,12 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
|
||||
dPts := reset(sData.DataPoints, n, n)
|
||||
|
||||
var i int
|
||||
for attr, value := range s.values {
|
||||
dPts[i].Attributes = attr
|
||||
for _, value := range s.values {
|
||||
dPts[i].Attributes = value.attrs
|
||||
dPts[i].StartTime = s.start
|
||||
dPts[i].Time = t
|
||||
dPts[i].Value = value
|
||||
dPts[i].Value = value.n
|
||||
collectExemplars(&dPts[i].Exemplars, value.res.Collect)
|
||||
// TODO (#3006): This will use an unbounded amount of memory if there
|
||||
// are unbounded number of attribute sets being aggregated. Attribute
|
||||
// sets that become "stale" need to be forgotten so this will not
|
||||
@ -129,9 +144,9 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
|
||||
// newPrecomputedSum returns an aggregator that summarizes a set of
|
||||
// observatrions as their arithmetic sum. Each sum is scoped by attributes and
|
||||
// the aggregation cycle the measurements were made in.
|
||||
func newPrecomputedSum[N int64 | float64](monotonic bool) *precomputedSum[N] {
|
||||
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *precomputedSum[N] {
|
||||
return &precomputedSum[N]{
|
||||
valueMap: newValueMap[N](),
|
||||
valueMap: newValueMap[N](limit, r),
|
||||
monotonic: monotonic,
|
||||
start: now(),
|
||||
}
|
||||
@ -144,12 +159,12 @@ type precomputedSum[N int64 | float64] struct {
|
||||
monotonic bool
|
||||
start time.Time
|
||||
|
||||
reported map[attribute.Set]N
|
||||
reported map[attribute.Distinct]N
|
||||
}
|
||||
|
||||
func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int {
|
||||
t := now()
|
||||
newReported := make(map[attribute.Set]N)
|
||||
newReported := make(map[attribute.Distinct]N)
|
||||
|
||||
// If *dest is not a metricdata.Sum, memory reuse is missed. In that case,
|
||||
// use the zero-value sData and hope for better alignment next cycle.
|
||||
@ -164,20 +179,20 @@ func (s *precomputedSum[N]) delta(dest *metricdata.Aggregation) int {
|
||||
dPts := reset(sData.DataPoints, n, n)
|
||||
|
||||
var i int
|
||||
for attr, value := range s.values {
|
||||
delta := value - s.reported[attr]
|
||||
for key, value := range s.values {
|
||||
delta := value.n - s.reported[key]
|
||||
|
||||
dPts[i].Attributes = attr
|
||||
dPts[i].Attributes = value.attrs
|
||||
dPts[i].StartTime = s.start
|
||||
dPts[i].Time = t
|
||||
dPts[i].Value = delta
|
||||
collectExemplars(&dPts[i].Exemplars, value.res.Collect)
|
||||
|
||||
newReported[attr] = value
|
||||
// Unused attribute sets do not report.
|
||||
delete(s.values, attr)
|
||||
newReported[key] = value.n
|
||||
i++
|
||||
}
|
||||
// Unused attribute sets are forgotten.
|
||||
// Unused attribute sets do not report.
|
||||
clear(s.values)
|
||||
s.reported = newReported
|
||||
// The delta collection cycle resets.
|
||||
s.start = t
|
||||
@ -204,16 +219,17 @@ func (s *precomputedSum[N]) cumulative(dest *metricdata.Aggregation) int {
|
||||
dPts := reset(sData.DataPoints, n, n)
|
||||
|
||||
var i int
|
||||
for attr, value := range s.values {
|
||||
dPts[i].Attributes = attr
|
||||
for _, val := range s.values {
|
||||
dPts[i].Attributes = val.attrs
|
||||
dPts[i].StartTime = s.start
|
||||
dPts[i].Time = t
|
||||
dPts[i].Value = value
|
||||
dPts[i].Value = val.n
|
||||
collectExemplars(&dPts[i].Exemplars, val.res.Collect)
|
||||
|
||||
// Unused attribute sets do not report.
|
||||
delete(s.values, attr)
|
||||
i++
|
||||
}
|
||||
// Unused attribute sets do not report.
|
||||
clear(s.values)
|
||||
|
||||
sData.DataPoints = dPts
|
||||
*dest = sData
|
||||
|
Reference in New Issue
Block a user