mirror of
https://gitea.com/Lydanne/buildx.git
synced 2025-05-18 00:47:48 +08:00
build: move main solve request into main gateway call
Now, we always perform the full solve request in the main gateway call. This ensures that progress works properly, and makes the lifetime semantics much clearer. NewResultContext abstracts the details of a successful/failed build, to always return a single ResultContext, even though the details of how a gateway is created is different: - For a failed build, we can just keep the gateway open. - For a successful build, we immediately open another gateway and re-evaluate the build definition in that gateway. This should give an instant cache hit (since the build was just successful). Signed-off-by: Justin Chadwell <me@jedevc.com>
This commit is contained in:
parent
2dae553d18
commit
8d822fb06c
@ -885,7 +885,7 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
|
||||
|
||||
cc := c
|
||||
var printRes map[string][]byte
|
||||
rr, err := c.Build(ctx, so, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
|
||||
buildFunc := func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
|
||||
if opt.PrintFunc != nil {
|
||||
if _, ok := req.FrontendOpt["frontend.caps"]; !ok {
|
||||
req.FrontendOpt["frontend.caps"] = "moby.buildkit.frontend.subrequests+forward"
|
||||
@ -915,7 +915,6 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
|
||||
}
|
||||
|
||||
if fallback {
|
||||
fmt.Println("falling back!")
|
||||
req.FrontendOpt["build-arg:BUILDKIT_SYNTAX"] = printFallbackImage
|
||||
res2, err2 := c.Solve(ctx, req)
|
||||
if err2 != nil {
|
||||
@ -931,16 +930,16 @@ func BuildWithResultHandler(ctx context.Context, nodes []builder.Node, opt map[s
|
||||
}
|
||||
|
||||
results.Set(resultKey(dp.driverIndex, k), res)
|
||||
if resultHandleFunc != nil {
|
||||
resultCtx, err := NewResultContext(ctx, cc, so, res)
|
||||
if err == nil {
|
||||
resultHandleFunc(dp.driverIndex, resultCtx)
|
||||
} else {
|
||||
logrus.Warnf("failed to record result: %s", err)
|
||||
}
|
||||
}
|
||||
return res, nil
|
||||
}, ch)
|
||||
}
|
||||
var rr *client.SolveResponse
|
||||
if resultHandleFunc != nil {
|
||||
var resultCtx *ResultContext
|
||||
resultCtx, rr, err = NewResultContext(ctx, cc, so, "buildx", buildFunc, ch)
|
||||
resultHandleFunc(dp.driverIndex, resultCtx)
|
||||
} else {
|
||||
rr, err = c.Build(ctx, so, "buildx", buildFunc, ch)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
335
build/result.go
335
build/result.go
@ -6,8 +6,6 @@ import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
controllerapi "github.com/docker/buildx/controller/pb"
|
||||
"github.com/moby/buildkit/client"
|
||||
@ -22,14 +20,185 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func NewResultContext(ctx context.Context, c *client.Client, solveOpt client.SolveOpt, res *gateway.Result) (*ResultContext, error) {
|
||||
def, err := getDefinition(ctx, res)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// NewResultContext wraps a call to client.Build, additionally returning a
|
||||
// ResultContext alongside the standard response and error.
|
||||
//
|
||||
// This ResultContext can be used to execute additional build steps in the same
|
||||
// context as the build occurred, which can allow easy debugging of build
|
||||
// failures and successes.
|
||||
//
|
||||
// If the returned ResultContext is not nil, the caller must call Done() on it.
|
||||
func NewResultContext(ctx context.Context, cc *client.Client, opt client.SolveOpt, product string, buildFunc gateway.BuildFunc, ch chan *client.SolveStatus) (*ResultContext, *client.SolveResponse, error) {
|
||||
// Create a new context to wrap the original, and cancel it when the
|
||||
// caller-provided context is cancelled.
|
||||
//
|
||||
// We derive the context from the background context so that we can forbid
|
||||
// cancellation of the build request after <-done is closed (which we do
|
||||
// before returning the ResultContext).
|
||||
baseCtx := ctx
|
||||
ctx, cancel := context.WithCancelCause(context.Background())
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
select {
|
||||
case <-baseCtx.Done():
|
||||
cancel(baseCtx.Err())
|
||||
case <-done:
|
||||
// Once done is closed, we've recorded a ResultContext, so we
|
||||
// shouldn't allow cancelling the underlying build request anymore.
|
||||
}
|
||||
}()
|
||||
|
||||
// Create a new channel to forward status messages to the original.
|
||||
//
|
||||
// We do this so that we can discard status messages after the main portion
|
||||
// of the build is complete. This is necessary for the solve error case,
|
||||
// where the original gateway is kept open until the ResultContext is
|
||||
// closed - we don't want progress messages from operations in that
|
||||
// ResultContext to display after this function exits.
|
||||
//
|
||||
// Additionally, callers should wait for the progress channel to be closed.
|
||||
// If we keep the session open and never close the progress channel, the
|
||||
// caller will likely hang.
|
||||
baseCh := ch
|
||||
ch = make(chan *client.SolveStatus)
|
||||
go func() {
|
||||
for {
|
||||
s, ok := <-ch
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-baseCh:
|
||||
// base channel is closed, discard status messages
|
||||
default:
|
||||
baseCh <- s
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer close(baseCh)
|
||||
|
||||
var resCtx *ResultContext
|
||||
var resp *client.SolveResponse
|
||||
var respErr error
|
||||
|
||||
go func() {
|
||||
defer cancel(context.Canceled) // ensure no dangling processes
|
||||
|
||||
var res *gateway.Result
|
||||
var err error
|
||||
resp, err = cc.Build(ctx, opt, product, func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
|
||||
var err error
|
||||
res, err = buildFunc(ctx, c)
|
||||
|
||||
if res != nil && err == nil {
|
||||
// Force evaluation of the build result (otherwise, we likely
|
||||
// won't get a solve error)
|
||||
def, err2 := getDefinition(ctx, res)
|
||||
if err2 != nil {
|
||||
return nil, err2
|
||||
}
|
||||
res, err = evalDefinition(ctx, c, def)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// Scenario 1: we failed to evaluate a node somewhere in the
|
||||
// build graph.
|
||||
//
|
||||
// In this case, we construct a ResultContext from this
|
||||
// original Build session, and return it alongside the original
|
||||
// build error. We then need to keep the gateway session open
|
||||
// until the caller explicitly closes the ResultContext.
|
||||
|
||||
var se *errdefs.SolveError
|
||||
if errors.As(err, &se) {
|
||||
resCtx = &ResultContext{
|
||||
done: make(chan struct{}),
|
||||
solveErr: se,
|
||||
gwClient: c,
|
||||
gwCtx: ctx,
|
||||
}
|
||||
respErr = se
|
||||
close(done)
|
||||
|
||||
// Block until the caller closes the ResultContext.
|
||||
select {
|
||||
case <-resCtx.done:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
}
|
||||
return res, err
|
||||
}, ch)
|
||||
if resCtx != nil {
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
// Something unexpected failed during the build, we didn't succeed,
|
||||
// but we also didn't make it far enough to create a ResultContext.
|
||||
respErr = err
|
||||
close(done)
|
||||
return
|
||||
}
|
||||
|
||||
// Scenario 2: we successfully built the image with no errors.
|
||||
//
|
||||
// In this case, the original gateway session has now been closed
|
||||
// since the Build has been completed. So, we need to create a new
|
||||
// gateway session to populate the ResultContext. To do this, we
|
||||
// need to re-evaluate the target result, in this new session. This
|
||||
// should be instantaneous since the result should be cached.
|
||||
|
||||
def, err := getDefinition(ctx, res)
|
||||
if err != nil {
|
||||
respErr = err
|
||||
close(done)
|
||||
return
|
||||
}
|
||||
|
||||
// NOTE: ideally this second connection should be lazily opened
|
||||
opt := opt
|
||||
opt.Ref = ""
|
||||
_, respErr = cc.Build(ctx, opt, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
|
||||
res, err := evalDefinition(ctx, c, def)
|
||||
if err != nil {
|
||||
// This should probably not happen, since we've previously
|
||||
// successfully evaluated the same result with no issues.
|
||||
return nil, errors.Wrap(err, "inconsistent solve result")
|
||||
}
|
||||
resCtx = &ResultContext{
|
||||
done: make(chan struct{}),
|
||||
res: res,
|
||||
gwClient: c,
|
||||
gwCtx: ctx,
|
||||
}
|
||||
close(done)
|
||||
|
||||
// Block until the caller closes the ResultContext.
|
||||
select {
|
||||
case <-resCtx.done:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return nil, ctx.Err()
|
||||
}, nil)
|
||||
if resCtx != nil {
|
||||
return
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Block until the other thread signals that it's completed the build.
|
||||
select {
|
||||
case <-done:
|
||||
case <-baseCtx.Done():
|
||||
if respErr == nil {
|
||||
respErr = baseCtx.Err()
|
||||
}
|
||||
}
|
||||
return getResultAt(ctx, c, solveOpt, def, nil)
|
||||
return resCtx, resp, respErr
|
||||
}
|
||||
|
||||
// getDefinition converts a gateway result into a collection of definitions for
|
||||
// each ref in the result.
|
||||
func getDefinition(ctx context.Context, res *gateway.Result) (*result.Result[*pb.Definition], error) {
|
||||
return result.ConvertResult(res, func(ref gateway.Reference) (*pb.Definition, error) {
|
||||
st, err := ref.ToState()
|
||||
@ -44,118 +213,39 @@ func getDefinition(ctx context.Context, res *gateway.Result) (*result.Result[*pb
|
||||
})
|
||||
}
|
||||
|
||||
func getResultAt(ctx context.Context, c *client.Client, solveOpt client.SolveOpt, targets *result.Result[*pb.Definition], statusChan chan *client.SolveStatus) (*ResultContext, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
// forward SolveStatus
|
||||
done := new(atomic.Bool)
|
||||
defer done.Store(true)
|
||||
ch := make(chan *client.SolveStatus)
|
||||
go func() {
|
||||
for {
|
||||
s := <-ch
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
if done.Load() {
|
||||
// Do not forward if the function returned because statusChan is possibly closed
|
||||
continue
|
||||
}
|
||||
select {
|
||||
case statusChan <- s:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// get result
|
||||
resultCtxCh := make(chan *ResultContext)
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
solveOpt := solveOpt
|
||||
solveOpt.Ref = ""
|
||||
buildDoneCh := make(chan struct{})
|
||||
_, err := c.Build(context.Background(), solveOpt, "buildx", func(ctx context.Context, c gateway.Client) (*gateway.Result, error) {
|
||||
doneErr := errors.Errorf("done")
|
||||
ctx, cancel := context.WithCancelCause(ctx)
|
||||
defer cancel(doneErr)
|
||||
|
||||
// force evaluation of all targets in parallel
|
||||
results := make(map[*pb.Definition]*gateway.Result)
|
||||
resultsMu := sync.Mutex{}
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
targets.EachRef(func(def *pb.Definition) error {
|
||||
eg.Go(func() error {
|
||||
res2, err := c.Solve(egCtx, gateway.SolveRequest{
|
||||
Evaluate: true,
|
||||
Definition: def,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resultsMu.Lock()
|
||||
results[def] = res2
|
||||
resultsMu.Unlock()
|
||||
return nil
|
||||
})
|
||||
return nil
|
||||
// evalDefinition performs the reverse of getDefinition, converting a
|
||||
// collection of definitions into a gateway result.
|
||||
func evalDefinition(ctx context.Context, c gateway.Client, defs *result.Result[*pb.Definition]) (*gateway.Result, error) {
|
||||
// force evaluation of all targets in parallel
|
||||
results := make(map[*pb.Definition]*gateway.Result)
|
||||
resultsMu := sync.Mutex{}
|
||||
eg, egCtx := errgroup.WithContext(ctx)
|
||||
defs.EachRef(func(def *pb.Definition) error {
|
||||
eg.Go(func() error {
|
||||
res, err := c.Solve(egCtx, gateway.SolveRequest{
|
||||
Evaluate: true,
|
||||
Definition: def,
|
||||
})
|
||||
resultCtx := ResultContext{}
|
||||
if err := eg.Wait(); err != nil {
|
||||
var se *errdefs.SolveError
|
||||
if errors.As(err, &se) {
|
||||
resultCtx.solveErr = se
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
res2, _ := result.ConvertResult(targets, func(def *pb.Definition) (gateway.Reference, error) {
|
||||
if res, ok := results[def]; ok {
|
||||
return res.Ref, nil
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
|
||||
// Record the client and ctx as well so that containers can be created from the SolveError.
|
||||
resultCtx.res = res2
|
||||
resultCtx.gwClient = c
|
||||
resultCtx.gwCtx = ctx
|
||||
resultCtx.gwDone = func() {
|
||||
cancel(doneErr)
|
||||
// wait for Build() completion(or timeout) to ensure the Build's finalizing and avoiding an error "context canceled"
|
||||
select {
|
||||
case <-buildDoneCh:
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
}
|
||||
select {
|
||||
case resultCtxCh <- &resultCtx:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
// wait for cleanup or cancel
|
||||
<-ctx.Done()
|
||||
if context.Cause(ctx) != doneErr { // doneErr is not an error.
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
return nil, nil
|
||||
}, ch)
|
||||
close(buildDoneCh)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case resultCtx := <-resultCtxCh:
|
||||
return resultCtx, nil
|
||||
case err := <-errCh:
|
||||
resultsMu.Lock()
|
||||
results[def] = res
|
||||
resultsMu.Unlock()
|
||||
return nil
|
||||
})
|
||||
return nil
|
||||
})
|
||||
if err := eg.Wait(); err != nil {
|
||||
return nil, err
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
res, _ := result.ConvertResult(defs, func(def *pb.Definition) (gateway.Reference, error) {
|
||||
if res, ok := results[def]; ok {
|
||||
return res.Ref, nil
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// ResultContext is a build result with the client that built it.
|
||||
@ -163,17 +253,18 @@ type ResultContext struct {
|
||||
res *gateway.Result
|
||||
solveErr *errdefs.SolveError
|
||||
|
||||
gwClient gateway.Client
|
||||
gwCtx context.Context
|
||||
gwDone func()
|
||||
gwDoneOnce sync.Once
|
||||
done chan struct{}
|
||||
doneOnce sync.Once
|
||||
|
||||
gwClient gateway.Client
|
||||
gwCtx context.Context
|
||||
|
||||
cleanups []func()
|
||||
cleanupsMu sync.Mutex
|
||||
}
|
||||
|
||||
func (r *ResultContext) Done() {
|
||||
r.gwDoneOnce.Do(func() {
|
||||
r.doneOnce.Do(func() {
|
||||
r.cleanupsMu.Lock()
|
||||
cleanups := r.cleanups
|
||||
r.cleanups = nil
|
||||
@ -181,7 +272,9 @@ func (r *ResultContext) Done() {
|
||||
for _, f := range cleanups {
|
||||
f()
|
||||
}
|
||||
r.gwDone()
|
||||
|
||||
close(r.done)
|
||||
<-r.gwCtx.Done()
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user