vendor: update buildkit to master@ae9d0f5

Signed-off-by: Justin Chadwell <me@jedevc.com>
This commit is contained in:
Justin Chadwell
2022-11-22 14:39:36 +00:00
parent 6e9b743296
commit 36e663edda
375 changed files with 14834 additions and 13552 deletions

View File

@ -140,13 +140,13 @@ type ClientStream interface {
// To ensure resources are not leaked due to the stream returned, one of the following
// actions must be performed:
//
// 1. Call Close on the ClientConn.
// 2. Cancel the context provided.
// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
// client-streaming RPC, for instance, might use the helper function
// CloseAndRecv (note that CloseSend does not Recv, therefore is not
// guaranteed to release all resources).
// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
// 1. Call Close on the ClientConn.
// 2. Cancel the context provided.
// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
// client-streaming RPC, for instance, might use the helper function
// CloseAndRecv (note that CloseSend does not Recv, therefore is not
// guaranteed to release all resources).
// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
//
// If none of the above happen, a goroutine and a context will be leaked, and grpc
// will not call the optionally-configured stats handler with a stats.End message.
@ -301,12 +301,13 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
if !cc.dopts.disableRetry {
cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
}
cs.binlog = binarylog.GetMethodLogger(method)
cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */)
if err != nil {
cs.finish(err)
return nil, err
if ml := binarylog.GetMethodLogger(method); ml != nil {
cs.binlogs = append(cs.binlogs, ml)
}
if cc.dopts.binaryLogger != nil {
if ml := cc.dopts.binaryLogger.GetMethodLogger(method); ml != nil {
cs.binlogs = append(cs.binlogs, ml)
}
}
// Pick the transport to use and create a new stream on the transport.
@ -328,7 +329,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
return nil, err
}
if cs.binlog != nil {
if len(cs.binlogs) != 0 {
md, _ := metadata.FromOutgoingContext(ctx)
logEntry := &binarylog.ClientHeader{
OnClientSide: true,
@ -342,7 +343,9 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
logEntry.Timeout = 0
}
}
cs.binlog.Log(logEntry)
for _, binlog := range cs.binlogs {
binlog.Log(logEntry)
}
}
if desc != unaryStreamDesc {
@ -374,9 +377,9 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
method := cs.callHdr.Method
sh := cs.cc.dopts.copts.StatsHandler
var beginTime time.Time
if sh != nil {
shs := cs.cc.dopts.copts.StatsHandlers
for _, sh := range shs {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: cs.callInfo.failFast})
beginTime = time.Now()
begin := &stats.Begin{
@ -414,12 +417,12 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
}
return &csAttempt{
ctx: ctx,
beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandler: sh,
trInfo: trInfo,
ctx: ctx,
beginTime: beginTime,
cs: cs,
dc: cs.cc.dopts.dc,
statsHandlers: shs,
trInfo: trInfo,
}, nil
}
@ -486,7 +489,7 @@ type clientStream struct {
retryThrottler *retryThrottler // The throttler active when the RPC began.
binlog binarylog.MethodLogger // Binary logger, can be nil.
binlogs []binarylog.MethodLogger
// serverHeaderBinlogged is a boolean for whether server header has been
// logged. Server header will be logged when the first time one of those
// happens: stream.Header(), stream.Recv().
@ -536,8 +539,8 @@ type csAttempt struct {
// and cleared when the finish method is called.
trInfo *traceInfo
statsHandler stats.Handler
beginTime time.Time
statsHandlers []stats.Handler
beginTime time.Time
// set for newStream errors that may be transparently retried
allowTransparentRetry bool
@ -704,6 +707,18 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
// already be status errors.
return toRPCErr(op(cs.attempt))
}
if len(cs.buffer) == 0 {
// For the first op, which controls creation of the stream and
// assigns cs.attempt, we need to create a new attempt inline
// before executing the first op. On subsequent ops, the attempt
// is created immediately before replaying the ops.
var err error
if cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */); err != nil {
cs.mu.Unlock()
cs.finish(err)
return err
}
}
a := cs.attempt
cs.mu.Unlock()
err := op(a)
@ -738,7 +753,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {
cs.finish(err)
return nil, err
}
if cs.binlog != nil && !cs.serverHeaderBinlogged {
if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
// Only log if binary log is on and header has not been logged.
logEntry := &binarylog.ServerHeader{
OnClientSide: true,
@ -748,8 +763,10 @@ func (cs *clientStream) Header() (metadata.MD, error) {
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
cs.binlog.Log(logEntry)
cs.serverHeaderBinlogged = true
for _, binlog := range cs.binlogs {
binlog.Log(logEntry)
}
}
return m, nil
}
@ -823,38 +840,44 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
return a.sendMsg(m, hdr, payload, data)
}
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ClientMessage{
if len(cs.binlogs) != 0 && err == nil {
cm := &binarylog.ClientMessage{
OnClientSide: true,
Message: data,
})
}
for _, binlog := range cs.binlogs {
binlog.Log(cm)
}
}
return err
}
func (cs *clientStream) RecvMsg(m interface{}) error {
if cs.binlog != nil && !cs.serverHeaderBinlogged {
if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged {
// Call Header() to binary log header if it's not already logged.
cs.Header()
}
var recvInfo *payloadInfo
if cs.binlog != nil {
if len(cs.binlogs) != 0 {
recvInfo = &payloadInfo{}
}
err := cs.withRetry(func(a *csAttempt) error {
return a.recvMsg(m, recvInfo)
}, cs.commitAttemptLocked)
if cs.binlog != nil && err == nil {
cs.binlog.Log(&binarylog.ServerMessage{
if len(cs.binlogs) != 0 && err == nil {
sm := &binarylog.ServerMessage{
OnClientSide: true,
Message: recvInfo.uncompressedBytes,
})
}
for _, binlog := range cs.binlogs {
binlog.Log(sm)
}
}
if err != nil || !cs.desc.ServerStreams {
// err != nil or non-server-streaming indicates end of stream.
cs.finish(err)
if cs.binlog != nil {
if len(cs.binlogs) != 0 {
// finish will not log Trailer. Log Trailer here.
logEntry := &binarylog.ServerTrailer{
OnClientSide: true,
@ -867,7 +890,9 @@ func (cs *clientStream) RecvMsg(m interface{}) error {
if peer, ok := peer.FromContext(cs.Context()); ok {
logEntry.PeerAddr = peer.Addr
}
cs.binlog.Log(logEntry)
for _, binlog := range cs.binlogs {
binlog.Log(logEntry)
}
}
}
return err
@ -888,10 +913,13 @@ func (cs *clientStream) CloseSend() error {
return nil
}
cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
if cs.binlog != nil {
cs.binlog.Log(&binarylog.ClientHalfClose{
if len(cs.binlogs) != 0 {
chc := &binarylog.ClientHalfClose{
OnClientSide: true,
})
}
for _, binlog := range cs.binlogs {
binlog.Log(chc)
}
}
// We never returned an error here for reasons.
return nil
@ -924,10 +952,13 @@ func (cs *clientStream) finish(err error) {
//
// Only one of cancel or trailer needs to be logged. In the cases where
// users don't call RecvMsg, users must have already canceled the RPC.
if cs.binlog != nil && status.Code(err) == codes.Canceled {
cs.binlog.Log(&binarylog.Cancel{
if len(cs.binlogs) != 0 && status.Code(err) == codes.Canceled {
c := &binarylog.Cancel{
OnClientSide: true,
})
}
for _, binlog := range cs.binlogs {
binlog.Log(c)
}
}
if err == nil {
cs.retryThrottler.successfulRPC()
@ -960,8 +991,8 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
}
return io.EOF
}
if a.statsHandler != nil {
a.statsHandler.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
for _, sh := range a.statsHandlers {
sh.HandleRPC(a.ctx, outPayload(true, m, data, payld, time.Now()))
}
if channelz.IsOn() {
a.t.IncrMsgSent()
@ -971,7 +1002,7 @@ func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
cs := a.cs
if a.statsHandler != nil && payInfo == nil {
if len(a.statsHandlers) != 0 && payInfo == nil {
payInfo = &payloadInfo{}
}
@ -999,6 +1030,7 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
}
return io.EOF // indicates successful end of stream.
}
return toRPCErr(err)
}
if a.trInfo != nil {
@ -1008,8 +1040,8 @@ func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
}
a.mu.Unlock()
}
if a.statsHandler != nil {
a.statsHandler.HandleRPC(a.ctx, &stats.InPayload{
for _, sh := range a.statsHandlers {
sh.HandleRPC(a.ctx, &stats.InPayload{
Client: true,
RecvTime: time.Now(),
Payload: m,
@ -1068,7 +1100,7 @@ func (a *csAttempt) finish(err error) {
ServerLoad: balancerload.Parse(tr),
})
}
if a.statsHandler != nil {
for _, sh := range a.statsHandlers {
end := &stats.End{
Client: true,
BeginTime: a.beginTime,
@ -1076,7 +1108,7 @@ func (a *csAttempt) finish(err error) {
Trailer: tr,
Error: err,
}
a.statsHandler.HandleRPC(a.ctx, end)
sh.HandleRPC(a.ctx, end)
}
if a.trInfo != nil && a.trInfo.tr != nil {
if err == nil {
@ -1445,9 +1477,9 @@ type serverStream struct {
maxSendMessageSize int
trInfo *traceInfo
statsHandler stats.Handler
statsHandler []stats.Handler
binlog binarylog.MethodLogger
binlogs []binarylog.MethodLogger
// serverHeaderBinlogged indicates whether server header has been logged. It
// will happen when one of the following two happens: stream.SendHeader(),
// stream.Send().
@ -1481,12 +1513,15 @@ func (ss *serverStream) SendHeader(md metadata.MD) error {
}
err = ss.t.WriteHeader(ss.s, md)
if ss.binlog != nil && !ss.serverHeaderBinlogged {
if len(ss.binlogs) != 0 && !ss.serverHeaderBinlogged {
h, _ := ss.s.Header()
ss.binlog.Log(&binarylog.ServerHeader{
sh := &binarylog.ServerHeader{
Header: h,
})
}
ss.serverHeaderBinlogged = true
for _, binlog := range ss.binlogs {
binlog.Log(sh)
}
}
return err
}
@ -1543,20 +1578,28 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
}
if ss.binlog != nil {
if len(ss.binlogs) != 0 {
if !ss.serverHeaderBinlogged {
h, _ := ss.s.Header()
ss.binlog.Log(&binarylog.ServerHeader{
sh := &binarylog.ServerHeader{
Header: h,
})
}
ss.serverHeaderBinlogged = true
for _, binlog := range ss.binlogs {
binlog.Log(sh)
}
}
ss.binlog.Log(&binarylog.ServerMessage{
sm := &binarylog.ServerMessage{
Message: data,
})
}
for _, binlog := range ss.binlogs {
binlog.Log(sm)
}
}
if ss.statsHandler != nil {
ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
if len(ss.statsHandler) != 0 {
for _, sh := range ss.statsHandler {
sh.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
}
}
return nil
}
@ -1590,13 +1633,16 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
}()
var payInfo *payloadInfo
if ss.statsHandler != nil || ss.binlog != nil {
if len(ss.statsHandler) != 0 || len(ss.binlogs) != 0 {
payInfo = &payloadInfo{}
}
if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
if err == io.EOF {
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ClientHalfClose{})
if len(ss.binlogs) != 0 {
chc := &binarylog.ClientHalfClose{}
for _, binlog := range ss.binlogs {
binlog.Log(chc)
}
}
return err
}
@ -1605,20 +1651,25 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
}
return toRPCErr(err)
}
if ss.statsHandler != nil {
ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: m,
// TODO truncate large payload.
Data: payInfo.uncompressedBytes,
WireLength: payInfo.wireLength + headerLen,
Length: len(payInfo.uncompressedBytes),
})
if len(ss.statsHandler) != 0 {
for _, sh := range ss.statsHandler {
sh.HandleRPC(ss.s.Context(), &stats.InPayload{
RecvTime: time.Now(),
Payload: m,
// TODO truncate large payload.
Data: payInfo.uncompressedBytes,
WireLength: payInfo.wireLength + headerLen,
Length: len(payInfo.uncompressedBytes),
})
}
}
if ss.binlog != nil {
ss.binlog.Log(&binarylog.ClientMessage{
if len(ss.binlogs) != 0 {
cm := &binarylog.ClientMessage{
Message: payInfo.uncompressedBytes,
})
}
for _, binlog := range ss.binlogs {
binlog.Log(cm)
}
}
return nil
}