monitor: Enable to exec into the container

Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
This commit is contained in:
Kohei Tokunaga
2023-02-14 21:16:29 +09:00
parent eefe27ff42
commit e8f55a3cf7
14 changed files with 1249 additions and 481 deletions

View File

@ -75,15 +75,28 @@ func (c *Client) Disconnect(ctx context.Context, key string) error {
return err
}
func (c *Client) Invoke(ctx context.Context, ref string, containerConfig pb.ContainerConfig, in io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
if ref == "" {
func (c *Client) ListProcesses(ctx context.Context, ref string) (infos []*pb.ProcessInfo, retErr error) {
res, err := c.client().ListProcesses(ctx, &pb.ListProcessesRequest{Ref: ref})
if err != nil {
return nil, err
}
return res.Infos, nil
}
func (c *Client) DisconnectProcess(ctx context.Context, ref, pid string) error {
_, err := c.client().DisconnectProcess(ctx, &pb.DisconnectProcessRequest{Ref: ref, ProcessID: pid})
return err
}
func (c *Client) Invoke(ctx context.Context, ref string, pid string, invokeConfig pb.InvokeConfig, in io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
if ref == "" || pid == "" {
return errors.New("build reference must be specified")
}
stream, err := c.client().Invoke(ctx)
if err != nil {
return err
}
return attachIO(ctx, stream, &pb.InitMessage{Ref: ref, ContainerConfig: &containerConfig}, ioAttachConfig{
return attachIO(ctx, stream, &pb.InitMessage{Ref: ref, ProcessID: pid, InvokeConfig: &invokeConfig}, ioAttachConfig{
stdin: in,
stdout: stdout,
stderr: stderr,

View File

@ -4,16 +4,17 @@ import (
"context"
"io"
"sync"
"sync/atomic"
"time"
"github.com/docker/buildx/build"
"github.com/docker/buildx/controller/pb"
"github.com/docker/buildx/controller/processes"
"github.com/docker/buildx/util/ioset"
"github.com/docker/buildx/version"
controlapi "github.com/moby/buildkit/api/services/control"
"github.com/moby/buildkit/client"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
@ -27,16 +28,47 @@ func NewServer(buildFunc BuildFunc) *Server {
type Server struct {
buildFunc BuildFunc
session map[string]session
session map[string]*session
sessionMu sync.Mutex
}
type session struct {
statusChan chan *client.SolveStatus
result *build.ResultContext
inputPipe *io.PipeWriter
curInvokeCancel func()
curBuildCancel func()
buildOnGoing atomic.Bool
statusChan chan *client.SolveStatus
cancelBuild func()
inputPipe *io.PipeWriter
result *build.ResultContext
processes *processes.Manager
}
func (s *session) cancelRunningProcesses() {
s.processes.CancelRunningProcesses()
}
func (m *Server) ListProcesses(ctx context.Context, req *pb.ListProcessesRequest) (res *pb.ListProcessesResponse, err error) {
m.sessionMu.Lock()
defer m.sessionMu.Unlock()
s, ok := m.session[req.Ref]
if !ok {
return nil, errors.Errorf("unknown ref %q", req.Ref)
}
res = new(pb.ListProcessesResponse)
for _, p := range s.processes.ListProcesses() {
res.Infos = append(res.Infos, p)
}
return res, nil
}
func (m *Server) DisconnectProcess(ctx context.Context, req *pb.DisconnectProcessRequest) (res *pb.DisconnectProcessResponse, err error) {
m.sessionMu.Lock()
defer m.sessionMu.Unlock()
s, ok := m.session[req.Ref]
if !ok {
return nil, errors.Errorf("unknown ref %q", req.Ref)
}
return res, s.processes.DeleteProcess(req.ProcessID)
}
func (m *Server) Info(ctx context.Context, req *pb.InfoRequest) (res *pb.InfoResponse, err error) {
@ -75,12 +107,10 @@ func (m *Server) Disconnect(ctx context.Context, req *pb.DisconnectRequest) (res
m.sessionMu.Lock()
if s, ok := m.session[key]; ok {
if s.curBuildCancel != nil {
s.curBuildCancel()
}
if s.curInvokeCancel != nil {
s.curInvokeCancel()
if s.cancelBuild != nil {
s.cancelBuild()
}
s.cancelRunningProcesses()
}
delete(m.session, key)
m.sessionMu.Unlock()
@ -92,12 +122,10 @@ func (m *Server) Close() error {
m.sessionMu.Lock()
for k := range m.session {
if s, ok := m.session[k]; ok {
if s.curBuildCancel != nil {
s.curBuildCancel()
}
if s.curInvokeCancel != nil {
s.curInvokeCancel()
if s.cancelBuild != nil {
s.cancelBuild()
}
s.cancelRunningProcesses()
}
}
m.sessionMu.Unlock()
@ -110,19 +138,30 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
return nil, errors.New("build: empty key")
}
// Prepare status channel and session if not exists
// Prepare status channel and session
m.sessionMu.Lock()
if m.session == nil {
m.session = make(map[string]session)
m.session = make(map[string]*session)
}
s, ok := m.session[ref]
if ok && m.session[ref].statusChan != nil {
m.sessionMu.Unlock()
return &pb.BuildResponse{}, errors.New("build or status ongoing or status didn't call")
if ok {
if !s.buildOnGoing.CompareAndSwap(false, true) {
m.sessionMu.Unlock()
return &pb.BuildResponse{}, errors.New("build ongoing")
}
s.cancelRunningProcesses()
s.result = nil
} else {
s = &session{}
s.buildOnGoing.Store(true)
}
s.processes = processes.NewManager()
statusChan := make(chan *client.SolveStatus)
s.statusChan = statusChan
m.session[ref] = session{statusChan: statusChan}
inR, inW := io.Pipe()
defer inR.Close()
s.inputPipe = inW
m.session[ref] = s
m.sessionMu.Unlock()
defer func() {
close(statusChan)
@ -130,23 +169,11 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
s, ok := m.session[ref]
if ok {
s.statusChan = nil
s.buildOnGoing.Store(false)
}
m.sessionMu.Unlock()
}()
// Prepare input stream pipe
inR, inW := io.Pipe()
m.sessionMu.Lock()
if s, ok := m.session[ref]; ok {
s.inputPipe = inW
m.session[ref] = s
} else {
m.sessionMu.Unlock()
return nil, errors.Errorf("build: unknown key %v", ref)
}
m.sessionMu.Unlock()
defer inR.Close()
// Build the specified request
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -154,7 +181,7 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
m.sessionMu.Lock()
if s, ok := m.session[ref]; ok {
s.result = res
s.curBuildCancel = cancel
s.cancelBuild = cancel
m.session[ref] = s
} else {
m.sessionMu.Unlock()
@ -298,56 +325,51 @@ func (m *Server) Input(stream pb.Controller_InputServer) (err error) {
}
func (m *Server) Invoke(srv pb.Controller_InvokeServer) error {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
containerIn, containerOut := ioset.Pipe()
waitInvokeDoneCh := make(chan struct{})
var cancelOnce sync.Once
curInvokeCancel := func() {
cancelOnce.Do(func() { containerOut.Close(); containerIn.Close(); cancel() })
<-waitInvokeDoneCh
}
defer curInvokeCancel()
defer func() { containerOut.Close(); containerIn.Close() }()
var cfg *pb.ContainerConfig
var resultCtx *build.ResultContext
initDoneCh := make(chan struct{})
initDoneCh := make(chan *processes.Process)
initErrCh := make(chan error)
eg, egCtx := errgroup.WithContext(ctx)
eg, egCtx := errgroup.WithContext(context.TODO())
srvIOCtx, srvIOCancel := context.WithCancel(egCtx)
eg.Go(func() error {
return serveIO(egCtx, srv, func(initMessage *pb.InitMessage) (retErr error) {
defer srvIOCancel()
return serveIO(srvIOCtx, srv, func(initMessage *pb.InitMessage) (retErr error) {
defer func() {
if retErr != nil {
initErrCh <- retErr
}
close(initDoneCh)
}()
ref := initMessage.Ref
cfg = initMessage.ContainerConfig
cfg := initMessage.InvokeConfig
// Register cancel callback
m.sessionMu.Lock()
if s, ok := m.session[ref]; ok {
if cancel := s.curInvokeCancel; cancel != nil {
logrus.Warnf("invoke: cancelling ongoing invoke of %q", ref)
cancel()
}
s.curInvokeCancel = curInvokeCancel
m.session[ref] = s
} else {
s, ok := m.session[ref]
if !ok {
m.sessionMu.Unlock()
return errors.Errorf("invoke: unknown key %v", ref)
}
m.sessionMu.Unlock()
// Get the target result to invoke a container from
m.sessionMu.Lock()
if _, ok := m.session[ref]; !ok || m.session[ref].result == nil {
m.sessionMu.Unlock()
return errors.Errorf("unknown reference: %q", ref)
pid := initMessage.ProcessID
if pid == "" {
return errors.Errorf("invoke: specify process ID")
}
resultCtx = m.session[ref].result
m.sessionMu.Unlock()
proc, ok := s.processes.Get(pid)
if !ok {
// Start a new process.
if cfg == nil {
return errors.New("no container config is provided")
}
var err error
proc, err = s.processes.StartProcess(pid, s.result, cfg)
if err != nil {
return err
}
}
// Attach containerIn to this process
proc.ForwardIO(&containerIn, srvIOCancel)
initDoneCh <- proc
return nil
}, &ioServerConfig{
stdin: containerOut.Stdin,
@ -356,43 +378,31 @@ func (m *Server) Invoke(srv pb.Controller_InvokeServer) error {
// TODO: signal, resize
})
})
eg.Go(func() error {
defer containerIn.Close()
defer cancel()
eg.Go(func() (rErr error) {
defer srvIOCancel()
// Wait for init done
var proc *processes.Process
select {
case <-initDoneCh:
case p := <-initDoneCh:
proc = p
case err := <-initErrCh:
return err
case <-egCtx.Done():
return egCtx.Err()
}
if cfg == nil {
return errors.New("no container config is provided")
}
if resultCtx == nil {
return errors.New("no result is provided")
}
ccfg := build.ContainerConfig{
ResultCtx: resultCtx,
Entrypoint: cfg.Entrypoint,
Cmd: cfg.Cmd,
Env: cfg.Env,
Tty: cfg.Tty,
Stdin: containerIn.Stdin,
Stdout: containerIn.Stdout,
Stderr: containerIn.Stderr,
}
if !cfg.NoUser {
ccfg.User = &cfg.User
}
if !cfg.NoCwd {
ccfg.Cwd = &cfg.Cwd
}
return build.Invoke(egCtx, ccfg)
})
err := eg.Wait()
close(waitInvokeDoneCh)
curInvokeCancel()
return err
// Wait for IO done
select {
case <-srvIOCtx.Done():
return srvIOCtx.Err()
case err := <-proc.Done():
return err
case <-egCtx.Done():
return egCtx.Err()
}
})
return eg.Wait()
}
func toControlStatus(s *client.SolveStatus) *pb.StatusResponse {