Sebastiaan van Stijn c855277d53
vendor: github.com/moby/buildkit 5ae9b23c40a9 (master / v0.13.0-dev)
full diff:

- 36ef4d8c0d...f098008783
- d5c1d785b0...5ae9b23c40

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2023-11-15 15:59:23 +01:00

81 lines
1.6 KiB
Go

package uploadprovider
import (
"io"
"path"
"sync"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session/upload"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
func New() *Uploader {
return &Uploader{m: map[string]io.Reader{}}
}
type Uploader struct {
mu sync.Mutex
m map[string]io.Reader
}
func (hp *Uploader) Add(r io.Reader) string {
id := identity.NewID()
hp.m[id] = r
return "http://buildkit-session/" + id
}
func (hp *Uploader) Register(server *grpc.Server) {
upload.RegisterUploadServer(server, hp)
}
func (hp *Uploader) Pull(stream upload.Upload_PullServer) error {
opts, _ := metadata.FromIncomingContext(stream.Context()) // if no metadata continue with empty object
var p string
urls, ok := opts["urlpath"]
if ok && len(urls) > 0 {
p = urls[0]
}
p = path.Base(p)
hp.mu.Lock()
r, ok := hp.m[p]
if !ok {
hp.mu.Unlock()
return errors.Errorf("no http response from session for %s", p)
}
delete(hp.m, p)
hp.mu.Unlock()
_, err := io.Copy(&writer{stream}, r)
return err
}
type writer struct {
grpc.ServerStream
}
func (w *writer) Write(dt []byte) (int, error) {
// avoid sending too big messages on grpc stream
const maxChunkSize = 3 * 1024 * 1024
if len(dt) > maxChunkSize {
n1, err := w.Write(dt[:maxChunkSize])
if err != nil {
return n1, err
}
dt = dt[maxChunkSize:]
var n2 int
if n2, err := w.Write(dt); err != nil {
return n1 + n2, err
}
return n1 + n2, nil
}
if err := w.SendMsg(&upload.BytesMessage{Data: dt}); err != nil {
return 0, err
}
return len(dt), nil
}