mirror of
https://gitea.com/Lydanne/buildx.git
synced 2025-07-09 21:17:09 +08:00
build: add experimental support for print flag
Print flag can be used to make additional information requests about the build and print their results. Currently Dockerfile supports: outline, targets, subrequests.describe Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
227
vendor/google.golang.org/grpc/stream.go
generated
vendored
227
vendor/google.golang.org/grpc/stream.go
generated
vendored
@ -36,6 +36,7 @@ import (
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
imetadata "google.golang.org/grpc/internal/metadata"
|
||||
iresolver "google.golang.org/grpc/internal/resolver"
|
||||
"google.golang.org/grpc/internal/serviceconfig"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
@ -166,6 +167,11 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
}
|
||||
|
||||
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
|
||||
if md, _, ok := metadata.FromOutgoingContextRaw(ctx); ok {
|
||||
if err := imetadata.Validate(md); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
}
|
||||
if channelz.IsOn() {
|
||||
cc.incrCallsStarted()
|
||||
defer func() {
|
||||
@ -297,14 +303,28 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
|
||||
}
|
||||
cs.binlog = binarylog.GetMethodLogger(method)
|
||||
|
||||
if err := cs.newAttemptLocked(false /* isTransparent */); err != nil {
|
||||
cs.attempt, err = cs.newAttemptLocked(false /* isTransparent */)
|
||||
if err != nil {
|
||||
cs.finish(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
op := func(a *csAttempt) error { return a.newStream() }
|
||||
// Pick the transport to use and create a new stream on the transport.
|
||||
// Assign cs.attempt upon success.
|
||||
op := func(a *csAttempt) error {
|
||||
if err := a.getTransport(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := a.newStream(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Because this operation is always called either here (while creating
|
||||
// the clientStream) or by the retry code while locked when replaying
|
||||
// the operation, it is safe to access cs.attempt directly.
|
||||
cs.attempt = a
|
||||
return nil
|
||||
}
|
||||
if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
|
||||
cs.finish(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -343,9 +363,15 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
|
||||
return cs, nil
|
||||
}
|
||||
|
||||
// newAttemptLocked creates a new attempt with a transport.
|
||||
// If it succeeds, then it replaces clientStream's attempt with this new attempt.
|
||||
func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
|
||||
// newAttemptLocked creates a new csAttempt without a transport or stream.
|
||||
func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error) {
|
||||
if err := cs.ctx.Err(); err != nil {
|
||||
return nil, toRPCErr(err)
|
||||
}
|
||||
if err := cs.cc.ctx.Err(); err != nil {
|
||||
return nil, ErrClientConnClosing
|
||||
}
|
||||
|
||||
ctx := newContextWithRPCInfo(cs.ctx, cs.callInfo.failFast, cs.callInfo.codec, cs.cp, cs.comp)
|
||||
method := cs.callHdr.Method
|
||||
sh := cs.cc.dopts.copts.StatsHandler
|
||||
@ -379,27 +405,6 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
|
||||
ctx = trace.NewContext(ctx, trInfo.tr)
|
||||
}
|
||||
|
||||
newAttempt := &csAttempt{
|
||||
ctx: ctx,
|
||||
beginTime: beginTime,
|
||||
cs: cs,
|
||||
dc: cs.cc.dopts.dc,
|
||||
statsHandler: sh,
|
||||
trInfo: trInfo,
|
||||
}
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
// This attempt is not set in the clientStream, so it's finish won't
|
||||
// be called. Call it here for stats and trace in case they are not
|
||||
// nil.
|
||||
newAttempt.finish(retErr)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return toRPCErr(err)
|
||||
}
|
||||
|
||||
if cs.cc.parsedTarget.Scheme == "xds" {
|
||||
// Add extra metadata (metadata that will be added by transport) to context
|
||||
// so the balancer can see them.
|
||||
@ -407,16 +412,32 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (retErr error) {
|
||||
"content-type", grpcutil.ContentType(cs.callHdr.ContentSubtype),
|
||||
))
|
||||
}
|
||||
t, done, err := cs.cc.getTransport(ctx, cs.callInfo.failFast, cs.callHdr.Method)
|
||||
|
||||
return &csAttempt{
|
||||
ctx: ctx,
|
||||
beginTime: beginTime,
|
||||
cs: cs,
|
||||
dc: cs.cc.dopts.dc,
|
||||
statsHandler: sh,
|
||||
trInfo: trInfo,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (a *csAttempt) getTransport() error {
|
||||
cs := a.cs
|
||||
|
||||
var err error
|
||||
a.t, a.done, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
|
||||
if err != nil {
|
||||
if de, ok := err.(dropError); ok {
|
||||
err = de.error
|
||||
a.drop = true
|
||||
}
|
||||
return err
|
||||
}
|
||||
if trInfo != nil {
|
||||
trInfo.firstLine.SetRemoteAddr(t.RemoteAddr())
|
||||
if a.trInfo != nil {
|
||||
a.trInfo.firstLine.SetRemoteAddr(a.t.RemoteAddr())
|
||||
}
|
||||
newAttempt.t = t
|
||||
newAttempt.done = done
|
||||
cs.attempt = newAttempt
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -425,12 +446,21 @@ func (a *csAttempt) newStream() error {
|
||||
cs.callHdr.PreviousAttempts = cs.numRetries
|
||||
s, err := a.t.NewStream(a.ctx, cs.callHdr)
|
||||
if err != nil {
|
||||
// Return without converting to an RPC error so retry code can
|
||||
// inspect.
|
||||
return err
|
||||
nse, ok := err.(*transport.NewStreamError)
|
||||
if !ok {
|
||||
// Unexpected.
|
||||
return err
|
||||
}
|
||||
|
||||
if nse.AllowTransparentRetry {
|
||||
a.allowTransparentRetry = true
|
||||
}
|
||||
|
||||
// Unwrap and convert error.
|
||||
return toRPCErr(nse.Err)
|
||||
}
|
||||
cs.attempt.s = s
|
||||
cs.attempt.p = &parser{r: s}
|
||||
a.s = s
|
||||
a.p = &parser{r: s}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -456,7 +486,7 @@ type clientStream struct {
|
||||
|
||||
retryThrottler *retryThrottler // The throttler active when the RPC began.
|
||||
|
||||
binlog *binarylog.MethodLogger // Binary logger, can be nil.
|
||||
binlog binarylog.MethodLogger // Binary logger, can be nil.
|
||||
// serverHeaderBinlogged is a boolean for whether server header has been
|
||||
// logged. Server header will be logged when the first time one of those
|
||||
// happens: stream.Header(), stream.Recv().
|
||||
@ -508,6 +538,11 @@ type csAttempt struct {
|
||||
|
||||
statsHandler stats.Handler
|
||||
beginTime time.Time
|
||||
|
||||
// set for newStream errors that may be transparently retried
|
||||
allowTransparentRetry bool
|
||||
// set for pick errors that are returned as a status
|
||||
drop bool
|
||||
}
|
||||
|
||||
func (cs *clientStream) commitAttemptLocked() {
|
||||
@ -527,41 +562,21 @@ func (cs *clientStream) commitAttempt() {
|
||||
// shouldRetry returns nil if the RPC should be retried; otherwise it returns
|
||||
// the error that should be returned by the operation. If the RPC should be
|
||||
// retried, the bool indicates whether it is being retried transparently.
|
||||
func (cs *clientStream) shouldRetry(err error) (bool, error) {
|
||||
if cs.attempt.s == nil {
|
||||
// Error from NewClientStream.
|
||||
nse, ok := err.(*transport.NewStreamError)
|
||||
if !ok {
|
||||
// Unexpected, but assume no I/O was performed and the RPC is not
|
||||
// fatal, so retry indefinitely.
|
||||
return true, nil
|
||||
}
|
||||
func (a *csAttempt) shouldRetry(err error) (bool, error) {
|
||||
cs := a.cs
|
||||
|
||||
// Unwrap and convert error.
|
||||
err = toRPCErr(nse.Err)
|
||||
|
||||
// Never retry DoNotRetry errors, which indicate the RPC should not be
|
||||
// retried due to max header list size violation, etc.
|
||||
if nse.DoNotRetry {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// In the event of a non-IO operation error from NewStream, we never
|
||||
// attempted to write anything to the wire, so we can retry
|
||||
// indefinitely.
|
||||
if !nse.DoNotTransparentRetry {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
if cs.finished || cs.committed {
|
||||
// RPC is finished or committed; cannot retry.
|
||||
if cs.finished || cs.committed || a.drop {
|
||||
// RPC is finished or committed or was dropped by the picker; cannot retry.
|
||||
return false, err
|
||||
}
|
||||
if a.s == nil && a.allowTransparentRetry {
|
||||
return true, nil
|
||||
}
|
||||
// Wait for the trailers.
|
||||
unprocessed := false
|
||||
if cs.attempt.s != nil {
|
||||
<-cs.attempt.s.Done()
|
||||
unprocessed = cs.attempt.s.Unprocessed()
|
||||
if a.s != nil {
|
||||
<-a.s.Done()
|
||||
unprocessed = a.s.Unprocessed()
|
||||
}
|
||||
if cs.firstAttempt && unprocessed {
|
||||
// First attempt, stream unprocessed: transparently retry.
|
||||
@ -573,14 +588,14 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
|
||||
|
||||
pushback := 0
|
||||
hasPushback := false
|
||||
if cs.attempt.s != nil {
|
||||
if !cs.attempt.s.TrailersOnly() {
|
||||
if a.s != nil {
|
||||
if !a.s.TrailersOnly() {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// TODO(retry): Move down if the spec changes to not check server pushback
|
||||
// before considering this a failure for throttling.
|
||||
sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
|
||||
sps := a.s.Trailer()["grpc-retry-pushback-ms"]
|
||||
if len(sps) == 1 {
|
||||
var e error
|
||||
if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
|
||||
@ -597,10 +612,10 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
|
||||
}
|
||||
|
||||
var code codes.Code
|
||||
if cs.attempt.s != nil {
|
||||
code = cs.attempt.s.Status().Code()
|
||||
if a.s != nil {
|
||||
code = a.s.Status().Code()
|
||||
} else {
|
||||
code = status.Convert(err).Code()
|
||||
code = status.Code(err)
|
||||
}
|
||||
|
||||
rp := cs.methodConfig.RetryPolicy
|
||||
@ -645,19 +660,24 @@ func (cs *clientStream) shouldRetry(err error) (bool, error) {
|
||||
}
|
||||
|
||||
// Returns nil if a retry was performed and succeeded; error otherwise.
|
||||
func (cs *clientStream) retryLocked(lastErr error) error {
|
||||
func (cs *clientStream) retryLocked(attempt *csAttempt, lastErr error) error {
|
||||
for {
|
||||
cs.attempt.finish(toRPCErr(lastErr))
|
||||
isTransparent, err := cs.shouldRetry(lastErr)
|
||||
attempt.finish(toRPCErr(lastErr))
|
||||
isTransparent, err := attempt.shouldRetry(lastErr)
|
||||
if err != nil {
|
||||
cs.commitAttemptLocked()
|
||||
return err
|
||||
}
|
||||
cs.firstAttempt = false
|
||||
if err := cs.newAttemptLocked(isTransparent); err != nil {
|
||||
attempt, err = cs.newAttemptLocked(isTransparent)
|
||||
if err != nil {
|
||||
// Only returns error if the clientconn is closed or the context of
|
||||
// the stream is canceled.
|
||||
return err
|
||||
}
|
||||
if lastErr = cs.replayBufferLocked(); lastErr == nil {
|
||||
// Note that the first op in the replay buffer always sets cs.attempt
|
||||
// if it is able to pick a transport and create a stream.
|
||||
if lastErr = cs.replayBufferLocked(attempt); lastErr == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -667,7 +687,10 @@ func (cs *clientStream) Context() context.Context {
|
||||
cs.commitAttempt()
|
||||
// No need to lock before using attempt, since we know it is committed and
|
||||
// cannot change.
|
||||
return cs.attempt.s.Context()
|
||||
if cs.attempt.s != nil {
|
||||
return cs.attempt.s.Context()
|
||||
}
|
||||
return cs.ctx
|
||||
}
|
||||
|
||||
func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
|
||||
@ -697,7 +720,7 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
|
||||
cs.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
if err := cs.retryLocked(err); err != nil {
|
||||
if err := cs.retryLocked(a, err); err != nil {
|
||||
cs.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
@ -728,7 +751,7 @@ func (cs *clientStream) Header() (metadata.MD, error) {
|
||||
cs.binlog.Log(logEntry)
|
||||
cs.serverHeaderBinlogged = true
|
||||
}
|
||||
return m, err
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (cs *clientStream) Trailer() metadata.MD {
|
||||
@ -746,10 +769,9 @@ func (cs *clientStream) Trailer() metadata.MD {
|
||||
return cs.attempt.s.Trailer()
|
||||
}
|
||||
|
||||
func (cs *clientStream) replayBufferLocked() error {
|
||||
a := cs.attempt
|
||||
func (cs *clientStream) replayBufferLocked(attempt *csAttempt) error {
|
||||
for _, f := range cs.buffer {
|
||||
if err := f(a); err != nil {
|
||||
if err := f(attempt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -797,22 +819,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
||||
if len(payload) > *cs.callInfo.maxSendMessageSize {
|
||||
return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
|
||||
}
|
||||
msgBytes := data // Store the pointer before setting to nil. For binary logging.
|
||||
op := func(a *csAttempt) error {
|
||||
err := a.sendMsg(m, hdr, payload, data)
|
||||
// nil out the message and uncomp when replaying; they are only needed for
|
||||
// stats which is disabled for subsequent attempts.
|
||||
m, data = nil, nil
|
||||
return err
|
||||
return a.sendMsg(m, hdr, payload, data)
|
||||
}
|
||||
err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
|
||||
if cs.binlog != nil && err == nil {
|
||||
cs.binlog.Log(&binarylog.ClientMessage{
|
||||
OnClientSide: true,
|
||||
Message: msgBytes,
|
||||
Message: data,
|
||||
})
|
||||
}
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
func (cs *clientStream) RecvMsg(m interface{}) error {
|
||||
@ -1364,8 +1381,10 @@ func (as *addrConnStream) finish(err error) {
|
||||
|
||||
// ServerStream defines the server-side behavior of a streaming RPC.
|
||||
//
|
||||
// All errors returned from ServerStream methods are compatible with the
|
||||
// status package.
|
||||
// Errors returned from ServerStream methods are compatible with the status
|
||||
// package. However, the status code will often not match the RPC status as
|
||||
// seen by the client application, and therefore, should not be relied upon for
|
||||
// this purpose.
|
||||
type ServerStream interface {
|
||||
// SetHeader sets the header metadata. It may be called multiple times.
|
||||
// When call multiple times, all the provided metadata will be merged.
|
||||
@ -1428,7 +1447,7 @@ type serverStream struct {
|
||||
|
||||
statsHandler stats.Handler
|
||||
|
||||
binlog *binarylog.MethodLogger
|
||||
binlog binarylog.MethodLogger
|
||||
// serverHeaderBinlogged indicates whether server header has been logged. It
|
||||
// will happen when one of the following two happens: stream.SendHeader(),
|
||||
// stream.Send().
|
||||
@ -1448,11 +1467,20 @@ func (ss *serverStream) SetHeader(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
err := imetadata.Validate(md)
|
||||
if err != nil {
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
return ss.s.SetHeader(md)
|
||||
}
|
||||
|
||||
func (ss *serverStream) SendHeader(md metadata.MD) error {
|
||||
err := ss.t.WriteHeader(ss.s, md)
|
||||
err := imetadata.Validate(md)
|
||||
if err != nil {
|
||||
return status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
err = ss.t.WriteHeader(ss.s, md)
|
||||
if ss.binlog != nil && !ss.serverHeaderBinlogged {
|
||||
h, _ := ss.s.Header()
|
||||
ss.binlog.Log(&binarylog.ServerHeader{
|
||||
@ -1467,6 +1495,9 @@ func (ss *serverStream) SetTrailer(md metadata.MD) {
|
||||
if md.Len() == 0 {
|
||||
return
|
||||
}
|
||||
if err := imetadata.Validate(md); err != nil {
|
||||
logger.Errorf("stream: failed to validate md when setting trailer, err: %v", err)
|
||||
}
|
||||
ss.s.SetTrailer(md)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user