mirror of
https://gitea.com/Lydanne/buildx.git
synced 2025-05-19 18:07:45 +08:00

Removes gogo/protobuf from buildx and updates to a version of moby/buildkit where gogo is removed. This also changes how the proto files are generated. This is because newer versions of protobuf are more strict about name conflicts. If two files have the same name (even if they are relative paths) and are used in different protoc commands, they'll conflict in the registry. Since protobuf file generation doesn't work very well with `paths=source_relative`, this removes the `go:generate` expression and just relies on the dockerfile to perform the generation. Signed-off-by: Jonathan A. Sternberg <jonathan.sternberg@docker.com>
420 lines
14 KiB
Go
420 lines
14 KiB
Go
/*
|
|
*
|
|
* Copyright 2022 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
// Package gracefulswitch implements a graceful switch load balancer.
|
|
package gracefulswitch
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"google.golang.org/grpc/balancer"
|
|
"google.golang.org/grpc/balancer/base"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/resolver"
|
|
)
|
|
|
|
var errBalancerClosed = errors.New("gracefulSwitchBalancer is closed")
|
|
var _ balancer.Balancer = (*Balancer)(nil)
|
|
|
|
// NewBalancer returns a graceful switch Balancer.
|
|
func NewBalancer(cc balancer.ClientConn, opts balancer.BuildOptions) *Balancer {
|
|
return &Balancer{
|
|
cc: cc,
|
|
bOpts: opts,
|
|
}
|
|
}
|
|
|
|
// Balancer is a utility to gracefully switch from one balancer to
|
|
// a new balancer. It implements the balancer.Balancer interface.
|
|
type Balancer struct {
|
|
bOpts balancer.BuildOptions
|
|
cc balancer.ClientConn
|
|
|
|
// mu protects the following fields and all fields within balancerCurrent
|
|
// and balancerPending. mu does not need to be held when calling into the
|
|
// child balancers, as all calls into these children happen only as a direct
|
|
// result of a call into the gracefulSwitchBalancer, which are also
|
|
// guaranteed to be synchronous. There is one exception: an UpdateState call
|
|
// from a child balancer when current and pending are populated can lead to
|
|
// calling Close() on the current. To prevent that racing with an
|
|
// UpdateSubConnState from the channel, we hold currentMu during Close and
|
|
// UpdateSubConnState calls.
|
|
mu sync.Mutex
|
|
balancerCurrent *balancerWrapper
|
|
balancerPending *balancerWrapper
|
|
closed bool // set to true when this balancer is closed
|
|
|
|
// currentMu must be locked before mu. This mutex guards against this
|
|
// sequence of events: UpdateSubConnState() called, finds the
|
|
// balancerCurrent, gives up lock, updateState comes in, causes Close() on
|
|
// balancerCurrent before the UpdateSubConnState is called on the
|
|
// balancerCurrent.
|
|
currentMu sync.Mutex
|
|
}
|
|
|
|
// swap swaps out the current lb with the pending lb and updates the ClientConn.
|
|
// The caller must hold gsb.mu.
|
|
func (gsb *Balancer) swap() {
|
|
gsb.cc.UpdateState(gsb.balancerPending.lastState)
|
|
cur := gsb.balancerCurrent
|
|
gsb.balancerCurrent = gsb.balancerPending
|
|
gsb.balancerPending = nil
|
|
go func() {
|
|
gsb.currentMu.Lock()
|
|
defer gsb.currentMu.Unlock()
|
|
cur.Close()
|
|
}()
|
|
}
|
|
|
|
// Helper function that checks if the balancer passed in is current or pending.
|
|
// The caller must hold gsb.mu.
|
|
func (gsb *Balancer) balancerCurrentOrPending(bw *balancerWrapper) bool {
|
|
return bw == gsb.balancerCurrent || bw == gsb.balancerPending
|
|
}
|
|
|
|
// SwitchTo initializes the graceful switch process, which completes based on
|
|
// connectivity state changes on the current/pending balancer. Thus, the switch
|
|
// process is not complete when this method returns. This method must be called
|
|
// synchronously alongside the rest of the balancer.Balancer methods this
|
|
// Graceful Switch Balancer implements.
|
|
//
|
|
// Deprecated: use ParseConfig and pass a parsed config to UpdateClientConnState
|
|
// to cause the Balancer to automatically change to the new child when necessary.
|
|
func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
|
|
_, err := gsb.switchTo(builder)
|
|
return err
|
|
}
|
|
|
|
func (gsb *Balancer) switchTo(builder balancer.Builder) (*balancerWrapper, error) {
|
|
gsb.mu.Lock()
|
|
if gsb.closed {
|
|
gsb.mu.Unlock()
|
|
return nil, errBalancerClosed
|
|
}
|
|
bw := &balancerWrapper{
|
|
builder: builder,
|
|
gsb: gsb,
|
|
lastState: balancer.State{
|
|
ConnectivityState: connectivity.Connecting,
|
|
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
|
|
},
|
|
subconns: make(map[balancer.SubConn]bool),
|
|
}
|
|
balToClose := gsb.balancerPending // nil if there is no pending balancer
|
|
if gsb.balancerCurrent == nil {
|
|
gsb.balancerCurrent = bw
|
|
} else {
|
|
gsb.balancerPending = bw
|
|
}
|
|
gsb.mu.Unlock()
|
|
balToClose.Close()
|
|
// This function takes a builder instead of a balancer because builder.Build
|
|
// can call back inline, and this utility needs to handle the callbacks.
|
|
newBalancer := builder.Build(bw, gsb.bOpts)
|
|
if newBalancer == nil {
|
|
// This is illegal and should never happen; we clear the balancerWrapper
|
|
// we were constructing if it happens to avoid a potential panic.
|
|
gsb.mu.Lock()
|
|
if gsb.balancerPending != nil {
|
|
gsb.balancerPending = nil
|
|
} else {
|
|
gsb.balancerCurrent = nil
|
|
}
|
|
gsb.mu.Unlock()
|
|
return nil, balancer.ErrBadResolverState
|
|
}
|
|
|
|
// This write doesn't need to take gsb.mu because this field never gets read
|
|
// or written to on any calls from the current or pending. Calls from grpc
|
|
// to this balancer are guaranteed to be called synchronously, so this
|
|
// bw.Balancer field will never be forwarded to until this SwitchTo()
|
|
// function returns.
|
|
bw.Balancer = newBalancer
|
|
return bw, nil
|
|
}
|
|
|
|
// Returns nil if the graceful switch balancer is closed.
|
|
func (gsb *Balancer) latestBalancer() *balancerWrapper {
|
|
gsb.mu.Lock()
|
|
defer gsb.mu.Unlock()
|
|
if gsb.balancerPending != nil {
|
|
return gsb.balancerPending
|
|
}
|
|
return gsb.balancerCurrent
|
|
}
|
|
|
|
// UpdateClientConnState forwards the update to the latest balancer created.
|
|
//
|
|
// If the state's BalancerConfig is the config returned by a call to
|
|
// gracefulswitch.ParseConfig, then this function will automatically SwitchTo
|
|
// the balancer indicated by the config before forwarding its config to it, if
|
|
// necessary.
|
|
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
|
|
// The resolver data is only relevant to the most recent LB Policy.
|
|
balToUpdate := gsb.latestBalancer()
|
|
gsbCfg, ok := state.BalancerConfig.(*lbConfig)
|
|
if ok {
|
|
// Switch to the child in the config unless it is already active.
|
|
if balToUpdate == nil || gsbCfg.childBuilder.Name() != balToUpdate.builder.Name() {
|
|
var err error
|
|
balToUpdate, err = gsb.switchTo(gsbCfg.childBuilder)
|
|
if err != nil {
|
|
return fmt.Errorf("could not switch to new child balancer: %w", err)
|
|
}
|
|
}
|
|
// Unwrap the child balancer's config.
|
|
state.BalancerConfig = gsbCfg.childConfig
|
|
}
|
|
|
|
if balToUpdate == nil {
|
|
return errBalancerClosed
|
|
}
|
|
|
|
// Perform this call without gsb.mu to prevent deadlocks if the child calls
|
|
// back into the channel. The latest balancer can never be closed during a
|
|
// call from the channel, even without gsb.mu held.
|
|
return balToUpdate.UpdateClientConnState(state)
|
|
}
|
|
|
|
// ResolverError forwards the error to the latest balancer created.
|
|
func (gsb *Balancer) ResolverError(err error) {
|
|
// The resolver data is only relevant to the most recent LB Policy.
|
|
balToUpdate := gsb.latestBalancer()
|
|
if balToUpdate == nil {
|
|
gsb.cc.UpdateState(balancer.State{
|
|
ConnectivityState: connectivity.TransientFailure,
|
|
Picker: base.NewErrPicker(err),
|
|
})
|
|
return
|
|
}
|
|
// Perform this call without gsb.mu to prevent deadlocks if the child calls
|
|
// back into the channel. The latest balancer can never be closed during a
|
|
// call from the channel, even without gsb.mu held.
|
|
balToUpdate.ResolverError(err)
|
|
}
|
|
|
|
// ExitIdle forwards the call to the latest balancer created.
|
|
//
|
|
// If the latest balancer does not support ExitIdle, the subConns are
|
|
// re-connected to manually.
|
|
func (gsb *Balancer) ExitIdle() {
|
|
balToUpdate := gsb.latestBalancer()
|
|
if balToUpdate == nil {
|
|
return
|
|
}
|
|
// There is no need to protect this read with a mutex, as the write to the
|
|
// Balancer field happens in SwitchTo, which completes before this can be
|
|
// called.
|
|
if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
|
|
ei.ExitIdle()
|
|
return
|
|
}
|
|
gsb.mu.Lock()
|
|
defer gsb.mu.Unlock()
|
|
for sc := range balToUpdate.subconns {
|
|
sc.Connect()
|
|
}
|
|
}
|
|
|
|
// updateSubConnState forwards the update to the appropriate child.
|
|
func (gsb *Balancer) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState, cb func(balancer.SubConnState)) {
|
|
gsb.currentMu.Lock()
|
|
defer gsb.currentMu.Unlock()
|
|
gsb.mu.Lock()
|
|
// Forward update to the appropriate child. Even if there is a pending
|
|
// balancer, the current balancer should continue to get SubConn updates to
|
|
// maintain the proper state while the pending is still connecting.
|
|
var balToUpdate *balancerWrapper
|
|
if gsb.balancerCurrent != nil && gsb.balancerCurrent.subconns[sc] {
|
|
balToUpdate = gsb.balancerCurrent
|
|
} else if gsb.balancerPending != nil && gsb.balancerPending.subconns[sc] {
|
|
balToUpdate = gsb.balancerPending
|
|
}
|
|
if balToUpdate == nil {
|
|
// SubConn belonged to a stale lb policy that has not yet fully closed,
|
|
// or the balancer was already closed.
|
|
gsb.mu.Unlock()
|
|
return
|
|
}
|
|
if state.ConnectivityState == connectivity.Shutdown {
|
|
delete(balToUpdate.subconns, sc)
|
|
}
|
|
gsb.mu.Unlock()
|
|
if cb != nil {
|
|
cb(state)
|
|
} else {
|
|
balToUpdate.UpdateSubConnState(sc, state)
|
|
}
|
|
}
|
|
|
|
// UpdateSubConnState forwards the update to the appropriate child.
|
|
func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
|
|
gsb.updateSubConnState(sc, state, nil)
|
|
}
|
|
|
|
// Close closes any active child balancers.
|
|
func (gsb *Balancer) Close() {
|
|
gsb.mu.Lock()
|
|
gsb.closed = true
|
|
currentBalancerToClose := gsb.balancerCurrent
|
|
gsb.balancerCurrent = nil
|
|
pendingBalancerToClose := gsb.balancerPending
|
|
gsb.balancerPending = nil
|
|
gsb.mu.Unlock()
|
|
|
|
currentBalancerToClose.Close()
|
|
pendingBalancerToClose.Close()
|
|
}
|
|
|
|
// balancerWrapper wraps a balancer.Balancer, and overrides some Balancer
|
|
// methods to help cleanup SubConns created by the wrapped balancer.
|
|
//
|
|
// It implements the balancer.ClientConn interface and is passed down in that
|
|
// capacity to the wrapped balancer. It maintains a set of subConns created by
|
|
// the wrapped balancer and calls from the latter to create/update/shutdown
|
|
// SubConns update this set before being forwarded to the parent ClientConn.
|
|
// State updates from the wrapped balancer can result in invocation of the
|
|
// graceful switch logic.
|
|
type balancerWrapper struct {
|
|
balancer.Balancer
|
|
gsb *Balancer
|
|
builder balancer.Builder
|
|
|
|
lastState balancer.State
|
|
subconns map[balancer.SubConn]bool // subconns created by this balancer
|
|
}
|
|
|
|
// Close closes the underlying LB policy and shuts down the subconns it
|
|
// created. bw must not be referenced via balancerCurrent or balancerPending in
|
|
// gsb when called. gsb.mu must not be held. Does not panic with a nil
|
|
// receiver.
|
|
func (bw *balancerWrapper) Close() {
|
|
// before Close is called.
|
|
if bw == nil {
|
|
return
|
|
}
|
|
// There is no need to protect this read with a mutex, as Close() is
|
|
// impossible to be called concurrently with the write in SwitchTo(). The
|
|
// callsites of Close() for this balancer in Graceful Switch Balancer will
|
|
// never be called until SwitchTo() returns.
|
|
bw.Balancer.Close()
|
|
bw.gsb.mu.Lock()
|
|
for sc := range bw.subconns {
|
|
sc.Shutdown()
|
|
}
|
|
bw.gsb.mu.Unlock()
|
|
}
|
|
|
|
func (bw *balancerWrapper) UpdateState(state balancer.State) {
|
|
// Hold the mutex for this entire call to ensure it cannot occur
|
|
// concurrently with other updateState() calls. This causes updates to
|
|
// lastState and calls to cc.UpdateState to happen atomically.
|
|
bw.gsb.mu.Lock()
|
|
defer bw.gsb.mu.Unlock()
|
|
bw.lastState = state
|
|
|
|
if !bw.gsb.balancerCurrentOrPending(bw) {
|
|
return
|
|
}
|
|
|
|
if bw == bw.gsb.balancerCurrent {
|
|
// In the case that the current balancer exits READY, and there is a pending
|
|
// balancer, you can forward the pending balancer's cached State up to
|
|
// ClientConn and swap the pending into the current. This is because there
|
|
// is no reason to gracefully switch from and keep using the old policy as
|
|
// the ClientConn is not connected to any backends.
|
|
if state.ConnectivityState != connectivity.Ready && bw.gsb.balancerPending != nil {
|
|
bw.gsb.swap()
|
|
return
|
|
}
|
|
// Even if there is a pending balancer waiting to be gracefully switched to,
|
|
// continue to forward current balancer updates to the Client Conn. Ignoring
|
|
// state + picker from the current would cause undefined behavior/cause the
|
|
// system to behave incorrectly from the current LB policies perspective.
|
|
// Also, the current LB is still being used by grpc to choose SubConns per
|
|
// RPC, and thus should use the most updated form of the current balancer.
|
|
bw.gsb.cc.UpdateState(state)
|
|
return
|
|
}
|
|
// This method is now dealing with a state update from the pending balancer.
|
|
// If the current balancer is currently in a state other than READY, the new
|
|
// policy can be swapped into place immediately. This is because there is no
|
|
// reason to gracefully switch from and keep using the old policy as the
|
|
// ClientConn is not connected to any backends.
|
|
if state.ConnectivityState != connectivity.Connecting || bw.gsb.balancerCurrent.lastState.ConnectivityState != connectivity.Ready {
|
|
bw.gsb.swap()
|
|
}
|
|
}
|
|
|
|
func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
|
bw.gsb.mu.Lock()
|
|
if !bw.gsb.balancerCurrentOrPending(bw) {
|
|
bw.gsb.mu.Unlock()
|
|
return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
|
|
}
|
|
bw.gsb.mu.Unlock()
|
|
|
|
var sc balancer.SubConn
|
|
oldListener := opts.StateListener
|
|
opts.StateListener = func(state balancer.SubConnState) { bw.gsb.updateSubConnState(sc, state, oldListener) }
|
|
sc, err := bw.gsb.cc.NewSubConn(addrs, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
bw.gsb.mu.Lock()
|
|
if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call
|
|
sc.Shutdown()
|
|
bw.gsb.mu.Unlock()
|
|
return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
|
|
}
|
|
bw.subconns[sc] = true
|
|
bw.gsb.mu.Unlock()
|
|
return sc, nil
|
|
}
|
|
|
|
func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
|
|
// Ignore ResolveNow requests from anything other than the most recent
|
|
// balancer, because older balancers were already removed from the config.
|
|
if bw != bw.gsb.latestBalancer() {
|
|
return
|
|
}
|
|
bw.gsb.cc.ResolveNow(opts)
|
|
}
|
|
|
|
func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
|
|
// Note: existing third party balancers may call this, so it must remain
|
|
// until RemoveSubConn is fully removed.
|
|
sc.Shutdown()
|
|
}
|
|
|
|
func (bw *balancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
|
|
bw.gsb.mu.Lock()
|
|
if !bw.gsb.balancerCurrentOrPending(bw) {
|
|
bw.gsb.mu.Unlock()
|
|
return
|
|
}
|
|
bw.gsb.mu.Unlock()
|
|
bw.gsb.cc.UpdateAddresses(sc, addrs)
|
|
}
|
|
|
|
func (bw *balancerWrapper) Target() string {
|
|
return bw.gsb.cc.Target()
|
|
}
|