mirror of
https://gitea.com/Lydanne/buildx.git
synced 2025-07-09 21:17:09 +08:00
vendor: update buildkit to 8effd45b
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
102
vendor/google.golang.org/grpc/server.go
generated
vendored
102
vendor/google.golang.org/grpc/server.go
generated
vendored
@ -42,6 +42,7 @@ import (
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal/binarylog"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@ -87,6 +88,12 @@ type service struct {
|
||||
mdata interface{}
|
||||
}
|
||||
|
||||
type serverWorkerData struct {
|
||||
st transport.ServerTransport
|
||||
wg *sync.WaitGroup
|
||||
stream *transport.Stream
|
||||
}
|
||||
|
||||
// Server is a gRPC server to serve RPC requests.
|
||||
type Server struct {
|
||||
opts serverOptions
|
||||
@ -107,6 +114,8 @@ type Server struct {
|
||||
|
||||
channelzID int64 // channelz unique identification number
|
||||
czData *channelzData
|
||||
|
||||
serverWorkerChannels []chan *serverWorkerData
|
||||
}
|
||||
|
||||
type serverOptions struct {
|
||||
@ -133,6 +142,7 @@ type serverOptions struct {
|
||||
connectionTimeout time.Duration
|
||||
maxHeaderListSize *uint32
|
||||
headerTableSize *uint32
|
||||
numServerWorkers uint32
|
||||
}
|
||||
|
||||
var defaultServerOptions = serverOptions{
|
||||
@ -335,7 +345,7 @@ func StreamInterceptor(i StreamServerInterceptor) ServerOption {
|
||||
}
|
||||
|
||||
// ChainStreamInterceptor returns a ServerOption that specifies the chained interceptor
|
||||
// for stream RPCs. The first interceptor will be the outer most,
|
||||
// for streaming RPCs. The first interceptor will be the outer most,
|
||||
// while the last interceptor will be the inner most wrapper around the real call.
|
||||
// All stream interceptors added by this method will be chained.
|
||||
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption {
|
||||
@ -410,6 +420,66 @@ func HeaderTableSize(s uint32) ServerOption {
|
||||
})
|
||||
}
|
||||
|
||||
// NumStreamWorkers returns a ServerOption that sets the number of worker
|
||||
// goroutines that should be used to process incoming streams. Setting this to
|
||||
// zero (default) will disable workers and spawn a new goroutine for each
|
||||
// stream.
|
||||
//
|
||||
// This API is EXPERIMENTAL.
|
||||
func NumStreamWorkers(numServerWorkers uint32) ServerOption {
|
||||
// TODO: If/when this API gets stabilized (i.e. stream workers become the
|
||||
// only way streams are processed), change the behavior of the zero value to
|
||||
// a sane default. Preliminary experiments suggest that a value equal to the
|
||||
// number of CPUs available is most performant; requires thorough testing.
|
||||
return newFuncServerOption(func(o *serverOptions) {
|
||||
o.numServerWorkers = numServerWorkers
|
||||
})
|
||||
}
|
||||
|
||||
// serverWorkerResetThreshold defines how often the stack must be reset. Every
|
||||
// N requests, by spawning a new goroutine in its place, a worker can reset its
|
||||
// stack so that large stacks don't live in memory forever. 2^16 should allow
|
||||
// each goroutine stack to live for at least a few seconds in a typical
|
||||
// workload (assuming a QPS of a few thousand requests/sec).
|
||||
const serverWorkerResetThreshold = 1 << 16
|
||||
|
||||
// serverWorkers blocks on a *transport.Stream channel forever and waits for
|
||||
// data to be fed by serveStreams. This allows different requests to be
|
||||
// processed by the same goroutine, removing the need for expensive stack
|
||||
// re-allocations (see the runtime.morestack problem [1]).
|
||||
//
|
||||
// [1] https://github.com/golang/go/issues/18138
|
||||
func (s *Server) serverWorker(ch chan *serverWorkerData) {
|
||||
// To make sure all server workers don't reset at the same time, choose a
|
||||
// random number of iterations before resetting.
|
||||
threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
|
||||
for completed := 0; completed < threshold; completed++ {
|
||||
data, ok := <-ch
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
|
||||
data.wg.Done()
|
||||
}
|
||||
go s.serverWorker(ch)
|
||||
}
|
||||
|
||||
// initServerWorkers creates worker goroutines and channels to process incoming
|
||||
// connections to reduce the time spent overall on runtime.morestack.
|
||||
func (s *Server) initServerWorkers() {
|
||||
s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers)
|
||||
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
|
||||
s.serverWorkerChannels[i] = make(chan *serverWorkerData)
|
||||
go s.serverWorker(s.serverWorkerChannels[i])
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) stopServerWorkers() {
|
||||
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
|
||||
close(s.serverWorkerChannels[i])
|
||||
}
|
||||
}
|
||||
|
||||
// NewServer creates a gRPC server which has no service registered and has not
|
||||
// started to accept requests yet.
|
||||
func NewServer(opt ...ServerOption) *Server {
|
||||
@ -434,6 +504,10 @@ func NewServer(opt ...ServerOption) *Server {
|
||||
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
|
||||
}
|
||||
|
||||
if s.opts.numServerWorkers > 0 {
|
||||
s.initServerWorkers()
|
||||
}
|
||||
|
||||
if channelz.IsOn() {
|
||||
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
|
||||
}
|
||||
@ -739,12 +813,27 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
|
||||
func (s *Server) serveStreams(st transport.ServerTransport) {
|
||||
defer st.Close()
|
||||
var wg sync.WaitGroup
|
||||
|
||||
var roundRobinCounter uint32
|
||||
st.HandleStreams(func(stream *transport.Stream) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.handleStream(st, stream, s.traceInfo(st, stream))
|
||||
}()
|
||||
if s.opts.numServerWorkers > 0 {
|
||||
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
|
||||
select {
|
||||
case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data:
|
||||
default:
|
||||
// If all stream workers are busy, fallback to the default code path.
|
||||
go func() {
|
||||
s.handleStream(st, stream, s.traceInfo(st, stream))
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
} else {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
s.handleStream(st, stream, s.traceInfo(st, stream))
|
||||
}()
|
||||
}
|
||||
}, func(ctx context.Context, method string) context.Context {
|
||||
if !EnableTracing {
|
||||
return ctx
|
||||
@ -1507,6 +1596,9 @@ func (s *Server) Stop() {
|
||||
for c := range st {
|
||||
c.Close()
|
||||
}
|
||||
if s.opts.numServerWorkers > 0 {
|
||||
s.stopServerWorkers()
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
if s.events != nil {
|
||||
|
Reference in New Issue
Block a user