Akihiro Suda 6b65b0c982 new driver: kubernetes
Tested with `kind` and GKE.

Note: "nodes" shown in `docker buildx ls` are unrelated to Kubernetes "nodes".
Probably buildx should come up with an alternative term.

Usage:

  $ kind create cluster
  $ export KUBECONFIG="$(kind get kubeconfig-path --name="kind")"

  $ docker buildx create --driver kubernetes --driver-opt replicas=3 --use
  $ docker buildx build -t foo --load .

`--load` loads the image into the local Docker.

Driver opts:

  - `image=IMAGE` - Sets the container image to be used for running buildkit.
  - `namespace=NS` - Sets the Kubernetes namespace. Defaults to the current namespace.
  - `replicas=N` - Sets the number of `Pod` replicas. Defaults to 1.
  - `rootless=(true|false)` - Run the container as a non-root user without `securityContext.privileged`. Defaults to false.
  - `loadbalance=(sticky|random)` - Load-balancing strategy. If set to "sticky", the pod is chosen using the hash of the context path. Defaults to "sticky"

Signed-off-by: Akihiro Suda <akihiro.suda.cz@hco.ntt.co.jp>
2019-11-21 10:30:39 +09:00

136 lines
2.7 KiB
Go

package execconn
import (
"io"
"net"
"os"
"sync"
"time"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)
func ExecConn(restClient rest.Interface, restConfig *rest.Config, namespace, pod, container string, cmd []string) (net.Conn, error) {
req := restClient.
Post().
Namespace(namespace).
Resource("pods").
Name(pod).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: container,
Command: cmd,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL())
if err != nil {
return nil, err
}
stdinR, stdinW := io.Pipe()
stdoutR, stdoutW := io.Pipe()
kc := &kubeConn{
stdin: stdinW,
stdout: stdoutR,
localAddr: dummyAddr{network: "dummy", s: "dummy-0"},
remoteAddr: dummyAddr{network: "dummy", s: "dummy-1"},
}
go func() {
serr := exec.Stream(remotecommand.StreamOptions{
Stdin: stdinR,
Stdout: stdoutW,
Stderr: os.Stderr,
Tty: false,
})
if serr != nil {
logrus.Error(serr)
}
}()
return kc, nil
}
type kubeConn struct {
stdin io.WriteCloser
stdout io.ReadCloser
stdioClosedMu sync.Mutex // for stdinClosed and stdoutClosed
stdinClosed bool
stdoutClosed bool
localAddr net.Addr
remoteAddr net.Addr
}
func (c *kubeConn) Write(p []byte) (int, error) {
return c.stdin.Write(p)
}
func (c *kubeConn) Read(p []byte) (int, error) {
return c.stdout.Read(p)
}
func (c *kubeConn) CloseWrite() error {
err := c.stdin.Close()
c.stdioClosedMu.Lock()
c.stdinClosed = true
c.stdioClosedMu.Unlock()
return err
}
func (c *kubeConn) CloseRead() error {
err := c.stdout.Close()
c.stdioClosedMu.Lock()
c.stdoutClosed = true
c.stdioClosedMu.Unlock()
return err
}
func (c *kubeConn) Close() error {
var err error
c.stdioClosedMu.Lock()
stdinClosed := c.stdinClosed
c.stdioClosedMu.Unlock()
if !stdinClosed {
err = c.CloseWrite()
}
c.stdioClosedMu.Lock()
stdoutClosed := c.stdoutClosed
c.stdioClosedMu.Unlock()
if !stdoutClosed {
err = c.CloseRead()
}
return err
}
func (c *kubeConn) LocalAddr() net.Addr {
return c.localAddr
}
func (c *kubeConn) RemoteAddr() net.Addr {
return c.remoteAddr
}
func (c *kubeConn) SetDeadline(t time.Time) error {
return nil
}
func (c *kubeConn) SetReadDeadline(t time.Time) error {
return nil
}
func (c *kubeConn) SetWriteDeadline(t time.Time) error {
return nil
}
type dummyAddr struct {
network string
s string
}
func (d dummyAddr) Network() string {
return d.network
}
func (d dummyAddr) String() string {
return d.s
}