mirror of
https://gitea.com/Lydanne/buildx.git
synced 2025-05-18 00:47:48 +08:00

Removes gogo/protobuf from buildx and updates to a version of moby/buildkit where gogo is removed. This also changes how the proto files are generated. This is because newer versions of protobuf are more strict about name conflicts. If two files have the same name (even if they are relative paths) and are used in different protoc commands, they'll conflict in the registry. Since protobuf file generation doesn't work very well with `paths=source_relative`, this removes the `go:generate` expression and just relies on the dockerfile to perform the generation. Signed-off-by: Jonathan A. Sternberg <jonathan.sternberg@docker.com>
496 lines
13 KiB
Go
496 lines
13 KiB
Go
package build
|
|
|
|
import (
|
|
"context"
|
|
_ "crypto/sha256" // ensure digests can be computed
|
|
"encoding/json"
|
|
"io"
|
|
"sync"
|
|
|
|
controllerapi "github.com/docker/buildx/controller/pb"
|
|
"github.com/moby/buildkit/client"
|
|
"github.com/moby/buildkit/exporter/containerimage/exptypes"
|
|
gateway "github.com/moby/buildkit/frontend/gateway/client"
|
|
"github.com/moby/buildkit/solver/errdefs"
|
|
"github.com/moby/buildkit/solver/pb"
|
|
"github.com/moby/buildkit/solver/result"
|
|
specs "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/pkg/errors"
|
|
"github.com/sirupsen/logrus"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// NewResultHandle makes a call to client.Build, additionally returning a
|
|
// opaque ResultHandle alongside the standard response and error.
|
|
//
|
|
// This ResultHandle can be used to execute additional build steps in the same
|
|
// context as the build occurred, which can allow easy debugging of build
|
|
// failures and successes.
|
|
//
|
|
// If the returned ResultHandle is not nil, the caller must call Done() on it.
|
|
func NewResultHandle(ctx context.Context, cc *client.Client, opt client.SolveOpt, product string, buildFunc gateway.BuildFunc, ch chan *client.SolveStatus) (*ResultHandle, *client.SolveResponse, error) {
|
|
// Create a new context to wrap the original, and cancel it when the
|
|
// caller-provided context is cancelled.
|
|
//
|
|
// We derive the context from the background context so that we can forbid
|
|
// cancellation of the build request after <-done is closed (which we do
|
|
// before returning the ResultHandle).
|
|
baseCtx := ctx
|
|
ctx, cancel := context.WithCancelCause(context.Background())
|
|
done := make(chan struct{})
|
|
go func() {
|
|
select {
|
|
case <-baseCtx.Done():
|
|
cancel(baseCtx.Err())
|
|
case <-done:
|
|
// Once done is closed, we've recorded a ResultHandle, so we
|
|
// shouldn't allow cancelling the underlying build request anymore.
|
|
}
|
|
}()
|
|
|
|
// Create a new channel to forward status messages to the original.
|
|
//
|
|
// We do this so that we can discard status messages after the main portion
|
|
// of the build is complete. This is necessary for the solve error case,
|
|
// where the original gateway is kept open until the ResultHandle is
|
|
// closed - we don't want progress messages from operations in that
|
|
// ResultHandle to display after this function exits.
|
|
//
|
|
// Additionally, callers should wait for the progress channel to be closed.
|
|
// If we keep the session open and never close the progress channel, the
|
|
// caller will likely hang.
|
|
baseCh := ch
|
|
ch = make(chan *client.SolveStatus)
|
|
go func() {
|
|
for {
|
|
s, ok := <-ch
|
|
if !ok {
|
|
return
|
|
}
|
|
select {
|
|
case <-baseCh:
|
|
// base channel is closed, discard status messages
|
|
default:
|
|
baseCh <- s
|
|
}
|
|
}
|
|
}()
|
|
defer close(baseCh)
|
|
|
|
var resp *client.SolveResponse
|
|
var respErr error
|
|
var respHandle *ResultHandle
|
|
|
|
go func() {
|
|
defer cancel(context.Canceled) // ensure no dangling processes
|
|
|
|
var res *gateway.Result
|
|
var err error
|
|
resp, err = cc.Build(ctx, opt, product, func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
|
|
var err error
|
|
res, err = buildFunc(ctx, c)
|
|
|
|
if res != nil && err == nil {
|
|
// Force evaluation of the build result (otherwise, we likely
|
|
// won't get a solve error)
|
|
def, err2 := getDefinition(ctx, res)
|
|
if err2 != nil {
|
|
return nil, err2
|
|
}
|
|
res, err = evalDefinition(ctx, c, def)
|
|
}
|
|
|
|
if err != nil {
|
|
// Scenario 1: we failed to evaluate a node somewhere in the
|
|
// build graph.
|
|
//
|
|
// In this case, we construct a ResultHandle from this
|
|
// original Build session, and return it alongside the original
|
|
// build error. We then need to keep the gateway session open
|
|
// until the caller explicitly closes the ResultHandle.
|
|
|
|
var se *errdefs.SolveError
|
|
if errors.As(err, &se) {
|
|
respHandle = &ResultHandle{
|
|
done: make(chan struct{}),
|
|
solveErr: se,
|
|
gwClient: c,
|
|
gwCtx: ctx,
|
|
}
|
|
respErr = err // return original error to preserve stacktrace
|
|
close(done)
|
|
|
|
// Block until the caller closes the ResultHandle.
|
|
select {
|
|
case <-respHandle.done:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
}
|
|
return res, err
|
|
}, ch)
|
|
if respHandle != nil {
|
|
return
|
|
}
|
|
if err != nil {
|
|
// Something unexpected failed during the build, we didn't succeed,
|
|
// but we also didn't make it far enough to create a ResultHandle.
|
|
respErr = err
|
|
close(done)
|
|
return
|
|
}
|
|
|
|
// Scenario 2: we successfully built the image with no errors.
|
|
//
|
|
// In this case, the original gateway session has now been closed
|
|
// since the Build has been completed. So, we need to create a new
|
|
// gateway session to populate the ResultHandle. To do this, we
|
|
// need to re-evaluate the target result, in this new session. This
|
|
// should be instantaneous since the result should be cached.
|
|
|
|
def, err := getDefinition(ctx, res)
|
|
if err != nil {
|
|
respErr = err
|
|
close(done)
|
|
return
|
|
}
|
|
|
|
// NOTE: ideally this second connection should be lazily opened
|
|
opt := opt
|
|
opt.Ref = ""
|
|
opt.Exports = nil
|
|
opt.CacheExports = nil
|
|
opt.Internal = true
|
|
_, respErr = cc.Build(ctx, opt, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
|
|
res, err := evalDefinition(ctx, c, def)
|
|
if err != nil {
|
|
// This should probably not happen, since we've previously
|
|
// successfully evaluated the same result with no issues.
|
|
return nil, errors.Wrap(err, "inconsistent solve result")
|
|
}
|
|
respHandle = &ResultHandle{
|
|
done: make(chan struct{}),
|
|
res: res,
|
|
gwClient: c,
|
|
gwCtx: ctx,
|
|
}
|
|
close(done)
|
|
|
|
// Block until the caller closes the ResultHandle.
|
|
select {
|
|
case <-respHandle.done:
|
|
case <-ctx.Done():
|
|
}
|
|
return nil, ctx.Err()
|
|
}, nil)
|
|
if respHandle != nil {
|
|
return
|
|
}
|
|
close(done)
|
|
}()
|
|
|
|
// Block until the other thread signals that it's completed the build.
|
|
select {
|
|
case <-done:
|
|
case <-baseCtx.Done():
|
|
if respErr == nil {
|
|
respErr = baseCtx.Err()
|
|
}
|
|
}
|
|
return respHandle, resp, respErr
|
|
}
|
|
|
|
// getDefinition converts a gateway result into a collection of definitions for
|
|
// each ref in the result.
|
|
func getDefinition(ctx context.Context, res *gateway.Result) (*result.Result[*pb.Definition], error) {
|
|
return result.ConvertResult(res, func(ref gateway.Reference) (*pb.Definition, error) {
|
|
st, err := ref.ToState()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
def, err := st.Marshal(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return def.ToPB(), nil
|
|
})
|
|
}
|
|
|
|
// evalDefinition performs the reverse of getDefinition, converting a
|
|
// collection of definitions into a gateway result.
|
|
func evalDefinition(ctx context.Context, c gateway.Client, defs *result.Result[*pb.Definition]) (*gateway.Result, error) {
|
|
// force evaluation of all targets in parallel
|
|
results := make(map[*pb.Definition]*gateway.Result)
|
|
resultsMu := sync.Mutex{}
|
|
eg, egCtx := errgroup.WithContext(ctx)
|
|
defs.EachRef(func(def *pb.Definition) error {
|
|
eg.Go(func() error {
|
|
res, err := c.Solve(egCtx, gateway.SolveRequest{
|
|
Evaluate: true,
|
|
Definition: def,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resultsMu.Lock()
|
|
results[def] = res
|
|
resultsMu.Unlock()
|
|
return nil
|
|
})
|
|
return nil
|
|
})
|
|
if err := eg.Wait(); err != nil {
|
|
return nil, err
|
|
}
|
|
res, _ := result.ConvertResult(defs, func(def *pb.Definition) (gateway.Reference, error) {
|
|
if res, ok := results[def]; ok {
|
|
return res.Ref, nil
|
|
}
|
|
return nil, nil
|
|
})
|
|
return res, nil
|
|
}
|
|
|
|
// ResultHandle is a build result with the client that built it.
|
|
type ResultHandle struct {
|
|
res *gateway.Result
|
|
solveErr *errdefs.SolveError
|
|
|
|
done chan struct{}
|
|
doneOnce sync.Once
|
|
|
|
gwClient gateway.Client
|
|
gwCtx context.Context
|
|
|
|
cleanups []func()
|
|
cleanupsMu sync.Mutex
|
|
}
|
|
|
|
func (r *ResultHandle) Done() {
|
|
r.doneOnce.Do(func() {
|
|
r.cleanupsMu.Lock()
|
|
cleanups := r.cleanups
|
|
r.cleanups = nil
|
|
r.cleanupsMu.Unlock()
|
|
for _, f := range cleanups {
|
|
f()
|
|
}
|
|
|
|
close(r.done)
|
|
<-r.gwCtx.Done()
|
|
})
|
|
}
|
|
|
|
func (r *ResultHandle) registerCleanup(f func()) {
|
|
r.cleanupsMu.Lock()
|
|
r.cleanups = append(r.cleanups, f)
|
|
r.cleanupsMu.Unlock()
|
|
}
|
|
|
|
func (r *ResultHandle) build(buildFunc gateway.BuildFunc) (err error) {
|
|
_, err = buildFunc(r.gwCtx, r.gwClient)
|
|
return err
|
|
}
|
|
|
|
func (r *ResultHandle) getContainerConfig(cfg *controllerapi.InvokeConfig) (containerCfg gateway.NewContainerRequest, _ error) {
|
|
if r.res != nil && r.solveErr == nil {
|
|
logrus.Debugf("creating container from successful build")
|
|
ccfg, err := containerConfigFromResult(r.res, cfg)
|
|
if err != nil {
|
|
return containerCfg, err
|
|
}
|
|
containerCfg = *ccfg
|
|
} else {
|
|
logrus.Debugf("creating container from failed build %+v", cfg)
|
|
ccfg, err := containerConfigFromError(r.solveErr, cfg)
|
|
if err != nil {
|
|
return containerCfg, errors.Wrapf(err, "no result nor error is available")
|
|
}
|
|
containerCfg = *ccfg
|
|
}
|
|
return containerCfg, nil
|
|
}
|
|
|
|
func (r *ResultHandle) getProcessConfig(cfg *controllerapi.InvokeConfig, stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) (_ gateway.StartRequest, err error) {
|
|
processCfg := newStartRequest(stdin, stdout, stderr)
|
|
if r.res != nil && r.solveErr == nil {
|
|
logrus.Debugf("creating container from successful build")
|
|
if err := populateProcessConfigFromResult(&processCfg, r.res, cfg); err != nil {
|
|
return processCfg, err
|
|
}
|
|
} else {
|
|
logrus.Debugf("creating container from failed build %+v", cfg)
|
|
if err := populateProcessConfigFromError(&processCfg, r.solveErr, cfg); err != nil {
|
|
return processCfg, err
|
|
}
|
|
}
|
|
return processCfg, nil
|
|
}
|
|
|
|
func containerConfigFromResult(res *gateway.Result, cfg *controllerapi.InvokeConfig) (*gateway.NewContainerRequest, error) {
|
|
if cfg.Initial {
|
|
return nil, errors.Errorf("starting from the container from the initial state of the step is supported only on the failed steps")
|
|
}
|
|
|
|
ps, err := exptypes.ParsePlatforms(res.Metadata)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ref, ok := res.FindRef(ps.Platforms[0].ID)
|
|
if !ok {
|
|
return nil, errors.Errorf("no reference found")
|
|
}
|
|
|
|
return &gateway.NewContainerRequest{
|
|
Mounts: []gateway.Mount{
|
|
{
|
|
Dest: "/",
|
|
MountType: pb.MountType_BIND,
|
|
Ref: ref,
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func populateProcessConfigFromResult(req *gateway.StartRequest, res *gateway.Result, cfg *controllerapi.InvokeConfig) error {
|
|
imgData := res.Metadata[exptypes.ExporterImageConfigKey]
|
|
var img *specs.Image
|
|
if len(imgData) > 0 {
|
|
img = &specs.Image{}
|
|
if err := json.Unmarshal(imgData, img); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
user := ""
|
|
if !cfg.NoUser {
|
|
user = cfg.User
|
|
} else if img != nil {
|
|
user = img.Config.User
|
|
}
|
|
|
|
cwd := ""
|
|
if !cfg.NoCwd {
|
|
cwd = cfg.Cwd
|
|
} else if img != nil {
|
|
cwd = img.Config.WorkingDir
|
|
}
|
|
|
|
env := []string{}
|
|
if img != nil {
|
|
env = append(env, img.Config.Env...)
|
|
}
|
|
env = append(env, cfg.Env...)
|
|
|
|
args := []string{}
|
|
if cfg.Entrypoint != nil {
|
|
args = append(args, cfg.Entrypoint...)
|
|
} else if img != nil {
|
|
args = append(args, img.Config.Entrypoint...)
|
|
}
|
|
if !cfg.NoCmd {
|
|
args = append(args, cfg.Cmd...)
|
|
} else if img != nil {
|
|
args = append(args, img.Config.Cmd...)
|
|
}
|
|
|
|
req.Args = args
|
|
req.Env = env
|
|
req.User = user
|
|
req.Cwd = cwd
|
|
req.Tty = cfg.Tty
|
|
|
|
return nil
|
|
}
|
|
|
|
func containerConfigFromError(solveErr *errdefs.SolveError, cfg *controllerapi.InvokeConfig) (*gateway.NewContainerRequest, error) {
|
|
exec, err := execOpFromError(solveErr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var mounts []gateway.Mount
|
|
for i, mnt := range exec.Mounts {
|
|
rid := solveErr.Solve.MountIDs[i]
|
|
if cfg.Initial {
|
|
rid = solveErr.Solve.InputIDs[i]
|
|
}
|
|
mounts = append(mounts, gateway.Mount{
|
|
Selector: mnt.Selector,
|
|
Dest: mnt.Dest,
|
|
ResultID: rid,
|
|
Readonly: mnt.Readonly,
|
|
MountType: mnt.MountType,
|
|
CacheOpt: mnt.CacheOpt,
|
|
SecretOpt: mnt.SecretOpt,
|
|
SSHOpt: mnt.SSHOpt,
|
|
})
|
|
}
|
|
return &gateway.NewContainerRequest{
|
|
Mounts: mounts,
|
|
NetMode: exec.Network,
|
|
}, nil
|
|
}
|
|
|
|
func populateProcessConfigFromError(req *gateway.StartRequest, solveErr *errdefs.SolveError, cfg *controllerapi.InvokeConfig) error {
|
|
exec, err := execOpFromError(solveErr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
meta := exec.Meta
|
|
user := ""
|
|
if !cfg.NoUser {
|
|
user = cfg.User
|
|
} else {
|
|
user = meta.User
|
|
}
|
|
|
|
cwd := ""
|
|
if !cfg.NoCwd {
|
|
cwd = cfg.Cwd
|
|
} else {
|
|
cwd = meta.Cwd
|
|
}
|
|
|
|
env := append(meta.Env, cfg.Env...)
|
|
|
|
args := []string{}
|
|
if cfg.Entrypoint != nil {
|
|
args = append(args, cfg.Entrypoint...)
|
|
}
|
|
if cfg.Cmd != nil {
|
|
args = append(args, cfg.Cmd...)
|
|
}
|
|
if len(args) == 0 {
|
|
args = meta.Args
|
|
}
|
|
|
|
req.Args = args
|
|
req.Env = env
|
|
req.User = user
|
|
req.Cwd = cwd
|
|
req.Tty = cfg.Tty
|
|
|
|
return nil
|
|
}
|
|
|
|
func execOpFromError(solveErr *errdefs.SolveError) (*pb.ExecOp, error) {
|
|
if solveErr == nil {
|
|
return nil, errors.Errorf("no error is available")
|
|
}
|
|
switch op := solveErr.Solve.Op.GetOp().(type) {
|
|
case *pb.Op_Exec:
|
|
return op.Exec, nil
|
|
default:
|
|
return nil, errors.Errorf("invoke: unsupported error type")
|
|
}
|
|
// TODO: support other ops
|
|
}
|
|
|
|
func newStartRequest(stdin io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) gateway.StartRequest {
|
|
return gateway.StartRequest{
|
|
Stdin: stdin,
|
|
Stdout: stdout,
|
|
Stderr: stderr,
|
|
}
|
|
}
|