buildx/monitor/monitor.go
Jonathan A. Sternberg 2f1be25b8f
controller: remove controller grpc service
Remove the controller grpc service along with associated code related to
sessions or remote controllers.

Data types that are still used with complicated dependency chains have
been kept in the same package for a future refactor.

Signed-off-by: Jonathan A. Sternberg <jonathan.sternberg@docker.com>
2025-04-30 13:46:58 -05:00

340 lines
9.2 KiB
Go

package monitor
import (
"context"
"fmt"
"io"
"sort"
"sync"
"sync/atomic"
"text/tabwriter"
"github.com/containerd/console"
"github.com/docker/buildx/build"
cbuild "github.com/docker/buildx/controller/build"
"github.com/docker/buildx/controller/control"
controllerapi "github.com/docker/buildx/controller/pb"
"github.com/docker/buildx/monitor/commands"
"github.com/docker/buildx/monitor/types"
"github.com/docker/buildx/util/ioset"
"github.com/docker/buildx/util/progress"
"github.com/google/shlex"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/term"
)
type MonitorBuildResult struct {
Resp *client.SolveResponse
Err error
}
// RunMonitor provides an interactive session for running and managing containers via specified IO.
func RunMonitor(ctx context.Context, curRef string, options *cbuild.Options, invokeConfig *controllerapi.InvokeConfig, c control.BuildxController, stdin io.ReadCloser, stdout io.WriteCloser, stderr console.File, progress *progress.Printer) (*MonitorBuildResult, error) {
defer func() {
if err := c.Close(); err != nil {
logrus.Warnf("close error: %v", err)
}
}()
if err := progress.Pause(); err != nil {
return nil, err
}
defer progress.Unpause()
monitorIn, monitorOut := ioset.Pipe()
defer func() {
monitorIn.Close()
}()
monitorEnableCh := make(chan struct{})
monitorDisableCh := make(chan struct{})
monitorOutCtx := ioset.MuxOut{
Out: monitorOut,
EnableHook: func() { monitorEnableCh <- struct{}{} },
DisableHook: func() { monitorDisableCh <- struct{}{} },
}
containerIn, containerOut := ioset.Pipe()
defer func() {
containerIn.Close()
}()
containerOutCtx := ioset.MuxOut{
Out: containerOut,
// send newline to hopefully get the prompt; TODO: better UI (e.g. reprinting the last line)
EnableHook: func() { containerOut.Stdin.Write([]byte("\n")) },
DisableHook: func() {},
}
invokeForwarder := ioset.NewForwarder()
invokeForwarder.SetIn(&containerIn)
m := &monitor{
BuildxController: c,
invokeIO: invokeForwarder,
muxIO: ioset.NewMuxIO(ioset.In{
Stdin: io.NopCloser(stdin),
Stdout: nopCloser{stdout},
Stderr: nopCloser{stderr},
}, []ioset.MuxOut{monitorOutCtx, containerOutCtx}, 1, func(prev int, res int) string {
if prev == 0 && res == 0 {
// No toggle happened because container I/O isn't enabled.
return "Process isn't attached (previous \"exec\" exited?). Use \"attach\" for attaching or \"rollback\" or \"exec\" for running new one.\n"
}
return "Switched IO\n"
}),
}
m.ref.Store(curRef)
// Start container automatically
fmt.Fprintf(stdout, "Launching interactive container. Press Ctrl-a-c to switch to monitor console\n")
invokeConfig.Rollback = false
invokeConfig.Initial = false
id := m.Rollback(ctx, invokeConfig)
fmt.Fprintf(stdout, "Interactive container was restarted with process %q. Press Ctrl-a-c to switch to the new container\n", id)
availableCommands := []types.Command{
commands.NewReloadCmd(m, stdout, progress, options, invokeConfig),
commands.NewRollbackCmd(m, invokeConfig, stdout),
commands.NewAttachCmd(m, stdout),
commands.NewExecCmd(m, invokeConfig, stdout),
commands.NewPsCmd(m, stdout),
}
registeredCommands := make(map[string]types.Command)
for _, c := range availableCommands {
registeredCommands[c.Info().Name] = c
}
additionalHelpMessages := map[string]string{
"help": "shows this message. Optionally pass a command name as an argument to print the detailed usage.",
"exit": "exits monitor",
}
// Serve monitor commands
monitorForwarder := ioset.NewForwarder()
monitorForwarder.SetIn(&monitorIn)
for {
<-monitorEnableCh
in, out := ioset.Pipe()
monitorForwarder.SetOut(&out)
doneCh, errCh := make(chan struct{}), make(chan error)
go func() {
defer close(doneCh)
defer in.Close()
go func() {
<-ctx.Done()
in.Close()
}()
t := term.NewTerminal(readWriter{in.Stdin, in.Stdout}, "(buildx) ")
for {
l, err := t.ReadLine()
if err != nil {
if err != io.EOF {
errCh <- err
return
}
return
}
args, err := shlex.Split(l)
if err != nil {
fmt.Fprintf(stdout, "monitor: failed to parse command: %v", err)
continue
} else if len(args) == 0 {
continue
}
// Builtin commands
switch args[0] {
case "":
// nop
continue
case "exit":
return
case "help":
if len(args) >= 2 {
printHelpMessageOfCommand(stdout, args[1], registeredCommands, additionalHelpMessages)
continue
}
printHelpMessage(stdout, registeredCommands, additionalHelpMessages)
continue
default:
}
// Registered commands
cmdname := args[0]
if cm, ok := registeredCommands[cmdname]; ok {
if err := cm.Exec(ctx, args); err != nil {
fmt.Fprintf(stdout, "%s: %v\n", cmdname, err)
}
} else {
fmt.Fprintf(stdout, "monitor: unknown command: %q\n", l)
printHelpMessage(stdout, registeredCommands, additionalHelpMessages)
}
}
}()
select {
case <-doneCh:
m.close()
return m.lastBuildResult, nil
case err := <-errCh:
m.close()
return m.lastBuildResult, err
case <-monitorDisableCh:
}
monitorForwarder.SetOut(nil)
}
}
func printHelpMessageOfCommand(out io.Writer, name string, registeredCommands map[string]types.Command, additional map[string]string) {
var target types.Command
if c, ok := registeredCommands[name]; ok {
target = c
} else {
fmt.Fprintf(out, "monitor: no help message for %q\n", name)
printHelpMessage(out, registeredCommands, additional)
return
}
fmt.Fprintln(out, target.Info().HelpMessage)
if h := target.Info().HelpMessageLong; h != "" {
fmt.Fprintln(out, h)
}
}
func printHelpMessage(out io.Writer, registeredCommands map[string]types.Command, additional map[string]string) {
var names []string
for name := range registeredCommands {
names = append(names, name)
}
for name := range additional {
names = append(names, name)
}
sort.Strings(names)
fmt.Fprint(out, "Available commands are:\n")
w := new(tabwriter.Writer)
w.Init(out, 0, 8, 0, '\t', 0)
for _, name := range names {
var mes string
if c, ok := registeredCommands[name]; ok {
mes = c.Info().HelpMessage
} else if m, ok := additional[name]; ok {
mes = m
} else {
continue
}
fmt.Fprintln(w, " "+name+"\t"+mes)
}
w.Flush()
}
type readWriter struct {
io.Reader
io.Writer
}
type monitor struct {
control.BuildxController
ref atomic.Value
muxIO *ioset.MuxIO
invokeIO *ioset.Forwarder
invokeCancel func()
attachedPid atomic.Value
lastBuildResult *MonitorBuildResult
}
func (m *monitor) Build(ctx context.Context, options *cbuild.Options, in io.ReadCloser, progress progress.Writer) (resp *client.SolveResponse, input *build.Inputs, err error) {
resp, _, err = m.BuildxController.Build(ctx, options, in, progress)
m.lastBuildResult = &MonitorBuildResult{Resp: resp, Err: err} // Record build result
return
}
func (m *monitor) Rollback(ctx context.Context, cfg *controllerapi.InvokeConfig) string {
pid := identity.NewID()
cfg1 := cfg
cfg1.Rollback = true
return m.startInvoke(ctx, pid, cfg1)
}
func (m *monitor) Exec(ctx context.Context, cfg *controllerapi.InvokeConfig) string {
return m.startInvoke(ctx, identity.NewID(), cfg)
}
func (m *monitor) Attach(ctx context.Context, pid string) {
m.startInvoke(ctx, pid, &controllerapi.InvokeConfig{})
}
func (m *monitor) Detach() {
if m.invokeCancel != nil {
m.invokeCancel() // Finish existing attach
}
}
func (m *monitor) AttachedPID() string {
return m.attachedPid.Load().(string)
}
func (m *monitor) close() {
m.Detach()
}
func (m *monitor) startInvoke(ctx context.Context, pid string, cfg *controllerapi.InvokeConfig) string {
if m.invokeCancel != nil {
m.invokeCancel() // Finish existing attach
}
if len(cfg.Entrypoint) == 0 && len(cfg.Cmd) == 0 {
cfg.Entrypoint = []string{"sh"} // launch shell by default
cfg.Cmd = []string{}
cfg.NoCmd = false
}
go func() {
// Start a new invoke
if err := m.invoke(ctx, pid, cfg); err != nil {
if errors.Is(err, context.Canceled) {
logrus.Debugf("process canceled: %v", err)
} else {
logrus.Errorf("invoke: %v", err)
}
}
if pid == m.attachedPid.Load() {
m.attachedPid.Store("")
}
}()
m.attachedPid.Store(pid)
return pid
}
func (m *monitor) invoke(ctx context.Context, pid string, cfg *controllerapi.InvokeConfig) error {
m.muxIO.Enable(1)
defer m.muxIO.Disable(1)
if err := m.muxIO.SwitchTo(1); err != nil {
return errors.Errorf("failed to switch to process IO: %v", err)
}
invokeCtx, invokeCancel := context.WithCancelCause(ctx)
containerIn, containerOut := ioset.Pipe()
m.invokeIO.SetOut(&containerOut)
waitInvokeDoneCh := make(chan struct{})
var cancelOnce sync.Once
invokeCancelAndDetachFn := func() {
cancelOnce.Do(func() {
containerIn.Close()
m.invokeIO.SetOut(nil)
invokeCancel(errors.WithStack(context.Canceled))
})
<-waitInvokeDoneCh
}
defer invokeCancelAndDetachFn()
m.invokeCancel = invokeCancelAndDetachFn
err := m.Invoke(invokeCtx, pid, cfg, containerIn.Stdin, containerIn.Stdout, containerIn.Stderr)
close(waitInvokeDoneCh)
return err
}
type nopCloser struct {
io.Writer
}
func (c nopCloser) Close() error { return nil }