vendor: github.com/moby/buildkit 25bec7145b39 (v0.14.0-dev)

Signed-off-by: CrazyMax <1951866+crazy-max@users.noreply.github.com>
This commit is contained in:
CrazyMax
2024-03-28 17:43:43 +01:00
parent 8abef59087
commit de5efcb03b
31 changed files with 402 additions and 232 deletions

View File

@ -31,9 +31,6 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// maxResets is the no.of times the Copy() method can tolerate a reset of the body
const maxResets = 5
var ErrReset = errors.New("writer has been reset")
var bufPool = sync.Pool{
@ -160,7 +157,7 @@ func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected dig
}
}
for i := 0; i < maxResets; i++ {
for i := 0; ; i++ {
if i >= 1 {
log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset")
}
@ -201,9 +198,6 @@ func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected dig
}
return nil
}
log.G(ctx).WithField("digest", expected).Errorf("failed to copy after %d retries", maxResets)
return fmt.Errorf("failed to copy after %d retries", maxResets)
}
// CopyReaderAt copies to a writer from a given reader at for the given

View File

@ -284,7 +284,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
req.body = func() (io.ReadCloser, error) {
pr, pw := io.Pipe()
pushw.setPipe(pw)
return io.NopCloser(pr), nil
return pr, nil
}
req.size = desc.Size
@ -292,7 +292,6 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
resp, err := req.doWithRetries(ctx, nil)
if err != nil {
pushw.setError(err)
pushw.Close()
return
}
@ -302,7 +301,7 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str
err := remoteserrors.NewUnexpectedStatusErr(resp)
log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response")
pushw.setError(err)
pushw.Close()
return
}
pushw.setResponse(resp)
}()
@ -335,10 +334,12 @@ type pushWriter struct {
pipe *io.PipeWriter
pipeC chan *io.PipeWriter
respC chan *http.Response
done chan struct{}
closeOnce sync.Once
errC chan error
pipeC chan *io.PipeWriter
respC chan *http.Response
errC chan error
isManifest bool
@ -356,19 +357,51 @@ func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker S
pipeC: make(chan *io.PipeWriter, 1),
respC: make(chan *http.Response, 1),
errC: make(chan error, 1),
done: make(chan struct{}),
isManifest: isManifest,
}
}
func (pw *pushWriter) setPipe(p *io.PipeWriter) {
pw.pipeC <- p
select {
case <-pw.done:
case pw.pipeC <- p:
}
}
func (pw *pushWriter) setError(err error) {
pw.errC <- err
select {
case <-pw.done:
case pw.errC <- err:
}
}
func (pw *pushWriter) setResponse(resp *http.Response) {
pw.respC <- resp
select {
case <-pw.done:
case pw.respC <- resp:
}
}
func (pw *pushWriter) replacePipe(p *io.PipeWriter) error {
if pw.pipe == nil {
pw.pipe = p
return nil
}
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p
// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return content.ErrReset
}
func (pw *pushWriter) Write(p []byte) (n int, err error) {
@ -378,26 +411,18 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
}
if pw.pipe == nil {
p, ok := <-pw.pipeC
if !ok {
select {
case <-pw.done:
return 0, io.ErrClosedPipe
case p := <-pw.pipeC:
pw.replacePipe(p)
}
pw.pipe = p
} else {
select {
case p, ok := <-pw.pipeC:
if !ok {
return 0, io.ErrClosedPipe
}
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p
// If content has already been written, the bytes
// cannot be written and the caller must reset
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return 0, content.ErrReset
case <-pw.done:
return 0, io.ErrClosedPipe
case p := <-pw.pipeC:
return 0, pw.replacePipe(p)
default:
}
}
@ -407,9 +432,13 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) {
// if the pipe is closed, we might have the original error on the error
// channel - so we should try and get it
select {
case err2 := <-pw.errC:
err = err2
default:
case <-pw.done:
case err = <-pw.errC:
pw.Close()
case p := <-pw.pipeC:
return 0, pw.replacePipe(p)
case resp := <-pw.respC:
pw.setResponse(resp)
}
}
status.Offset += int64(n)
@ -422,7 +451,7 @@ func (pw *pushWriter) Close() error {
// Ensure pipeC is closed but handle `Close()` being
// called multiple times without panicking
pw.closeOnce.Do(func() {
close(pw.pipeC)
close(pw.done)
})
if pw.pipe != nil {
status, err := pw.tracker.GetStatus(pw.ref)
@ -462,30 +491,18 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di
// TODO: timeout waiting for response
var resp *http.Response
select {
case <-pw.done:
return io.ErrClosedPipe
case err := <-pw.errC:
pw.Close()
return err
case resp = <-pw.respC:
defer resp.Body.Close()
case p, ok := <-pw.pipeC:
case p := <-pw.pipeC:
// check whether the pipe has changed in the commit, because sometimes Write
// can complete successfully, but the pipe may have changed. In that case, the
// content needs to be reset.
if !ok {
return io.ErrClosedPipe
}
pw.pipe.CloseWithError(content.ErrReset)
pw.pipe = p
// If content has already been written, the bytes
// cannot be written again and the caller must reset
status, err := pw.tracker.GetStatus(pw.ref)
if err != nil {
return err
}
status.Offset = 0
status.UpdatedAt = time.Now()
pw.tracker.SetStatus(pw.ref, status)
return content.ErrReset
return pw.replacePipe(p)
}
// 201 is specified return status, some registries return

View File

@ -23,7 +23,7 @@ var (
Package = "github.com/containerd/containerd"
// Version holds the complete version number. Filled in at linking time.
Version = "1.7.13+unknown"
Version = "1.7.14+unknown"
// Revision is filled with the VCS (e.g. git) revision being used to build
// the program at linking time.

View File

@ -71,6 +71,42 @@ func WithUnaryClientInterceptor(i UnaryClientInterceptor) ClientOpts {
}
}
// WithChainUnaryClientInterceptor sets the provided chain of client interceptors
func WithChainUnaryClientInterceptor(interceptors ...UnaryClientInterceptor) ClientOpts {
return func(c *Client) {
if len(interceptors) == 0 {
return
}
if c.interceptor != nil {
interceptors = append([]UnaryClientInterceptor{c.interceptor}, interceptors...)
}
c.interceptor = func(
ctx context.Context,
req *Request,
reply *Response,
info *UnaryClientInfo,
final Invoker,
) error {
return interceptors[0](ctx, req, reply, info,
chainUnaryInterceptors(interceptors[1:], final, info))
}
}
}
func chainUnaryInterceptors(interceptors []UnaryClientInterceptor, final Invoker, info *UnaryClientInfo) Invoker {
if len(interceptors) == 0 {
return final
}
return func(
ctx context.Context,
req *Request,
reply *Response,
) error {
return interceptors[0](ctx, req, reply, info,
chainUnaryInterceptors(interceptors[1:], final, info))
}
}
// NewClient creates a new ttrpc client using the given connection
func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx, cancel := context.WithCancel(context.Background())
@ -85,13 +121,16 @@ func NewClient(conn net.Conn, opts ...ClientOpts) *Client {
ctx: ctx,
userCloseFunc: func() {},
userCloseWaitCh: make(chan struct{}),
interceptor: defaultClientInterceptor,
}
for _, o := range opts {
o(c)
}
if c.interceptor == nil {
c.interceptor = defaultClientInterceptor
}
go c.run()
return c
}
@ -286,7 +325,7 @@ func (c *Client) Close() error {
return nil
}
// UserOnCloseWait is used to blocks untils the user's on-close callback
// UserOnCloseWait is used to block until the user's on-close callback
// finishes.
func (c *Client) UserOnCloseWait(ctx context.Context) error {
select {

View File

@ -16,7 +16,10 @@
package ttrpc
import "errors"
import (
"context"
"errors"
)
type serverConfig struct {
handshaker Handshaker
@ -44,9 +47,40 @@ func WithServerHandshaker(handshaker Handshaker) ServerOpt {
func WithUnaryServerInterceptor(i UnaryServerInterceptor) ServerOpt {
return func(c *serverConfig) error {
if c.interceptor != nil {
return errors.New("only one interceptor allowed per server")
return errors.New("only one unchained interceptor allowed per server")
}
c.interceptor = i
return nil
}
}
// WithChainUnaryServerInterceptor sets the provided chain of server interceptors
func WithChainUnaryServerInterceptor(interceptors ...UnaryServerInterceptor) ServerOpt {
return func(c *serverConfig) error {
if len(interceptors) == 0 {
return nil
}
if c.interceptor != nil {
interceptors = append([]UnaryServerInterceptor{c.interceptor}, interceptors...)
}
c.interceptor = func(
ctx context.Context,
unmarshal Unmarshaler,
info *UnaryServerInfo,
method Method) (interface{}, error) {
return interceptors[0](ctx, unmarshal, info,
chainUnaryServerInterceptors(info, method, interceptors[1:]))
}
return nil
}
}
func chainUnaryServerInterceptors(info *UnaryServerInfo, method Method, interceptors []UnaryServerInterceptor) Method {
if len(interceptors) == 0 {
return method
}
return func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
return interceptors[0](ctx, unmarshal, info,
chainUnaryServerInterceptors(info, method, interceptors[1:]))
}
}

View File

@ -140,7 +140,11 @@ func (s *serviceSet) handle(ctx context.Context, req *Request, respond func(*sta
respond(st, p, stream.StreamingServer, true)
}()
if req.Payload != nil {
// Empty proto messages serialized to 0 payloads,
// so signatures like: rpc Stream(google.protobuf.Empty) returns (stream Data);
// don't get invoked here, which causes hang on client side.
// See https://github.com/containerd/ttrpc/issues/126
if req.Payload != nil || !info.StreamingClient {
unmarshal := func(obj interface{}) error {
return protoUnmarshal(req.Payload, obj)
}