vendor: update buildkit to opentelemetry support

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi
2021-06-15 21:02:39 -07:00
parent 6ba080d337
commit 334c93fbbe
829 changed files with 89541 additions and 24438 deletions

View File

@@ -0,0 +1,429 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package connection
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/cenkalti/backoff/v4"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc/encoding/gzip"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
type Connection struct {
// Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines.
lastConnectErrPtr unsafe.Pointer
// mu protects the Connection as it is accessed by the
// exporter goroutines and background Connection goroutine
mu sync.Mutex
cc *grpc.ClientConn
// these fields are read-only after constructor is finished
cfg otlpconfig.Config
SCfg otlpconfig.SignalConfig
metadata metadata.MD
newConnectionHandler func(cc *grpc.ClientConn)
// these channels are created once
disconnectedCh chan bool
backgroundConnectionDoneCh chan struct{}
stopCh chan struct{}
// this is for tests, so they can replace the closing
// routine without a worry of modifying some global variable
// or changing it back to original after the test is done
closeBackgroundConnectionDoneCh func(ch chan struct{})
}
func NewConnection(cfg otlpconfig.Config, sCfg otlpconfig.SignalConfig, handler func(cc *grpc.ClientConn)) *Connection {
c := new(Connection)
c.newConnectionHandler = handler
c.cfg = cfg
c.SCfg = sCfg
if len(c.SCfg.Headers) > 0 {
c.metadata = metadata.New(c.SCfg.Headers)
}
c.closeBackgroundConnectionDoneCh = func(ch chan struct{}) {
close(ch)
}
return c
}
func (c *Connection) StartConnection(ctx context.Context) error {
c.stopCh = make(chan struct{})
c.disconnectedCh = make(chan bool, 1)
c.backgroundConnectionDoneCh = make(chan struct{})
if err := c.connect(ctx); err == nil {
c.setStateConnected()
} else {
c.SetStateDisconnected(err)
}
go c.indefiniteBackgroundConnection()
// TODO: proper error handling when initializing connections.
// We can report permanent errors, e.g., invalid settings.
return nil
}
func (c *Connection) LastConnectError() error {
errPtr := (*error)(atomic.LoadPointer(&c.lastConnectErrPtr))
if errPtr == nil {
return nil
}
return *errPtr
}
func (c *Connection) saveLastConnectError(err error) {
var errPtr *error
if err != nil {
errPtr = &err
}
atomic.StorePointer(&c.lastConnectErrPtr, unsafe.Pointer(errPtr))
}
func (c *Connection) SetStateDisconnected(err error) {
c.saveLastConnectError(err)
select {
case c.disconnectedCh <- true:
default:
}
c.newConnectionHandler(nil)
}
func (c *Connection) setStateConnected() {
c.saveLastConnectError(nil)
}
func (c *Connection) Connected() bool {
return c.LastConnectError() == nil
}
const defaultConnReattemptPeriod = 10 * time.Second
func (c *Connection) indefiniteBackgroundConnection() {
defer func() {
c.closeBackgroundConnectionDoneCh(c.backgroundConnectionDoneCh)
}()
connReattemptPeriod := c.cfg.ReconnectionPeriod
if connReattemptPeriod <= 0 {
connReattemptPeriod = defaultConnReattemptPeriod
}
// No strong seeding required, nano time can
// already help with pseudo uniqueness.
rng := rand.New(rand.NewSource(time.Now().UnixNano() + rand.Int63n(1024)))
// maxJitterNanos: 70% of the connectionReattemptPeriod
maxJitterNanos := int64(0.7 * float64(connReattemptPeriod))
for {
// Otherwise these will be the normal scenarios to enable
// reconnection if we trip out.
// 1. If we've stopped, return entirely
// 2. Otherwise block until we are disconnected, and
// then retry connecting
select {
case <-c.stopCh:
return
case <-c.disconnectedCh:
// Quickly check if we haven't stopped at the
// same time.
select {
case <-c.stopCh:
return
default:
}
// Normal scenario that we'll wait for
}
if err := c.connect(context.Background()); err == nil {
c.setStateConnected()
} else {
// this code is unreachable in most cases
// c.connect does not establish Connection
c.SetStateDisconnected(err)
}
// Apply some jitter to avoid lockstep retrials of other
// collector-exporters. Lockstep retrials could result in an
// innocent DDOS, by clogging the machine's resources and network.
jitter := time.Duration(rng.Int63n(maxJitterNanos))
select {
case <-c.stopCh:
return
case <-time.After(connReattemptPeriod + jitter):
}
}
}
func (c *Connection) connect(ctx context.Context) error {
cc, err := c.dialToCollector(ctx)
if err != nil {
return err
}
c.setConnection(cc)
c.newConnectionHandler(cc)
return nil
}
// setConnection sets cc as the client Connection and returns true if
// the Connection state changed.
func (c *Connection) setConnection(cc *grpc.ClientConn) bool {
c.mu.Lock()
defer c.mu.Unlock()
// If previous clientConn is same as the current then just return.
// This doesn't happen right now as this func is only called with new ClientConn.
// It is more about future-proofing.
if c.cc == cc {
return false
}
// If the previous clientConn was non-nil, close it
if c.cc != nil {
_ = c.cc.Close()
}
c.cc = cc
return true
}
func (c *Connection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) {
dialOpts := []grpc.DialOption{}
if c.cfg.ServiceConfig != "" {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(c.cfg.ServiceConfig))
}
if c.SCfg.GRPCCredentials != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(c.SCfg.GRPCCredentials))
} else if c.SCfg.Insecure {
dialOpts = append(dialOpts, grpc.WithInsecure())
}
if c.SCfg.Compression == otlpconfig.GzipCompression {
dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)))
}
if len(c.cfg.DialOptions) != 0 {
dialOpts = append(dialOpts, c.cfg.DialOptions...)
}
ctx, cancel := c.ContextWithStop(ctx)
defer cancel()
ctx = c.ContextWithMetadata(ctx)
return grpc.DialContext(ctx, c.SCfg.Endpoint, dialOpts...)
}
func (c *Connection) ContextWithMetadata(ctx context.Context) context.Context {
if c.metadata.Len() > 0 {
return metadata.NewOutgoingContext(ctx, c.metadata)
}
return ctx
}
func (c *Connection) Shutdown(ctx context.Context) error {
close(c.stopCh)
// Ensure that the backgroundConnector returns
select {
case <-c.backgroundConnectionDoneCh:
case <-ctx.Done():
return ctx.Err()
}
c.mu.Lock()
cc := c.cc
c.cc = nil
c.mu.Unlock()
if cc != nil {
return cc.Close()
}
return nil
}
func (c *Connection) ContextWithStop(ctx context.Context) (context.Context, context.CancelFunc) {
// Unify the parent context Done signal with the Connection's
// stop channel.
ctx, cancel := context.WithCancel(ctx)
go func(ctx context.Context, cancel context.CancelFunc) {
select {
case <-ctx.Done():
// Nothing to do, either cancelled or deadline
// happened.
case <-c.stopCh:
cancel()
}
}(ctx, cancel)
return ctx, cancel
}
func (c *Connection) DoRequest(ctx context.Context, fn func(context.Context) error) error {
expBackoff := newExponentialBackoff(c.cfg.RetrySettings)
for {
err := fn(ctx)
if err == nil {
// request succeeded.
return nil
}
if !c.cfg.RetrySettings.Enabled {
return err
}
// We have an error, check gRPC status code.
st := status.Convert(err)
if st.Code() == codes.OK {
// Not really an error, still success.
return nil
}
// Now, this is this a real error.
if !shouldRetry(st.Code()) {
// It is not a retryable error, we should not retry.
return err
}
// Need to retry.
throttle := getThrottleDuration(st)
backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
// throw away the batch
err = fmt.Errorf("max elapsed time expired: %w", err)
return err
}
var delay time.Duration
if backoffDelay > throttle {
delay = backoffDelay
} else {
if expBackoff.GetElapsedTime()+throttle > expBackoff.MaxElapsedTime {
err = fmt.Errorf("max elapsed time expired when respecting server throttle: %w", err)
return err
}
// Respect server throttling.
delay = throttle
}
// back-off, but get interrupted when shutting down or request is cancelled or timed out.
err = func() error {
dt := time.NewTimer(delay)
defer dt.Stop()
select {
case <-ctx.Done():
return ctx.Err()
case <-c.stopCh:
return fmt.Errorf("interrupted due to shutdown: %w", err)
case <-dt.C:
}
return nil
}()
if err != nil {
return err
}
}
}
func shouldRetry(code codes.Code) bool {
switch code {
case codes.OK:
// Success. This function should not be called for this code, the best we
// can do is tell the caller not to retry.
return false
case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
// These are retryable errors.
return true
case codes.Unknown,
codes.InvalidArgument,
codes.Unauthenticated,
codes.PermissionDenied,
codes.NotFound,
codes.AlreadyExists,
codes.FailedPrecondition,
codes.Unimplemented,
codes.Internal:
// These are fatal errors, don't retry.
return false
default:
// Don't retry on unknown codes.
return false
}
}
func getThrottleDuration(status *status.Status) time.Duration {
// See if throttling information is available.
for _, detail := range status.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 {
// We are throttled. Wait before retrying as requested by the server.
return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond
}
return 0
}
}
return 0
}
func newExponentialBackoff(rs otlpconfig.RetrySettings) *backoff.ExponentialBackOff {
// Do not use NewExponentialBackOff since it calls Reset and the code here must
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
expBackoff := &backoff.ExponentialBackOff{
InitialInterval: rs.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: rs.MaxInterval,
MaxElapsedTime: rs.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
expBackoff.Reset()
return expBackoff
}

View File

@@ -0,0 +1,195 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpconfig
import (
"crypto/tls"
"fmt"
"io/ioutil"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"
"go.opentelemetry.io/otel"
)
var httpSchemeRegexp = regexp.MustCompile(`(?i)^(http://|https://)`)
func ApplyGRPCEnvConfigs(cfg *Config) {
e := EnvOptionsReader{
GetEnv: os.Getenv,
ReadFile: ioutil.ReadFile,
}
e.ApplyGRPCEnvConfigs(cfg)
}
func ApplyHTTPEnvConfigs(cfg *Config) {
e := EnvOptionsReader{
GetEnv: os.Getenv,
ReadFile: ioutil.ReadFile,
}
e.ApplyHTTPEnvConfigs(cfg)
}
type EnvOptionsReader struct {
GetEnv func(string) string
ReadFile func(filename string) ([]byte, error)
}
func (e *EnvOptionsReader) ApplyHTTPEnvConfigs(cfg *Config) {
opts := e.GetOptionsFromEnv()
for _, opt := range opts {
opt.ApplyHTTPOption(cfg)
}
}
func (e *EnvOptionsReader) ApplyGRPCEnvConfigs(cfg *Config) {
opts := e.GetOptionsFromEnv()
for _, opt := range opts {
opt.ApplyGRPCOption(cfg)
}
}
func (e *EnvOptionsReader) GetOptionsFromEnv() []GenericOption {
var opts []GenericOption
// Endpoint
if v, ok := e.getEnvValue("ENDPOINT"); ok {
if isInsecureEndpoint(v) {
opts = append(opts, WithInsecure())
} else {
opts = append(opts, WithSecure())
}
opts = append(opts, WithEndpoint(trimSchema(v)))
}
if v, ok := e.getEnvValue("TRACES_ENDPOINT"); ok {
if isInsecureEndpoint(v) {
opts = append(opts, WithInsecure())
} else {
opts = append(opts, WithSecure())
}
opts = append(opts, WithEndpoint(trimSchema(v)))
}
// Certificate File
if path, ok := e.getEnvValue("CERTIFICATE"); ok {
if tls, err := e.readTLSConfig(path); err == nil {
opts = append(opts, WithTLSClientConfig(tls))
} else {
otel.Handle(fmt.Errorf("failed to configure otlp exporter certificate '%s': %w", path, err))
}
}
if path, ok := e.getEnvValue("TRACES_CERTIFICATE"); ok {
if tls, err := e.readTLSConfig(path); err == nil {
opts = append(opts, WithTLSClientConfig(tls))
} else {
otel.Handle(fmt.Errorf("failed to configure otlp traces exporter certificate '%s': %w", path, err))
}
}
// Headers
if h, ok := e.getEnvValue("HEADERS"); ok {
opts = append(opts, WithHeaders(stringToHeader(h)))
}
if h, ok := e.getEnvValue("TRACES_HEADERS"); ok {
opts = append(opts, WithHeaders(stringToHeader(h)))
}
// Compression
if c, ok := e.getEnvValue("COMPRESSION"); ok {
opts = append(opts, WithCompression(stringToCompression(c)))
}
if c, ok := e.getEnvValue("TRACES_COMPRESSION"); ok {
opts = append(opts, WithCompression(stringToCompression(c)))
}
// Timeout
if t, ok := e.getEnvValue("TIMEOUT"); ok {
if d, err := strconv.Atoi(t); err == nil {
opts = append(opts, WithTimeout(time.Duration(d)*time.Millisecond))
}
}
if t, ok := e.getEnvValue("TRACES_TIMEOUT"); ok {
if d, err := strconv.Atoi(t); err == nil {
opts = append(opts, WithTimeout(time.Duration(d)*time.Millisecond))
}
}
return opts
}
func isInsecureEndpoint(endpoint string) bool {
return strings.HasPrefix(strings.ToLower(endpoint), "http://") || strings.HasPrefix(strings.ToLower(endpoint), "unix://")
}
func trimSchema(endpoint string) string {
return httpSchemeRegexp.ReplaceAllString(endpoint, "")
}
// getEnvValue gets an OTLP environment variable value of the specified key using the GetEnv function.
// This function already prepends the OTLP prefix to all key lookup.
func (e *EnvOptionsReader) getEnvValue(key string) (string, bool) {
v := strings.TrimSpace(e.GetEnv(fmt.Sprintf("OTEL_EXPORTER_OTLP_%s", key)))
return v, v != ""
}
func (e *EnvOptionsReader) readTLSConfig(path string) (*tls.Config, error) {
b, err := e.ReadFile(path)
if err != nil {
return nil, err
}
return CreateTLSConfig(b)
}
func stringToCompression(value string) Compression {
switch value {
case "gzip":
return GzipCompression
}
return NoCompression
}
func stringToHeader(value string) map[string]string {
headersPairs := strings.Split(value, ",")
headers := make(map[string]string)
for _, header := range headersPairs {
nameValue := strings.SplitN(header, "=", 2)
if len(nameValue) < 2 {
continue
}
name, err := url.QueryUnescape(nameValue[0])
if err != nil {
continue
}
trimmedName := strings.TrimSpace(name)
value, err := url.QueryUnescape(nameValue[1])
if err != nil {
continue
}
trimmedValue := strings.TrimSpace(value)
headers[trimmedName] = trimmedValue
}
return headers
}

View File

@@ -0,0 +1,270 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpconfig // import "go.opentelemetry.io/otel/exporters/otlp/internal/otlpconfig"
import (
"crypto/tls"
"fmt"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const (
// DefaultMaxAttempts describes how many times the driver
// should retry the sending of the payload in case of a
// retryable error.
DefaultMaxAttempts int = 5
// DefaultTracesPath is a default URL path for endpoint that
// receives spans.
DefaultTracesPath string = "/v1/traces"
// DefaultBackoff is a default base backoff time used in the
// exponential backoff strategy.
DefaultBackoff time.Duration = 300 * time.Millisecond
// DefaultTimeout is a default max waiting time for the backend to process
// each span batch.
DefaultTimeout time.Duration = 10 * time.Second
)
var (
// defaultRetrySettings is a default settings for the retry policy.
defaultRetrySettings = RetrySettings{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
}
)
type (
SignalConfig struct {
Endpoint string
Insecure bool
TLSCfg *tls.Config
Headers map[string]string
Compression Compression
Timeout time.Duration
URLPath string
// gRPC configurations
GRPCCredentials credentials.TransportCredentials
}
Config struct {
// Signal specific configurations
Traces SignalConfig
// HTTP configurations
MaxAttempts int
Backoff time.Duration
// gRPC configurations
ReconnectionPeriod time.Duration
ServiceConfig string
DialOptions []grpc.DialOption
RetrySettings RetrySettings
}
)
func NewDefaultConfig() Config {
c := Config{
Traces: SignalConfig{
Endpoint: fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort),
URLPath: DefaultTracesPath,
Compression: NoCompression,
Timeout: DefaultTimeout,
},
MaxAttempts: DefaultMaxAttempts,
Backoff: DefaultBackoff,
RetrySettings: defaultRetrySettings,
}
return c
}
type (
// GenericOption applies an option to the HTTP or gRPC driver.
GenericOption interface {
ApplyHTTPOption(*Config)
ApplyGRPCOption(*Config)
// A private method to prevent users implementing the
// interface and so future additions to it will not
// violate compatibility.
private()
}
// HTTPOption applies an option to the HTTP driver.
HTTPOption interface {
ApplyHTTPOption(*Config)
// A private method to prevent users implementing the
// interface and so future additions to it will not
// violate compatibility.
private()
}
// GRPCOption applies an option to the gRPC driver.
GRPCOption interface {
ApplyGRPCOption(*Config)
// A private method to prevent users implementing the
// interface and so future additions to it will not
// violate compatibility.
private()
}
)
// genericOption is an option that applies the same logic
// for both gRPC and HTTP.
type genericOption struct {
fn func(*Config)
}
func (g *genericOption) ApplyGRPCOption(cfg *Config) {
g.fn(cfg)
}
func (g *genericOption) ApplyHTTPOption(cfg *Config) {
g.fn(cfg)
}
func (genericOption) private() {}
func newGenericOption(fn func(cfg *Config)) GenericOption {
return &genericOption{fn: fn}
}
// splitOption is an option that applies different logics
// for gRPC and HTTP.
type splitOption struct {
httpFn func(*Config)
grpcFn func(*Config)
}
func (g *splitOption) ApplyGRPCOption(cfg *Config) {
g.grpcFn(cfg)
}
func (g *splitOption) ApplyHTTPOption(cfg *Config) {
g.httpFn(cfg)
}
func (splitOption) private() {}
func newSplitOption(httpFn func(cfg *Config), grpcFn func(cfg *Config)) GenericOption {
return &splitOption{httpFn: httpFn, grpcFn: grpcFn}
}
// httpOption is an option that is only applied to the HTTP driver.
type httpOption struct {
fn func(*Config)
}
func (h *httpOption) ApplyHTTPOption(cfg *Config) {
h.fn(cfg)
}
func (httpOption) private() {}
func NewHTTPOption(fn func(cfg *Config)) HTTPOption {
return &httpOption{fn: fn}
}
// grpcOption is an option that is only applied to the gRPC driver.
type grpcOption struct {
fn func(*Config)
}
func (h *grpcOption) ApplyGRPCOption(cfg *Config) {
h.fn(cfg)
}
func (grpcOption) private() {}
func NewGRPCOption(fn func(cfg *Config)) GRPCOption {
return &grpcOption{fn: fn}
}
// Generic Options
func WithEndpoint(endpoint string) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Endpoint = endpoint
})
}
func WithCompression(compression Compression) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Compression = compression
})
}
func WithURLPath(urlPath string) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.URLPath = urlPath
})
}
func WithRetry(settings RetrySettings) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.RetrySettings = settings
})
}
func WithTLSClientConfig(tlsCfg *tls.Config) GenericOption {
return newSplitOption(func(cfg *Config) {
cfg.Traces.TLSCfg = tlsCfg.Clone()
}, func(cfg *Config) {
cfg.Traces.GRPCCredentials = credentials.NewTLS(tlsCfg)
})
}
func WithInsecure() GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Insecure = true
})
}
func WithSecure() GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Insecure = false
})
}
func WithHeaders(headers map[string]string) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Headers = headers
})
}
func WithTimeout(duration time.Duration) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Traces.Timeout = duration
})
}
func WithMaxAttempts(maxAttempts int) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.MaxAttempts = maxAttempts
})
}
func WithBackoff(duration time.Duration) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Backoff = duration
})
}

View File

@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpconfig // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/otlpconfig"
import "time"
const (
// DefaultCollectorPort is the port the Exporter will attempt connect to
// if no collector port is provided.
DefaultCollectorPort uint16 = 4317
// DefaultCollectorHost is the host address the Exporter will attempt
// connect to if no collector address is provided.
DefaultCollectorHost string = "localhost"
)
// Compression describes the compression used for payloads sent to the
// collector.
type Compression int
const (
// NoCompression tells the driver to send payloads without
// compression.
NoCompression Compression = iota
// GzipCompression tells the driver to send payloads after
// compressing them with gzip.
GzipCompression
)
// Marshaler describes the kind of message format sent to the collector
type Marshaler int
const (
// MarshalProto tells the driver to send using the protobuf binary format.
MarshalProto Marshaler = iota
// MarshalJSON tells the driver to send using json format.
MarshalJSON
)
// RetrySettings defines configuration for retrying batches in case of export failure
// using an exponential backoff.
type RetrySettings struct {
// Enabled indicates whether to not retry sending batches in case of export failure.
Enabled bool
// InitialInterval the time to wait after the first failure before retrying.
InitialInterval time.Duration
// MaxInterval is the upper bound on backoff interval. Once this value is reached the delay between
// consecutive retries will always be `MaxInterval`.
MaxInterval time.Duration
// MaxElapsedTime is the maximum amount of time (including retries) spent trying to send a request/batch.
// Once this value is reached, the data is discarded.
MaxElapsedTime time.Duration
}

View File

@@ -0,0 +1,34 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otlpconfig
import (
"crypto/tls"
"crypto/x509"
"errors"
)
// CreateTLSConfig creates a tls.Config from a raw certificate bytes
// to verify a server certificate.
func CreateTLSConfig(certBytes []byte) (*tls.Config, error) {
cp := x509.NewCertPool()
if ok := cp.AppendCertsFromPEM(certBytes); !ok {
return nil, errors.New("failed to append certificate to the cert pool")
}
return &tls.Config{
RootCAs: cp,
}, nil
}

View File

@@ -0,0 +1,141 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracetransform
import (
"reflect"
"go.opentelemetry.io/otel/attribute"
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
"go.opentelemetry.io/otel/sdk/resource"
)
// Attributes transforms a slice of KeyValues into a slice of OTLP attribute key-values.
func Attributes(attrs []attribute.KeyValue) []*commonpb.KeyValue {
if len(attrs) == 0 {
return nil
}
out := make([]*commonpb.KeyValue, 0, len(attrs))
for _, kv := range attrs {
out = append(out, toAttribute(kv))
}
return out
}
// ResourceAttributes transforms a Resource into a slice of OTLP attribute key-values.
func ResourceAttributes(resource *resource.Resource) []*commonpb.KeyValue {
if resource.Len() == 0 {
return nil
}
out := make([]*commonpb.KeyValue, 0, resource.Len())
for iter := resource.Iter(); iter.Next(); {
out = append(out, toAttribute(iter.Attribute()))
}
return out
}
func toAttribute(v attribute.KeyValue) *commonpb.KeyValue {
result := &commonpb.KeyValue{
Key: string(v.Key),
Value: new(commonpb.AnyValue),
}
switch v.Value.Type() {
case attribute.BOOL:
result.Value.Value = &commonpb.AnyValue_BoolValue{
BoolValue: v.Value.AsBool(),
}
case attribute.INT64:
result.Value.Value = &commonpb.AnyValue_IntValue{
IntValue: v.Value.AsInt64(),
}
case attribute.FLOAT64:
result.Value.Value = &commonpb.AnyValue_DoubleValue{
DoubleValue: v.Value.AsFloat64(),
}
case attribute.STRING:
result.Value.Value = &commonpb.AnyValue_StringValue{
StringValue: v.Value.AsString(),
}
case attribute.ARRAY:
result.Value.Value = &commonpb.AnyValue_ArrayValue{
ArrayValue: &commonpb.ArrayValue{
Values: arrayValues(v),
},
}
default:
result.Value.Value = &commonpb.AnyValue_StringValue{
StringValue: "INVALID",
}
}
return result
}
func arrayValues(kv attribute.KeyValue) []*commonpb.AnyValue {
a := kv.Value.AsArray()
aType := reflect.TypeOf(a)
var valueFunc func(reflect.Value) *commonpb.AnyValue
switch aType.Elem().Kind() {
case reflect.Bool:
valueFunc = func(v reflect.Value) *commonpb.AnyValue {
return &commonpb.AnyValue{
Value: &commonpb.AnyValue_BoolValue{
BoolValue: v.Bool(),
},
}
}
case reflect.Int, reflect.Int64:
valueFunc = func(v reflect.Value) *commonpb.AnyValue {
return &commonpb.AnyValue{
Value: &commonpb.AnyValue_IntValue{
IntValue: v.Int(),
},
}
}
case reflect.Uintptr:
valueFunc = func(v reflect.Value) *commonpb.AnyValue {
return &commonpb.AnyValue{
Value: &commonpb.AnyValue_IntValue{
IntValue: int64(v.Uint()),
},
}
}
case reflect.Float64:
valueFunc = func(v reflect.Value) *commonpb.AnyValue {
return &commonpb.AnyValue{
Value: &commonpb.AnyValue_DoubleValue{
DoubleValue: v.Float(),
},
}
}
case reflect.String:
valueFunc = func(v reflect.Value) *commonpb.AnyValue {
return &commonpb.AnyValue{
Value: &commonpb.AnyValue_StringValue{
StringValue: v.String(),
},
}
}
}
results := make([]*commonpb.AnyValue, aType.Len())
for i, aValue := 0, reflect.ValueOf(a); i < aValue.Len(); i++ {
results[i] = valueFunc(aValue.Index(i))
}
return results
}

View File

@@ -0,0 +1,31 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracetransform
import (
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
"go.opentelemetry.io/otel/sdk/instrumentation"
)
func InstrumentationLibrary(il instrumentation.Library) *commonpb.InstrumentationLibrary {
if il == (instrumentation.Library{}) {
return nil
}
return &commonpb.InstrumentationLibrary{
Name: il.Name,
Version: il.Version,
}
}

View File

@@ -0,0 +1,29 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracetransform
import (
resourcepb "go.opentelemetry.io/proto/otlp/resource/v1"
"go.opentelemetry.io/otel/sdk/resource"
)
// Resource transforms a Resource into an OTLP Resource.
func Resource(r *resource.Resource) *resourcepb.Resource {
if r == nil {
return nil
}
return &resourcepb.Resource{Attributes: ResourceAttributes(r)}
}

View File

@@ -0,0 +1,218 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tracetransform
import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
"go.opentelemetry.io/otel/sdk/instrumentation"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)
const (
maxEventsPerSpan = 128
)
// Spans transforms a slice of OpenTelemetry spans into a slice of OTLP
// ResourceSpans.
func Spans(sdl []tracesdk.ReadOnlySpan) []*tracepb.ResourceSpans {
if len(sdl) == 0 {
return nil
}
rsm := make(map[attribute.Distinct]*tracepb.ResourceSpans)
type ilsKey struct {
r attribute.Distinct
il instrumentation.Library
}
ilsm := make(map[ilsKey]*tracepb.InstrumentationLibrarySpans)
var resources int
for _, sd := range sdl {
if sd == nil {
continue
}
rKey := sd.Resource().Equivalent()
iKey := ilsKey{
r: rKey,
il: sd.InstrumentationLibrary(),
}
ils, iOk := ilsm[iKey]
if !iOk {
// Either the resource or instrumentation library were unknown.
ils = &tracepb.InstrumentationLibrarySpans{
InstrumentationLibrary: InstrumentationLibrary(sd.InstrumentationLibrary()),
Spans: []*tracepb.Span{},
}
}
ils.Spans = append(ils.Spans, span(sd))
ilsm[iKey] = ils
rs, rOk := rsm[rKey]
if !rOk {
resources++
// The resource was unknown.
rs = &tracepb.ResourceSpans{
Resource: Resource(sd.Resource()),
InstrumentationLibrarySpans: []*tracepb.InstrumentationLibrarySpans{ils},
}
rsm[rKey] = rs
continue
}
// The resource has been seen before. Check if the instrumentation
// library lookup was unknown because if so we need to add it to the
// ResourceSpans. Otherwise, the instrumentation library has already
// been seen and the append we did above will be included it in the
// InstrumentationLibrarySpans reference.
if !iOk {
rs.InstrumentationLibrarySpans = append(rs.InstrumentationLibrarySpans, ils)
}
}
// Transform the categorized map into a slice
rss := make([]*tracepb.ResourceSpans, 0, resources)
for _, rs := range rsm {
rss = append(rss, rs)
}
return rss
}
// span transforms a Span into an OTLP span.
func span(sd tracesdk.ReadOnlySpan) *tracepb.Span {
if sd == nil {
return nil
}
tid := sd.SpanContext().TraceID()
sid := sd.SpanContext().SpanID()
s := &tracepb.Span{
TraceId: tid[:],
SpanId: sid[:],
TraceState: sd.SpanContext().TraceState().String(),
Status: status(sd.Status().Code, sd.Status().Description),
StartTimeUnixNano: uint64(sd.StartTime().UnixNano()),
EndTimeUnixNano: uint64(sd.EndTime().UnixNano()),
Links: links(sd.Links()),
Kind: spanKind(sd.SpanKind()),
Name: sd.Name(),
Attributes: Attributes(sd.Attributes()),
Events: spanEvents(sd.Events()),
DroppedAttributesCount: uint32(sd.DroppedAttributes()),
DroppedEventsCount: uint32(sd.DroppedEvents()),
DroppedLinksCount: uint32(sd.DroppedLinks()),
}
if psid := sd.Parent().SpanID(); psid.IsValid() {
s.ParentSpanId = psid[:]
}
return s
}
// status transform a span code and message into an OTLP span status.
func status(status codes.Code, message string) *tracepb.Status {
var c tracepb.Status_StatusCode
switch status {
case codes.Error:
c = tracepb.Status_STATUS_CODE_ERROR
default:
c = tracepb.Status_STATUS_CODE_OK
}
return &tracepb.Status{
Code: c,
Message: message,
}
}
// links transforms span Links to OTLP span links.
func links(links []trace.Link) []*tracepb.Span_Link {
if len(links) == 0 {
return nil
}
sl := make([]*tracepb.Span_Link, 0, len(links))
for _, otLink := range links {
// This redefinition is necessary to prevent otLink.*ID[:] copies
// being reused -- in short we need a new otLink per iteration.
otLink := otLink
tid := otLink.SpanContext.TraceID()
sid := otLink.SpanContext.SpanID()
sl = append(sl, &tracepb.Span_Link{
TraceId: tid[:],
SpanId: sid[:],
Attributes: Attributes(otLink.Attributes),
})
}
return sl
}
// spanEvents transforms span Events to an OTLP span events.
func spanEvents(es []tracesdk.Event) []*tracepb.Span_Event {
if len(es) == 0 {
return nil
}
evCount := len(es)
if evCount > maxEventsPerSpan {
evCount = maxEventsPerSpan
}
events := make([]*tracepb.Span_Event, 0, evCount)
nEvents := 0
// Transform message events
for _, e := range es {
if nEvents >= maxEventsPerSpan {
break
}
nEvents++
events = append(events,
&tracepb.Span_Event{
Name: e.Name,
TimeUnixNano: uint64(e.Time.UnixNano()),
Attributes: Attributes(e.Attributes),
// TODO (rghetia) : Add Drop Counts when supported.
},
)
}
return events
}
// spanKind transforms a SpanKind to an OTLP span kind.
func spanKind(kind trace.SpanKind) tracepb.Span_SpanKind {
switch kind {
case trace.SpanKindInternal:
return tracepb.Span_SPAN_KIND_INTERNAL
case trace.SpanKindClient:
return tracepb.Span_SPAN_KIND_CLIENT
case trace.SpanKindServer:
return tracepb.Span_SPAN_KIND_SERVER
case trace.SpanKindProducer:
return tracepb.Span_SPAN_KIND_PRODUCER
case trace.SpanKindConsumer:
return tracepb.Span_SPAN_KIND_CONSUMER
default:
return tracepb.Span_SPAN_KIND_UNSPECIFIED
}
}