mirror of
https://gitea.com/Lydanne/buildx.git
synced 2025-07-09 21:17:09 +08:00
vendor: google.golang.org/grpc v1.58.3
Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
345
vendor/google.golang.org/grpc/server.go
generated
vendored
345
vendor/google.golang.org/grpc/server.go
generated
vendored
@ -43,8 +43,8 @@ import (
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/binarylog"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
@ -74,10 +74,10 @@ func init() {
|
||||
srv.drainServerTransports(addr)
|
||||
}
|
||||
internal.AddGlobalServerOptions = func(opt ...ServerOption) {
|
||||
extraServerOptions = append(extraServerOptions, opt...)
|
||||
globalServerOptions = append(globalServerOptions, opt...)
|
||||
}
|
||||
internal.ClearGlobalServerOptions = func() {
|
||||
extraServerOptions = nil
|
||||
globalServerOptions = nil
|
||||
}
|
||||
internal.BinaryLogger = binaryLogger
|
||||
internal.JoinServerOptions = newJoinServerOption
|
||||
@ -86,7 +86,7 @@ func init() {
|
||||
var statusOK = status.New(codes.OK, "")
|
||||
var logger = grpclog.Component("core")
|
||||
|
||||
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
|
||||
type methodHandler func(srv any, ctx context.Context, dec func(any) error, interceptor UnaryServerInterceptor) (any, error)
|
||||
|
||||
// MethodDesc represents an RPC service's method specification.
|
||||
type MethodDesc struct {
|
||||
@ -99,26 +99,20 @@ type ServiceDesc struct {
|
||||
ServiceName string
|
||||
// The pointer to the service interface. Used to check whether the user
|
||||
// provided implementation satisfies the interface requirements.
|
||||
HandlerType interface{}
|
||||
HandlerType any
|
||||
Methods []MethodDesc
|
||||
Streams []StreamDesc
|
||||
Metadata interface{}
|
||||
Metadata any
|
||||
}
|
||||
|
||||
// serviceInfo wraps information about a service. It is very similar to
|
||||
// ServiceDesc and is constructed from it for internal purposes.
|
||||
type serviceInfo struct {
|
||||
// Contains the implementation for the methods in this service.
|
||||
serviceImpl interface{}
|
||||
serviceImpl any
|
||||
methods map[string]*MethodDesc
|
||||
streams map[string]*StreamDesc
|
||||
mdata interface{}
|
||||
}
|
||||
|
||||
type serverWorkerData struct {
|
||||
st transport.ServerTransport
|
||||
wg *sync.WaitGroup
|
||||
stream *transport.Stream
|
||||
mdata any
|
||||
}
|
||||
|
||||
// Server is a gRPC server to serve RPC requests.
|
||||
@ -145,7 +139,7 @@ type Server struct {
|
||||
channelzID *channelz.Identifier
|
||||
czData *channelzData
|
||||
|
||||
serverWorkerChannels []chan *serverWorkerData
|
||||
serverWorkerChannel chan func()
|
||||
}
|
||||
|
||||
type serverOptions struct {
|
||||
@ -170,20 +164,24 @@ type serverOptions struct {
|
||||
initialConnWindowSize int32
|
||||
writeBufferSize int
|
||||
readBufferSize int
|
||||
sharedWriteBuffer bool
|
||||
connectionTimeout time.Duration
|
||||
maxHeaderListSize *uint32
|
||||
headerTableSize *uint32
|
||||
numServerWorkers uint32
|
||||
recvBufferPool SharedBufferPool
|
||||
}
|
||||
|
||||
var defaultServerOptions = serverOptions{
|
||||
maxConcurrentStreams: math.MaxUint32,
|
||||
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
|
||||
maxSendMessageSize: defaultServerMaxSendMessageSize,
|
||||
connectionTimeout: 120 * time.Second,
|
||||
writeBufferSize: defaultWriteBufSize,
|
||||
readBufferSize: defaultReadBufSize,
|
||||
recvBufferPool: nopBufferPool{},
|
||||
}
|
||||
var extraServerOptions []ServerOption
|
||||
var globalServerOptions []ServerOption
|
||||
|
||||
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
|
||||
type ServerOption interface {
|
||||
@ -233,6 +231,20 @@ func newJoinServerOption(opts ...ServerOption) ServerOption {
|
||||
return &joinServerOption{opts: opts}
|
||||
}
|
||||
|
||||
// SharedWriteBuffer allows reusing per-connection transport write buffer.
|
||||
// If this option is set to true every connection will release the buffer after
|
||||
// flushing the data on the wire.
|
||||
//
|
||||
// # Experimental
|
||||
//
|
||||
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
|
||||
// later release.
|
||||
func SharedWriteBuffer(val bool) ServerOption {
|
||||
return newFuncServerOption(func(o *serverOptions) {
|
||||
o.sharedWriteBuffer = val
|
||||
})
|
||||
}
|
||||
|
||||
// WriteBufferSize determines how much data can be batched before doing a write
|
||||
// on the wire. The corresponding memory allocation for this buffer will be
|
||||
// twice the size to keep syscalls low. The default value for this buffer is
|
||||
@ -273,9 +285,9 @@ func InitialConnWindowSize(s int32) ServerOption {
|
||||
|
||||
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
|
||||
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
|
||||
if kp.Time > 0 && kp.Time < time.Second {
|
||||
if kp.Time > 0 && kp.Time < internal.KeepaliveMinServerPingTime {
|
||||
logger.Warning("Adjusting keepalive ping interval to minimum period of 1s")
|
||||
kp.Time = time.Second
|
||||
kp.Time = internal.KeepaliveMinServerPingTime
|
||||
}
|
||||
|
||||
return newFuncServerOption(func(o *serverOptions) {
|
||||
@ -387,6 +399,9 @@ func MaxSendMsgSize(m int) ServerOption {
|
||||
// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
|
||||
// of concurrent streams to each ServerTransport.
|
||||
func MaxConcurrentStreams(n uint32) ServerOption {
|
||||
if n == 0 {
|
||||
n = math.MaxUint32
|
||||
}
|
||||
return newFuncServerOption(func(o *serverOptions) {
|
||||
o.maxConcurrentStreams = n
|
||||
})
|
||||
@ -552,6 +567,27 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
|
||||
})
|
||||
}
|
||||
|
||||
// RecvBufferPool returns a ServerOption that configures the server
|
||||
// to use the provided shared buffer pool for parsing incoming messages. Depending
|
||||
// on the application's workload, this could result in reduced memory allocation.
|
||||
//
|
||||
// If you are unsure about how to implement a memory pool but want to utilize one,
|
||||
// begin with grpc.NewSharedBufferPool.
|
||||
//
|
||||
// Note: The shared buffer pool feature will not be active if any of the following
|
||||
// options are used: StatsHandler, EnableTracing, or binary logging. In such
|
||||
// cases, the shared buffer pool will be ignored.
|
||||
//
|
||||
// # Experimental
|
||||
//
|
||||
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
|
||||
// later release.
|
||||
func RecvBufferPool(bufferPool SharedBufferPool) ServerOption {
|
||||
return newFuncServerOption(func(o *serverOptions) {
|
||||
o.recvBufferPool = bufferPool
|
||||
})
|
||||
}
|
||||
|
||||
// serverWorkerResetThreshold defines how often the stack must be reset. Every
|
||||
// N requests, by spawning a new goroutine in its place, a worker can reset its
|
||||
// stack so that large stacks don't live in memory forever. 2^16 should allow
|
||||
@ -560,47 +596,40 @@ func NumStreamWorkers(numServerWorkers uint32) ServerOption {
|
||||
const serverWorkerResetThreshold = 1 << 16
|
||||
|
||||
// serverWorkers blocks on a *transport.Stream channel forever and waits for
|
||||
// data to be fed by serveStreams. This allows different requests to be
|
||||
// data to be fed by serveStreams. This allows multiple requests to be
|
||||
// processed by the same goroutine, removing the need for expensive stack
|
||||
// re-allocations (see the runtime.morestack problem [1]).
|
||||
//
|
||||
// [1] https://github.com/golang/go/issues/18138
|
||||
func (s *Server) serverWorker(ch chan *serverWorkerData) {
|
||||
// To make sure all server workers don't reset at the same time, choose a
|
||||
// random number of iterations before resetting.
|
||||
threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
|
||||
for completed := 0; completed < threshold; completed++ {
|
||||
data, ok := <-ch
|
||||
func (s *Server) serverWorker() {
|
||||
for completed := 0; completed < serverWorkerResetThreshold; completed++ {
|
||||
f, ok := <-s.serverWorkerChannel
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
|
||||
data.wg.Done()
|
||||
f()
|
||||
}
|
||||
go s.serverWorker(ch)
|
||||
go s.serverWorker()
|
||||
}
|
||||
|
||||
// initServerWorkers creates worker goroutines and channels to process incoming
|
||||
// initServerWorkers creates worker goroutines and a channel to process incoming
|
||||
// connections to reduce the time spent overall on runtime.morestack.
|
||||
func (s *Server) initServerWorkers() {
|
||||
s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
|
||||
s.serverWorkerChannel = make(chan func())
|
||||
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
|
||||
s.serverWorkerChannels[i] = make(chan *serverWorkerData)
|
||||
go s.serverWorker(s.serverWorkerChannels[i])
|
||||
go s.serverWorker()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) stopServerWorkers() {
|
||||
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
|
||||
close(s.serverWorkerChannels[i])
|
||||
}
|
||||
close(s.serverWorkerChannel)
|
||||
}
|
||||
|
||||
// NewServer creates a gRPC server which has no service registered and has not
|
||||
// started to accept requests yet.
|
||||
func NewServer(opt ...ServerOption) *Server {
|
||||
opts := defaultServerOptions
|
||||
for _, o := range extraServerOptions {
|
||||
for _, o := range globalServerOptions {
|
||||
o.apply(&opts)
|
||||
}
|
||||
for _, o := range opt {
|
||||
@ -634,7 +663,7 @@ func NewServer(opt ...ServerOption) *Server {
|
||||
|
||||
// printf records an event in s's event log, unless s has been stopped.
|
||||
// REQUIRES s.mu is held.
|
||||
func (s *Server) printf(format string, a ...interface{}) {
|
||||
func (s *Server) printf(format string, a ...any) {
|
||||
if s.events != nil {
|
||||
s.events.Printf(format, a...)
|
||||
}
|
||||
@ -642,7 +671,7 @@ func (s *Server) printf(format string, a ...interface{}) {
|
||||
|
||||
// errorf records an error in s's event log, unless s has been stopped.
|
||||
// REQUIRES s.mu is held.
|
||||
func (s *Server) errorf(format string, a ...interface{}) {
|
||||
func (s *Server) errorf(format string, a ...any) {
|
||||
if s.events != nil {
|
||||
s.events.Errorf(format, a...)
|
||||
}
|
||||
@ -657,14 +686,14 @@ type ServiceRegistrar interface {
|
||||
// once the server has started serving.
|
||||
// desc describes the service and its methods and handlers. impl is the
|
||||
// service implementation which is passed to the method handlers.
|
||||
RegisterService(desc *ServiceDesc, impl interface{})
|
||||
RegisterService(desc *ServiceDesc, impl any)
|
||||
}
|
||||
|
||||
// RegisterService registers a service and its implementation to the gRPC
|
||||
// server. It is called from the IDL generated code. This must be called before
|
||||
// invoking Serve. If ss is non-nil (for legacy code), its type is checked to
|
||||
// ensure it implements sd.HandlerType.
|
||||
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
|
||||
func (s *Server) RegisterService(sd *ServiceDesc, ss any) {
|
||||
if ss != nil {
|
||||
ht := reflect.TypeOf(sd.HandlerType).Elem()
|
||||
st := reflect.TypeOf(ss)
|
||||
@ -675,7 +704,7 @@ func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
|
||||
s.register(sd, ss)
|
||||
}
|
||||
|
||||
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
|
||||
func (s *Server) register(sd *ServiceDesc, ss any) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.printf("RegisterService(%q)", sd.ServiceName)
|
||||
@ -716,7 +745,7 @@ type MethodInfo struct {
|
||||
type ServiceInfo struct {
|
||||
Methods []MethodInfo
|
||||
// Metadata is the metadata specified in ServiceDesc when registering service.
|
||||
Metadata interface{}
|
||||
Metadata any
|
||||
}
|
||||
|
||||
// GetServiceInfo returns a map from service names to ServiceInfo.
|
||||
@ -897,7 +926,7 @@ func (s *Server) drainServerTransports(addr string) {
|
||||
s.mu.Lock()
|
||||
conns := s.conns[addr]
|
||||
for st := range conns {
|
||||
st.Drain()
|
||||
st.Drain("")
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
@ -917,6 +946,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
|
||||
InitialConnWindowSize: s.opts.initialConnWindowSize,
|
||||
WriteBufferSize: s.opts.writeBufferSize,
|
||||
ReadBufferSize: s.opts.readBufferSize,
|
||||
SharedWriteBuffer: s.opts.sharedWriteBuffer,
|
||||
ChannelzParentID: s.channelzID,
|
||||
MaxHeaderListSize: s.opts.maxHeaderListSize,
|
||||
HeaderTableSize: s.opts.headerTableSize,
|
||||
@ -945,26 +975,26 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
|
||||
defer st.Close(errors.New("finished serving streams for the server transport"))
|
||||
var wg sync.WaitGroup
|
||||
|
||||
var roundRobinCounter uint32
|
||||
streamQuota := newHandlerQuota(s.opts.maxConcurrentStreams)
|
||||
st.HandleStreams(func(stream *transport.Stream) {
|
||||
wg.Add(1)
|
||||
|
||||
streamQuota.acquire()
|
||||
f := func() {
|
||||
defer streamQuota.release()
|
||||
defer wg.Done()
|
||||
s.handleStream(st, stream, s.traceInfo(st, stream))
|
||||
}
|
||||
|
||||
if s.opts.numServerWorkers > 0 {
|
||||
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
|
||||
select {
|
||||
case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
|
||||
case s.serverWorkerChannel <- f:
|
||||
return
|
||||
default:
|
||||
// If all stream workers are busy, fallback to the default code path.
|
||||
go func() {
|
||||
s.handleStream(st, stream, s.traceInfo(st, stream))
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
} else {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.handleStream(st, stream, s.traceInfo(st, stream))
|
||||
}()
|
||||
}
|
||||
go f()
|
||||
}, func(ctx context.Context, method string) context.Context {
|
||||
if !EnableTracing {
|
||||
return ctx
|
||||
@ -1053,7 +1083,7 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
|
||||
if s.drain {
|
||||
// Transport added after we drained our existing conns: drain it
|
||||
// immediately.
|
||||
st.Drain()
|
||||
st.Drain("")
|
||||
}
|
||||
|
||||
if s.conns[addr] == nil {
|
||||
@ -1103,7 +1133,7 @@ func (s *Server) incrCallsFailed() {
|
||||
atomic.AddInt64(&s.czData.callsFailed, 1)
|
||||
}
|
||||
|
||||
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
|
||||
func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg any, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
|
||||
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
|
||||
if err != nil {
|
||||
channelz.Error(logger, s.channelzID, "grpc: server failed to encode response: ", err)
|
||||
@ -1150,7 +1180,7 @@ func chainUnaryServerInterceptors(s *Server) {
|
||||
}
|
||||
|
||||
func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
|
||||
return func(ctx context.Context, req any, info *UnaryServerInfo, handler UnaryHandler) (any, error) {
|
||||
return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
|
||||
}
|
||||
}
|
||||
@ -1159,7 +1189,7 @@ func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info
|
||||
if curr == len(interceptors)-1 {
|
||||
return finalHandler
|
||||
}
|
||||
return func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return func(ctx context.Context, req any) (any, error) {
|
||||
return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
|
||||
}
|
||||
}
|
||||
@ -1196,7 +1226,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
defer func() {
|
||||
if trInfo != nil {
|
||||
if err != nil && err != io.EOF {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
trInfo.tr.Finish()
|
||||
@ -1252,7 +1282,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
logEntry.PeerAddr = peer.Addr
|
||||
}
|
||||
for _, binlog := range binlogs {
|
||||
binlog.Log(logEntry)
|
||||
binlog.Log(ctx, logEntry)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1263,6 +1293,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
var comp, decomp encoding.Compressor
|
||||
var cp Compressor
|
||||
var dc Decompressor
|
||||
var sendCompressorName string
|
||||
|
||||
// If dc is set and matches the stream's compression, use it. Otherwise, try
|
||||
// to find a matching registered compressor for decomp.
|
||||
@ -1283,12 +1314,18 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
|
||||
if s.opts.cp != nil {
|
||||
cp = s.opts.cp
|
||||
stream.SetSendCompress(cp.Type())
|
||||
sendCompressorName = cp.Type()
|
||||
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
|
||||
// Legacy compressor not specified; attempt to respond with same encoding.
|
||||
comp = encoding.GetCompressor(rc)
|
||||
if comp != nil {
|
||||
stream.SetSendCompress(rc)
|
||||
sendCompressorName = comp.Name()
|
||||
}
|
||||
}
|
||||
|
||||
if sendCompressorName != "" {
|
||||
if err := stream.SetSendCompress(sendCompressorName); err != nil {
|
||||
return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1296,7 +1333,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
if len(shs) != 0 || len(binlogs) != 0 {
|
||||
payInfo = &payloadInfo{}
|
||||
}
|
||||
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
|
||||
d, err := recvAndDecompress(&parser{r: stream, recvBufferPool: s.opts.recvBufferPool}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
|
||||
if err != nil {
|
||||
if e := t.WriteStatus(stream, status.Convert(err)); e != nil {
|
||||
channelz.Warningf(logger, s.channelzID, "grpc: Server.processUnaryRPC failed to write status: %v", e)
|
||||
@ -1306,17 +1343,18 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
if channelz.IsOn() {
|
||||
t.IncrMsgRecv()
|
||||
}
|
||||
df := func(v interface{}) error {
|
||||
df := func(v any) error {
|
||||
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
|
||||
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
|
||||
}
|
||||
for _, sh := range shs {
|
||||
sh.HandleRPC(stream.Context(), &stats.InPayload{
|
||||
RecvTime: time.Now(),
|
||||
Payload: v,
|
||||
WireLength: payInfo.wireLength + headerLen,
|
||||
Data: d,
|
||||
Length: len(d),
|
||||
RecvTime: time.Now(),
|
||||
Payload: v,
|
||||
Length: len(d),
|
||||
WireLength: payInfo.compressedLength + headerLen,
|
||||
CompressedLength: payInfo.compressedLength,
|
||||
Data: d,
|
||||
})
|
||||
}
|
||||
if len(binlogs) != 0 {
|
||||
@ -1324,7 +1362,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
Message: d,
|
||||
}
|
||||
for _, binlog := range binlogs {
|
||||
binlog.Log(cm)
|
||||
binlog.Log(stream.Context(), cm)
|
||||
}
|
||||
}
|
||||
if trInfo != nil {
|
||||
@ -1357,7 +1395,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
Header: h,
|
||||
}
|
||||
for _, binlog := range binlogs {
|
||||
binlog.Log(sh)
|
||||
binlog.Log(stream.Context(), sh)
|
||||
}
|
||||
}
|
||||
st := &binarylog.ServerTrailer{
|
||||
@ -1365,7 +1403,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
Err: appErr,
|
||||
}
|
||||
for _, binlog := range binlogs {
|
||||
binlog.Log(st)
|
||||
binlog.Log(stream.Context(), st)
|
||||
}
|
||||
}
|
||||
return appErr
|
||||
@ -1375,6 +1413,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
}
|
||||
opts := &transport.Options{Last: true}
|
||||
|
||||
// Server handler could have set new compressor by calling SetSendCompressor.
|
||||
// In case it is set, we need to use it for compressing outbound message.
|
||||
if stream.SendCompress() != sendCompressorName {
|
||||
comp = encoding.GetCompressor(stream.SendCompress())
|
||||
}
|
||||
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
|
||||
if err == io.EOF {
|
||||
// The entire stream is done (for unary RPC only).
|
||||
@ -1402,8 +1445,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
Err: appErr,
|
||||
}
|
||||
for _, binlog := range binlogs {
|
||||
binlog.Log(sh)
|
||||
binlog.Log(st)
|
||||
binlog.Log(stream.Context(), sh)
|
||||
binlog.Log(stream.Context(), st)
|
||||
}
|
||||
}
|
||||
return err
|
||||
@ -1417,8 +1460,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
Message: reply,
|
||||
}
|
||||
for _, binlog := range binlogs {
|
||||
binlog.Log(sh)
|
||||
binlog.Log(sm)
|
||||
binlog.Log(stream.Context(), sh)
|
||||
binlog.Log(stream.Context(), sm)
|
||||
}
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
@ -1430,17 +1473,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
// TODO: Should we be logging if writing status failed here, like above?
|
||||
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
|
||||
// error or allow the stats handler to see it?
|
||||
err = t.WriteStatus(stream, statusOK)
|
||||
if len(binlogs) != 0 {
|
||||
st := &binarylog.ServerTrailer{
|
||||
Trailer: stream.Trailer(),
|
||||
Err: appErr,
|
||||
}
|
||||
for _, binlog := range binlogs {
|
||||
binlog.Log(st)
|
||||
binlog.Log(stream.Context(), st)
|
||||
}
|
||||
}
|
||||
return err
|
||||
return t.WriteStatus(stream, statusOK)
|
||||
}
|
||||
|
||||
// chainStreamServerInterceptors chains all stream server interceptors into one.
|
||||
@ -1465,7 +1507,7 @@ func chainStreamServerInterceptors(s *Server) {
|
||||
}
|
||||
|
||||
func chainStreamInterceptors(interceptors []StreamServerInterceptor) StreamServerInterceptor {
|
||||
return func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
|
||||
return func(srv any, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error {
|
||||
return interceptors[0](srv, ss, info, getChainStreamHandler(interceptors, 0, info, handler))
|
||||
}
|
||||
}
|
||||
@ -1474,7 +1516,7 @@ func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, inf
|
||||
if curr == len(interceptors)-1 {
|
||||
return finalHandler
|
||||
}
|
||||
return func(srv interface{}, stream ServerStream) error {
|
||||
return func(srv any, stream ServerStream) error {
|
||||
return interceptors[curr+1](srv, stream, info, getChainStreamHandler(interceptors, curr+1, info, finalHandler))
|
||||
}
|
||||
}
|
||||
@ -1501,7 +1543,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
||||
ctx: ctx,
|
||||
t: t,
|
||||
s: stream,
|
||||
p: &parser{r: stream},
|
||||
p: &parser{r: stream, recvBufferPool: s.opts.recvBufferPool},
|
||||
codec: s.getCodec(stream.ContentSubtype()),
|
||||
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
|
||||
maxSendMessageSize: s.opts.maxSendMessageSize,
|
||||
@ -1515,7 +1557,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
||||
if trInfo != nil {
|
||||
ss.mu.Lock()
|
||||
if err != nil && err != io.EOF {
|
||||
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
|
||||
ss.trInfo.tr.SetError()
|
||||
}
|
||||
ss.trInfo.tr.Finish()
|
||||
@ -1574,7 +1616,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
||||
logEntry.PeerAddr = peer.Addr
|
||||
}
|
||||
for _, binlog := range ss.binlogs {
|
||||
binlog.Log(logEntry)
|
||||
binlog.Log(stream.Context(), logEntry)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1597,12 +1639,18 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
||||
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
|
||||
if s.opts.cp != nil {
|
||||
ss.cp = s.opts.cp
|
||||
stream.SetSendCompress(s.opts.cp.Type())
|
||||
ss.sendCompressorName = s.opts.cp.Type()
|
||||
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
|
||||
// Legacy compressor not specified; attempt to respond with same encoding.
|
||||
ss.comp = encoding.GetCompressor(rc)
|
||||
if ss.comp != nil {
|
||||
stream.SetSendCompress(rc)
|
||||
ss.sendCompressorName = rc
|
||||
}
|
||||
}
|
||||
|
||||
if ss.sendCompressorName != "" {
|
||||
if err := stream.SetSendCompress(ss.sendCompressorName); err != nil {
|
||||
return status.Errorf(codes.Internal, "grpc: failed to set send compressor: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1612,7 +1660,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
||||
trInfo.tr.LazyLog(&trInfo.firstLine, false)
|
||||
}
|
||||
var appErr error
|
||||
var server interface{}
|
||||
var server any
|
||||
if info != nil {
|
||||
server = info.serviceImpl
|
||||
}
|
||||
@ -1640,16 +1688,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
||||
ss.trInfo.tr.SetError()
|
||||
ss.mu.Unlock()
|
||||
}
|
||||
t.WriteStatus(ss.s, appStatus)
|
||||
if len(ss.binlogs) != 0 {
|
||||
st := &binarylog.ServerTrailer{
|
||||
Trailer: ss.s.Trailer(),
|
||||
Err: appErr,
|
||||
}
|
||||
for _, binlog := range ss.binlogs {
|
||||
binlog.Log(st)
|
||||
binlog.Log(stream.Context(), st)
|
||||
}
|
||||
}
|
||||
t.WriteStatus(ss.s, appStatus)
|
||||
// TODO: Should we log an error from WriteStatus here and below?
|
||||
return appErr
|
||||
}
|
||||
@ -1658,17 +1706,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
||||
ss.trInfo.tr.LazyLog(stringer("OK"), false)
|
||||
ss.mu.Unlock()
|
||||
}
|
||||
err = t.WriteStatus(ss.s, statusOK)
|
||||
if len(ss.binlogs) != 0 {
|
||||
st := &binarylog.ServerTrailer{
|
||||
Trailer: ss.s.Trailer(),
|
||||
Err: appErr,
|
||||
}
|
||||
for _, binlog := range ss.binlogs {
|
||||
binlog.Log(st)
|
||||
binlog.Log(stream.Context(), st)
|
||||
}
|
||||
}
|
||||
return err
|
||||
return t.WriteStatus(ss.s, statusOK)
|
||||
}
|
||||
|
||||
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
|
||||
@ -1679,13 +1726,13 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
|
||||
pos := strings.LastIndex(sm, "/")
|
||||
if pos == -1 {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
|
||||
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []any{sm}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
|
||||
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
|
||||
@ -1726,7 +1773,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
|
||||
}
|
||||
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
|
||||
if trInfo != nil {
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
trInfo.tr.LazyLog(&fmtStringer{"%v", []any{err}}, true)
|
||||
trInfo.tr.SetError()
|
||||
}
|
||||
channelz.Warningf(logger, s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
|
||||
@ -1846,7 +1893,7 @@ func (s *Server) GracefulStop() {
|
||||
if !s.drain {
|
||||
for _, conns := range s.conns {
|
||||
for st := range conns {
|
||||
st.Drain()
|
||||
st.Drain("graceful_stop")
|
||||
}
|
||||
}
|
||||
s.drain = true
|
||||
@ -1935,6 +1982,60 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetSendCompressor sets a compressor for outbound messages from the server.
|
||||
// It must not be called after any event that causes headers to be sent
|
||||
// (see ServerStream.SetHeader for the complete list). Provided compressor is
|
||||
// used when below conditions are met:
|
||||
//
|
||||
// - compressor is registered via encoding.RegisterCompressor
|
||||
// - compressor name must exist in the client advertised compressor names
|
||||
// sent in grpc-accept-encoding header. Use ClientSupportedCompressors to
|
||||
// get client supported compressor names.
|
||||
//
|
||||
// The context provided must be the context passed to the server's handler.
|
||||
// It must be noted that compressor name encoding.Identity disables the
|
||||
// outbound compression.
|
||||
// By default, server messages will be sent using the same compressor with
|
||||
// which request messages were sent.
|
||||
//
|
||||
// It is not safe to call SetSendCompressor concurrently with SendHeader and
|
||||
// SendMsg.
|
||||
//
|
||||
// # Experimental
|
||||
//
|
||||
// Notice: This function is EXPERIMENTAL and may be changed or removed in a
|
||||
// later release.
|
||||
func SetSendCompressor(ctx context.Context, name string) error {
|
||||
stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
|
||||
if !ok || stream == nil {
|
||||
return fmt.Errorf("failed to fetch the stream from the given context")
|
||||
}
|
||||
|
||||
if err := validateSendCompressor(name, stream.ClientAdvertisedCompressors()); err != nil {
|
||||
return fmt.Errorf("unable to set send compressor: %w", err)
|
||||
}
|
||||
|
||||
return stream.SetSendCompress(name)
|
||||
}
|
||||
|
||||
// ClientSupportedCompressors returns compressor names advertised by the client
|
||||
// via grpc-accept-encoding header.
|
||||
//
|
||||
// The context provided must be the context passed to the server's handler.
|
||||
//
|
||||
// # Experimental
|
||||
//
|
||||
// Notice: This function is EXPERIMENTAL and may be changed or removed in a
|
||||
// later release.
|
||||
func ClientSupportedCompressors(ctx context.Context) ([]string, error) {
|
||||
stream, ok := ServerTransportStreamFromContext(ctx).(*transport.Stream)
|
||||
if !ok || stream == nil {
|
||||
return nil, fmt.Errorf("failed to fetch the stream from the given context %v", ctx)
|
||||
}
|
||||
|
||||
return strings.Split(stream.ClientAdvertisedCompressors(), ","), nil
|
||||
}
|
||||
|
||||
// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
|
||||
// When called more than once, all the provided metadata will be merged.
|
||||
//
|
||||
@ -1969,3 +2070,53 @@ type channelzServer struct {
|
||||
func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
|
||||
return c.s.channelzMetric()
|
||||
}
|
||||
|
||||
// validateSendCompressor returns an error when given compressor name cannot be
|
||||
// handled by the server or the client based on the advertised compressors.
|
||||
func validateSendCompressor(name, clientCompressors string) error {
|
||||
if name == encoding.Identity {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !grpcutil.IsCompressorNameRegistered(name) {
|
||||
return fmt.Errorf("compressor not registered %q", name)
|
||||
}
|
||||
|
||||
for _, c := range strings.Split(clientCompressors, ",") {
|
||||
if c == name {
|
||||
return nil // found match
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("client does not support compressor %q", name)
|
||||
}
|
||||
|
||||
// atomicSemaphore implements a blocking, counting semaphore. acquire should be
|
||||
// called synchronously; release may be called asynchronously.
|
||||
type atomicSemaphore struct {
|
||||
n atomic.Int64
|
||||
wait chan struct{}
|
||||
}
|
||||
|
||||
func (q *atomicSemaphore) acquire() {
|
||||
if q.n.Add(-1) < 0 {
|
||||
// We ran out of quota. Block until a release happens.
|
||||
<-q.wait
|
||||
}
|
||||
}
|
||||
|
||||
func (q *atomicSemaphore) release() {
|
||||
// N.B. the "<= 0" check below should allow for this to work with multiple
|
||||
// concurrent calls to acquire, but also note that with synchronous calls to
|
||||
// acquire, as our system does, n will never be less than -1. There are
|
||||
// fairness issues (queuing) to consider if this was to be generalized.
|
||||
if q.n.Add(1) <= 0 {
|
||||
// An acquire was waiting on us. Unblock it.
|
||||
q.wait <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func newHandlerQuota(n uint32) *atomicSemaphore {
|
||||
a := &atomicSemaphore{wait: make(chan struct{}, 1)}
|
||||
a.n.Store(int64(n))
|
||||
return a
|
||||
}
|
||||
|
Reference in New Issue
Block a user