mirror of
				https://gitea.com/Lydanne/buildx.git
				synced 2025-10-31 08:03:43 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			240 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			240 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package otgrpc
 | |
| 
 | |
| import (
 | |
| 	"github.com/opentracing/opentracing-go"
 | |
| 	"github.com/opentracing/opentracing-go/ext"
 | |
| 	"github.com/opentracing/opentracing-go/log"
 | |
| 	"golang.org/x/net/context"
 | |
| 	"google.golang.org/grpc"
 | |
| 	"google.golang.org/grpc/metadata"
 | |
| 	"io"
 | |
| 	"runtime"
 | |
| 	"sync/atomic"
 | |
| )
 | |
| 
 | |
| // OpenTracingClientInterceptor returns a grpc.UnaryClientInterceptor suitable
 | |
| // for use in a grpc.Dial call.
 | |
| //
 | |
| // For example:
 | |
| //
 | |
| //     conn, err := grpc.Dial(
 | |
| //         address,
 | |
| //         ...,  // (existing DialOptions)
 | |
| //         grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(tracer)))
 | |
| //
 | |
| // All gRPC client spans will inject the OpenTracing SpanContext into the gRPC
 | |
| // metadata; they will also look in the context.Context for an active
 | |
| // in-process parent Span and establish a ChildOf reference if such a parent
 | |
| // Span could be found.
 | |
| func OpenTracingClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.UnaryClientInterceptor {
 | |
| 	otgrpcOpts := newOptions()
 | |
| 	otgrpcOpts.apply(optFuncs...)
 | |
| 	return func(
 | |
| 		ctx context.Context,
 | |
| 		method string,
 | |
| 		req, resp interface{},
 | |
| 		cc *grpc.ClientConn,
 | |
| 		invoker grpc.UnaryInvoker,
 | |
| 		opts ...grpc.CallOption,
 | |
| 	) error {
 | |
| 		var err error
 | |
| 		var parentCtx opentracing.SpanContext
 | |
| 		if parent := opentracing.SpanFromContext(ctx); parent != nil {
 | |
| 			parentCtx = parent.Context()
 | |
| 		}
 | |
| 		if otgrpcOpts.inclusionFunc != nil &&
 | |
| 			!otgrpcOpts.inclusionFunc(parentCtx, method, req, resp) {
 | |
| 			return invoker(ctx, method, req, resp, cc, opts...)
 | |
| 		}
 | |
| 		clientSpan := tracer.StartSpan(
 | |
| 			method,
 | |
| 			opentracing.ChildOf(parentCtx),
 | |
| 			ext.SpanKindRPCClient,
 | |
| 			gRPCComponentTag,
 | |
| 		)
 | |
| 		defer clientSpan.Finish()
 | |
| 		ctx = injectSpanContext(ctx, tracer, clientSpan)
 | |
| 		if otgrpcOpts.logPayloads {
 | |
| 			clientSpan.LogFields(log.Object("gRPC request", req))
 | |
| 		}
 | |
| 		err = invoker(ctx, method, req, resp, cc, opts...)
 | |
| 		if err == nil {
 | |
| 			if otgrpcOpts.logPayloads {
 | |
| 				clientSpan.LogFields(log.Object("gRPC response", resp))
 | |
| 			}
 | |
| 		} else {
 | |
| 			SetSpanTags(clientSpan, err, true)
 | |
| 			clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
 | |
| 		}
 | |
| 		if otgrpcOpts.decorator != nil {
 | |
| 			otgrpcOpts.decorator(clientSpan, method, req, resp, err)
 | |
| 		}
 | |
| 		return err
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // OpenTracingStreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
 | |
| // for use in a grpc.Dial call. The interceptor instruments streaming RPCs by creating
 | |
| // a single span to correspond to the lifetime of the RPC's stream.
 | |
| //
 | |
| // For example:
 | |
| //
 | |
| //     conn, err := grpc.Dial(
 | |
| //         address,
 | |
| //         ...,  // (existing DialOptions)
 | |
| //         grpc.WithStreamInterceptor(otgrpc.OpenTracingStreamClientInterceptor(tracer)))
 | |
| //
 | |
| // All gRPC client spans will inject the OpenTracing SpanContext into the gRPC
 | |
| // metadata; they will also look in the context.Context for an active
 | |
| // in-process parent Span and establish a ChildOf reference if such a parent
 | |
| // Span could be found.
 | |
| func OpenTracingStreamClientInterceptor(tracer opentracing.Tracer, optFuncs ...Option) grpc.StreamClientInterceptor {
 | |
| 	otgrpcOpts := newOptions()
 | |
| 	otgrpcOpts.apply(optFuncs...)
 | |
| 	return func(
 | |
| 		ctx context.Context,
 | |
| 		desc *grpc.StreamDesc,
 | |
| 		cc *grpc.ClientConn,
 | |
| 		method string,
 | |
| 		streamer grpc.Streamer,
 | |
| 		opts ...grpc.CallOption,
 | |
| 	) (grpc.ClientStream, error) {
 | |
| 		var err error
 | |
| 		var parentCtx opentracing.SpanContext
 | |
| 		if parent := opentracing.SpanFromContext(ctx); parent != nil {
 | |
| 			parentCtx = parent.Context()
 | |
| 		}
 | |
| 		if otgrpcOpts.inclusionFunc != nil &&
 | |
| 			!otgrpcOpts.inclusionFunc(parentCtx, method, nil, nil) {
 | |
| 			return streamer(ctx, desc, cc, method, opts...)
 | |
| 		}
 | |
| 
 | |
| 		clientSpan := tracer.StartSpan(
 | |
| 			method,
 | |
| 			opentracing.ChildOf(parentCtx),
 | |
| 			ext.SpanKindRPCClient,
 | |
| 			gRPCComponentTag,
 | |
| 		)
 | |
| 		ctx = injectSpanContext(ctx, tracer, clientSpan)
 | |
| 		cs, err := streamer(ctx, desc, cc, method, opts...)
 | |
| 		if err != nil {
 | |
| 			clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
 | |
| 			SetSpanTags(clientSpan, err, true)
 | |
| 			clientSpan.Finish()
 | |
| 			return cs, err
 | |
| 		}
 | |
| 		return newOpenTracingClientStream(cs, method, desc, clientSpan, otgrpcOpts), nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func newOpenTracingClientStream(cs grpc.ClientStream, method string, desc *grpc.StreamDesc, clientSpan opentracing.Span, otgrpcOpts *options) grpc.ClientStream {
 | |
| 	finishChan := make(chan struct{})
 | |
| 
 | |
| 	isFinished := new(int32)
 | |
| 	*isFinished = 0
 | |
| 	finishFunc := func(err error) {
 | |
| 		// The current OpenTracing specification forbids finishing a span more than
 | |
| 		// once. Since we have multiple code paths that could concurrently call
 | |
| 		// `finishFunc`, we need to add some sort of synchronization to guard against
 | |
| 		// multiple finishing.
 | |
| 		if !atomic.CompareAndSwapInt32(isFinished, 0, 1) {
 | |
| 			return
 | |
| 		}
 | |
| 		close(finishChan)
 | |
| 		defer clientSpan.Finish()
 | |
| 		if err != nil {
 | |
| 			clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
 | |
| 			SetSpanTags(clientSpan, err, true)
 | |
| 		}
 | |
| 		if otgrpcOpts.decorator != nil {
 | |
| 			otgrpcOpts.decorator(clientSpan, method, nil, nil, err)
 | |
| 		}
 | |
| 	}
 | |
| 	go func() {
 | |
| 		select {
 | |
| 		case <-finishChan:
 | |
| 			// The client span is being finished by another code path; hence, no
 | |
| 			// action is necessary.
 | |
| 		case <-cs.Context().Done():
 | |
| 			finishFunc(cs.Context().Err())
 | |
| 		}
 | |
| 	}()
 | |
| 	otcs := &openTracingClientStream{
 | |
| 		ClientStream: cs,
 | |
| 		desc:         desc,
 | |
| 		finishFunc:   finishFunc,
 | |
| 	}
 | |
| 
 | |
| 	// The `ClientStream` interface allows one to omit calling `Recv` if it's
 | |
| 	// known that the result will be `io.EOF`. See
 | |
| 	// http://stackoverflow.com/q/42915337
 | |
| 	// In such cases, there's nothing that triggers the span to finish. We,
 | |
| 	// therefore, set a finalizer so that the span and the context goroutine will
 | |
| 	// at least be cleaned up when the garbage collector is run.
 | |
| 	runtime.SetFinalizer(otcs, func(otcs *openTracingClientStream) {
 | |
| 		otcs.finishFunc(nil)
 | |
| 	})
 | |
| 	return otcs
 | |
| }
 | |
| 
 | |
| type openTracingClientStream struct {
 | |
| 	grpc.ClientStream
 | |
| 	desc       *grpc.StreamDesc
 | |
| 	finishFunc func(error)
 | |
| }
 | |
| 
 | |
| func (cs *openTracingClientStream) Header() (metadata.MD, error) {
 | |
| 	md, err := cs.ClientStream.Header()
 | |
| 	if err != nil {
 | |
| 		cs.finishFunc(err)
 | |
| 	}
 | |
| 	return md, err
 | |
| }
 | |
| 
 | |
| func (cs *openTracingClientStream) SendMsg(m interface{}) error {
 | |
| 	err := cs.ClientStream.SendMsg(m)
 | |
| 	if err != nil {
 | |
| 		cs.finishFunc(err)
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (cs *openTracingClientStream) RecvMsg(m interface{}) error {
 | |
| 	err := cs.ClientStream.RecvMsg(m)
 | |
| 	if err == io.EOF {
 | |
| 		cs.finishFunc(nil)
 | |
| 		return err
 | |
| 	} else if err != nil {
 | |
| 		cs.finishFunc(err)
 | |
| 		return err
 | |
| 	}
 | |
| 	if !cs.desc.ServerStreams {
 | |
| 		cs.finishFunc(nil)
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (cs *openTracingClientStream) CloseSend() error {
 | |
| 	err := cs.ClientStream.CloseSend()
 | |
| 	if err != nil {
 | |
| 		cs.finishFunc(err)
 | |
| 	}
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func injectSpanContext(ctx context.Context, tracer opentracing.Tracer, clientSpan opentracing.Span) context.Context {
 | |
| 	md, ok := metadata.FromOutgoingContext(ctx)
 | |
| 	if !ok {
 | |
| 		md = metadata.New(nil)
 | |
| 	} else {
 | |
| 		md = md.Copy()
 | |
| 	}
 | |
| 	mdWriter := metadataReaderWriter{md}
 | |
| 	err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, mdWriter)
 | |
| 	// We have no better place to record an error than the Span itself :-/
 | |
| 	if err != nil {
 | |
| 		clientSpan.LogFields(log.String("event", "Tracer.Inject() failed"), log.Error(err))
 | |
| 	}
 | |
| 	return metadata.NewOutgoingContext(ctx, md)
 | |
| }
 | 
