controller: rename ref to sessionID and set buildRef back to ref

Signed-off-by: CrazyMax <1951866+crazy-max@users.noreply.github.com>
This commit is contained in:
CrazyMax
2024-10-24 15:37:18 +02:00
parent 1060328a96
commit eb15c667b9
11 changed files with 565 additions and 558 deletions

View File

@ -75,36 +75,36 @@ func (c *Client) List(ctx context.Context) (keys []string, retErr error) {
return res.Keys, nil
}
func (c *Client) Disconnect(ctx context.Context, key string) error {
if key == "" {
func (c *Client) Disconnect(ctx context.Context, sessionID string) error {
if sessionID == "" {
return nil
}
_, err := c.client().Disconnect(ctx, &pb.DisconnectRequest{Ref: key})
_, err := c.client().Disconnect(ctx, &pb.DisconnectRequest{SessionID: sessionID})
return err
}
func (c *Client) ListProcesses(ctx context.Context, ref string) (infos []*pb.ProcessInfo, retErr error) {
res, err := c.client().ListProcesses(ctx, &pb.ListProcessesRequest{Ref: ref})
func (c *Client) ListProcesses(ctx context.Context, sessionID string) (infos []*pb.ProcessInfo, retErr error) {
res, err := c.client().ListProcesses(ctx, &pb.ListProcessesRequest{SessionID: sessionID})
if err != nil {
return nil, err
}
return res.Infos, nil
}
func (c *Client) DisconnectProcess(ctx context.Context, ref, pid string) error {
_, err := c.client().DisconnectProcess(ctx, &pb.DisconnectProcessRequest{Ref: ref, ProcessID: pid})
func (c *Client) DisconnectProcess(ctx context.Context, sessionID, pid string) error {
_, err := c.client().DisconnectProcess(ctx, &pb.DisconnectProcessRequest{SessionID: sessionID, ProcessID: pid})
return err
}
func (c *Client) Invoke(ctx context.Context, ref string, pid string, invokeConfig *pb.InvokeConfig, in io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
if ref == "" || pid == "" {
return errors.New("build reference must be specified")
func (c *Client) Invoke(ctx context.Context, sessionID string, pid string, invokeConfig *pb.InvokeConfig, in io.ReadCloser, stdout io.WriteCloser, stderr io.WriteCloser) error {
if sessionID == "" || pid == "" {
return errors.New("build session ID must be specified")
}
stream, err := c.client().Invoke(ctx)
if err != nil {
return err
}
return attachIO(ctx, stream, &pb.InitMessage{Ref: ref, ProcessID: pid, InvokeConfig: invokeConfig}, ioAttachConfig{
return attachIO(ctx, stream, &pb.InitMessage{SessionID: sessionID, ProcessID: pid, InvokeConfig: invokeConfig}, ioAttachConfig{
stdin: in,
stdout: stdout,
stderr: stderr,
@ -112,8 +112,8 @@ func (c *Client) Invoke(ctx context.Context, ref string, pid string, invokeConfi
})
}
func (c *Client) Inspect(ctx context.Context, ref string) (*pb.InspectResponse, error) {
return c.client().Inspect(ctx, &pb.InspectRequest{Ref: ref})
func (c *Client) Inspect(ctx context.Context, sessionID string) (*pb.InspectResponse, error) {
return c.client().Inspect(ctx, &pb.InspectRequest{SessionID: sessionID})
}
func (c *Client) Build(ctx context.Context, options *pb.BuildOptions, in io.ReadCloser, progress progress.Writer) (string, *client.SolveResponse, *build.Inputs, error) {
@ -137,7 +137,7 @@ func (c *Client) Build(ctx context.Context, options *pb.BuildOptions, in io.Read
return ref, resp, nil, eg.Wait()
}
func (c *Client) build(ctx context.Context, ref string, options *pb.BuildOptions, in io.ReadCloser, statusChan chan *client.SolveStatus) (*client.SolveResponse, error) {
func (c *Client) build(ctx context.Context, sessionID string, options *pb.BuildOptions, in io.ReadCloser, statusChan chan *client.SolveStatus) (*client.SolveResponse, error) {
eg, egCtx := errgroup.WithContext(ctx)
done := make(chan struct{})
@ -146,8 +146,8 @@ func (c *Client) build(ctx context.Context, ref string, options *pb.BuildOptions
eg.Go(func() error {
defer close(done)
pbResp, err := c.client().Build(egCtx, &pb.BuildRequest{
Ref: ref,
Options: options,
SessionID: sessionID,
Options: options,
})
if err != nil {
return err
@ -159,7 +159,7 @@ func (c *Client) build(ctx context.Context, ref string, options *pb.BuildOptions
})
eg.Go(func() error {
stream, err := c.client().Status(egCtx, &pb.StatusRequest{
Ref: ref,
SessionID: sessionID,
})
if err != nil {
return err
@ -184,7 +184,7 @@ func (c *Client) build(ctx context.Context, ref string, options *pb.BuildOptions
if err := stream.Send(&pb.InputMessage{
Input: &pb.InputMessage_Init{
Init: &pb.InputInitMessage{
Ref: ref,
SessionID: sessionID,
},
},
}); err != nil {

View File

@ -43,9 +43,9 @@ func serveIO(attachCtx context.Context, srv msgStream, initFn func(*pb.InitMessa
if init == nil {
return errors.Errorf("unexpected message: %T; wanted init", msg.GetInput())
}
ref := init.Ref
if ref == "" {
return errors.New("no ref is provided")
sessionID := init.SessionID
if sessionID == "" {
return errors.New("no session ID is provided")
}
if err := initFn(init); err != nil {
return errors.Wrap(err, "failed to initialize IO server")

View File

@ -53,9 +53,9 @@ func (s *session) cancelRunningProcesses() {
func (m *Server) ListProcesses(ctx context.Context, req *pb.ListProcessesRequest) (res *pb.ListProcessesResponse, err error) {
m.sessionMu.Lock()
defer m.sessionMu.Unlock()
s, ok := m.session[req.Ref]
s, ok := m.session[req.SessionID]
if !ok {
return nil, errors.Errorf("unknown ref %q", req.Ref)
return nil, errors.Errorf("unknown session ID %q", req.SessionID)
}
res = new(pb.ListProcessesResponse)
res.Infos = append(res.Infos, s.processes.ListProcesses()...)
@ -65,9 +65,9 @@ func (m *Server) ListProcesses(ctx context.Context, req *pb.ListProcessesRequest
func (m *Server) DisconnectProcess(ctx context.Context, req *pb.DisconnectProcessRequest) (res *pb.DisconnectProcessResponse, err error) {
m.sessionMu.Lock()
defer m.sessionMu.Unlock()
s, ok := m.session[req.Ref]
s, ok := m.session[req.SessionID]
if !ok {
return nil, errors.Errorf("unknown ref %q", req.Ref)
return nil, errors.Errorf("unknown session ID %q", req.SessionID)
}
return res, s.processes.DeleteProcess(req.ProcessID)
}
@ -101,13 +101,13 @@ func (m *Server) List(ctx context.Context, req *pb.ListRequest) (res *pb.ListRes
}
func (m *Server) Disconnect(ctx context.Context, req *pb.DisconnectRequest) (res *pb.DisconnectResponse, err error) {
key := req.Ref
if key == "" {
return nil, errors.New("disconnect: empty key")
sessionID := req.SessionID
if sessionID == "" {
return nil, errors.New("disconnect: empty session ID")
}
m.sessionMu.Lock()
if s, ok := m.session[key]; ok {
if s, ok := m.session[sessionID]; ok {
if s.cancelBuild != nil {
s.cancelBuild()
}
@ -116,7 +116,7 @@ func (m *Server) Disconnect(ctx context.Context, req *pb.DisconnectRequest) (res
s.result.Done()
}
}
delete(m.session, key)
delete(m.session, sessionID)
m.sessionMu.Unlock()
return &pb.DisconnectResponse{}, nil
@ -137,26 +137,26 @@ func (m *Server) Close() error {
}
func (m *Server) Inspect(ctx context.Context, req *pb.InspectRequest) (*pb.InspectResponse, error) {
ref := req.Ref
if ref == "" {
return nil, errors.New("inspect: empty key")
sessionID := req.SessionID
if sessionID == "" {
return nil, errors.New("inspect: empty session ID")
}
var bo *pb.BuildOptions
m.sessionMu.Lock()
if s, ok := m.session[ref]; ok {
if s, ok := m.session[sessionID]; ok {
bo = s.buildOptions
} else {
m.sessionMu.Unlock()
return nil, errors.Errorf("inspect: unknown key %v", ref)
return nil, errors.Errorf("inspect: unknown key %v", sessionID)
}
m.sessionMu.Unlock()
return &pb.InspectResponse{Options: bo}, nil
}
func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResponse, error) {
ref := req.Ref
if ref == "" {
return nil, errors.New("build: empty key")
sessionID := req.SessionID
if sessionID == "" {
return nil, errors.New("build: empty session ID")
}
// Prepare status channel and session
@ -164,7 +164,7 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
if m.session == nil {
m.session = make(map[string]*session)
}
s, ok := m.session[ref]
s, ok := m.session[sessionID]
if ok {
if !s.buildOnGoing.CompareAndSwap(false, true) {
m.sessionMu.Unlock()
@ -183,12 +183,12 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
inR, inW := io.Pipe()
defer inR.Close()
s.inputPipe = inW
m.session[ref] = s
m.session[sessionID] = s
m.sessionMu.Unlock()
defer func() {
close(statusChan)
m.sessionMu.Lock()
s, ok := m.session[ref]
s, ok := m.session[sessionID]
if ok {
s.statusChan = nil
s.buildOnGoing.Store(false)
@ -203,25 +203,25 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
defer cancel()
resp, res, _, buildErr := m.buildFunc(ctx, req.Options, inR, pw)
m.sessionMu.Lock()
if s, ok := m.session[ref]; ok {
if s, ok := m.session[sessionID]; ok {
// NOTE: buildFunc can return *build.ResultHandle even on error (e.g. when it's implemented using (github.com/docker/buildx/controller/build).RunBuild).
if res != nil {
s.result = res
s.cancelBuild = cancel
s.buildOptions = req.Options
m.session[ref] = s
m.session[sessionID] = s
if buildErr != nil {
var buildRef string
var ref string
var ebr *desktop.ErrorWithBuildRef
if errors.As(buildErr, &ebr) {
buildRef = ebr.Ref
ref = ebr.Ref
}
buildErr = controllererrors.WrapBuild(buildErr, ref, buildRef)
buildErr = controllererrors.WrapBuild(buildErr, sessionID, ref)
}
}
} else {
m.sessionMu.Unlock()
return nil, errors.Errorf("build: unknown key %v", ref)
return nil, errors.Errorf("build: unknown session ID %v", sessionID)
}
m.sessionMu.Unlock()
@ -238,9 +238,9 @@ func (m *Server) Build(ctx context.Context, req *pb.BuildRequest) (*pb.BuildResp
}
func (m *Server) Status(req *pb.StatusRequest, stream pb.Controller_StatusServer) error {
ref := req.Ref
if ref == "" {
return errors.New("status: empty key")
sessionID := req.SessionID
if sessionID == "" {
return errors.New("status: empty session ID")
}
// Wait and get status channel prepared by Build()
@ -248,12 +248,12 @@ func (m *Server) Status(req *pb.StatusRequest, stream pb.Controller_StatusServer
for {
// TODO: timeout?
m.sessionMu.Lock()
if _, ok := m.session[ref]; !ok || m.session[ref].statusChan == nil {
if _, ok := m.session[sessionID]; !ok || m.session[sessionID].statusChan == nil {
m.sessionMu.Unlock()
time.Sleep(time.Millisecond) // TODO: wait Build without busy loop and make it cancellable
continue
}
statusChan = m.session[ref].statusChan
statusChan = m.session[sessionID].statusChan
m.sessionMu.Unlock()
break
}
@ -284,9 +284,9 @@ func (m *Server) Input(stream pb.Controller_InputServer) (err error) {
if init == nil {
return errors.Errorf("unexpected message: %T; wanted init", msg.GetInit())
}
ref := init.Ref
if ref == "" {
return errors.New("input: no ref is provided")
sessionID := init.SessionID
if sessionID == "" {
return errors.New("input: no session ID is provided")
}
// Wait and get input stream pipe prepared by Build()
@ -294,12 +294,12 @@ func (m *Server) Input(stream pb.Controller_InputServer) (err error) {
for {
// TODO: timeout?
m.sessionMu.Lock()
if _, ok := m.session[ref]; !ok || m.session[ref].inputPipe == nil {
if _, ok := m.session[sessionID]; !ok || m.session[sessionID].inputPipe == nil {
m.sessionMu.Unlock()
time.Sleep(time.Millisecond) // TODO: wait Build without busy loop and make it cancellable
continue
}
inputPipeW = m.session[ref].inputPipe
inputPipeW = m.session[sessionID].inputPipe
m.sessionMu.Unlock()
break
}
@ -379,14 +379,14 @@ func (m *Server) Invoke(srv pb.Controller_InvokeServer) error {
initErrCh <- retErr
}
}()
ref := initMessage.Ref
sessionID := initMessage.SessionID
cfg := initMessage.InvokeConfig
m.sessionMu.Lock()
s, ok := m.session[ref]
s, ok := m.session[sessionID]
if !ok {
m.sessionMu.Unlock()
return errors.Errorf("invoke: unknown key %v", ref)
return errors.Errorf("invoke: unknown session ID %v", sessionID)
}
m.sessionMu.Unlock()