vendor: update buildkit to v0.19.0-rc1

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi
2025-01-14 14:20:26 -08:00
parent 630066bfc5
commit 44fa243d58
1910 changed files with 95196 additions and 50438 deletions

View File

@@ -8,7 +8,6 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@@ -38,8 +37,8 @@ type Builder[N int64 | float64] struct {
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.FilteredReservoir[N]
// DropReservoir reservoir will be used.
ReservoirFunc func() FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
@@ -50,12 +49,12 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}
func (b Builder[N]) resFunc() func() exemplar.FilteredReservoir[N] {
func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}
return exemplar.Drop
return DropReservoir
}
type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)

View File

@@ -1,16 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)
// Drop returns a [FilteredReservoir] that drops all measurements it is offered.
func Drop[N int64 | float64]() FilteredReservoir[N] { return &dropRes[N]{} }
// DropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func DropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} }
type dropRes[N int64 | float64] struct{}
@@ -18,6 +19,6 @@ type dropRes[N int64 | float64] struct{}
func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {}
// Collect resets dest. No exemplars will ever be returned.
func (r *dropRes[N]) Collect(dest *[]Exemplar) {
func (r *dropRes[N]) Collect(dest *[]exemplar.Exemplar) {
*dest = (*dest)[:0]
}

View File

@@ -6,7 +6,7 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggreg
import (
"sync"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

View File

@@ -12,7 +12,6 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@@ -31,7 +30,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res exemplar.FilteredReservoir[N]
res FilteredExemplarReservoir[N]
count uint64
min N
@@ -42,14 +41,14 @@ type expoHistogramDataPoint[N int64 | float64] struct {
noMinMax bool
noSum bool
scale int
scale int32
posBuckets expoBuckets
negBuckets expoBuckets
zeroCount uint64
}
func newExpoHistogramDataPoint[N int64 | float64](attrs attribute.Set, maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramDataPoint[N] {
func newExpoHistogramDataPoint[N int64 | float64](attrs attribute.Set, maxSize int, maxScale int32, noMinMax, noSum bool) *expoHistogramDataPoint[N] {
f := math.MaxFloat64
max := N(f) // if N is int64, max will overflow to -9223372036854775808
min := N(-f)
@@ -119,11 +118,13 @@ func (p *expoHistogramDataPoint[N]) record(v N) {
}
// getBin returns the bin v should be recorded into.
func (p *expoHistogramDataPoint[N]) getBin(v float64) int {
frac, exp := math.Frexp(v)
func (p *expoHistogramDataPoint[N]) getBin(v float64) int32 {
frac, expInt := math.Frexp(v)
// 11-bit exponential.
exp := int32(expInt) // nolint: gosec
if p.scale <= 0 {
// Because of the choice of fraction is always 1 power of two higher than we want.
correction := 1
var correction int32 = 1
if frac == .5 {
// If v is an exact power of two the frac will be .5 and the exp
// will be one higher than we want.
@@ -131,7 +132,7 @@ func (p *expoHistogramDataPoint[N]) getBin(v float64) int {
}
return (exp - correction) >> (-p.scale)
}
return exp<<p.scale + int(math.Log(frac)*scaleFactors[p.scale]) - 1
return exp<<p.scale + int32(math.Log(frac)*scaleFactors[p.scale]) - 1
}
// scaleFactors are constants used in calculating the logarithm index. They are
@@ -162,20 +163,20 @@ var scaleFactors = [21]float64{
// scaleChange returns the magnitude of the scale change needed to fit bin in
// the bucket. If no scale change is needed 0 is returned.
func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin, length int) int {
func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin int32, length int) int32 {
if length == 0 {
// No need to rescale if there are no buckets.
return 0
}
low := startBin
high := bin
low := int(startBin)
high := int(bin)
if startBin >= bin {
low = bin
high = startBin + length - 1
low = int(bin)
high = int(startBin) + length - 1
}
count := 0
var count int32
for high-low >= p.maxSize {
low = low >> 1
high = high >> 1
@@ -189,39 +190,39 @@ func (p *expoHistogramDataPoint[N]) scaleChange(bin, startBin, length int) int {
// expoBuckets is a set of buckets in an exponential histogram.
type expoBuckets struct {
startBin int
startBin int32
counts []uint64
}
// record increments the count for the given bin, and expands the buckets if needed.
// Size changes must be done before calling this function.
func (b *expoBuckets) record(bin int) {
func (b *expoBuckets) record(bin int32) {
if len(b.counts) == 0 {
b.counts = []uint64{1}
b.startBin = bin
return
}
endBin := b.startBin + len(b.counts) - 1
endBin := int(b.startBin) + len(b.counts) - 1
// if the new bin is inside the current range
if bin >= b.startBin && bin <= endBin {
if bin >= b.startBin && int(bin) <= endBin {
b.counts[bin-b.startBin]++
return
}
// if the new bin is before the current start add spaces to the counts
if bin < b.startBin {
origLen := len(b.counts)
newLength := endBin - bin + 1
newLength := endBin - int(bin) + 1
shift := b.startBin - bin
if newLength > cap(b.counts) {
b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...)
}
copy(b.counts[shift:origLen+shift], b.counts[:])
copy(b.counts[shift:origLen+int(shift)], b.counts[:])
b.counts = b.counts[:newLength]
for i := 1; i < shift; i++ {
for i := 1; i < int(shift); i++ {
b.counts[i] = 0
}
b.startBin = bin
@@ -229,17 +230,17 @@ func (b *expoBuckets) record(bin int) {
return
}
// if the new is after the end add spaces to the end
if bin > endBin {
if bin-b.startBin < cap(b.counts) {
if int(bin) > endBin {
if int(bin-b.startBin) < cap(b.counts) {
b.counts = b.counts[:bin-b.startBin+1]
for i := endBin + 1 - b.startBin; i < len(b.counts); i++ {
for i := endBin + 1 - int(b.startBin); i < len(b.counts); i++ {
b.counts[i] = 0
}
b.counts[bin-b.startBin] = 1
return
}
end := make([]uint64, bin-b.startBin-len(b.counts)+1)
end := make([]uint64, int(bin-b.startBin)-len(b.counts)+1)
b.counts = append(b.counts, end...)
b.counts[bin-b.startBin] = 1
}
@@ -247,7 +248,7 @@ func (b *expoBuckets) record(bin int) {
// downscale shrinks a bucket by a factor of 2*s. It will sum counts into the
// correct lower resolution bucket.
func (b *expoBuckets) downscale(delta int) {
func (b *expoBuckets) downscale(delta int32) {
// Example
// delta = 2
// Original offset: -6
@@ -262,19 +263,19 @@ func (b *expoBuckets) downscale(delta int) {
return
}
steps := 1 << delta
steps := int32(1) << delta
offset := b.startBin % steps
offset = (offset + steps) % steps // to make offset positive
for i := 1; i < len(b.counts); i++ {
idx := i + offset
if idx%steps == 0 {
b.counts[idx/steps] = b.counts[i]
idx := i + int(offset)
if idx%int(steps) == 0 {
b.counts[idx/int(steps)] = b.counts[i]
continue
}
b.counts[idx/steps] += b.counts[i]
b.counts[idx/int(steps)] += b.counts[i]
}
lastIdx := (len(b.counts) - 1 + offset) / steps
lastIdx := (len(b.counts) - 1 + int(offset)) / int(steps)
b.counts = b.counts[:lastIdx+1]
b.startBin = b.startBin >> delta
}
@@ -282,12 +283,12 @@ func (b *expoBuckets) downscale(delta int) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
maxSize: int(maxSize),
maxScale: int(maxScale),
maxScale: maxScale,
newRes: r,
limit: newLimiter[*expoHistogramDataPoint[N]](limit),
@@ -303,9 +304,9 @@ type expoHistogram[N int64 | float64] struct {
noSum bool
noMinMax bool
maxSize int
maxScale int
maxScale int32
newRes func() exemplar.FilteredReservoir[N]
newRes func() FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
@@ -354,15 +355,15 @@ func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int {
hDPts[i].StartTime = e.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Scale = int32(val.scale)
hDPts[i].Scale = val.scale
hDPts[i].ZeroCount = val.zeroCount
hDPts[i].ZeroThreshold = 0.0
hDPts[i].PositiveBucket.Offset = int32(val.posBuckets.startBin)
hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts))
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
hDPts[i].NegativeBucket.Offset = int32(val.negBuckets.startBin)
hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts))
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)
@@ -407,15 +408,15 @@ func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int {
hDPts[i].StartTime = e.start
hDPts[i].Time = t
hDPts[i].Count = val.count
hDPts[i].Scale = int32(val.scale)
hDPts[i].Scale = val.scale
hDPts[i].ZeroCount = val.zeroCount
hDPts[i].ZeroThreshold = 0.0
hDPts[i].PositiveBucket.Offset = int32(val.posBuckets.startBin)
hDPts[i].PositiveBucket.Offset = val.posBuckets.startBin
hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(val.posBuckets.counts), len(val.posBuckets.counts))
copy(hDPts[i].PositiveBucket.Counts, val.posBuckets.counts)
hDPts[i].NegativeBucket.Offset = int32(val.negBuckets.startBin)
hDPts[i].NegativeBucket.Offset = val.negBuckets.startBin
hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(val.negBuckets.counts), len(val.negBuckets.counts))
copy(hDPts[i].NegativeBucket.Counts, val.negBuckets.counts)

View File

@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)
// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter.
type FilteredExemplarReservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the filter decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars in the reservoir.
Collect(dest *[]exemplar.Exemplar)
}
// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
type filteredExemplarReservoir[N int64 | float64] struct {
filter exemplar.Filter
reservoir exemplar.Reservoir
}
// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
// that are allowed by the filter.
func NewFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] {
return &filteredExemplarReservoir[N]{
filter: f,
reservoir: r,
}
}
func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
// only record the current time if we are sampling this measurement.
f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr)
}
}
func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) { f.reservoir.Collect(dest) }

View File

@@ -11,13 +11,12 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
type buckets[N int64 | float64] struct {
attrs attribute.Set
res exemplar.FilteredReservoir[N]
res FilteredExemplarReservoir[N]
counts []uint64
count uint64
@@ -48,13 +47,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64
newRes func() exemplar.FilteredReservoir[N]
newRes func() FilteredExemplarReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
@@ -109,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute
// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,

View File

@@ -9,7 +9,6 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
@@ -17,10 +16,10 @@ import (
type datapoint[N int64 | float64] struct {
attrs attribute.Set
value N
res exemplar.FilteredReservoir[N]
res FilteredExemplarReservoir[N]
}
func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *lastValue[N] {
func newLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
@@ -33,7 +32,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReserv
type lastValue[N int64 | float64] struct {
sync.Mutex
newRes func() exemplar.FilteredReservoir[N]
newRes func() FilteredExemplarReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
start time.Time
@@ -115,7 +114,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in
// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *precomputedLastValue[N] {
func newPrecomputedLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}

View File

@@ -9,25 +9,24 @@ import (
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
type sumValue[N int64 | float64] struct {
n N
res exemplar.FilteredReservoir[N]
res FilteredExemplarReservoir[N]
attrs attribute.Set
}
// valueMap is the storage for sums.
type valueMap[N int64 | float64] struct {
sync.Mutex
newRes func() exemplar.FilteredReservoir[N]
newRes func() FilteredExemplarReservoir[N]
limit limiter[sumValue[N]]
values map[attribute.Distinct]sumValue[N]
}
func newValueMap[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *valueMap[N] {
func newValueMap[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *valueMap[N] {
return &valueMap[N]{
newRes: r,
limit: newLimiter[sumValue[N]](limit),
@@ -55,7 +54,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S
// newSum returns an aggregator that summarizes a set of measurements as their
// arithmetic sum. Each sum is scoped by attributes and the aggregation cycle
// the measurements were made in.
func newSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *sum[N] {
func newSum[N int64 | float64](monotonic bool, limit int, r func() FilteredExemplarReservoir[N]) *sum[N] {
return &sum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
@@ -142,9 +141,9 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int {
}
// newPrecomputedSum returns an aggregator that summarizes a set of
// observatrions as their arithmetic sum. Each sum is scoped by attributes and
// observations as their arithmetic sum. Each sum is scoped by attributes and
// the aggregation cycle the measurements were made in.
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() exemplar.FilteredReservoir[N]) *precomputedSum[N] {
func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() FilteredExemplarReservoir[N]) *precomputedSum[N] {
return &precomputedSum[N]{
valueMap: newValueMap[N](limit, r),
monotonic: monotonic,
@@ -152,7 +151,7 @@ func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() ex
}
}
// precomputedSum summarizes a set of observatrions as their arithmetic sum.
// precomputedSum summarizes a set of observations as their arithmetic sum.
type precomputedSum[N int64 | float64] struct {
*valueMap[N]

View File

@@ -1,6 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
// Package exemplar provides an implementation of the OpenTelemetry exemplar
// reservoir to be used in metric collection pipelines.
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"

View File

@@ -1,29 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
import (
"time"
"go.opentelemetry.io/otel/attribute"
)
// Exemplar is a measurement sampled from a timeseries providing a typical
// example.
type Exemplar struct {
// FilteredAttributes are the attributes recorded with the measurement but
// filtered out of the timeseries' aggregated data.
FilteredAttributes []attribute.KeyValue
// Time is the time when the measurement was recorded.
Time time.Time
// Value is the measured value.
Value Value
// SpanID is the ID of the span that was active during the measurement. If
// no span was active or the span was not sampled this will be empty.
SpanID []byte `json:",omitempty"`
// TraceID is the ID of the trace the active span belonged to during the
// measurement. If no span was active or the span was not sampled this will
// be empty.
TraceID []byte `json:",omitempty"`
}

View File

@@ -1,29 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
import (
"context"
"go.opentelemetry.io/otel/trace"
)
// Filter determines if a measurement should be offered.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
type Filter func(context.Context) bool
// SampledFilter is a [Filter] that will only offer measurements
// if the passed context associated with the measurement contains a sampled
// [go.opentelemetry.io/otel/trace.SpanContext].
func SampledFilter(ctx context.Context) bool {
return trace.SpanContextFromContext(ctx).IsSampled()
}
// AlwaysOnFilter is a [Filter] that always offers measurements.
func AlwaysOnFilter(ctx context.Context) bool {
return true
}

View File

@@ -1,49 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
)
// FilteredReservoir wraps a [Reservoir] with a filter.
type FilteredReservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the filter decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars in the reservoir.
Collect(dest *[]Exemplar)
}
// filteredReservoir handles the pre-sampled exemplar of measurements made.
type filteredReservoir[N int64 | float64] struct {
filter Filter
reservoir Reservoir
}
// NewFilteredReservoir creates a [FilteredReservoir] which only offers values
// that are allowed by the filter.
func NewFilteredReservoir[N int64 | float64](f Filter, r Reservoir) FilteredReservoir[N] {
return &filteredReservoir[N]{
filter: f,
reservoir: r,
}
}
func (f *filteredReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
// only record the current time if we are sampling this measurment.
f.reservoir.Offer(ctx, time.Now(), NewValue(val), attr)
}
}
func (f *filteredReservoir[N]) Collect(dest *[]Exemplar) { f.reservoir.Collect(dest) }

View File

@@ -1,46 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
import (
"context"
"slices"
"sort"
"time"
"go.opentelemetry.io/otel/attribute"
)
// Histogram returns a [Reservoir] that samples the last measurement that falls
// within a histogram bucket. The histogram bucket upper-boundaries are define
// by bounds.
//
// The passed bounds will be sorted by this function.
func Histogram(bounds []float64) Reservoir {
slices.Sort(bounds)
return &histRes{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
}
}
type histRes struct {
*storage
// bounds are bucket bounds in ascending order.
bounds []float64
}
func (r *histRes) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) {
var x float64
switch v.Type() {
case Int64ValueType:
x = float64(v.Int64())
case Float64ValueType:
x = v.Float64()
default:
panic("unknown value type")
}
r.store[sort.SearchFloat64s(r.bounds, x)] = newMeasurement(ctx, t, v, a)
}

View File

@@ -1,191 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
import (
"context"
"math"
"math/rand"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
)
var (
// rng is used to make sampling decisions.
//
// Do not use crypto/rand. There is no reason for the decrease in performance
// given this is not a security sensitive decision.
rng = rand.New(rand.NewSource(time.Now().UnixNano()))
// Ensure concurrent safe accecess to rng and its underlying source.
rngMu sync.Mutex
)
// random returns, as a float64, a uniform pseudo-random number in the open
// interval (0.0,1.0).
func random() float64 {
// TODO: This does not return a uniform number. rng.Float64 returns a
// uniformly random int in [0,2^53) that is divided by 2^53. Meaning it
// returns multiples of 2^-53, and not all floating point numbers between 0
// and 1 (i.e. for values less than 2^-4 the 4 last bits of the significand
// are always going to be 0).
//
// An alternative algorithm should be considered that will actually return
// a uniform number in the interval (0,1). For example, since the default
// rand source provides a uniform distribution for Int63, this can be
// converted following the prototypical code of Mersenne Twister 64 (Takuji
// Nishimura and Makoto Matsumoto:
// http://www.math.sci.hiroshima-u.ac.jp/m-mat/MT/VERSIONS/C-LANG/mt19937-64.c)
//
// (float64(rng.Int63()>>11) + 0.5) * (1.0 / 4503599627370496.0)
//
// There are likely many other methods to explore here as well.
rngMu.Lock()
defer rngMu.Unlock()
f := rng.Float64()
for f == 0 {
f = rng.Float64()
}
return f
}
// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
// are k or less measurements made, the Reservoir will sample each one. If
// there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
func FixedSize(k int) Reservoir {
r := &randRes{storage: newStorage(k)}
r.reset()
return r
}
type randRes struct {
*storage
// count is the number of measurement seen.
count int64
// next is the next count that will store a measurement at a random index
// once the reservoir has been filled.
next int64
// w is the largest random number in a distribution that is used to compute
// the next next.
w float64
}
func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
// O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4):
// 481493 (https://dl.acm.org/doi/10.1145/198429.198435).
//
// A high-level overview of "Algorithm L":
// 0) Pre-calculate the random count greater than the storage size when
// an exemplar will be replaced.
// 1) Accept all measurements offered until the configured storage size is
// reached.
// 2) Loop:
// a) When the pre-calculate count is reached, replace a random
// existing exemplar with the offered measurement.
// b) Calculate the next random count greater than the existing one
// which will replace another exemplars
//
// The way a "replacement" count is computed is by looking at `n` number of
// independent random numbers each corresponding to an offered measurement.
// Of these numbers the smallest `k` (the same size as the storage
// capacity) of them are kept as a subset. The maximum value in this
// subset, called `w` is used to weight another random number generation
// for the next count that will be considered.
//
// By weighting the next count computation like described, it is able to
// perform a uniformly-weighted sampling algorithm based on the number of
// samples the reservoir has seen so far. The sampling will "slow down" as
// more and more samples are offered so as to reduce a bias towards those
// offered just prior to the end of the collection.
//
// This algorithm is preferred because of its balance of simplicity and
// performance. It will compute three random numbers (the bulk of
// computation time) for each item that becomes part of the reservoir, but
// it does not spend any time on items that do not. In particular it has an
// asymptotic runtime of O(k(1 + log(n/k)) where n is the number of
// measurements offered and k is the reservoir size.
//
// See https://en.wikipedia.org/wiki/Reservoir_sampling for an overview of
// this and other reservoir sampling algorithms. See
// https://github.com/MrAlias/reservoir-sampling for a performance
// comparison of reservoir sampling algorithms.
if int(r.count) < cap(r.store) {
r.store[r.count] = newMeasurement(ctx, t, n, a)
} else {
if r.count == r.next {
// Overwrite a random existing measurement with the one offered.
idx := int(rng.Int63n(int64(cap(r.store))))
r.store[idx] = newMeasurement(ctx, t, n, a)
r.advance()
}
}
r.count++
}
// reset resets r to the initial state.
func (r *randRes) reset() {
// This resets the number of exemplars known.
r.count = 0
// Random index inserts should only happen after the storage is full.
r.next = int64(cap(r.store))
// Initial random number in the series used to generate r.next.
//
// This is set before r.advance to reset or initialize the random number
// series. Without doing so it would always be 0 or never restart a new
// random number series.
//
// This maps the uniform random number in (0,1) to a geometric distribution
// over the same interval. The mean of the distribution is inversely
// proportional to the storage capacity.
r.w = math.Exp(math.Log(random()) / float64(cap(r.store)))
r.advance()
}
// advance updates the count at which the offered measurement will overwrite an
// existing exemplar.
func (r *randRes) advance() {
// Calculate the next value in the random number series.
//
// The current value of r.w is based on the max of a distribution of random
// numbers (i.e. `w = max(u_1,u_2,...,u_k)` for `k` equal to the capacity
// of the storage and each `u` in the interval (0,w)). To calculate the
// next r.w we use the fact that when the next exemplar is selected to be
// included in the storage an existing one will be dropped, and the
// corresponding random number in the set used to calculate r.w will also
// be replaced. The replacement random number will also be within (0,w),
// therefore the next r.w will be based on the same distribution (i.e.
// `max(u_1,u_2,...,u_k)`). Therefore, we can sample the next r.w by
// computing the next random number `u` and take r.w as `w * u^(1/k)`.
r.w *= math.Exp(math.Log(random()) / float64(cap(r.store)))
// Use the new random number in the series to calculate the count of the
// next measurement that will be stored.
//
// Given 0 < r.w < 1, each iteration will result in subsequent r.w being
// smaller. This translates here into the next next being selected against
// a distribution with a higher mean (i.e. the expected value will increase
// and replacements become less likely)
//
// Important to note, the new r.next will always be at least 1 more than
// the last r.next.
r.next += int64(math.Log(random())/math.Log(1-r.w)) + 1
}
func (r *randRes) Collect(dest *[]Exemplar) {
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
// measurements are offered, but it will also prioritize those new
// measurements that are made over the older collection cycle ones.
r.reset()
}

View File

@@ -1,32 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
)
// Reservoir holds the sampled exemplar of measurements made.
type Reservoir interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The val and attr
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
Offer(ctx context.Context, t time.Time, val Value, attr []attribute.KeyValue)
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
Collect(dest *[]Exemplar)
}

View File

@@ -1,95 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
import (
"context"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// storage is an exemplar storage for [Reservoir] implementations.
type storage struct {
// store are the measurements sampled.
//
// This does not use []metricdata.Exemplar because it potentially would
// require an allocation for trace and span IDs in the hot path of Offer.
store []measurement
}
func newStorage(n int) *storage {
return &storage{store: make([]measurement, n)}
}
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *storage) Collect(dest *[]Exemplar) {
*dest = reset(*dest, len(r.store), len(r.store))
var n int
for _, m := range r.store {
if !m.valid {
continue
}
m.Exemplar(&(*dest)[n])
n++
}
*dest = (*dest)[:n]
}
// measurement is a measurement made by a telemetry system.
type measurement struct {
// FilteredAttributes are the attributes dropped during the measurement.
FilteredAttributes []attribute.KeyValue
// Time is the time when the measurement was made.
Time time.Time
// Value is the value of the measurement.
Value Value
// SpanContext is the SpanContext active when a measurement was made.
SpanContext trace.SpanContext
valid bool
}
// newMeasurement returns a new non-empty Measurement.
func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []attribute.KeyValue) measurement {
return measurement{
FilteredAttributes: droppedAttr,
Time: ts,
Value: v,
SpanContext: trace.SpanContextFromContext(ctx),
valid: true,
}
}
// Exemplar returns m as an [Exemplar].
func (m measurement) Exemplar(dest *Exemplar) {
dest.FilteredAttributes = m.FilteredAttributes
dest.Time = m.Time
dest.Value = m.Value
if m.SpanContext.HasTraceID() {
traceID := m.SpanContext.TraceID()
dest.TraceID = traceID[:]
} else {
dest.TraceID = dest.TraceID[:0]
}
if m.SpanContext.HasSpanID() {
spanID := m.SpanContext.SpanID()
dest.SpanID = spanID[:]
} else {
dest.SpanID = dest.SpanID[:0]
}
}
func reset[T any](s []T, length, capacity int) []T {
if cap(s) < capacity {
return make([]T, length, capacity)
}
return s[:length]
}

View File

@@ -1,57 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
import "math"
// ValueType identifies the type of value used in exemplar data.
type ValueType uint8
const (
// UnknownValueType should not be used. It represents a misconfigured
// Value.
UnknownValueType ValueType = 0
// Int64ValueType represents a Value with int64 data.
Int64ValueType ValueType = 1
// Float64ValueType represents a Value with float64 data.
Float64ValueType ValueType = 2
)
// Value is the value of data held by an exemplar.
type Value struct {
t ValueType
val uint64
}
// NewValue returns a new [Value] for the provided value.
func NewValue[N int64 | float64](value N) Value {
switch v := any(value).(type) {
case int64:
return Value{t: Int64ValueType, val: uint64(v)}
case float64:
return Value{t: Float64ValueType, val: math.Float64bits(v)}
}
return Value{}
}
// Type returns the [ValueType] of data held by v.
func (v Value) Type() ValueType { return v.t }
// Int64 returns the value of v as an int64. If the ValueType of v is not an
// Int64ValueType, 0 is returned.
func (v Value) Int64() int64 {
if v.t == Int64ValueType {
return int64(v.val)
}
return 0
}
// Float64 returns the value of v as an float64. If the ValueType of v is not
// an Float64ValueType, 0 is returned.
func (v Value) Float64() float64 {
if v.t == Float64ValueType {
return math.Float64frombits(v.val)
}
return 0
}

View File

@@ -10,39 +10,23 @@ package x // import "go.opentelemetry.io/otel/sdk/metric/internal/x"
import (
"os"
"strconv"
"strings"
)
var (
// Exemplars is an experimental feature flag that defines if exemplars
// should be recorded for metric data-points.
//
// To enable this feature set the OTEL_GO_X_EXEMPLAR environment variable
// to the case-insensitive string value of "true" (i.e. "True" and "TRUE"
// will also enable this).
Exemplars = newFeature("EXEMPLAR", func(v string) (string, bool) {
if strings.ToLower(v) == "true" {
return v, true
}
return "", false
})
// CardinalityLimit is an experimental feature flag that defines if
// cardinality limits should be applied to the recorded metric data-points.
//
// To enable this feature set the OTEL_GO_X_CARDINALITY_LIMIT environment
// variable to the integer limit value you want to use.
//
// Setting OTEL_GO_X_CARDINALITY_LIMIT to a value less than or equal to 0
// will disable the cardinality limits.
CardinalityLimit = newFeature("CARDINALITY_LIMIT", func(v string) (int, bool) {
n, err := strconv.Atoi(v)
if err != nil {
return 0, false
}
return n, true
})
)
// CardinalityLimit is an experimental feature flag that defines if
// cardinality limits should be applied to the recorded metric data-points.
//
// To enable this feature set the OTEL_GO_X_CARDINALITY_LIMIT environment
// variable to the integer limit value you want to use.
//
// Setting OTEL_GO_X_CARDINALITY_LIMIT to a value less than or equal to 0
// will disable the cardinality limits.
var CardinalityLimit = newFeature("CARDINALITY_LIMIT", func(v string) (int, bool) {
n, err := strconv.Atoi(v)
if err != nil {
return 0, false
}
return n, true
})
// Feature is an experimental feature control flag. It provides a uniform way
// to interact with these feature flags and parse their values.