build: allow builds from stdin for multi-node builders

When building from same stream all nodes need to read
data from the same stream. In order to achive that there
is a new SyncMultiReader wrapper that sends the stream
concurrently to all readers. Readers must read at similar
speed or pauses will happen while they wait for each other.

Dockerfiles were already written to disk before sent. Now
the file written by first node is reused for others.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi 2024-08-13 11:03:04 +03:00
parent e00efeb399
commit adbcc2225e
No known key found for this signature in database
GPG Key ID: AFA9DE5F8AB7AF39
5 changed files with 266 additions and 14 deletions

View File

@ -97,7 +97,7 @@ type CallFunc struct {
type Inputs struct { type Inputs struct {
ContextPath string ContextPath string
DockerfilePath string DockerfilePath string
InStream io.Reader InStream *SyncMultiReader
ContextState *llb.State ContextState *llb.State
DockerfileInline string DockerfileInline string
NamedContexts map[string]NamedContext NamedContexts map[string]NamedContext

View File

@ -1,11 +1,12 @@
package build package build
import ( import (
"bufio" "bytes"
"context" "context"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"slices"
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
@ -260,7 +261,7 @@ func toSolveOpt(ctx context.Context, node builder.Node, multiDriver bool, opt Op
} }
so.Exports = opt.Exports so.Exports = opt.Exports
so.Session = opt.Session so.Session = slices.Clone(opt.Session)
releaseLoad, err := loadInputs(ctx, nodeDriver, opt.Inputs, pw, &so) releaseLoad, err := loadInputs(ctx, nodeDriver, opt.Inputs, pw, &so)
if err != nil { if err != nil {
@ -364,7 +365,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
var ( var (
err error err error
dockerfileReader io.Reader dockerfileReader io.ReadCloser
dockerfileDir string dockerfileDir string
dockerfileName = inp.DockerfilePath dockerfileName = inp.DockerfilePath
toRemove []string toRemove []string
@ -382,8 +383,8 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
return nil, errors.Errorf("invalid argument: can't use stdin for both build context and dockerfile") return nil, errors.Errorf("invalid argument: can't use stdin for both build context and dockerfile")
} }
buf := bufio.NewReader(inp.InStream) rc := inp.InStream.NewReadCloser()
magic, err := buf.Peek(archiveHeaderSize * 2) magic, err := inp.InStream.Peek(archiveHeaderSize * 2)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
return nil, errors.Wrap(err, "failed to peek context header from STDIN") return nil, errors.Wrap(err, "failed to peek context header from STDIN")
} }
@ -391,14 +392,14 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
if isArchive(magic) { if isArchive(magic) {
// stdin is context // stdin is context
up := uploadprovider.New() up := uploadprovider.New()
target.FrontendAttrs["context"] = up.Add(buf) target.FrontendAttrs["context"] = up.Add(rc)
target.Session = append(target.Session, up) target.Session = append(target.Session, up)
} else { } else {
if inp.DockerfilePath != "" { if inp.DockerfilePath != "" {
return nil, errors.Errorf("ambiguous Dockerfile source: both stdin and flag correspond to Dockerfiles") return nil, errors.Errorf("ambiguous Dockerfile source: both stdin and flag correspond to Dockerfiles")
} }
// stdin is dockerfile // stdin is dockerfile
dockerfileReader = buf dockerfileReader = rc
inp.ContextPath, _ = os.MkdirTemp("", "empty-dir") inp.ContextPath, _ = os.MkdirTemp("", "empty-dir")
toRemove = append(toRemove, inp.ContextPath) toRemove = append(toRemove, inp.ContextPath)
if err := setLocalMount("context", inp.ContextPath, target); err != nil { if err := setLocalMount("context", inp.ContextPath, target); err != nil {
@ -417,7 +418,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
target.SharedKey = sharedKey target.SharedKey = sharedKey
switch inp.DockerfilePath { switch inp.DockerfilePath {
case "-": case "-":
dockerfileReader = inp.InStream dockerfileReader = inp.InStream.NewReadCloser()
case "": case "":
dockerfileDir = inp.ContextPath dockerfileDir = inp.ContextPath
default: default:
@ -426,7 +427,7 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
} }
case IsRemoteURL(inp.ContextPath): case IsRemoteURL(inp.ContextPath):
if inp.DockerfilePath == "-" { if inp.DockerfilePath == "-" {
dockerfileReader = inp.InStream dockerfileReader = inp.InStream.NewReadCloser()
} else if filepath.IsAbs(inp.DockerfilePath) { } else if filepath.IsAbs(inp.DockerfilePath) {
dockerfileDir = filepath.Dir(inp.DockerfilePath) dockerfileDir = filepath.Dir(inp.DockerfilePath)
dockerfileName = filepath.Base(inp.DockerfilePath) dockerfileName = filepath.Base(inp.DockerfilePath)
@ -438,11 +439,11 @@ func loadInputs(ctx context.Context, d *driver.DriverHandle, inp Inputs, pw prog
} }
if inp.DockerfileInline != "" { if inp.DockerfileInline != "" {
dockerfileReader = strings.NewReader(inp.DockerfileInline) dockerfileReader = io.NopCloser(strings.NewReader(inp.DockerfileInline))
} }
if dockerfileReader != nil { if dockerfileReader != nil {
dockerfileDir, err = createTempDockerfile(dockerfileReader) dockerfileDir, err = createTempDockerfile(dockerfileReader, inp.InStream)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -582,7 +583,7 @@ func setLocalMount(name, dir string, so *client.SolveOpt) error {
return nil return nil
} }
func createTempDockerfile(r io.Reader) (string, error) { func createTempDockerfile(r io.Reader, multiReader *SyncMultiReader) (string, error) {
dir, err := os.MkdirTemp("", "dockerfile") dir, err := os.MkdirTemp("", "dockerfile")
if err != nil { if err != nil {
return "", err return "", err
@ -592,6 +593,16 @@ func createTempDockerfile(r io.Reader) (string, error) {
return "", err return "", err
} }
defer f.Close() defer f.Close()
if multiReader != nil {
dt, err := io.ReadAll(r)
if err != nil {
return "", err
}
multiReader.Reset(dt)
r = bytes.NewReader(dt)
}
if _, err := io.Copy(f, r); err != nil { if _, err := io.Copy(f, r); err != nil {
return "", err return "", err
} }

164
build/replicatedstream.go Normal file
View File

@ -0,0 +1,164 @@
package build
import (
"bufio"
"bytes"
"io"
"sync"
)
type SyncMultiReader struct {
source *bufio.Reader
buffer []byte
static []byte
mu sync.Mutex
cond *sync.Cond
readers []*syncReader
err error
offset int
}
type syncReader struct {
mr *SyncMultiReader
offset int
closed bool
}
func NewSyncMultiReader(source io.Reader) *SyncMultiReader {
mr := &SyncMultiReader{
source: bufio.NewReader(source),
buffer: make([]byte, 0, 32*1024),
}
mr.cond = sync.NewCond(&mr.mu)
return mr
}
func (mr *SyncMultiReader) Peek(n int) ([]byte, error) {
mr.mu.Lock()
defer mr.mu.Unlock()
if mr.static != nil {
return mr.static[min(n, len(mr.static)):], nil
}
return mr.source.Peek(n)
}
func (mr *SyncMultiReader) Reset(dt []byte) {
mr.mu.Lock()
defer mr.mu.Unlock()
mr.static = dt
}
func (mr *SyncMultiReader) NewReadCloser() io.ReadCloser {
mr.mu.Lock()
defer mr.mu.Unlock()
if mr.static != nil {
return io.NopCloser(bytes.NewReader(mr.static))
}
reader := &syncReader{
mr: mr,
}
mr.readers = append(mr.readers, reader)
return reader
}
func (sr *syncReader) Read(p []byte) (int, error) {
sr.mr.mu.Lock()
defer sr.mr.mu.Unlock()
return sr.read(p)
}
func (sr *syncReader) read(p []byte) (int, error) {
end := sr.mr.offset + len(sr.mr.buffer)
loop0:
for {
if sr.closed {
return 0, io.EOF
}
end := sr.mr.offset + len(sr.mr.buffer)
if sr.mr.err != nil && sr.offset == end {
return 0, sr.mr.err
}
start := sr.offset - sr.mr.offset
dt := sr.mr.buffer[start:]
if len(dt) > 0 {
n := copy(p, dt)
sr.offset += n
sr.mr.cond.Broadcast()
return n, nil
}
// check for readers that have not caught up
hasOpen := false
for _, r := range sr.mr.readers {
if !r.closed {
hasOpen = true
} else {
continue
}
if r.offset < end {
sr.mr.cond.Wait()
continue loop0
}
}
if !hasOpen {
return 0, io.EOF
}
break
}
last := sr.mr.offset + len(sr.mr.buffer)
// another reader has already updated the buffer
if last > end || sr.mr.err != nil {
return sr.read(p)
}
sr.mr.offset += len(sr.mr.buffer)
sr.mr.buffer = sr.mr.buffer[:cap(sr.mr.buffer)]
n, err := sr.mr.source.Read(sr.mr.buffer)
if n >= 0 {
sr.mr.buffer = sr.mr.buffer[:n]
} else {
sr.mr.buffer = sr.mr.buffer[:0]
}
sr.mr.cond.Broadcast()
if err != nil {
sr.mr.err = err
return 0, err
}
nn := copy(p, sr.mr.buffer)
sr.offset += nn
return nn, nil
}
func (sr *syncReader) Close() error {
sr.mr.mu.Lock()
defer sr.mr.mu.Unlock()
if sr.closed {
return nil
}
sr.closed = true
sr.mr.cond.Broadcast()
return nil
}

View File

@ -0,0 +1,77 @@
package build
import (
"bytes"
"crypto/rand"
"io"
mathrand "math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func generateRandomData(size int) []byte {
data := make([]byte, size)
rand.Read(data)
return data
}
func TestSyncMultiReaderParallel(t *testing.T) {
data := generateRandomData(1024 * 1024)
source := bytes.NewReader(data)
mr := NewSyncMultiReader(source)
var wg sync.WaitGroup
numReaders := 10
bufferSize := 4096 * 4
readers := make([]io.ReadCloser, numReaders)
for i := 0; i < numReaders; i++ {
readers[i] = mr.NewReadCloser()
}
for i := 0; i < numReaders; i++ {
wg.Add(1)
go func(readerId int) {
defer wg.Done()
reader := readers[readerId]
defer reader.Close()
totalRead := 0
buf := make([]byte, bufferSize)
for totalRead < len(data) {
// Simulate random read sizes
readSize := mathrand.Intn(bufferSize) //nolint:gosec
n, err := reader.Read(buf[:readSize])
if n > 0 {
assert.Equal(t, data[totalRead:totalRead+n], buf[:n], "Reader %d mismatch", readerId)
totalRead += n
}
if err == io.EOF {
assert.Equal(t, len(data), totalRead, "Reader %d EOF mismatch", readerId)
return
}
require.NoError(t, err, "Reader %d error", readerId)
if mathrand.Intn(1000) == 0 { //nolint:gosec
t.Logf("Reader %d closing", readerId)
// Simulate random close
return
}
// Simulate random timing between reads
time.Sleep(time.Millisecond * time.Duration(mathrand.Intn(5))) //nolint:gosec
}
assert.Equal(t, len(data), totalRead, "Reader %d total read mismatch", readerId)
}(i)
}
wg.Wait()
}

View File

@ -48,7 +48,7 @@ func RunBuild(ctx context.Context, dockerCli command.Cli, in controllerapi.Build
Inputs: build.Inputs{ Inputs: build.Inputs{
ContextPath: in.ContextPath, ContextPath: in.ContextPath,
DockerfilePath: in.DockerfileName, DockerfilePath: in.DockerfileName,
InStream: inStream, InStream: build.NewSyncMultiReader(inStream),
NamedContexts: contexts, NamedContexts: contexts,
}, },
Ref: in.Ref, Ref: in.Ref,