vendor: update buildkit to v0.16.0-rc2

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi
2024-09-09 16:29:01 -07:00
parent 9cfa25ab40
commit 7213b2a814
37 changed files with 583 additions and 167 deletions

View File

@ -535,8 +535,8 @@ const minBatchSize = 1000
// size is too low to give stream goroutines a chance to fill it up.
//
// Upon exiting, if the error causing the exit is not an I/O error, run()
// flushes and closes the underlying connection. Otherwise, the connection is
// left open to allow the I/O error to be encountered by the reader instead.
// flushes the underlying connection. The connection is always left open to
// allow different closing behavior on the client and server.
func (l *loopyWriter) run() (err error) {
defer func() {
if l.logger.V(logLevel) {
@ -544,7 +544,6 @@ func (l *loopyWriter) run() (err error) {
}
if !isIOError(err) {
l.framer.writer.Flush()
l.conn.Close()
}
l.cbuf.finish()
}()

View File

@ -35,7 +35,6 @@ import (
"sync"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@ -45,6 +44,7 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// NewServerHandlerTransport returns a ServerTransport handling gRPC from

View File

@ -59,6 +59,8 @@ import (
// atomically.
var clientConnectionCounter uint64
var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
@ -449,7 +451,13 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.run()
if err := t.loopy.run(); !isIOError(err) {
// Immediately close the connection, as the loopy writer returns
// when there are no more active streams and we were draining (the
// server sent a GOAWAY). For I/O errors, the reader will hit it
// after draining any remaining incoming data.
t.conn.Close()
}
close(t.writerDone)
}()
return t, nil
@ -568,7 +576,7 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
}
if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
var k string
for k, vv := range md {
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
@ -1323,10 +1331,8 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
for streamID, stream := range t.activeStreams {
if streamID > id && streamID <= upperLimit {
// The stream was unprocessed by the server.
if streamID > id && streamID <= upperLimit {
atomic.StoreUint32(&stream.unprocessed, 1)
streamsToClose = append(streamsToClose, stream)
}
atomic.StoreUint32(&stream.unprocessed, 1)
streamsToClose = append(streamsToClose, stream)
}
}
t.mu.Unlock()

View File

@ -32,13 +32,13 @@ import (
"sync/atomic"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@ -322,8 +322,24 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
t.loopy.run()
err := t.loopy.run()
close(t.loopyWriterDone)
if !isIOError(err) {
// Close the connection if a non-I/O error occurs (for I/O errors
// the reader will also encounter the error and close). Wait 1
// second before closing the connection, or when the reader is done
// (i.e. the client already closed the connection or a connection
// error occurred). This avoids the potential problem where there
// is unread data on the receive side of the connection, which, if
// closed, would lead to a TCP RST instead of FIN, and the client
// encountering errors. For more info:
// https://github.com/grpc/grpc-go/issues/5358
select {
case <-t.readerDone:
case <-time.After(time.Second):
}
t.conn.Close()
}
}()
go t.keepalive()
return t, nil
@ -609,8 +625,8 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
defer func() {
<-t.loopyWriterDone
close(t.readerDone)
<-t.loopyWriterDone
}()
for {
t.controlBuf.throttle()
@ -636,10 +652,6 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
}
continue
}
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close(err)
return
}
t.Close(err)
return
}
@ -960,7 +972,12 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
}
}
if err := t.writeHeaderLocked(s); err != nil {
return status.Convert(err).Err()
switch e := err.(type) {
case ConnectionError:
return status.Error(codes.Unavailable, e.Desc)
default:
return status.Convert(err).Err()
}
}
return nil
}
@ -1324,6 +1341,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
return false, err
}
t.framer.writer.Flush()
if retErr != nil {
return false, retErr
}
@ -1344,7 +1362,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, err
}
go func() {
timer := time.NewTimer(time.Minute)
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
select {
case <-t.drainEvent.Done():