Add dial-stdio command

This allows the buildx CLI to act a proxy to the configured instance.
It allows external code to use buildx itself as a driver for connecting
to buildkitd instances.

Instance and node selection should follow the same semantics as as
`buildx build`, including taking into account the `BUILDX_BUILDER` env
var and the `--builder` global flag.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2023-11-13 23:57:12 +00:00
parent d0c4bed484
commit 760244ee3e
12 changed files with 461 additions and 27 deletions

62
build/dial.go Normal file
View File

@ -0,0 +1,62 @@
package build
import (
"context"
stderrors "errors"
"net"
"github.com/containerd/containerd/platforms"
"github.com/docker/buildx/builder"
"github.com/docker/buildx/util/progress"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
func Dial(ctx context.Context, nodes []builder.Node, pw progress.Writer, platform *v1.Platform) (net.Conn, error) {
nodes, err := filterAvailableNodes(nodes)
if err != nil {
return nil, err
}
if len(nodes) == 0 {
return nil, errors.New("no nodes available")
}
var pls []v1.Platform
if platform != nil {
pls = []v1.Platform{*platform}
}
opts := map[string]Options{"default": {Platforms: pls}}
resolved, err := resolveDrivers(ctx, nodes, opts, pw)
if err != nil {
return nil, err
}
var dialError error
for _, ls := range resolved {
for _, rn := range ls {
if platform != nil {
p := *platform
var found bool
for _, pp := range rn.platforms {
if platforms.Only(p).Match(pp) {
found = true
break
}
}
if !found {
continue
}
}
conn, err := nodes[rn.driverIndex].Driver.Dial(ctx)
if err == nil {
return conn, nil
}
dialError = stderrors.Join(err)
}
}
return nil, errors.Wrap(dialError, "no nodes available")
}

132
commands/dial_stdio.go Normal file
View File

@ -0,0 +1,132 @@
package commands
import (
"io"
"net"
"os"
"github.com/containerd/containerd/platforms"
"github.com/docker/buildx/build"
"github.com/docker/buildx/builder"
"github.com/docker/buildx/util/progress"
"github.com/docker/cli/cli/command"
"github.com/moby/buildkit/util/appcontext"
"github.com/moby/buildkit/util/progress/progressui"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)
type stdioOptions struct {
builder string
platform string
progress string
}
func runDialStdio(dockerCli command.Cli, opts stdioOptions) error {
ctx := appcontext.Context()
contextPathHash, _ := os.Getwd()
b, err := builder.New(dockerCli,
builder.WithName(opts.builder),
builder.WithContextPathHash(contextPathHash),
)
if err != nil {
return err
}
if err = updateLastActivity(dockerCli, b.NodeGroup); err != nil {
return errors.Wrapf(err, "failed to update builder last activity time")
}
nodes, err := b.LoadNodes(ctx)
if err != nil {
return err
}
printer, err := progress.NewPrinter(ctx, os.Stderr, progressui.DisplayMode(opts.progress), progress.WithPhase("dial-stdio"), progress.WithDesc("builder: "+b.Name, "builder:"+b.Name))
if err != nil {
return err
}
var p *v1.Platform
if opts.platform != "" {
pp, err := platforms.Parse(opts.platform)
if err != nil {
return errors.Wrapf(err, "invalid platform %q", opts.platform)
}
p = &pp
}
defer printer.Wait()
return progress.Wrap("Proxying to builder", printer.Write, func(sub progress.SubLogger) error {
var conn net.Conn
err := sub.Wrap("Dialing builder", func() error {
conn, err = build.Dial(ctx, nodes, printer, p)
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
defer conn.Close()
go func() {
<-ctx.Done()
closeWrite(conn)
}()
var eg errgroup.Group
eg.Go(func() error {
_, err := io.Copy(conn, os.Stdin)
closeWrite(conn)
return err
})
eg.Go(func() error {
_, err := io.Copy(os.Stdout, conn)
closeRead(conn)
return err
})
return eg.Wait()
})
}
func closeRead(conn net.Conn) error {
if c, ok := conn.(interface{ CloseRead() error }); ok {
return c.CloseRead()
}
return conn.Close()
}
func closeWrite(conn net.Conn) error {
if c, ok := conn.(interface{ CloseWrite() error }); ok {
return c.CloseWrite()
}
return conn.Close()
}
func dialStdioCmd(dockerCli command.Cli, rootOpts *rootOptions) *cobra.Command {
opts := stdioOptions{}
cmd := &cobra.Command{
Use: "dial-stdio",
Short: "Proxy current stdio streams to builder instance",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, args []string) error {
opts.builder = rootOpts.builder
return runDialStdio(dockerCli, opts)
},
}
flags := cmd.Flags()
cmd.Flags()
flags.StringVar(&opts.platform, "platform", os.Getenv("DOCKER_DEFAULT_PLATFORM"), "Target platform: this is used for node selection")
flags.StringVar(&opts.progress, "progress", "quiet", "Set type of progress output (auto, plain, tty).")
return cmd
}

View File

@ -83,6 +83,7 @@ func addCommands(cmd *cobra.Command, dockerCli command.Cli) {
buildCmd(dockerCli, opts, nil), buildCmd(dockerCli, opts, nil),
bakeCmd(dockerCli, opts), bakeCmd(dockerCli, opts),
createCmd(dockerCli), createCmd(dockerCli),
dialStdioCmd(dockerCli, opts),
rmCmd(dockerCli, opts), rmCmd(dockerCli, opts),
lsCmd(dockerCli), lsCmd(dockerCli),
useCmd(dockerCli, opts), useCmd(dockerCli, opts),

View File

@ -9,21 +9,22 @@ Extended build capabilities with BuildKit
### Subcommands ### Subcommands
| Name | Description | | Name | Description |
|:-------------------------------------|:---------------------------------------| |:-------------------------------------|:------------------------------------------------|
| [`bake`](buildx_bake.md) | Build from a file | | [`bake`](buildx_bake.md) | Build from a file |
| [`build`](buildx_build.md) | Start a build | | [`build`](buildx_build.md) | Start a build |
| [`create`](buildx_create.md) | Create a new builder instance | | [`create`](buildx_create.md) | Create a new builder instance |
| [`debug`](buildx_debug.md) | Start debugger (EXPERIMENTAL) | | [`debug`](buildx_debug.md) | Start debugger (EXPERIMENTAL) |
| [`du`](buildx_du.md) | Disk usage | | [`dial-stdio`](buildx_dial-stdio.md) | Proxy current stdio streams to builder instance |
| [`imagetools`](buildx_imagetools.md) | Commands to work on images in registry | | [`du`](buildx_du.md) | Disk usage |
| [`inspect`](buildx_inspect.md) | Inspect current builder instance | | [`imagetools`](buildx_imagetools.md) | Commands to work on images in registry |
| [`ls`](buildx_ls.md) | List builder instances | | [`inspect`](buildx_inspect.md) | Inspect current builder instance |
| [`prune`](buildx_prune.md) | Remove build cache | | [`ls`](buildx_ls.md) | List builder instances |
| [`rm`](buildx_rm.md) | Remove one or more builder instances | | [`prune`](buildx_prune.md) | Remove build cache |
| [`stop`](buildx_stop.md) | Stop builder instance | | [`rm`](buildx_rm.md) | Remove one or more builder instances |
| [`use`](buildx_use.md) | Set the current builder instance | | [`stop`](buildx_stop.md) | Stop builder instance |
| [`version`](buildx_version.md) | Show buildx version information | | [`use`](buildx_use.md) | Set the current builder instance |
| [`version`](buildx_version.md) | Show buildx version information |
### Options ### Options

View File

@ -0,0 +1,47 @@
# docker buildx dial-stdio
<!---MARKER_GEN_START-->
Proxy current stdio streams to builder instance
### Options
| Name | Type | Default | Description |
|:-------------|:---------|:--------|:-------------------------------------------------|
| `--builder` | `string` | | Override the configured builder instance |
| `--platform` | `string` | | Target platform: this is used for node selection |
| `--progress` | `string` | `quiet` | Set type of progress output (auto, plain, tty). |
<!---MARKER_GEN_END-->
## Description
dial-stdio uses the stdin and stdout streams of the command to proxy to the configured builder instance.
It is not intended to be used by humans, but rather by other tools that want to interact with the builder instance via BuildKit API.
## Examples
Example go program that uses the dial-stdio command wire up a buildkit client.
This is for example use only and may not be suitable for production use.
```go
client.New(ctx, "", client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
c1, c2 := net.Pipe()
cmd := exec.Command("docker", "buildx", "dial-stdio")
cmd.Stdin = c1
cmd.Stdout = c1
if err := cmd.Start(); err != nil {
c1.Close()
c2.Close()
return nil, err
}
go func() {
cmd.Wait()
c2.Close()
}()
return c2
}))
```

View File

@ -384,13 +384,20 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil return nil
} }
func (d *Driver) Client(ctx context.Context) (*client.Client, error) { func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
_, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"}) _, conn, err := d.exec(ctx, []string{"buildctl", "dial-stdio"})
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn = demuxConn(conn) conn = demuxConn(conn)
return conn, nil
}
func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
conn, err := d.Dial(ctx)
if err != nil {
return nil, err
}
exp, _, err := detect.Exporter() exp, _, err := detect.Exporter()
if err != nil { if err != nil {

View File

@ -57,10 +57,14 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil return nil
} }
func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", d.DialMeta)
}
func (d *Driver) Client(ctx context.Context) (*client.Client, error) { func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
opts := []client.ClientOpt{ opts := []client.ClientOpt{
client.WithContextDialer(func(context.Context, string) (net.Conn, error) { client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return d.DockerAPI.DialHijack(ctx, "/grpc", "h2c", d.DialMeta) return d.Dial(ctx)
}), client.WithSessionDialer(func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) { }), client.WithSessionDialer(func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) {
return d.DockerAPI.DialHijack(ctx, "/session", proto, meta) return d.DockerAPI.DialHijack(ctx, "/session", proto, meta)
}), }),

View File

@ -59,6 +59,7 @@ type Driver interface {
Version(context.Context) (string, error) Version(context.Context) (string, error)
Stop(ctx context.Context, force bool) error Stop(ctx context.Context, force bool) error
Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error
Dial(ctx context.Context) (net.Conn, error)
Client(ctx context.Context) (*client.Client, error) Client(ctx context.Context) (*client.Client, error)
Features(ctx context.Context) map[Feature]bool Features(ctx context.Context) map[Feature]bool
HostGatewayIP(ctx context.Context) (net.IP, error) HostGatewayIP(ctx context.Context) (net.IP, error)

View File

@ -189,7 +189,7 @@ func (d *Driver) Rm(ctx context.Context, force, rmVolume, rmDaemon bool) error {
return nil return nil
} }
func (d *Driver) Client(ctx context.Context) (*client.Client, error) { func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
restClient := d.clientset.CoreV1().RESTClient() restClient := d.clientset.CoreV1().RESTClient()
restClientConfig, err := d.KubeClientConfig.ClientConfig() restClientConfig, err := d.KubeClientConfig.ClientConfig()
if err != nil { if err != nil {
@ -208,7 +208,10 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return conn, nil
}
func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
exp, _, err := detect.Exporter() exp, _, err := detect.Exporter()
if err != nil { if err != nil {
return nil, err return nil, err
@ -216,7 +219,7 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
var opts []client.ClientOpt var opts []client.ClientOpt
opts = append(opts, client.WithContextDialer(func(context.Context, string) (net.Conn, error) { opts = append(opts, client.WithContextDialer(func(context.Context, string) (net.Conn, error) {
return conn, nil return d.Dial(ctx)
})) }))
if td, ok := exp.(client.TracerDelegate); ok { if td, ok := exp.(client.TracerDelegate); ok {
opts = append(opts, client.WithTracerDelegate(td)) opts = append(opts, client.WithTracerDelegate(td))

View File

@ -2,14 +2,18 @@ package remote
import ( import (
"context" "context"
"errors" "crypto/tls"
"crypto/x509"
"net" "net"
"os"
"strings"
"time" "time"
"github.com/docker/buildx/driver" "github.com/docker/buildx/driver"
"github.com/docker/buildx/util/progress" "github.com/docker/buildx/util/progress"
"github.com/moby/buildkit/client" "github.com/moby/buildkit/client"
"github.com/moby/buildkit/util/tracing/detect" "github.com/moby/buildkit/util/tracing/detect"
"github.com/pkg/errors"
) )
type Driver struct { type Driver struct {
@ -82,14 +86,61 @@ func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
opts = append(opts, client.WithTracerDelegate(td)) opts = append(opts, client.WithTracerDelegate(td))
} }
if d.tlsOpts != nil { opts = append(opts, client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
opts = append(opts, []client.ClientOpt{ return d.Dial(ctx)
client.WithServerConfig(d.tlsOpts.serverName, d.tlsOpts.caCert), }))
client.WithCredentials(d.tlsOpts.cert, d.tlsOpts.key),
}...) return client.New(ctx, "", opts...)
}
func (d *Driver) Dial(ctx context.Context) (net.Conn, error) {
network, addr, ok := strings.Cut(d.InitConfig.EndpointAddr, "://")
if !ok {
return nil, errors.Errorf("invalid endpoint address: %s", d.InitConfig.EndpointAddr)
} }
return client.New(ctx, d.InitConfig.EndpointAddr, opts...) dialer := &net.Dialer{}
conn, err := dialer.DialContext(ctx, network, addr)
if err != nil {
return nil, errors.WithStack(err)
}
if d.tlsOpts != nil {
cfg, err := loadTLS(d.tlsOpts)
if err != nil {
return nil, errors.Wrap(err, "error loading tls config")
}
conn = tls.Client(conn, cfg)
}
return conn, nil
}
func loadTLS(opts *tlsOpts) (*tls.Config, error) {
cfg := &tls.Config{
ServerName: opts.serverName,
RootCAs: x509.NewCertPool(),
}
if opts.caCert != "" {
ca, err := os.ReadFile(opts.caCert)
if err != nil {
return nil, errors.Wrap(err, "could not read ca certificate")
}
if ok := cfg.RootCAs.AppendCertsFromPEM(ca); !ok {
return nil, errors.New("failed to append ca certs")
}
}
if opts.cert != "" || opts.key != "" {
cert, err := tls.LoadX509KeyPair(opts.cert, opts.key)
if err != nil {
return nil, errors.Wrap(err, "could not read certificate/key")
}
cfg.Certificates = append(cfg.Certificates, cert)
}
return cfg, nil
} }
func (d *Driver) Features(ctx context.Context) map[driver.Feature]bool { func (d *Driver) Features(ctx context.Context) map[driver.Feature]bool {

124
tests/dialstdio.go Normal file
View File

@ -0,0 +1,124 @@
package tests
import (
"bytes"
"context"
"net"
"os"
"os/exec"
"path/filepath"
"testing"
"time"
"github.com/docker/buildx/util/progress"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb"
gwclient "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/util/progress/progressui"
"github.com/moby/buildkit/util/testutil/integration"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var dialstdioTests = []func(t *testing.T, sb integration.Sandbox){
testDialStdio,
}
func testDialStdio(t *testing.T, sb integration.Sandbox) {
do := func(t *testing.T, pipe func(t *testing.T, cmd *exec.Cmd) net.Conn) {
errBuf := bytes.NewBuffer(nil)
defer func() {
if t.Failed() {
t.Log(errBuf.String())
}
}()
var cmd *exec.Cmd
c, err := client.New(sb.Context(), "", client.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
cmd = buildxCmd(sb, withArgs("dial-stdio", "--progress", "auto"))
conn := pipe(t, cmd)
cmd.Stderr = errBuf
if err := cmd.Start(); err != nil {
return nil, errors.Wrap(err, errBuf.String())
}
return conn, nil
}))
require.NoError(t, err)
defer func() {
c.Close()
// Since the client is closed (and as such the connection shutdown), the buildx command should exit cleanly.
chErr := make(chan error, 1)
go func() {
chErr <- cmd.Wait()
}()
select {
case <-time.After(10 * time.Second):
t.Error("timeout waiting for buildx command to exit")
case <-chErr:
assert.NoError(t, err)
}
}()
_, err = c.Info(sb.Context())
require.NoError(t, err)
require.Contains(t, errBuf.String(), "builder: "+sb.Address())
dir := t.TempDir()
f, err := os.CreateTemp(dir, "log")
require.NoError(t, err)
defer f.Close()
defer func() {
if t.Failed() {
dt, _ := os.ReadFile(f.Name())
t.Log(string(dt))
}
}()
p, err := progress.NewPrinter(sb.Context(), f, progressui.AutoMode)
require.NoError(t, err)
ch, chDone := progress.NewChannel(p)
done := func() {
select {
case <-sb.Context().Done():
case <-chDone:
}
}
_, err = c.Build(sb.Context(), client.SolveOpt{
Exports: []client.ExportEntry{
{Type: "local", OutputDir: dir},
},
}, "", func(ctx context.Context, gwc gwclient.Client) (*gwclient.Result, error) {
def, err := llb.Scratch().File(llb.Mkfile("hello", 0o600, []byte("world"))).Marshal(ctx)
if err != nil {
return nil, err
}
return gwc.Solve(ctx, gwclient.SolveRequest{
Definition: def.ToPB(),
})
}, ch)
done()
require.NoError(t, err)
dt, err := os.ReadFile(filepath.Join(dir, "hello"))
require.NoError(t, err)
require.Equal(t, "world", string(dt))
}
t.Run("conn=netpipe", func(t *testing.T) {
t.Parallel()
do(t, func(t *testing.T, cmd *exec.Cmd) net.Conn {
c1, c2 := net.Pipe()
cmd.Stdin = c1
cmd.Stdout = c1
return c2
})
})
}

View File

@ -29,6 +29,7 @@ func TestIntegration(t *testing.T) {
tests = append(tests, versionTests...) tests = append(tests, versionTests...)
tests = append(tests, createTests...) tests = append(tests, createTests...)
tests = append(tests, rmTests...) tests = append(tests, rmTests...)
tests = append(tests, dialstdioTests...)
testIntegration(t, tests...) testIntegration(t, tests...)
} }