mirror of
https://gitea.com/Lydanne/buildx.git
synced 2025-07-09 21:17:09 +08:00
imagetools inspect: add --format flag
Signed-off-by: CrazyMax <crazy-max@users.noreply.github.com>
This commit is contained in:
156
vendor/github.com/moby/buildkit/util/contentutil/buffer.go
generated
vendored
Normal file
156
vendor/github.com/moby/buildkit/util/contentutil/buffer.go
generated
vendored
Normal file
@ -0,0 +1,156 @@
|
||||
package contentutil
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Buffer is a content provider and ingester that keeps data in memory
|
||||
type Buffer interface {
|
||||
content.Provider
|
||||
content.Ingester
|
||||
}
|
||||
|
||||
// NewBuffer returns a new buffer
|
||||
func NewBuffer() Buffer {
|
||||
return &buffer{
|
||||
buffers: map[digest.Digest][]byte{},
|
||||
refs: map[string]struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
type buffer struct {
|
||||
mu sync.Mutex
|
||||
buffers map[digest.Digest][]byte
|
||||
refs map[string]struct{}
|
||||
}
|
||||
|
||||
func (b *buffer) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
|
||||
var wOpts content.WriterOpts
|
||||
for _, opt := range opts {
|
||||
if err := opt(&wOpts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
b.mu.Lock()
|
||||
if _, ok := b.refs[wOpts.Ref]; ok {
|
||||
return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %s locked", wOpts.Ref)
|
||||
}
|
||||
b.mu.Unlock()
|
||||
return &bufferedWriter{
|
||||
main: b,
|
||||
digester: digest.Canonical.Digester(),
|
||||
buffer: bytes.NewBuffer(nil),
|
||||
expected: wOpts.Desc.Digest,
|
||||
releaseRef: func() {
|
||||
b.mu.Lock()
|
||||
delete(b.refs, wOpts.Ref)
|
||||
b.mu.Unlock()
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (b *buffer) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
|
||||
r, err := b.getBytesReader(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &readerAt{Reader: r, Closer: ioutil.NopCloser(r), size: int64(r.Len())}, nil
|
||||
}
|
||||
|
||||
func (b *buffer) getBytesReader(ctx context.Context, dgst digest.Digest) (*bytes.Reader, error) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if dt, ok := b.buffers[dgst]; ok {
|
||||
return bytes.NewReader(dt), nil
|
||||
}
|
||||
|
||||
return nil, errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
|
||||
}
|
||||
|
||||
func (b *buffer) addValue(k digest.Digest, dt []byte) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
b.buffers[k] = dt
|
||||
}
|
||||
|
||||
type bufferedWriter struct {
|
||||
main *buffer
|
||||
ref string
|
||||
offset int64
|
||||
total int64
|
||||
startedAt time.Time
|
||||
updatedAt time.Time
|
||||
buffer *bytes.Buffer
|
||||
expected digest.Digest
|
||||
digester digest.Digester
|
||||
releaseRef func()
|
||||
}
|
||||
|
||||
func (w *bufferedWriter) Write(p []byte) (n int, err error) {
|
||||
n, err = w.buffer.Write(p)
|
||||
w.digester.Hash().Write(p[:n])
|
||||
w.offset += int64(len(p))
|
||||
w.updatedAt = time.Now()
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (w *bufferedWriter) Close() error {
|
||||
if w.buffer != nil {
|
||||
w.releaseRef()
|
||||
w.buffer = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *bufferedWriter) Status() (content.Status, error) {
|
||||
return content.Status{
|
||||
Ref: w.ref,
|
||||
Offset: w.offset,
|
||||
Total: w.total,
|
||||
StartedAt: w.startedAt,
|
||||
UpdatedAt: w.updatedAt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (w *bufferedWriter) Digest() digest.Digest {
|
||||
return w.digester.Digest()
|
||||
}
|
||||
|
||||
func (w *bufferedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opt ...content.Opt) error {
|
||||
if w.buffer == nil {
|
||||
return errors.Errorf("can't commit already committed or closed")
|
||||
}
|
||||
if s := int64(w.buffer.Len()); size > 0 && size != s {
|
||||
return errors.Errorf("unexpected commit size %d, expected %d", s, size)
|
||||
}
|
||||
dgst := w.digester.Digest()
|
||||
if expected != "" && expected != dgst {
|
||||
return errors.Errorf("unexpected digest: %v != %v", dgst, expected)
|
||||
}
|
||||
if w.expected != "" && w.expected != dgst {
|
||||
return errors.Errorf("unexpected digest: %v != %v", dgst, w.expected)
|
||||
}
|
||||
w.main.addValue(dgst, w.buffer.Bytes())
|
||||
return w.Close()
|
||||
}
|
||||
|
||||
func (w *bufferedWriter) Truncate(size int64) error {
|
||||
if size != 0 {
|
||||
return errors.New("Truncate: unsupported size")
|
||||
}
|
||||
w.offset = 0
|
||||
w.digester.Hash().Reset()
|
||||
w.buffer.Reset()
|
||||
return nil
|
||||
}
|
94
vendor/github.com/moby/buildkit/util/contentutil/copy.go
generated
vendored
Normal file
94
vendor/github.com/moby/buildkit/util/contentutil/copy.go
generated
vendored
Normal file
@ -0,0 +1,94 @@
|
||||
package contentutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/moby/buildkit/util/resolver/limited"
|
||||
"github.com/moby/buildkit/util/resolver/retryhandler"
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispecs.Descriptor, ref string, logger func([]byte)) error {
|
||||
if _, err := retryhandler.New(limited.FetchHandler(ingester, &localFetcher{provider}, ref), logger)(ctx, desc); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type localFetcher struct {
|
||||
content.Provider
|
||||
}
|
||||
|
||||
func (f *localFetcher) Fetch(ctx context.Context, desc ocispecs.Descriptor) (io.ReadCloser, error) {
|
||||
r, err := f.Provider.ReaderAt(ctx, desc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &rc{ReaderAt: r}, nil
|
||||
}
|
||||
|
||||
type rc struct {
|
||||
content.ReaderAt
|
||||
offset int64
|
||||
}
|
||||
|
||||
func (r *rc) Read(b []byte) (int, error) {
|
||||
n, err := r.ReadAt(b, r.offset)
|
||||
r.offset += int64(n)
|
||||
if n > 0 && err == io.EOF {
|
||||
err = nil
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (r *rc) Seek(offset int64, whence int) (int64, error) {
|
||||
switch whence {
|
||||
case io.SeekStart:
|
||||
r.offset = offset
|
||||
case io.SeekCurrent:
|
||||
r.offset += offset
|
||||
case io.SeekEnd:
|
||||
r.offset = r.Size() - offset
|
||||
}
|
||||
return r.offset, nil
|
||||
}
|
||||
|
||||
func CopyChain(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispecs.Descriptor) error {
|
||||
var m sync.Mutex
|
||||
manifestStack := []ocispecs.Descriptor{}
|
||||
|
||||
filterHandler := images.HandlerFunc(func(ctx context.Context, desc ocispecs.Descriptor) ([]ocispecs.Descriptor, error) {
|
||||
switch desc.MediaType {
|
||||
case images.MediaTypeDockerSchema2Manifest, ocispecs.MediaTypeImageManifest,
|
||||
images.MediaTypeDockerSchema2ManifestList, ocispecs.MediaTypeImageIndex:
|
||||
m.Lock()
|
||||
manifestStack = append(manifestStack, desc)
|
||||
m.Unlock()
|
||||
return nil, images.ErrStopHandler
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
})
|
||||
handlers := []images.Handler{
|
||||
images.ChildrenHandler(provider),
|
||||
filterHandler,
|
||||
retryhandler.New(limited.FetchHandler(ingester, &localFetcher{provider}, ""), func(_ []byte) {}),
|
||||
}
|
||||
|
||||
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
for i := len(manifestStack) - 1; i >= 0; i-- {
|
||||
if err := Copy(ctx, ingester, provider, manifestStack[i], "", nil); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
73
vendor/github.com/moby/buildkit/util/contentutil/fetcher.go
generated
vendored
Normal file
73
vendor/github.com/moby/buildkit/util/contentutil/fetcher.go
generated
vendored
Normal file
@ -0,0 +1,73 @@
|
||||
package contentutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/remotes"
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func FromFetcher(f remotes.Fetcher) content.Provider {
|
||||
return &fetchedProvider{
|
||||
f: f,
|
||||
}
|
||||
}
|
||||
|
||||
type fetchedProvider struct {
|
||||
f remotes.Fetcher
|
||||
}
|
||||
|
||||
func (p *fetchedProvider) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
|
||||
rc, err := p.f.Fetch(ctx, desc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &readerAt{Reader: rc, Closer: rc, size: desc.Size}, nil
|
||||
}
|
||||
|
||||
type readerAt struct {
|
||||
io.Reader
|
||||
io.Closer
|
||||
size int64
|
||||
offset int64
|
||||
}
|
||||
|
||||
func (r *readerAt) ReadAt(b []byte, off int64) (int, error) {
|
||||
if ra, ok := r.Reader.(io.ReaderAt); ok {
|
||||
return ra.ReadAt(b, off)
|
||||
}
|
||||
|
||||
if r.offset != off {
|
||||
if seeker, ok := r.Reader.(io.Seeker); ok {
|
||||
if _, err := seeker.Seek(off, io.SeekStart); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
r.offset = off
|
||||
} else {
|
||||
return 0, errors.Errorf("unsupported offset")
|
||||
}
|
||||
}
|
||||
|
||||
var totalN int
|
||||
for len(b) > 0 {
|
||||
n, err := r.Reader.Read(b)
|
||||
if err == io.EOF && n == len(b) {
|
||||
err = nil
|
||||
}
|
||||
r.offset += int64(n)
|
||||
totalN += n
|
||||
b = b[n:]
|
||||
if err != nil {
|
||||
return totalN, err
|
||||
}
|
||||
}
|
||||
return totalN, nil
|
||||
}
|
||||
|
||||
func (r *readerAt) Size() int64 {
|
||||
return r.size
|
||||
}
|
92
vendor/github.com/moby/buildkit/util/contentutil/multiprovider.go
generated
vendored
Normal file
92
vendor/github.com/moby/buildkit/util/contentutil/multiprovider.go
generated
vendored
Normal file
@ -0,0 +1,92 @@
|
||||
package contentutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// NewMultiProvider creates a new mutable provider with a base provider
|
||||
func NewMultiProvider(base content.Provider) *MultiProvider {
|
||||
return &MultiProvider{
|
||||
base: base,
|
||||
sub: map[digest.Digest]content.Provider{},
|
||||
}
|
||||
}
|
||||
|
||||
// MultiProvider is a provider backed by a mutable map of providers
|
||||
type MultiProvider struct {
|
||||
mu sync.RWMutex
|
||||
base content.Provider
|
||||
sub map[digest.Digest]content.Provider
|
||||
}
|
||||
|
||||
func (mp *MultiProvider) SnapshotLabels(descs []ocispecs.Descriptor, index int) map[string]string {
|
||||
if len(descs) < index {
|
||||
return nil
|
||||
}
|
||||
desc := descs[index]
|
||||
type snapshotLabels interface {
|
||||
SnapshotLabels([]ocispecs.Descriptor, int) map[string]string
|
||||
}
|
||||
|
||||
mp.mu.RLock()
|
||||
if p, ok := mp.sub[desc.Digest]; ok {
|
||||
mp.mu.RUnlock()
|
||||
if cd, ok := p.(snapshotLabels); ok {
|
||||
return cd.SnapshotLabels(descs, index)
|
||||
}
|
||||
} else {
|
||||
mp.mu.RUnlock()
|
||||
}
|
||||
if cd, ok := mp.base.(snapshotLabels); ok {
|
||||
return cd.SnapshotLabels(descs, index)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mp *MultiProvider) CheckDescriptor(ctx context.Context, desc ocispecs.Descriptor) error {
|
||||
type checkDescriptor interface {
|
||||
CheckDescriptor(context.Context, ocispecs.Descriptor) error
|
||||
}
|
||||
|
||||
mp.mu.RLock()
|
||||
if p, ok := mp.sub[desc.Digest]; ok {
|
||||
mp.mu.RUnlock()
|
||||
if cd, ok := p.(checkDescriptor); ok {
|
||||
return cd.CheckDescriptor(ctx, desc)
|
||||
}
|
||||
} else {
|
||||
mp.mu.RUnlock()
|
||||
}
|
||||
if cd, ok := mp.base.(checkDescriptor); ok {
|
||||
return cd.CheckDescriptor(ctx, desc)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReaderAt returns a content.ReaderAt
|
||||
func (mp *MultiProvider) ReaderAt(ctx context.Context, desc ocispecs.Descriptor) (content.ReaderAt, error) {
|
||||
mp.mu.RLock()
|
||||
if p, ok := mp.sub[desc.Digest]; ok {
|
||||
mp.mu.RUnlock()
|
||||
return p.ReaderAt(ctx, desc)
|
||||
}
|
||||
mp.mu.RUnlock()
|
||||
if mp.base == nil {
|
||||
return nil, errors.Wrapf(errdefs.ErrNotFound, "content %v", desc.Digest)
|
||||
}
|
||||
return mp.base.ReaderAt(ctx, desc)
|
||||
}
|
||||
|
||||
// Add adds a new child provider for a specific digest
|
||||
func (mp *MultiProvider) Add(dgst digest.Digest, p content.Provider) {
|
||||
mp.mu.Lock()
|
||||
defer mp.mu.Unlock()
|
||||
mp.sub[dgst] = p
|
||||
}
|
122
vendor/github.com/moby/buildkit/util/contentutil/pusher.go
generated
vendored
Normal file
122
vendor/github.com/moby/buildkit/util/contentutil/pusher.go
generated
vendored
Normal file
@ -0,0 +1,122 @@
|
||||
package contentutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/remotes"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func FromPusher(p remotes.Pusher) content.Ingester {
|
||||
var mu sync.Mutex
|
||||
c := sync.NewCond(&mu)
|
||||
return &pushingIngester{
|
||||
mu: &mu,
|
||||
c: c,
|
||||
p: p,
|
||||
active: map[digest.Digest]struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
type pushingIngester struct {
|
||||
p remotes.Pusher
|
||||
|
||||
mu *sync.Mutex
|
||||
c *sync.Cond
|
||||
active map[digest.Digest]struct{}
|
||||
}
|
||||
|
||||
// Writer implements content.Ingester. desc.MediaType must be set for manifest blobs.
|
||||
func (i *pushingIngester) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
|
||||
var wOpts content.WriterOpts
|
||||
for _, opt := range opts {
|
||||
if err := opt(&wOpts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if wOpts.Ref == "" {
|
||||
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
|
||||
}
|
||||
|
||||
st := time.Now()
|
||||
|
||||
i.mu.Lock()
|
||||
for {
|
||||
if time.Since(st) > time.Hour {
|
||||
i.mu.Unlock()
|
||||
return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %v locked", wOpts.Desc.Digest)
|
||||
}
|
||||
if _, ok := i.active[wOpts.Desc.Digest]; ok {
|
||||
i.c.Wait()
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
i.active[wOpts.Desc.Digest] = struct{}{}
|
||||
i.mu.Unlock()
|
||||
|
||||
var once sync.Once
|
||||
release := func() {
|
||||
once.Do(func() {
|
||||
i.mu.Lock()
|
||||
delete(i.active, wOpts.Desc.Digest)
|
||||
i.c.Broadcast()
|
||||
i.mu.Unlock()
|
||||
})
|
||||
}
|
||||
|
||||
// pusher requires desc.MediaType to determine the PUT URL, especially for manifest blobs.
|
||||
contentWriter, err := i.p.Push(ctx, wOpts.Desc)
|
||||
if err != nil {
|
||||
release()
|
||||
return nil, err
|
||||
}
|
||||
runtime.SetFinalizer(contentWriter, func(_ content.Writer) {
|
||||
release()
|
||||
})
|
||||
return &writer{
|
||||
Writer: contentWriter,
|
||||
contentWriterRef: wOpts.Ref,
|
||||
release: release,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type writer struct {
|
||||
content.Writer // returned from pusher.Push
|
||||
contentWriterRef string // ref passed for Writer()
|
||||
release func()
|
||||
}
|
||||
|
||||
func (w *writer) Status() (content.Status, error) {
|
||||
st, err := w.Writer.Status()
|
||||
if err != nil {
|
||||
return st, err
|
||||
}
|
||||
if w.contentWriterRef != "" {
|
||||
st.Ref = w.contentWriterRef
|
||||
}
|
||||
return st, nil
|
||||
}
|
||||
|
||||
func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
err := w.Writer.Commit(ctx, size, expected, opts...)
|
||||
if w.release != nil {
|
||||
w.release()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *writer) Close() error {
|
||||
err := w.Writer.Close()
|
||||
if w.release != nil {
|
||||
w.release()
|
||||
}
|
||||
return err
|
||||
}
|
109
vendor/github.com/moby/buildkit/util/contentutil/refs.go
generated
vendored
Normal file
109
vendor/github.com/moby/buildkit/util/contentutil/refs.go
generated
vendored
Normal file
@ -0,0 +1,109 @@
|
||||
package contentutil
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/remotes"
|
||||
"github.com/containerd/containerd/remotes/docker"
|
||||
"github.com/moby/buildkit/version"
|
||||
"github.com/moby/locker"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func ProviderFromRef(ref string) (ocispecs.Descriptor, content.Provider, error) {
|
||||
headers := http.Header{}
|
||||
headers.Set("User-Agent", version.UserAgent())
|
||||
remote := docker.NewResolver(docker.ResolverOptions{
|
||||
Client: http.DefaultClient,
|
||||
Headers: headers,
|
||||
})
|
||||
|
||||
name, desc, err := remote.Resolve(context.TODO(), ref)
|
||||
if err != nil {
|
||||
return ocispecs.Descriptor{}, nil, err
|
||||
}
|
||||
|
||||
fetcher, err := remote.Fetcher(context.TODO(), name)
|
||||
if err != nil {
|
||||
return ocispecs.Descriptor{}, nil, err
|
||||
}
|
||||
return desc, FromFetcher(fetcher), nil
|
||||
}
|
||||
|
||||
func IngesterFromRef(ref string) (content.Ingester, error) {
|
||||
headers := http.Header{}
|
||||
headers.Set("User-Agent", version.UserAgent())
|
||||
remote := docker.NewResolver(docker.ResolverOptions{
|
||||
Client: http.DefaultClient,
|
||||
Headers: headers,
|
||||
})
|
||||
|
||||
p, err := remote.Pusher(context.TODO(), ref)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ingester{
|
||||
locker: locker.New(),
|
||||
pusher: &pusher{p},
|
||||
}, nil
|
||||
}
|
||||
|
||||
type pusher struct {
|
||||
remotes.Pusher
|
||||
}
|
||||
|
||||
type ingester struct {
|
||||
locker *locker.Locker
|
||||
pusher remotes.Pusher
|
||||
}
|
||||
|
||||
func (w *ingester) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
|
||||
var wo content.WriterOpts
|
||||
for _, o := range opts {
|
||||
if err := o(&wo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if wo.Ref == "" {
|
||||
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
|
||||
}
|
||||
w.locker.Lock(wo.Ref)
|
||||
var once sync.Once
|
||||
unlock := func() {
|
||||
once.Do(func() {
|
||||
w.locker.Unlock(wo.Ref)
|
||||
})
|
||||
}
|
||||
writer, err := w.pusher.Push(ctx, wo.Desc)
|
||||
if err != nil {
|
||||
unlock()
|
||||
return nil, err
|
||||
}
|
||||
return &lockedWriter{unlock: unlock, Writer: writer}, nil
|
||||
}
|
||||
|
||||
type lockedWriter struct {
|
||||
unlock func()
|
||||
content.Writer
|
||||
}
|
||||
|
||||
func (w *lockedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
err := w.Writer.Commit(ctx, size, expected, opts...)
|
||||
if err == nil {
|
||||
w.unlock()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *lockedWriter) Close() error {
|
||||
err := w.Writer.Close()
|
||||
w.unlock()
|
||||
return err
|
||||
}
|
Reference in New Issue
Block a user