new driver: kubernetes

Tested with `kind` and GKE.

Note: "nodes" shown in `docker buildx ls` are unrelated to Kubernetes "nodes".
Probably buildx should come up with an alternative term.

Usage:

  $ kind create cluster
  $ export KUBECONFIG="$(kind get kubeconfig-path --name="kind")"

  $ docker buildx create --driver kubernetes --driver-opt replicas=3 --use
  $ docker buildx build -t foo --load .

`--load` loads the image into the local Docker.

Driver opts:

  - `image=IMAGE` - Sets the container image to be used for running buildkit.
  - `namespace=NS` - Sets the Kubernetes namespace. Defaults to the current namespace.
  - `replicas=N` - Sets the number of `Pod` replicas. Defaults to 1.
  - `rootless=(true|false)` - Run the container as a non-root user without `securityContext.privileged`. Defaults to false.
  - `loadbalance=(sticky|random)` - Load-balancing strategy. If set to "sticky", the pod is chosen using the hash of the context path. Defaults to "sticky"

Signed-off-by: Akihiro Suda <akihiro.suda.cz@hco.ntt.co.jp>
This commit is contained in:
Akihiro Suda
2019-10-21 15:02:37 +09:00
parent f5c2673878
commit 6b65b0c982
657 changed files with 258673 additions and 370 deletions

157
driver/kubernetes/driver.go Normal file
View File

@ -0,0 +1,157 @@
package kubernetes
import (
"context"
"fmt"
"net"
"time"
"github.com/docker/buildx/driver"
"github.com/docker/buildx/driver/kubernetes/execconn"
"github.com/docker/buildx/driver/kubernetes/podchooser"
"github.com/docker/buildx/util/progress"
"github.com/moby/buildkit/client"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
const (
DriverName = "kubernetes"
)
const (
// valid values for driver-opt loadbalance
LoadbalanceRandom = "random"
LoadbalanceSticky = "sticky"
)
type Driver struct {
driver.InitConfig
factory driver.Factory
minReplicas int
deployment *appsv1.Deployment
clientset *kubernetes.Clientset
deploymentClient clientappsv1.DeploymentInterface
podClient clientcorev1.PodInterface
podChooser podchooser.PodChooser
}
func (d *Driver) Bootstrap(ctx context.Context, l progress.Logger) error {
return progress.Wrap("[internal] booting buildkit", l, func(sub progress.SubLogger) error {
_, err := d.deploymentClient.Get(d.deployment.Name, metav1.GetOptions{})
if err != nil {
// TODO: return err if err != ErrNotFound
_, err = d.deploymentClient.Create(d.deployment)
if err != nil {
return errors.Wrapf(err, "error while calling deploymentClient.Create for %q", d.deployment.Name)
}
}
return sub.Wrap(
fmt.Sprintf("waiting for %d pods to be ready", d.minReplicas),
func() error {
if err := d.wait(ctx); err != nil {
return err
}
return nil
})
})
}
func (d *Driver) wait(ctx context.Context) error {
// TODO: use watch API
var (
err error
depl *appsv1.Deployment
)
for try := 0; try < 100; try++ {
depl, err = d.deploymentClient.Get(d.deployment.Name, metav1.GetOptions{})
if err == nil {
if depl.Status.ReadyReplicas >= int32(d.minReplicas) {
return nil
}
err = errors.Errorf("expected %d replicas to be ready, got %d",
d.minReplicas, depl.Status.ReadyReplicas)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Duration(100+try*20) * time.Millisecond):
}
}
return err
}
func (d *Driver) Info(ctx context.Context) (*driver.Info, error) {
depl, err := d.deploymentClient.Get(d.deployment.Name, metav1.GetOptions{})
if err != nil {
// TODO: return err if err != ErrNotFound
return &driver.Info{
Status: driver.Inactive,
}, nil
}
if depl.Status.ReadyReplicas > 0 {
return &driver.Info{
Status: driver.Running,
}, nil
}
return &driver.Info{
Status: driver.Stopped,
}, nil
}
func (d *Driver) Stop(ctx context.Context, force bool) error {
// future version may scale the replicas to zero here
return nil
}
func (d *Driver) Rm(ctx context.Context, force bool) error {
if err := d.deploymentClient.Delete(d.deployment.Name, nil); err != nil {
return errors.Wrapf(err, "error while calling deploymentClient.Delete for %q", d.deployment.Name)
}
return nil
}
func (d *Driver) Client(ctx context.Context) (*client.Client, error) {
restClient := d.clientset.CoreV1().RESTClient()
restClientConfig, err := d.KubeClientConfig.ClientConfig()
if err != nil {
return nil, err
}
pod, err := d.podChooser.ChoosePod(ctx)
if err != nil {
return nil, err
}
logrus.Infof("Using pod %q", pod.Name)
if len(pod.Spec.Containers) == 0 {
return nil, errors.Errorf("pod %s does not have any container", pod.Name)
}
containerName := pod.Spec.Containers[0].Name
cmd := []string{"buildctl", "dial-stdio"}
conn, err := execconn.ExecConn(restClient, restClientConfig,
pod.Namespace, pod.Name, containerName, cmd)
if err != nil {
return nil, err
}
return client.New(ctx, "", client.WithDialer(func(string, time.Duration) (net.Conn, error) {
return conn, nil
}))
}
func (d *Driver) Factory() driver.Factory {
return d.factory
}
func (d *Driver) Features() map[driver.Feature]bool {
return map[driver.Feature]bool{
driver.OCIExporter: true,
driver.DockerExporter: d.DockerAPI != nil,
driver.CacheExport: true,
driver.MultiPlatform: true, // Untested (needs multiple Driver instances)
}
}

View File

@ -0,0 +1,135 @@
package execconn
import (
"io"
"net"
"os"
"sync"
"time"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)
func ExecConn(restClient rest.Interface, restConfig *rest.Config, namespace, pod, container string, cmd []string) (net.Conn, error) {
req := restClient.
Post().
Namespace(namespace).
Resource("pods").
Name(pod).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: container,
Command: cmd,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(restConfig, "POST", req.URL())
if err != nil {
return nil, err
}
stdinR, stdinW := io.Pipe()
stdoutR, stdoutW := io.Pipe()
kc := &kubeConn{
stdin: stdinW,
stdout: stdoutR,
localAddr: dummyAddr{network: "dummy", s: "dummy-0"},
remoteAddr: dummyAddr{network: "dummy", s: "dummy-1"},
}
go func() {
serr := exec.Stream(remotecommand.StreamOptions{
Stdin: stdinR,
Stdout: stdoutW,
Stderr: os.Stderr,
Tty: false,
})
if serr != nil {
logrus.Error(serr)
}
}()
return kc, nil
}
type kubeConn struct {
stdin io.WriteCloser
stdout io.ReadCloser
stdioClosedMu sync.Mutex // for stdinClosed and stdoutClosed
stdinClosed bool
stdoutClosed bool
localAddr net.Addr
remoteAddr net.Addr
}
func (c *kubeConn) Write(p []byte) (int, error) {
return c.stdin.Write(p)
}
func (c *kubeConn) Read(p []byte) (int, error) {
return c.stdout.Read(p)
}
func (c *kubeConn) CloseWrite() error {
err := c.stdin.Close()
c.stdioClosedMu.Lock()
c.stdinClosed = true
c.stdioClosedMu.Unlock()
return err
}
func (c *kubeConn) CloseRead() error {
err := c.stdout.Close()
c.stdioClosedMu.Lock()
c.stdoutClosed = true
c.stdioClosedMu.Unlock()
return err
}
func (c *kubeConn) Close() error {
var err error
c.stdioClosedMu.Lock()
stdinClosed := c.stdinClosed
c.stdioClosedMu.Unlock()
if !stdinClosed {
err = c.CloseWrite()
}
c.stdioClosedMu.Lock()
stdoutClosed := c.stdoutClosed
c.stdioClosedMu.Unlock()
if !stdoutClosed {
err = c.CloseRead()
}
return err
}
func (c *kubeConn) LocalAddr() net.Addr {
return c.localAddr
}
func (c *kubeConn) RemoteAddr() net.Addr {
return c.remoteAddr
}
func (c *kubeConn) SetDeadline(t time.Time) error {
return nil
}
func (c *kubeConn) SetReadDeadline(t time.Time) error {
return nil
}
func (c *kubeConn) SetWriteDeadline(t time.Time) error {
return nil
}
type dummyAddr struct {
network string
s string
}
func (d dummyAddr) Network() string {
return d.network
}
func (d dummyAddr) String() string {
return d.s
}

View File

@ -0,0 +1,146 @@
package kubernetes
import (
"context"
"strconv"
"strings"
"github.com/docker/buildx/driver"
"github.com/docker/buildx/driver/bkimage"
"github.com/docker/buildx/driver/kubernetes/manifest"
"github.com/docker/buildx/driver/kubernetes/podchooser"
dockerclient "github.com/docker/docker/client"
"github.com/pkg/errors"
"k8s.io/client-go/kubernetes"
)
const prioritySupported = 30
const priorityUnsupported = 70
func init() {
driver.Register(&factory{})
}
type factory struct {
}
func (*factory) Name() string {
return DriverName
}
func (*factory) Usage() string {
return DriverName
}
func (*factory) Priority(ctx context.Context, api dockerclient.APIClient) int {
if api == nil {
return priorityUnsupported
}
return prioritySupported
}
func (f *factory) New(ctx context.Context, cfg driver.InitConfig) (driver.Driver, error) {
if cfg.KubeClientConfig == nil {
return nil, errors.Errorf("%s driver requires kubernetes API access", DriverName)
}
deploymentName, err := buildxNameToDeploymentName(cfg.Name)
if err != nil {
return nil, err
}
namespace, _, err := cfg.KubeClientConfig.Namespace()
if err != nil {
return nil, errors.Wrap(err, "cannot determine Kubernetes namespace, specify manually")
}
restClientConfig, err := cfg.KubeClientConfig.ClientConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(restClientConfig)
if err != nil {
return nil, err
}
d := &Driver{
factory: f,
InitConfig: cfg,
clientset: clientset,
}
deploymentOpt := &manifest.DeploymentOpt{
Name: deploymentName,
Image: bkimage.DefaultImage,
Replicas: 1,
BuildkitFlags: cfg.BuildkitFlags,
Rootless: false,
}
loadbalance := LoadbalanceSticky
imageOverride := ""
for k, v := range cfg.DriverOpts {
switch k {
case "image":
imageOverride = v
case "namespace":
namespace = v
case "replicas":
deploymentOpt.Replicas, err = strconv.Atoi(v)
if err != nil {
return nil, err
}
case "rootless":
deploymentOpt.Rootless, err = strconv.ParseBool(v)
if err != nil {
return nil, err
}
deploymentOpt.Image = bkimage.DefaultRootlessImage
case "loadbalance":
switch v {
case LoadbalanceSticky:
case LoadbalanceRandom:
default:
return nil, errors.Errorf("invalid loadbalance %q", v)
}
loadbalance = v
default:
return nil, errors.Errorf("invalid driver option %s for driver %s", k, DriverName)
}
}
if imageOverride != "" {
deploymentOpt.Image = imageOverride
}
d.deployment, err = manifest.NewDeployment(deploymentOpt)
if err != nil {
return nil, err
}
d.minReplicas = deploymentOpt.Replicas
d.deploymentClient = clientset.AppsV1().Deployments(namespace)
d.podClient = clientset.CoreV1().Pods(namespace)
switch loadbalance {
case LoadbalanceSticky:
d.podChooser = &podchooser.StickyPodChooser{
Key: cfg.ContextPathHash,
PodClient: d.podClient,
Deployment: d.deployment,
}
case LoadbalanceRandom:
d.podChooser = &podchooser.RandomPodChooser{
PodClient: d.podClient,
Deployment: d.deployment,
}
}
return d, nil
}
func (f *factory) AllowsInstances() bool {
return true
}
// buildxNameToDeploymentName converts buildx name to Kubernetes Deployment name.
//
// eg. "buildx_buildkit_loving_mendeleev0" -> "loving-mendeleev0"
func buildxNameToDeploymentName(bx string) (string, error) {
// TODO: commands.util.go should not pass "buildx_buildkit_" prefix to drivers
if !strings.HasPrefix(bx, "buildx_buildkit_") {
return "", errors.Errorf("expected a string with \"buildx_buildkit_\", got %q", bx)
}
s := strings.TrimPrefix(bx, "buildx_buildkit_")
s = strings.ReplaceAll(s, "_", "-")
return s, nil
}

View File

@ -0,0 +1,90 @@
package manifest
import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type DeploymentOpt struct {
Namespace string
Name string
Image string
Replicas int
BuildkitFlags []string
Rootless bool
}
const (
containerName = "buildkitd"
)
func NewDeployment(opt *DeploymentOpt) (*appsv1.Deployment, error) {
labels := map[string]string{
"app": opt.Name,
}
replicas := int32(opt.Replicas)
privileged := true
args := opt.BuildkitFlags
d := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
APIVersion: appsv1.SchemeGroupVersion.String(),
Kind: "Deployment",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: opt.Namespace,
Name: opt.Name,
Labels: labels,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: labels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: labels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: containerName,
Image: opt.Image,
Args: args,
SecurityContext: &corev1.SecurityContext{
Privileged: &privileged,
},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{"buildctl", "debug", "workers"},
},
},
},
},
},
},
},
},
}
if opt.Rootless {
if err := toRootless(d); err != nil {
return nil, err
}
}
return d, nil
}
func toRootless(d *appsv1.Deployment) error {
d.Spec.Template.Spec.Containers[0].Args = append(
d.Spec.Template.Spec.Containers[0].Args,
"--oci-worker-no-process-sandbox",
)
d.Spec.Template.Spec.Containers[0].SecurityContext = nil
if d.Spec.Template.ObjectMeta.Annotations == nil {
d.Spec.Template.ObjectMeta.Annotations = make(map[string]string, 2)
}
d.Spec.Template.ObjectMeta.Annotations["container.apparmor.security.beta.kubernetes.io/"+containerName] = "unconfined"
d.Spec.Template.ObjectMeta.Annotations["container.seccomp.security.alpha.kubernetes.io/"+containerName] = "unconfined"
return nil
}

View File

@ -0,0 +1,97 @@
package podchooser
import (
"context"
"math/rand"
"sort"
"time"
"github.com/serialx/hashring"
"github.com/sirupsen/logrus"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
type PodChooser interface {
ChoosePod(ctx context.Context) (*corev1.Pod, error)
}
type RandomPodChooser struct {
RandSource rand.Source
PodClient clientcorev1.PodInterface
Deployment *appsv1.Deployment
}
func (pc *RandomPodChooser) ChoosePod(ctx context.Context) (*corev1.Pod, error) {
pods, err := ListRunningPods(pc.PodClient, pc.Deployment)
if err != nil {
return nil, err
}
randSource := pc.RandSource
if randSource == nil {
randSource = rand.NewSource(time.Now().Unix())
}
rnd := rand.New(randSource)
n := rnd.Int() % len(pods)
logrus.Debugf("RandomPodChooser.ChoosePod(): len(pods)=%d, n=%d", len(pods), n)
return pods[n], nil
}
type StickyPodChooser struct {
Key string
PodClient clientcorev1.PodInterface
Deployment *appsv1.Deployment
}
func (pc *StickyPodChooser) ChoosePod(ctx context.Context) (*corev1.Pod, error) {
pods, err := ListRunningPods(pc.PodClient, pc.Deployment)
if err != nil {
return nil, err
}
var podNames []string
podMap := make(map[string]*corev1.Pod, len(pods))
for _, pod := range pods {
podNames = append(podNames, pod.Name)
podMap[pod.Name] = pod
}
ring := hashring.New(podNames)
chosen, ok := ring.GetNode(pc.Key)
if !ok {
// NOTREACHED
logrus.Errorf("no pod found for key %q", pc.Key)
rpc := &RandomPodChooser{
PodClient: pc.PodClient,
Deployment: pc.Deployment,
}
return rpc.ChoosePod(ctx)
}
return podMap[chosen], nil
}
func ListRunningPods(client clientcorev1.PodInterface, depl *appsv1.Deployment) ([]*corev1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(depl.Spec.Selector)
if err != nil {
return nil, err
}
listOpts := metav1.ListOptions{
LabelSelector: selector.String(),
}
podList, err := client.List(listOpts)
if err != nil {
return nil, err
}
var runningPods []*corev1.Pod
for i := range podList.Items {
pod := &podList.Items[i]
if pod.Status.Phase == corev1.PodRunning {
logrus.Debugf("pod runnning: %q", pod.Name)
runningPods = append(runningPods, pod)
}
}
sort.Slice(runningPods, func(i, j int) bool {
return runningPods[i].Name < runningPods[j].Name
})
return runningPods, nil
}