mirror of
				https://gitea.com/Lydanne/buildx.git
				synced 2025-11-04 01:53:42 +08:00 
			
		
		
		
	When building from same stream all nodes need to read data from the same stream. In order to achive that there is a new SyncMultiReader wrapper that sends the stream concurrently to all readers. Readers must read at similar speed or pauses will happen while they wait for each other. Dockerfiles were already written to disk before sent. Now the file written by first node is reused for others. Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
		
			
				
	
	
		
			78 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			78 lines
		
	
	
		
			1.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package build
 | 
						|
 | 
						|
import (
 | 
						|
	"bytes"
 | 
						|
	"crypto/rand"
 | 
						|
	"io"
 | 
						|
	mathrand "math/rand"
 | 
						|
	"sync"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/stretchr/testify/assert"
 | 
						|
	"github.com/stretchr/testify/require"
 | 
						|
)
 | 
						|
 | 
						|
func generateRandomData(size int) []byte {
 | 
						|
	data := make([]byte, size)
 | 
						|
	rand.Read(data)
 | 
						|
	return data
 | 
						|
}
 | 
						|
func TestSyncMultiReaderParallel(t *testing.T) {
 | 
						|
	data := generateRandomData(1024 * 1024)
 | 
						|
	source := bytes.NewReader(data)
 | 
						|
	mr := NewSyncMultiReader(source)
 | 
						|
 | 
						|
	var wg sync.WaitGroup
 | 
						|
	numReaders := 10
 | 
						|
	bufferSize := 4096 * 4
 | 
						|
 | 
						|
	readers := make([]io.ReadCloser, numReaders)
 | 
						|
 | 
						|
	for i := 0; i < numReaders; i++ {
 | 
						|
		readers[i] = mr.NewReadCloser()
 | 
						|
	}
 | 
						|
 | 
						|
	for i := 0; i < numReaders; i++ {
 | 
						|
		wg.Add(1)
 | 
						|
		go func(readerId int) {
 | 
						|
			defer wg.Done()
 | 
						|
			reader := readers[readerId]
 | 
						|
			defer reader.Close()
 | 
						|
 | 
						|
			totalRead := 0
 | 
						|
			buf := make([]byte, bufferSize)
 | 
						|
			for totalRead < len(data) {
 | 
						|
				// Simulate random read sizes
 | 
						|
				readSize := mathrand.Intn(bufferSize) //nolint:gosec
 | 
						|
				n, err := reader.Read(buf[:readSize])
 | 
						|
 | 
						|
				if n > 0 {
 | 
						|
					assert.Equal(t, data[totalRead:totalRead+n], buf[:n], "Reader %d mismatch", readerId)
 | 
						|
					totalRead += n
 | 
						|
				}
 | 
						|
 | 
						|
				if err == io.EOF {
 | 
						|
					assert.Equal(t, len(data), totalRead, "Reader %d EOF mismatch", readerId)
 | 
						|
					return
 | 
						|
				}
 | 
						|
 | 
						|
				require.NoError(t, err, "Reader %d error", readerId)
 | 
						|
 | 
						|
				if mathrand.Intn(1000) == 0 { //nolint:gosec
 | 
						|
					t.Logf("Reader %d closing", readerId)
 | 
						|
					// Simulate random close
 | 
						|
					return
 | 
						|
				}
 | 
						|
 | 
						|
				// Simulate random timing between reads
 | 
						|
				time.Sleep(time.Millisecond * time.Duration(mathrand.Intn(5))) //nolint:gosec
 | 
						|
			}
 | 
						|
 | 
						|
			assert.Equal(t, len(data), totalRead, "Reader %d total read mismatch", readerId)
 | 
						|
		}(i)
 | 
						|
	}
 | 
						|
 | 
						|
	wg.Wait()
 | 
						|
}
 |