vendor: update buildkit to 1e6032c

Signed-off-by: CrazyMax <crazy-max@users.noreply.github.com>
This commit is contained in:
CrazyMax
2022-02-15 19:12:00 +01:00
parent 1bcc3556fc
commit 22aaa260e7
85 changed files with 1199 additions and 1166 deletions

View File

@@ -0,0 +1,88 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package env // import "go.opentelemetry.io/otel/sdk/internal/env"
import (
"os"
"strconv"
"go.opentelemetry.io/otel/internal/global"
)
// Environment variable names
const (
// BatchSpanProcessorScheduleDelayKey
// Delay interval between two consecutive exports.
// i.e. 5000
BatchSpanProcessorScheduleDelayKey = "OTEL_BSP_SCHEDULE_DELAY"
// BatchSpanProcessorExportTimeoutKey
// Maximum allowed time to export data.
// i.e. 3000
BatchSpanProcessorExportTimeoutKey = "OTEL_BSP_EXPORT_TIMEOUT"
// BatchSpanProcessorMaxQueueSizeKey
// Maximum queue size
// i.e. 2048
BatchSpanProcessorMaxQueueSizeKey = "OTEL_BSP_MAX_QUEUE_SIZE"
// BatchSpanProcessorMaxExportBatchSizeKey
// Maximum batch size
// Note: Must be less than or equal to EnvBatchSpanProcessorMaxQueueSize
// i.e. 512
BatchSpanProcessorMaxExportBatchSizeKey = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE"
)
// IntEnvOr returns the int value of the environment variable with name key if
// it exists and the value is an int. Otherwise, defaultValue is returned.
func IntEnvOr(key string, defaultValue int) int {
value, ok := os.LookupEnv(key)
if !ok {
return defaultValue
}
intValue, err := strconv.Atoi(value)
if err != nil {
global.Info("Got invalid value, number value expected.", key, value)
return defaultValue
}
return intValue
}
// BatchSpanProcessorScheduleDelay returns the environment variable value for
// the OTEL_BSP_SCHEDULE_DELAY key if it exists, otherwise defaultValue is
// returned.
func BatchSpanProcessorScheduleDelay(defaultValue int) int {
return IntEnvOr(BatchSpanProcessorScheduleDelayKey, defaultValue)
}
// BatchSpanProcessorExportTimeout returns the environment variable value for
// the OTEL_BSP_EXPORT_TIMEOUT key if it exists, otherwise defaultValue is
// returned.
func BatchSpanProcessorExportTimeout(defaultValue int) int {
return IntEnvOr(BatchSpanProcessorExportTimeoutKey, defaultValue)
}
// BatchSpanProcessorMaxQueueSize returns the environment variable value for
// the OTEL_BSP_MAX_QUEUE_SIZE key if it exists, otherwise defaultValue is
// returned.
func BatchSpanProcessorMaxQueueSize(defaultValue int) int {
return IntEnvOr(BatchSpanProcessorMaxQueueSizeKey, defaultValue)
}
// BatchSpanProcessorMaxExportBatchSize returns the environment variable value for
// the OTEL_BSP_MAX_EXPORT_BATCH_SIZE key if it exists, otherwise defaultValue
// is returned.
func BatchSpanProcessorMaxExportBatchSize(defaultValue int) int {
return IntEnvOr(BatchSpanProcessorMaxExportBatchSizeKey, defaultValue)
}

View File

@@ -1,50 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package internal // import "go.opentelemetry.io/otel/sdk/internal"
import (
"strings"
"unicode"
)
const labelKeySizeLimit = 100
// Sanitize returns a string that is trunacated to 100 characters if it's too
// long, and replaces non-alphanumeric characters to underscores.
func Sanitize(s string) string {
if len(s) == 0 {
return s
}
if len(s) > labelKeySizeLimit {
s = s[:labelKeySizeLimit]
}
s = strings.Map(sanitizeRune, s)
if unicode.IsDigit(rune(s[0])) {
s = "key_" + s
}
if s[0] == '_' {
s = "key" + s
}
return s
}
// converts anything that is not a letter or digit to an underscore
func sanitizeRune(r rune) rune {
if unicode.IsLetter(r) || unicode.IsDigit(r) {
return r
}
// Everything else turns into an underscore
return '_'
}

View File

@@ -31,7 +31,7 @@ type config struct {
// Option is the interface that applies a configuration option.
type Option interface {
// apply sets the Option value of a config.
apply(*config)
apply(config) config
}
// WithAttributes adds attributes to the configured Resource.
@@ -56,8 +56,9 @@ type detectorsOption struct {
detectors []Detector
}
func (o detectorsOption) apply(cfg *config) {
func (o detectorsOption) apply(cfg config) config {
cfg.detectors = append(cfg.detectors, o.detectors...)
return cfg
}
// WithFromEnv adds attributes from environment variables to the configured resource.
@@ -82,8 +83,9 @@ func WithSchemaURL(schemaURL string) Option {
type schemaURLOption string
func (o schemaURLOption) apply(cfg *config) {
func (o schemaURLOption) apply(cfg config) config {
cfg.schemaURL = string(o)
return cfg
}
// WithOS adds all the OS attributes to the configured Resource.

View File

@@ -48,7 +48,7 @@ var errMergeConflictSchemaURL = errors.New("cannot merge resource due to conflic
func New(ctx context.Context, opts ...Option) (*Resource, error) {
cfg := config{}
for _, opt := range opts {
opt.apply(&cfg)
cfg = opt.apply(cfg)
}
resource, err := Detect(ctx, cfg.detectors...)
@@ -109,6 +109,17 @@ func (r *Resource) String() string {
return r.attrs.Encoded(attribute.DefaultEncoder())
}
// MarshalLog is the marshaling function used by the logging system to represent this exporter.
func (r *Resource) MarshalLog() interface{} {
return struct {
Attributes attribute.Set
SchemaURL string
}{
Attributes: r.attrs,
SchemaURL: r.schemaURL,
}
}
// Attributes returns a copy of attributes from the resource in a sorted order.
// To avoid allocating a new slice, use an iterator.
func (r *Resource) Attributes() []attribute.KeyValue {

View File

@@ -1,91 +0,0 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace // import "go.opentelemetry.io/otel/sdk/trace"
import (
"container/list"
"go.opentelemetry.io/otel/attribute"
)
// attributesMap is a capped map of attributes, holding the most recent attributes.
// Eviction is done via a LRU method, the oldest entry is removed to create room for a new entry.
// Updates are allowed and they refresh the usage of the key.
//
// This is based from https://github.com/hashicorp/golang-lru/blob/master/simplelru/lru.go
// With a subset of the its operations and specific for holding attribute.KeyValue
type attributesMap struct {
attributes map[attribute.Key]*list.Element
evictList *list.List
droppedCount int
capacity int
}
func newAttributesMap(capacity int) *attributesMap {
lm := &attributesMap{
attributes: make(map[attribute.Key]*list.Element),
evictList: list.New(),
capacity: capacity,
}
return lm
}
func (am *attributesMap) add(kv attribute.KeyValue) {
// Check for existing item
if ent, ok := am.attributes[kv.Key]; ok {
am.evictList.MoveToFront(ent)
ent.Value = &kv
return
}
// Add new item
entry := am.evictList.PushFront(&kv)
am.attributes[kv.Key] = entry
// Verify size not exceeded
if am.evictList.Len() > am.capacity {
am.removeOldest()
am.droppedCount++
}
}
// toKeyValue copies the attributesMap into a slice of attribute.KeyValue and
// returns it. If the map is empty, a nil is returned.
// TODO: Is it more efficient to return a pointer to the slice?
func (am *attributesMap) toKeyValue() []attribute.KeyValue {
len := am.evictList.Len()
if len == 0 {
return nil
}
attributes := make([]attribute.KeyValue, 0, len)
for ent := am.evictList.Back(); ent != nil; ent = ent.Prev() {
if value, ok := ent.Value.(*attribute.KeyValue); ok {
attributes = append(attributes, *value)
}
}
return attributes
}
// removeOldest removes the oldest item from the cache.
func (am *attributesMap) removeOldest() {
ent := am.evictList.Back()
if ent != nil {
am.evictList.Remove(ent)
kv := ent.Value.(*attribute.KeyValue)
delete(am.attributes, kv.Key)
}
}

View File

@@ -23,14 +23,15 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/internal/env"
"go.opentelemetry.io/otel/trace"
)
// Defaults for BatchSpanProcessorOptions.
const (
DefaultMaxQueueSize = 2048
DefaultBatchTimeout = 5000 * time.Millisecond
DefaultExportTimeout = 30000 * time.Millisecond
DefaultScheduleDelay = 5000
DefaultExportTimeout = 30000
DefaultMaxExportBatchSize = 512
)
@@ -89,11 +90,22 @@ var _ SpanProcessor = (*batchSpanProcessor)(nil)
//
// If the exporter is nil, the span processor will preform no action.
func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
maxQueueSize := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize)
maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize)
if maxExportBatchSize > maxQueueSize {
if DefaultMaxExportBatchSize > maxQueueSize {
maxExportBatchSize = maxQueueSize
} else {
maxExportBatchSize = DefaultMaxExportBatchSize
}
}
o := BatchSpanProcessorOptions{
BatchTimeout: DefaultBatchTimeout,
ExportTimeout: DefaultExportTimeout,
MaxQueueSize: DefaultMaxQueueSize,
MaxExportBatchSize: DefaultMaxExportBatchSize,
BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond,
ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
MaxQueueSize: maxQueueSize,
MaxExportBatchSize: maxExportBatchSize,
}
for _, opt := range options {
opt(&o)
@@ -238,7 +250,7 @@ func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
}
if l := len(bsp.batch); l > 0 {
global.Debug("exporting spans", "count", len(bsp.batch))
global.Debug("exporting spans", "count", len(bsp.batch), "dropped", bsp.dropped)
err := bsp.e.ExportSpans(ctx, bsp.batch)
// A new batch is always created after exporting, even if the batch failed to be exported.
@@ -369,3 +381,16 @@ func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd R
}
return false
}
// MarshalLog is the marshaling function used by the logging system to represent this exporter.
func (bsp *batchSpanProcessor) MarshalLog() interface{} {
return struct {
Type string
SpanExporter SpanExporter
Config BatchSpanProcessorOptions
}{
Type: "BatchSpanProcessor",
SpanExporter: bsp.e,
Config: bsp.o,
}
}

View File

@@ -14,24 +14,25 @@
package trace // import "go.opentelemetry.io/otel/sdk/trace"
// evictedQueue is a FIFO queue with a configurable capacity.
type evictedQueue struct {
queue []interface{}
capacity int
droppedCount int
}
func newEvictedQueue(capacity int) *evictedQueue {
eq := &evictedQueue{
capacity: capacity,
queue: make([]interface{}, 0),
}
return eq
func newEvictedQueue(capacity int) evictedQueue {
// Do not pre-allocate queue, do this lazily.
return evictedQueue{capacity: capacity}
}
// add adds value to the evictedQueue eq. If eq is at capacity, the oldest
// queued value will be discarded and the drop count incremented.
func (eq *evictedQueue) add(value interface{}) {
if len(eq.queue) == eq.capacity {
eq.queue = eq.queue[1:]
// Drop first-in while avoiding allocating more capacity to eq.queue.
copy(eq.queue[:eq.capacity-1], eq.queue[1:])
eq.queue = eq.queue[:eq.capacity-1]
eq.droppedCount++
}
eq.queue = append(eq.queue, value)

View File

@@ -21,6 +21,7 @@ import (
"sync/atomic"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
@@ -52,14 +53,34 @@ type tracerProviderConfig struct {
resource *resource.Resource
}
// MarshalLog is the marshaling function used by the logging system to represent this exporter.
func (cfg tracerProviderConfig) MarshalLog() interface{} {
return struct {
SpanProcessors []SpanProcessor
SamplerType string
IDGeneratorType string
SpanLimits SpanLimits
Resource *resource.Resource
}{
SpanProcessors: cfg.processors,
SamplerType: fmt.Sprintf("%T", cfg.sampler),
IDGeneratorType: fmt.Sprintf("%T", cfg.idGenerator),
SpanLimits: cfg.spanLimits,
Resource: cfg.resource,
}
}
type TracerProvider struct {
mu sync.Mutex
namedTracer map[instrumentation.Library]*tracer
spanProcessors atomic.Value
sampler Sampler
idGenerator IDGenerator
spanLimits SpanLimits
resource *resource.Resource
// These fields are not protected by the lock mu. They are assumed to be
// immutable after creation of the TracerProvider.
sampler Sampler
idGenerator IDGenerator
spanLimits SpanLimits
resource *resource.Resource
}
var _ trace.TracerProvider = &TracerProvider{}
@@ -75,13 +96,13 @@ var _ trace.TracerProvider = &TracerProvider{}
// The passed opts are used to override these default values and configure the
// returned TracerProvider appropriately.
func NewTracerProvider(opts ...TracerProviderOption) *TracerProvider {
o := &tracerProviderConfig{}
o := tracerProviderConfig{}
for _, opt := range opts {
opt.apply(o)
o = opt.apply(o)
}
ensureValidTracerProviderConfig(o)
o = ensureValidTracerProviderConfig(o)
tp := &TracerProvider{
namedTracer: make(map[instrumentation.Library]*tracer),
@@ -91,6 +112,8 @@ func NewTracerProvider(opts ...TracerProviderOption) *TracerProvider {
resource: o.resource,
}
global.Info("TracerProvider created", "config", o)
for _, sp := range o.processors {
tp.RegisterSpanProcessor(sp)
}
@@ -125,6 +148,7 @@ func (p *TracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.T
instrumentationLibrary: il,
}
p.namedTracer[il] = t
global.Info("Tracer created", "name", name, "version", c.InstrumentationVersion(), "schemaURL", c.SchemaURL())
}
return t
}
@@ -235,13 +259,13 @@ func (p *TracerProvider) Shutdown(ctx context.Context) error {
}
type TracerProviderOption interface {
apply(*tracerProviderConfig)
apply(tracerProviderConfig) tracerProviderConfig
}
type traceProviderOptionFunc func(*tracerProviderConfig)
type traceProviderOptionFunc func(tracerProviderConfig) tracerProviderConfig
func (fn traceProviderOptionFunc) apply(cfg *tracerProviderConfig) {
fn(cfg)
func (fn traceProviderOptionFunc) apply(cfg tracerProviderConfig) tracerProviderConfig {
return fn(cfg)
}
// WithSyncer registers the exporter with the TracerProvider using a
@@ -264,8 +288,9 @@ func WithBatcher(e SpanExporter, opts ...BatchSpanProcessorOption) TracerProvide
// WithSpanProcessor registers the SpanProcessor with a TracerProvider.
func WithSpanProcessor(sp SpanProcessor) TracerProviderOption {
return traceProviderOptionFunc(func(cfg *tracerProviderConfig) {
return traceProviderOptionFunc(func(cfg tracerProviderConfig) tracerProviderConfig {
cfg.processors = append(cfg.processors, sp)
return cfg
})
}
@@ -277,12 +302,13 @@ func WithSpanProcessor(sp SpanProcessor) TracerProviderOption {
// If this option is not used, the TracerProvider will use the
// resource.Default() Resource by default.
func WithResource(r *resource.Resource) TracerProviderOption {
return traceProviderOptionFunc(func(cfg *tracerProviderConfig) {
return traceProviderOptionFunc(func(cfg tracerProviderConfig) tracerProviderConfig {
var err error
cfg.resource, err = resource.Merge(resource.Environment(), r)
if err != nil {
otel.Handle(err)
}
return cfg
})
}
@@ -294,10 +320,11 @@ func WithResource(r *resource.Resource) TracerProviderOption {
// If this option is not used, the TracerProvider will use a random number
// IDGenerator by default.
func WithIDGenerator(g IDGenerator) TracerProviderOption {
return traceProviderOptionFunc(func(cfg *tracerProviderConfig) {
return traceProviderOptionFunc(func(cfg tracerProviderConfig) tracerProviderConfig {
if g != nil {
cfg.idGenerator = g
}
return cfg
})
}
@@ -309,10 +336,11 @@ func WithIDGenerator(g IDGenerator) TracerProviderOption {
// If this option is not used, the TracerProvider will use a
// ParentBased(AlwaysSample) Sampler by default.
func WithSampler(s Sampler) TracerProviderOption {
return traceProviderOptionFunc(func(cfg *tracerProviderConfig) {
return traceProviderOptionFunc(func(cfg tracerProviderConfig) tracerProviderConfig {
if s != nil {
cfg.sampler = s
}
return cfg
})
}
@@ -324,13 +352,14 @@ func WithSampler(s Sampler) TracerProviderOption {
// If this option is not used, the TracerProvider will use the default
// SpanLimits.
func WithSpanLimits(sl SpanLimits) TracerProviderOption {
return traceProviderOptionFunc(func(cfg *tracerProviderConfig) {
return traceProviderOptionFunc(func(cfg tracerProviderConfig) tracerProviderConfig {
cfg.spanLimits = sl
return cfg
})
}
// ensureValidTracerProviderConfig ensures that given TracerProviderConfig is valid.
func ensureValidTracerProviderConfig(cfg *tracerProviderConfig) {
func ensureValidTracerProviderConfig(cfg tracerProviderConfig) tracerProviderConfig {
if cfg.sampler == nil {
cfg.sampler = ParentBased(AlwaysSample())
}
@@ -341,4 +370,5 @@ func ensureValidTracerProviderConfig(cfg *tracerProviderConfig) {
if cfg.resource == nil {
cfg.resource = resource.Default()
}
return cfg
}

View File

@@ -187,7 +187,7 @@ func configureSamplersForParentBased(samplers []ParentBasedSamplerOption) sample
}
for _, so := range samplers {
so.apply(&c)
c = so.apply(c)
}
return c
@@ -201,7 +201,7 @@ type samplerConfig struct {
// ParentBasedSamplerOption configures the sampler for a particular sampling case.
type ParentBasedSamplerOption interface {
apply(*samplerConfig)
apply(samplerConfig) samplerConfig
}
// WithRemoteParentSampled sets the sampler for the case of sampled remote parent.
@@ -213,8 +213,9 @@ type remoteParentSampledOption struct {
s Sampler
}
func (o remoteParentSampledOption) apply(config *samplerConfig) {
func (o remoteParentSampledOption) apply(config samplerConfig) samplerConfig {
config.remoteParentSampled = o.s
return config
}
// WithRemoteParentNotSampled sets the sampler for the case of remote parent
@@ -227,8 +228,9 @@ type remoteParentNotSampledOption struct {
s Sampler
}
func (o remoteParentNotSampledOption) apply(config *samplerConfig) {
func (o remoteParentNotSampledOption) apply(config samplerConfig) samplerConfig {
config.remoteParentNotSampled = o.s
return config
}
// WithLocalParentSampled sets the sampler for the case of sampled local parent.
@@ -240,8 +242,9 @@ type localParentSampledOption struct {
s Sampler
}
func (o localParentSampledOption) apply(config *samplerConfig) {
func (o localParentSampledOption) apply(config samplerConfig) samplerConfig {
config.localParentSampled = o.s
return config
}
// WithLocalParentNotSampled sets the sampler for the case of local parent
@@ -254,8 +257,9 @@ type localParentNotSampledOption struct {
s Sampler
}
func (o localParentNotSampledOption) apply(config *samplerConfig) {
func (o localParentNotSampledOption) apply(config samplerConfig) samplerConfig {
config.localParentNotSampled = o.s
return config
}
func (pb parentBased) ShouldSample(p SamplingParameters) SamplingResult {

View File

@@ -115,3 +115,14 @@ func (ssp *simpleSpanProcessor) Shutdown(ctx context.Context) error {
func (ssp *simpleSpanProcessor) ForceFlush(context.Context) error {
return nil
}
// MarshalLog is the marshaling function used by the logging system to represent this exporter.
func (ssp *simpleSpanProcessor) MarshalLog() interface{} {
return struct {
Type string
Exporter SpanExporter
}{
Type: "SimpleSpanProcessor",
Exporter: ssp.exporter,
}
}

View File

@@ -54,6 +54,7 @@ type ReadOnlySpan interface {
// the span has not ended.
EndTime() time.Time
// Attributes returns the defining attributes of the span.
// The order of the returned attributes is not guaranteed to be stable across invocations.
Attributes() []attribute.KeyValue
// Links returns all the links the span has to other spans.
Links() []Link
@@ -126,35 +127,29 @@ type recordingSpan struct {
// childSpanCount holds the number of child spans created for this span.
childSpanCount int
// resource contains attributes representing an entity that produced this
// span.
resource *resource.Resource
// instrumentationLibrary defines the instrumentation library used to
// provide instrumentation.
instrumentationLibrary instrumentation.Library
// spanContext holds the SpanContext of this span.
spanContext trace.SpanContext
// attributes are capped at configured limit. When the capacity is reached
// an oldest entry is removed to create room for a new entry.
attributes *attributesMap
// attributes is a collection of user provided key/values. The collection
// is constrained by a configurable maximum held by the parent
// TracerProvider. When additional attributes are added after this maximum
// is reached these attributes the user is attempting to add are dropped.
// This dropped number of attributes is tracked and reported in the
// ReadOnlySpan exported when the span ends.
attributes []attribute.KeyValue
droppedAttributes int
// events are stored in FIFO queue capped by configured limit.
events *evictedQueue
events evictedQueue
// links are stored in FIFO queue capped by configured limit.
links *evictedQueue
links evictedQueue
// executionTracerTaskEnd ends the execution tracer span.
executionTracerTaskEnd func()
// tracer is the SDK tracer that created this span.
tracer *tracer
// spanLimits holds the limits to this span.
spanLimits SpanLimits
}
var _ ReadWriteSpan = (*recordingSpan)(nil)
@@ -205,11 +200,80 @@ func (s *recordingSpan) SetStatus(code codes.Code, description string) {
// will be overwritten with the value contained in attributes.
//
// If this span is not being recorded than this method does nothing.
//
// If adding attributes to the span would exceed the maximum amount of
// attributes the span is configured to have, the last added attributes will
// be dropped.
func (s *recordingSpan) SetAttributes(attributes ...attribute.KeyValue) {
if !s.IsRecording() {
return
}
s.copyToCappedAttributes(attributes...)
s.mu.Lock()
defer s.mu.Unlock()
// If adding these attributes could exceed the capacity of s perform a
// de-duplication and truncation while adding to avoid over allocation.
if len(s.attributes)+len(attributes) > s.tracer.provider.spanLimits.AttributeCountLimit {
s.addOverCapAttrs(attributes)
return
}
// Otherwise, add without deduplication. When attributes are read they
// will be deduplicated, optimizing the operation.
for _, a := range attributes {
if !a.Valid() {
// Drop all invalid attributes.
s.droppedAttributes++
continue
}
s.attributes = append(s.attributes, a)
}
}
// addOverCapAttrs adds the attributes attrs to the span s while
// de-duplicating the attributes of s and attrs and dropping attributes that
// exceed the capacity of s.
//
// This method assumes s.mu.Lock is held by the caller.
//
// This method should only be called when there is a possibility that adding
// attrs to s will exceed the capacity of s. Otherwise, attrs should be added
// to s without checking for duplicates and all retrieval methods of the
// attributes for s will de-duplicate as needed.
func (s *recordingSpan) addOverCapAttrs(attrs []attribute.KeyValue) {
// In order to not allocate more capacity to s.attributes than needed,
// prune and truncate this addition of attributes while adding.
// Do not set a capacity when creating this map. Benchmark testing has
// showed this to only add unused memory allocations in general use.
exists := make(map[attribute.Key]int)
s.dedupeAttrsFromRecord(&exists)
// Now that s.attributes is deduplicated, adding unique attributes up to
// the capacity of s will not over allocate s.attributes.
for _, a := range attrs {
if !a.Valid() {
// Drop all invalid attributes.
s.droppedAttributes++
continue
}
if idx, ok := exists[a.Key]; ok {
// Perform all updates before dropping, even when at capacity.
s.attributes[idx] = a
continue
}
if len(s.attributes) >= s.tracer.provider.spanLimits.AttributeCountLimit {
// Do not just drop all of the remaining attributes, make sure
// updates are checked and performed.
s.droppedAttributes++
} else {
s.attributes = append(s.attributes, a)
exists[a.Key] = len(s.attributes) - 1
}
}
}
// End ends the span. This method does nothing if the span is already ended or
@@ -336,9 +400,9 @@ func (s *recordingSpan) addEvent(name string, o ...trace.EventOption) {
// Discard over limited attributes
attributes := c.Attributes()
var discarded int
if len(attributes) > s.spanLimits.AttributePerEventCountLimit {
discarded = len(attributes) - s.spanLimits.AttributePerEventCountLimit
attributes = attributes[:s.spanLimits.AttributePerEventCountLimit]
if len(attributes) > s.tracer.provider.spanLimits.AttributePerEventCountLimit {
discarded = len(attributes) - s.tracer.provider.spanLimits.AttributePerEventCountLimit
attributes = attributes[:s.tracer.provider.spanLimits.AttributePerEventCountLimit]
}
s.mu.Lock()
defer s.mu.Unlock()
@@ -399,13 +463,45 @@ func (s *recordingSpan) EndTime() time.Time {
}
// Attributes returns the attributes of this span.
//
// The order of the returned attributes is not guaranteed to be stable.
func (s *recordingSpan) Attributes() []attribute.KeyValue {
s.mu.Lock()
defer s.mu.Unlock()
if s.attributes.evictList.Len() == 0 {
return []attribute.KeyValue{}
s.dedupeAttrs()
return s.attributes
}
// dedupeAttrs deduplicates the attributes of s to fit capacity.
//
// This method assumes s.mu.Lock is held by the caller.
func (s *recordingSpan) dedupeAttrs() {
// Do not set a capacity when creating this map. Benchmark testing has
// showed this to only add unused memory allocations in general use.
exists := make(map[attribute.Key]int)
s.dedupeAttrsFromRecord(&exists)
}
// dedupeAttrsFromRecord deduplicates the attributes of s to fit capacity
// using record as the record of unique attribute keys to their index.
//
// This method assumes s.mu.Lock is held by the caller.
func (s *recordingSpan) dedupeAttrsFromRecord(record *map[attribute.Key]int) {
// Use the fact that slices share the same backing array.
unique := s.attributes[:0]
for _, a := range s.attributes {
if idx, ok := (*record)[a.Key]; ok {
unique[idx] = a
} else {
unique = append(unique, a)
(*record)[a.Key] = len(unique) - 1
}
}
return s.attributes.toKeyValue()
// s.attributes have element types of attribute.KeyValue. These types are
// not pointers and they themselves do not contain pointer fields,
// therefore the duplicate values do not need to be zeroed for them to be
// garbage collected.
s.attributes = unique
}
// Links returns the links of this span.
@@ -440,7 +536,7 @@ func (s *recordingSpan) Status() Status {
func (s *recordingSpan) InstrumentationLibrary() instrumentation.Library {
s.mu.Lock()
defer s.mu.Unlock()
return s.instrumentationLibrary
return s.tracer.instrumentationLibrary
}
// Resource returns the Resource associated with the Tracer that created this
@@ -448,7 +544,7 @@ func (s *recordingSpan) InstrumentationLibrary() instrumentation.Library {
func (s *recordingSpan) Resource() *resource.Resource {
s.mu.Lock()
defer s.mu.Unlock()
return s.resource
return s.tracer.provider.resource
}
func (s *recordingSpan) addLink(link trace.Link) {
@@ -461,9 +557,9 @@ func (s *recordingSpan) addLink(link trace.Link) {
var droppedAttributeCount int
// Discard over limited attributes
if len(link.Attributes) > s.spanLimits.AttributePerLinkCountLimit {
droppedAttributeCount = len(link.Attributes) - s.spanLimits.AttributePerLinkCountLimit
link.Attributes = link.Attributes[:s.spanLimits.AttributePerLinkCountLimit]
if len(link.Attributes) > s.tracer.provider.spanLimits.AttributePerLinkCountLimit {
droppedAttributeCount = len(link.Attributes) - s.tracer.provider.spanLimits.AttributePerLinkCountLimit
link.Attributes = link.Attributes[:s.tracer.provider.spanLimits.AttributePerLinkCountLimit]
}
s.links.add(Link{link.SpanContext, link.Attributes, droppedAttributeCount})
@@ -474,7 +570,7 @@ func (s *recordingSpan) addLink(link trace.Link) {
func (s *recordingSpan) DroppedAttributes() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.attributes.droppedCount
return s.droppedAttributes
}
// DroppedLinks returns the number of links dropped by the span due to limits
@@ -514,20 +610,21 @@ func (s *recordingSpan) snapshot() ReadOnlySpan {
defer s.mu.Unlock()
sd.endTime = s.endTime
sd.instrumentationLibrary = s.instrumentationLibrary
sd.instrumentationLibrary = s.tracer.instrumentationLibrary
sd.name = s.name
sd.parent = s.parent
sd.resource = s.resource
sd.resource = s.tracer.provider.resource
sd.spanContext = s.spanContext
sd.spanKind = s.spanKind
sd.startTime = s.startTime
sd.status = s.status
sd.childSpanCount = s.childSpanCount
if s.attributes.evictList.Len() > 0 {
sd.attributes = s.attributes.toKeyValue()
sd.droppedAttributeCount = s.attributes.droppedCount
if len(s.attributes) > 0 {
s.dedupeAttrs()
sd.attributes = s.attributes
}
sd.droppedAttributeCount = s.droppedAttributes
if len(s.events.queue) > 0 {
sd.events = s.interfaceArrayToEventArray()
sd.droppedEventCount = s.events.droppedCount
@@ -555,18 +652,6 @@ func (s *recordingSpan) interfaceArrayToEventArray() []Event {
return eventArr
}
func (s *recordingSpan) copyToCappedAttributes(attributes ...attribute.KeyValue) {
s.mu.Lock()
defer s.mu.Unlock()
for _, a := range attributes {
// Ensure attributes conform to the specification:
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.0.1/specification/common/common.md#attributes
if a.Valid() {
s.attributes.add(a)
}
}
}
func (s *recordingSpan) addChild() {
if !s.IsRecording() {
return

View File

@@ -122,18 +122,22 @@ func (tr *tracer) newRecordingSpan(psc, sc trace.SpanContext, name string, sr Sa
}
s := &recordingSpan{
parent: psc,
spanContext: sc,
spanKind: trace.ValidateSpanKind(config.SpanKind()),
name: name,
startTime: startTime,
attributes: newAttributesMap(tr.provider.spanLimits.AttributeCountLimit),
events: newEvictedQueue(tr.provider.spanLimits.EventCountLimit),
links: newEvictedQueue(tr.provider.spanLimits.LinkCountLimit),
tracer: tr,
spanLimits: tr.provider.spanLimits,
resource: tr.provider.resource,
instrumentationLibrary: tr.instrumentationLibrary,
// Do not pre-allocate the attributes slice here! Doing so will
// allocate memory that is likely never going to be used, or if used,
// will be over-sized. The default Go compiler has been tested to
// dynamically allocate needed space very well. Benchmarking has shown
// it to be more performant than what we can predetermine here,
// especially for the common use case of few to no added
// attributes.
parent: psc,
spanContext: sc,
spanKind: trace.ValidateSpanKind(config.SpanKind()),
name: name,
startTime: startTime,
events: newEvictedQueue(tr.provider.spanLimits.EventCountLimit),
links: newEvictedQueue(tr.provider.spanLimits.LinkCountLimit),
tracer: tr,
}
for _, l := range config.Links() {