vendor: update buildkit to v0.8

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi
2020-12-07 22:01:24 -08:00
parent 080e9981c7
commit 69a1419ab1
323 changed files with 20129 additions and 8394 deletions

View File

@ -2,6 +2,7 @@ package client
import (
"context"
"io"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/solver/pb"
@ -16,6 +17,64 @@ type Client interface {
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt) (digest.Digest, []byte, error)
BuildOpts() BuildOpts
Inputs(ctx context.Context) (map[string]llb.State, error)
NewContainer(ctx context.Context, req NewContainerRequest) (Container, error)
}
// NewContainerRequest encapsulates the requirements for a client to define a
// new container, without defining the initial process.
type NewContainerRequest struct {
Mounts []Mount
NetMode pb.NetMode
Platform *pb.Platform
Constraints *pb.WorkerConstraints
}
// Mount allows clients to specify a filesystem mount. A Reference to a
// previously solved Result is required.
type Mount struct {
Selector string
Dest string
ResultID string
Ref Reference
Readonly bool
MountType pb.MountType
CacheOpt *pb.CacheOpt
SecretOpt *pb.SecretOpt
SSHOpt *pb.SSHOpt
}
// Container is used to start new processes inside a container and release the
// container resources when done.
type Container interface {
Start(context.Context, StartRequest) (ContainerProcess, error)
Release(context.Context) error
}
// StartRequest encapsulates the arguments to define a process within a
// container.
type StartRequest struct {
Args []string
Env []string
User string
Cwd string
Tty bool
Stdin io.ReadCloser
Stdout, Stderr io.WriteCloser
SecurityMode pb.SecurityMode
}
// WinSize is same as executor.WinSize, copied here to prevent circular package
// dependencies.
type WinSize struct {
Rows uint32
Cols uint32
}
// ContainerProcess represents a process within a container.
type ContainerProcess interface {
Wait() error
Resize(ctx context.Context, size WinSize) error
// TODO Signal(ctx context.Context, sig os.Signal)
}
type Reference interface {
@ -46,6 +105,7 @@ type StatRequest struct {
// SolveRequest is same as frontend.SolveRequest but avoiding dependency
type SolveRequest struct {
Evaluate bool
Definition *pb.Definition
Frontend string
FrontendOpt map[string]string

View File

@ -3,10 +3,12 @@ package grpcclient
import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"time"
"github.com/gogo/googleapis/google/rpc"
@ -15,26 +17,33 @@ import (
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/frontend/gateway/client"
pb "github.com/moby/buildkit/frontend/gateway/pb"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/solver/errdefs"
opspb "github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/apicaps"
"github.com/moby/buildkit/util/grpcerrors"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
fstypes "github.com/tonistiigi/fsutil/types"
"golang.org/x/sync/errgroup"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const frontendPrefix = "BUILDKIT_FRONTEND_OPT_"
type GrpcClient interface {
client.Client
Run(context.Context, client.BuildFunc) error
}
func New(ctx context.Context, opts map[string]string, session, product string, c pb.LLBBridgeClient, w []client.WorkerInfo) (GrpcClient, error) {
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
resp, err := c.Ping(ctx, &pb.PingRequest{})
pingCtx, pingCancel := context.WithTimeout(ctx, 15*time.Second)
defer pingCancel()
resp, err := c.Ping(pingCtx, &pb.PingRequest{})
if err != nil {
return nil, err
}
@ -56,6 +65,7 @@ func New(ctx context.Context, opts map[string]string, session, product string, c
caps: pb.Caps.CapSet(resp.FrontendAPICaps),
llbCaps: opspb.Caps.CapSet(resp.LLBCaps),
requests: map[string]*pb.SolveRequest{},
execMsgs: newMessageForwarder(ctx, c),
}, nil
}
@ -167,6 +177,13 @@ func (c *grpcClient) Run(ctx context.Context, f client.BuildFunc) (retError erro
}()
}
defer func() {
err = c.execMsgs.Release()
if err != nil && retError != nil {
retError = err
}
}()
if res, err = f(ctx, c); err != nil {
return err
}
@ -257,6 +274,7 @@ type grpcClient struct {
caps apicaps.CapSet
llbCaps apicaps.CapSet
requests map[string]*pb.SolveRequest
execMsgs *messageForwarder
}
func (c *grpcClient) requestForRef(ref client.Reference) (*pb.SolveRequest, error) {
@ -280,7 +298,7 @@ func (c *grpcClient) requestForRef(ref client.Reference) (*pb.SolveRequest, erro
return req, nil
}
func (c *grpcClient) Solve(ctx context.Context, creq client.SolveRequest) (*client.Result, error) {
func (c *grpcClient) Solve(ctx context.Context, creq client.SolveRequest) (res *client.Result, err error) {
if creq.Definition != nil {
for _, md := range creq.Definition.Metadata {
for cap := range md.Caps {
@ -326,13 +344,45 @@ func (c *grpcClient) Solve(ctx context.Context, creq client.SolveRequest) (*clie
req.ExporterAttr = []byte("{}")
}
if creq.Evaluate {
if c.caps.Supports(pb.CapGatewayEvaluateSolve) == nil {
req.Evaluate = creq.Evaluate
} else {
// If evaluate is not supported, fallback to running Stat(".") in order to
// trigger an evaluation of the result.
defer func() {
if res == nil {
return
}
var (
id string
ref client.Reference
)
ref, err = res.SingleRef()
if err != nil {
for refID := range res.Refs {
id = refID
break
}
} else {
id = ref.(*reference).id
}
_, err = c.client.StatFile(ctx, &pb.StatFileRequest{
Ref: id,
Path: ".",
})
}()
}
}
resp, err := c.client.Solve(ctx, req)
if err != nil {
return nil, err
}
res := &client.Result{}
res = &client.Result{}
if resp.Result == nil {
if id := resp.Ref; id != "" {
c.requests[id] = req
@ -427,7 +477,454 @@ func (c *grpcClient) Inputs(ctx context.Context) (map[string]llb.State, error) {
inputs[key] = llb.NewState(op)
}
return inputs, nil
}
// procMessageForwarder is created per container process to act as the
// communication channel between the process and the ExecProcess message
// stream.
type procMessageForwarder struct {
done chan struct{}
closeOnce sync.Once
msgs chan *pb.ExecMessage
}
func newProcMessageForwarder() *procMessageForwarder {
return &procMessageForwarder{
done: make(chan struct{}),
msgs: make(chan *pb.ExecMessage),
}
}
func (b *procMessageForwarder) Send(ctx context.Context, m *pb.ExecMessage) {
select {
case <-ctx.Done():
case <-b.done:
b.closeOnce.Do(func() {
close(b.msgs)
})
case b.msgs <- m:
}
}
func (b *procMessageForwarder) Recv(ctx context.Context) (m *pb.ExecMessage, ok bool) {
select {
case <-ctx.Done():
return nil, true
case <-b.done:
return nil, false
case m = <-b.msgs:
return m, true
}
}
func (b *procMessageForwarder) Close() {
close(b.done)
b.Recv(context.Background()) // flush any messages in queue
b.Send(context.Background(), nil) // ensure channel is closed
}
// messageForwarder manages a single grpc stream for ExecProcess to facilitate
// a pub/sub message channel for each new process started from the client
// connection.
type messageForwarder struct {
client pb.LLBBridgeClient
ctx context.Context
cancel func()
eg *errgroup.Group
mu sync.Mutex
pids map[string]*procMessageForwarder
stream pb.LLBBridge_ExecProcessClient
// startOnce used to only start the exec message forwarder once,
// so we only have one exec stream per client
startOnce sync.Once
// startErr tracks the error when initializing the stream, it will
// be returned on subsequent calls to Start
startErr error
}
func newMessageForwarder(ctx context.Context, client pb.LLBBridgeClient) *messageForwarder {
ctx, cancel := context.WithCancel(ctx)
eg, ctx := errgroup.WithContext(ctx)
return &messageForwarder{
client: client,
pids: map[string]*procMessageForwarder{},
ctx: ctx,
cancel: cancel,
eg: eg,
}
}
func (m *messageForwarder) Start() (err error) {
defer func() {
if err != nil {
m.startErr = err
}
}()
if m.startErr != nil {
return m.startErr
}
m.startOnce.Do(func() {
m.stream, err = m.client.ExecProcess(m.ctx)
if err != nil {
return
}
m.eg.Go(func() error {
for {
msg, err := m.stream.Recv()
if errors.Is(err, io.EOF) || grpcerrors.Code(err) == codes.Canceled {
return nil
}
logrus.Debugf("|<--- %s", debugMessage(msg))
if err != nil {
return err
}
m.mu.Lock()
msgs, ok := m.pids[msg.ProcessID]
m.mu.Unlock()
if !ok {
logrus.Debugf("Received exec message for unregistered process: %s", msg.String())
continue
}
msgs.Send(m.ctx, msg)
}
})
})
return err
}
func debugMessage(msg *pb.ExecMessage) string {
switch m := msg.GetInput().(type) {
case *pb.ExecMessage_Init:
return fmt.Sprintf("Init Message %s", msg.ProcessID)
case *pb.ExecMessage_File:
if m.File.EOF {
return fmt.Sprintf("File Message %s, fd=%d, EOF", msg.ProcessID, m.File.Fd)
}
return fmt.Sprintf("File Message %s, fd=%d, %d bytes", msg.ProcessID, m.File.Fd, len(m.File.Data))
case *pb.ExecMessage_Resize:
return fmt.Sprintf("Resize Message %s", msg.ProcessID)
case *pb.ExecMessage_Started:
return fmt.Sprintf("Started Message %s", msg.ProcessID)
case *pb.ExecMessage_Exit:
return fmt.Sprintf("Exit Message %s, code=%d, err=%s", msg.ProcessID, m.Exit.Code, m.Exit.Error)
case *pb.ExecMessage_Done:
return fmt.Sprintf("Done Message %s", msg.ProcessID)
}
return fmt.Sprintf("Unknown Message %s", msg.String())
}
func (m *messageForwarder) Send(msg *pb.ExecMessage) error {
m.mu.Lock()
_, ok := m.pids[msg.ProcessID]
defer m.mu.Unlock()
if !ok {
return errors.Errorf("process %s has ended, not sending message %#v", msg.ProcessID, msg.Input)
}
logrus.Debugf("|---> %s", debugMessage(msg))
return m.stream.Send(msg)
}
func (m *messageForwarder) Release() error {
m.cancel()
return m.eg.Wait()
}
func (m *messageForwarder) Register(pid string) *procMessageForwarder {
m.mu.Lock()
defer m.mu.Unlock()
sender := newProcMessageForwarder()
m.pids[pid] = sender
return sender
}
func (m *messageForwarder) Deregister(pid string) {
m.mu.Lock()
defer m.mu.Unlock()
sender, ok := m.pids[pid]
if !ok {
return
}
delete(m.pids, pid)
sender.Close()
}
type msgWriter struct {
mux *messageForwarder
fd uint32
processID string
}
func (w *msgWriter) Write(msg []byte) (int, error) {
err := w.mux.Send(&pb.ExecMessage{
ProcessID: w.processID,
Input: &pb.ExecMessage_File{
File: &pb.FdMessage{
Fd: w.fd,
Data: msg,
},
},
})
if err != nil {
return 0, err
}
return len(msg), nil
}
func (c *grpcClient) NewContainer(ctx context.Context, req client.NewContainerRequest) (client.Container, error) {
err := c.caps.Supports(pb.CapGatewayExec)
if err != nil {
return nil, err
}
id := identity.NewID()
var mounts []*opspb.Mount
for _, m := range req.Mounts {
resultID := m.ResultID
if m.Ref != nil {
ref, ok := m.Ref.(*reference)
if !ok {
return nil, errors.Errorf("unexpected type for reference, got %T", m.Ref)
}
resultID = ref.id
}
mounts = append(mounts, &opspb.Mount{
Dest: m.Dest,
Selector: m.Selector,
Readonly: m.Readonly,
MountType: m.MountType,
ResultID: resultID,
CacheOpt: m.CacheOpt,
SecretOpt: m.SecretOpt,
SSHOpt: m.SSHOpt,
})
}
logrus.Debugf("|---> NewContainer %s", id)
_, err = c.client.NewContainer(ctx, &pb.NewContainerRequest{
ContainerID: id,
Mounts: mounts,
Platform: req.Platform,
Constraints: req.Constraints,
})
if err != nil {
return nil, err
}
// ensure message forwarder is started, only sets up stream first time called
err = c.execMsgs.Start()
if err != nil {
return nil, err
}
return &container{
client: c.client,
id: id,
execMsgs: c.execMsgs,
}, nil
}
type container struct {
client pb.LLBBridgeClient
id string
execMsgs *messageForwarder
}
func (ctr *container) Start(ctx context.Context, req client.StartRequest) (client.ContainerProcess, error) {
pid := fmt.Sprintf("%s:%s", ctr.id, identity.NewID())
msgs := ctr.execMsgs.Register(pid)
init := &pb.InitMessage{
ContainerID: ctr.id,
Meta: &opspb.Meta{
Args: req.Args,
Env: req.Env,
Cwd: req.Cwd,
User: req.User,
},
Tty: req.Tty,
Security: req.SecurityMode,
}
if req.Stdin != nil {
init.Fds = append(init.Fds, 0)
}
if req.Stdout != nil {
init.Fds = append(init.Fds, 1)
}
if req.Stderr != nil {
init.Fds = append(init.Fds, 2)
}
err := ctr.execMsgs.Send(&pb.ExecMessage{
ProcessID: pid,
Input: &pb.ExecMessage_Init{
Init: init,
},
})
if err != nil {
return nil, err
}
msg, _ := msgs.Recv(ctx)
if msg == nil {
return nil, errors.Errorf("failed to receive started message")
}
started := msg.GetStarted()
if started == nil {
return nil, errors.Errorf("expecting started message, got %T", msg.GetInput())
}
eg, ctx := errgroup.WithContext(ctx)
done := make(chan struct{})
ctrProc := &containerProcess{
execMsgs: ctr.execMsgs,
id: pid,
eg: eg,
}
var stdinReader *io.PipeReader
ctrProc.eg.Go(func() error {
<-done
if stdinReader != nil {
return stdinReader.Close()
}
return nil
})
if req.Stdin != nil {
var stdinWriter io.WriteCloser
stdinReader, stdinWriter = io.Pipe()
// This go routine is intentionally not part of the errgroup because
// if os.Stdin is used for req.Stdin then this will block until
// the user closes the input, which will likely be after we are done
// with the container, so we can't Wait on it.
go func() {
io.Copy(stdinWriter, req.Stdin)
stdinWriter.Close()
}()
ctrProc.eg.Go(func() error {
m := &msgWriter{
mux: ctr.execMsgs,
processID: pid,
fd: 0,
}
_, err := io.Copy(m, stdinReader)
// ignore ErrClosedPipe, it is EOF for our usage.
if err != nil && !errors.Is(err, io.ErrClosedPipe) {
return err
}
// not an error so must be eof
return ctr.execMsgs.Send(&pb.ExecMessage{
ProcessID: pid,
Input: &pb.ExecMessage_File{
File: &pb.FdMessage{
Fd: 0,
EOF: true,
},
},
})
})
}
ctrProc.eg.Go(func() error {
var closeDoneOnce sync.Once
var exitError error
for {
msg, ok := msgs.Recv(ctx)
if !ok {
// no more messages, return
return exitError
}
if msg == nil {
// empty message from ctx cancel, so just start shutting down
// input, but continue processing more exit/done messages
closeDoneOnce.Do(func() {
close(done)
})
continue
}
if file := msg.GetFile(); file != nil {
var out io.WriteCloser
switch file.Fd {
case 1:
out = req.Stdout
case 2:
out = req.Stderr
}
if out == nil {
// if things are plumbed correctly this should never happen
return errors.Errorf("missing writer for output fd %d", file.Fd)
}
if len(file.Data) > 0 {
_, err := out.Write(file.Data)
if err != nil {
return err
}
}
} else if exit := msg.GetExit(); exit != nil {
// capture exit message to exitError so we can return it after
// the server sends the Done message
closeDoneOnce.Do(func() {
close(done)
})
if exit.Code == 0 {
continue
}
exitError = grpcerrors.FromGRPC(status.ErrorProto(&spb.Status{
Code: exit.Error.Code,
Message: exit.Error.Message,
Details: convertGogoAny(exit.Error.Details),
}))
if exit.Code != errdefs.ContainerdUnknownExitStatus {
exitError = &errdefs.ExitError{ExitCode: exit.Code, Err: exitError}
}
} else if serverDone := msg.GetDone(); serverDone != nil {
return exitError
} else {
return errors.Errorf("unexpected Exec Message for pid %s: %T", pid, msg.GetInput())
}
}
})
return ctrProc, nil
}
func (ctr *container) Release(ctx context.Context) error {
logrus.Debugf("|---> ReleaseContainer %s", ctr.id)
_, err := ctr.client.ReleaseContainer(ctx, &pb.ReleaseContainerRequest{
ContainerID: ctr.id,
})
return err
}
type containerProcess struct {
execMsgs *messageForwarder
id string
eg *errgroup.Group
}
func (ctrProc *containerProcess) Wait() error {
defer ctrProc.execMsgs.Deregister(ctrProc.id)
return ctrProc.eg.Wait()
}
func (ctrProc *containerProcess) Resize(_ context.Context, size client.WinSize) error {
return ctrProc.execMsgs.Send(&pb.ExecMessage{
ProcessID: ctrProc.id,
Input: &pb.ExecMessage_Resize{
Resize: &pb.ResizeMessage{
Cols: size.Cols,
Rows: size.Rows,
},
},
})
}
type reference struct {
@ -509,7 +1006,7 @@ func grpcClientConn(ctx context.Context) (context.Context, *grpc.ClientConn, err
return stdioConn(), nil
})
cc, err := grpc.DialContext(ctx, "", dialOpt, grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpcerrors.UnaryClientInterceptor), grpc.WithStreamInterceptor(grpcerrors.StreamClientInterceptor))
cc, err := grpc.DialContext(ctx, "localhost", dialOpt, grpc.WithInsecure(), grpc.WithUnaryInterceptor(grpcerrors.UnaryClientInterceptor), grpc.WithStreamInterceptor(grpcerrors.StreamClientInterceptor))
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create grpc client")
}
@ -596,6 +1093,14 @@ func product() string {
return os.Getenv("BUILDKIT_EXPORTEDPRODUCT")
}
func convertGogoAny(in []*gogotypes.Any) []*any.Any {
out := make([]*any.Any, len(in))
for i := range in {
out[i] = &any.Any{TypeUrl: in[i].TypeUrl, Value: in[i].Value}
}
return out
}
func convertToGogoAny(in []*any.Any) []*gogotypes.Any {
out := make([]*gogotypes.Any, len(in))
for i := range in {

View File

@ -35,6 +35,18 @@ const (
// CapGatewaySolveMetadata can be used to check if solve calls from gateway reliably return metadata
CapGatewaySolveMetadata apicaps.CapID = "gateway.solve.metadata"
// CapGatewayExec is the capability to create and interact with new
// containers directly through the gateway
CapGatewayExec apicaps.CapID = "gateway.exec"
// CapFrontendCaps can be used to check that frontends define support for certain capabilities
CapFrontendCaps apicaps.CapID = "frontend.caps"
// CapGatewayEvaluateSolve is a capability to immediately unlazy solve
// results. This is generally used by the client to return and handle solve
// errors.
CapGatewayEvaluateSolve apicaps.CapID = "gateway.solve.evaluate"
)
func init() {
@ -136,4 +148,25 @@ func init() {
Enabled: true,
Status: apicaps.CapStatusExperimental,
})
Caps.Init(apicaps.Cap{
ID: CapGatewayExec,
Name: "gateway exec",
Enabled: true,
Status: apicaps.CapStatusExperimental,
})
Caps.Init(apicaps.Cap{
ID: CapFrontendCaps,
Name: "frontend capabilities",
Enabled: true,
Status: apicaps.CapStatusExperimental,
})
Caps.Init(apicaps.Cap{
ID: CapGatewayEvaluateSolve,
Name: "gateway evaluate solve",
Enabled: true,
Status: apicaps.CapStatusExperimental,
})
}

File diff suppressed because it is too large Load Diff

View File

@ -28,6 +28,10 @@ service LLBBridge {
rpc Return(ReturnRequest) returns (ReturnResponse);
// apicaps:CapFrontendInputs
rpc Inputs(InputsRequest) returns (InputsResponse);
rpc NewContainer(NewContainerRequest) returns (NewContainerResponse);
rpc ReleaseContainer(ReleaseContainerRequest) returns (ReleaseContainerResponse);
rpc ExecProcess(stream ExecMessage) returns (stream ExecMessage);
}
message Result {
@ -103,6 +107,8 @@ message SolveRequest {
// apicaps:CapFrontendInputs
map<string, pb.Definition> FrontendInputs = 13;
bool Evaluate = 14;
}
// CacheOptionsEntry corresponds to the control.CacheOptionsEntry
@ -162,3 +168,71 @@ message PongResponse{
repeated moby.buildkit.v1.apicaps.APICap LLBCaps = 2 [(gogoproto.nullable) = false];
repeated moby.buildkit.v1.types.WorkerRecord Workers = 3;
}
message NewContainerRequest {
string ContainerID = 1;
// For mount input values we can use random identifiers passed with ref
repeated pb.Mount Mounts = 2;
pb.NetMode Network = 3;
pb.Platform platform = 4;
pb.WorkerConstraints constraints = 5;
}
message NewContainerResponse{}
message ReleaseContainerRequest {
string ContainerID = 1;
}
message ReleaseContainerResponse{}
message ExecMessage {
string ProcessID = 1;
oneof Input {
// InitMessage sent from client to server will start a new process in a
// container
InitMessage Init = 2;
// FdMessage used from client to server for input (stdin) and
// from server to client for output (stdout, stderr)
FdMessage File = 3;
// ResizeMessage used from client to server for terminal resize events
ResizeMessage Resize = 4;
// StartedMessage sent from server to client after InitMessage to
// indicate the process has started.
StartedMessage Started = 5;
// ExitMessage sent from server to client will contain the exit code
// when the process ends.
ExitMessage Exit = 6;
// DoneMessage from server to client will be the last message for any
// process. Note that FdMessage might be sent after ExitMessage.
DoneMessage Done = 7;
}
}
message InitMessage{
string ContainerID = 1;
pb.Meta Meta = 2;
repeated uint32 Fds = 3;
bool Tty = 4;
pb.SecurityMode Security = 5;
}
message ExitMessage {
uint32 Code = 1;
google.rpc.Status Error = 2;
}
message StartedMessage{}
message DoneMessage{}
message FdMessage{
uint32 Fd = 1; // what fd the data was from
bool EOF = 2; // true if eof was reached
bytes Data = 3;
}
message ResizeMessage{
uint32 Rows = 1;
uint32 Cols = 2;
}