mirror of
				https://gitea.com/Lydanne/buildx.git
				synced 2025-11-04 18:13:42 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			176 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			176 lines
		
	
	
		
			5.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package session
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"net"
 | 
						|
	"strings"
 | 
						|
 | 
						|
	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
 | 
						|
	"github.com/moby/buildkit/identity"
 | 
						|
	"github.com/moby/buildkit/util/grpcerrors"
 | 
						|
	"github.com/pkg/errors"
 | 
						|
	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
 | 
						|
	"go.opentelemetry.io/otel/propagation"
 | 
						|
	"go.opentelemetry.io/otel/trace"
 | 
						|
	"google.golang.org/grpc"
 | 
						|
	"google.golang.org/grpc/health"
 | 
						|
	"google.golang.org/grpc/health/grpc_health_v1"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	headerSessionID        = "X-Docker-Expose-Session-Uuid"
 | 
						|
	headerSessionName      = "X-Docker-Expose-Session-Name"
 | 
						|
	headerSessionSharedKey = "X-Docker-Expose-Session-Sharedkey"
 | 
						|
	headerSessionMethod    = "X-Docker-Expose-Session-Grpc-Method"
 | 
						|
)
 | 
						|
 | 
						|
var propagators = propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
 | 
						|
 | 
						|
// Dialer returns a connection that can be used by the session
 | 
						|
type Dialer func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error)
 | 
						|
 | 
						|
// Attachable defines a feature that can be exposed on a session
 | 
						|
type Attachable interface {
 | 
						|
	Register(*grpc.Server)
 | 
						|
}
 | 
						|
 | 
						|
// Session is a long running connection between client and a daemon
 | 
						|
type Session struct {
 | 
						|
	id         string
 | 
						|
	name       string
 | 
						|
	sharedKey  string
 | 
						|
	ctx        context.Context
 | 
						|
	cancelCtx  func()
 | 
						|
	done       chan struct{}
 | 
						|
	grpcServer *grpc.Server
 | 
						|
	conn       net.Conn
 | 
						|
}
 | 
						|
 | 
						|
// NewSession returns a new long running session
 | 
						|
func NewSession(ctx context.Context, name, sharedKey string) (*Session, error) {
 | 
						|
	id := identity.NewID()
 | 
						|
 | 
						|
	var unary []grpc.UnaryServerInterceptor
 | 
						|
	var stream []grpc.StreamServerInterceptor
 | 
						|
 | 
						|
	serverOpts := []grpc.ServerOption{}
 | 
						|
 | 
						|
	if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() {
 | 
						|
		unary = append(unary, filterServer(otelgrpc.UnaryServerInterceptor(otelgrpc.WithTracerProvider(span.TracerProvider()), otelgrpc.WithPropagators(propagators))))
 | 
						|
		stream = append(stream, otelgrpc.StreamServerInterceptor(otelgrpc.WithTracerProvider(span.TracerProvider()), otelgrpc.WithPropagators(propagators)))
 | 
						|
	}
 | 
						|
 | 
						|
	unary = append(unary, grpcerrors.UnaryServerInterceptor)
 | 
						|
	stream = append(stream, grpcerrors.StreamServerInterceptor)
 | 
						|
 | 
						|
	if len(unary) == 1 {
 | 
						|
		serverOpts = append(serverOpts, grpc.UnaryInterceptor(unary[0]))
 | 
						|
	} else if len(unary) > 1 {
 | 
						|
		serverOpts = append(serverOpts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(unary...)))
 | 
						|
	}
 | 
						|
 | 
						|
	if len(stream) == 1 {
 | 
						|
		serverOpts = append(serverOpts, grpc.StreamInterceptor(stream[0]))
 | 
						|
	} else if len(stream) > 1 {
 | 
						|
		serverOpts = append(serverOpts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(stream...)))
 | 
						|
	}
 | 
						|
 | 
						|
	s := &Session{
 | 
						|
		id:         id,
 | 
						|
		name:       name,
 | 
						|
		sharedKey:  sharedKey,
 | 
						|
		grpcServer: grpc.NewServer(serverOpts...),
 | 
						|
	}
 | 
						|
 | 
						|
	grpc_health_v1.RegisterHealthServer(s.grpcServer, health.NewServer())
 | 
						|
 | 
						|
	return s, nil
 | 
						|
}
 | 
						|
 | 
						|
// Allow enables a given service to be reachable through the grpc session
 | 
						|
func (s *Session) Allow(a Attachable) {
 | 
						|
	a.Register(s.grpcServer)
 | 
						|
}
 | 
						|
 | 
						|
// ID returns unique identifier for the session
 | 
						|
func (s *Session) ID() string {
 | 
						|
	return s.id
 | 
						|
}
 | 
						|
 | 
						|
// Run activates the session
 | 
						|
func (s *Session) Run(ctx context.Context, dialer Dialer) error {
 | 
						|
	ctx, cancel := context.WithCancel(ctx)
 | 
						|
	s.cancelCtx = cancel
 | 
						|
	s.done = make(chan struct{})
 | 
						|
 | 
						|
	defer cancel()
 | 
						|
	defer close(s.done)
 | 
						|
 | 
						|
	meta := make(map[string][]string)
 | 
						|
	meta[headerSessionID] = []string{s.id}
 | 
						|
	meta[headerSessionName] = []string{s.name}
 | 
						|
	meta[headerSessionSharedKey] = []string{s.sharedKey}
 | 
						|
 | 
						|
	for name, svc := range s.grpcServer.GetServiceInfo() {
 | 
						|
		for _, method := range svc.Methods {
 | 
						|
			meta[headerSessionMethod] = append(meta[headerSessionMethod], MethodURL(name, method.Name))
 | 
						|
		}
 | 
						|
	}
 | 
						|
	conn, err := dialer(ctx, "h2c", meta)
 | 
						|
	if err != nil {
 | 
						|
		return errors.Wrap(err, "failed to dial gRPC")
 | 
						|
	}
 | 
						|
	s.conn = conn
 | 
						|
	serve(ctx, s.grpcServer, conn)
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
// Close closes the session
 | 
						|
func (s *Session) Close() error {
 | 
						|
	if s.cancelCtx != nil && s.done != nil {
 | 
						|
		if s.conn != nil {
 | 
						|
			s.conn.Close()
 | 
						|
		}
 | 
						|
		s.grpcServer.Stop()
 | 
						|
		<-s.done
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func (s *Session) context() context.Context {
 | 
						|
	return s.ctx
 | 
						|
}
 | 
						|
 | 
						|
func (s *Session) closed() bool {
 | 
						|
	select {
 | 
						|
	case <-s.context().Done():
 | 
						|
		return true
 | 
						|
	default:
 | 
						|
		return false
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// MethodURL returns a gRPC method URL for service and method name
 | 
						|
func MethodURL(s, m string) string {
 | 
						|
	return "/" + s + "/" + m
 | 
						|
}
 | 
						|
 | 
						|
// updates needed in opentelemetry-contrib to avoid this
 | 
						|
func filterServer(intercept grpc.UnaryServerInterceptor) grpc.UnaryServerInterceptor {
 | 
						|
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
 | 
						|
		if strings.HasSuffix(info.FullMethod, "Health/Check") {
 | 
						|
			return handler(ctx, req)
 | 
						|
		}
 | 
						|
		return intercept(ctx, req, info, handler)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func filterClient(intercept grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
 | 
						|
	return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
 | 
						|
		if strings.HasSuffix(method, "Health/Check") {
 | 
						|
			return invoker(ctx, method, req, reply, cc, opts...)
 | 
						|
		}
 | 
						|
		return intercept(ctx, method, req, reply, cc, invoker, opts...)
 | 
						|
	}
 | 
						|
}
 |