mirror of
https://gitea.com/Lydanne/buildx.git
synced 2025-07-09 21:17:09 +08:00
Bump moby/buildkit
Signed-off-by: ulyssessouza <ulyssessouza@gmail.com>
This commit is contained in:
357
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
357
vendor/google.golang.org/grpc/clientconn.go
generated
vendored
@ -38,13 +38,13 @@ import (
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal/backoff"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/envconfig"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/resolver"
|
||||
_ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
|
||||
_ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
@ -137,6 +137,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
opt.apply(&cc.dopts)
|
||||
}
|
||||
|
||||
chainUnaryClientInterceptors(cc)
|
||||
chainStreamClientInterceptors(cc)
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
cc.Close()
|
||||
@ -290,6 +293,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
CredsBundle: cc.dopts.copts.CredsBundle,
|
||||
Dialer: cc.dopts.copts.Dialer,
|
||||
ChannelzParentID: cc.channelzID,
|
||||
Target: cc.parsedTarget,
|
||||
}
|
||||
|
||||
// Build the resolver.
|
||||
@ -327,6 +331,68 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
||||
return cc, nil
|
||||
}
|
||||
|
||||
// chainUnaryClientInterceptors chains all unary client interceptors into one.
|
||||
func chainUnaryClientInterceptors(cc *ClientConn) {
|
||||
interceptors := cc.dopts.chainUnaryInts
|
||||
// Prepend dopts.unaryInt to the chaining interceptors if it exists, since unaryInt will
|
||||
// be executed before any other chained interceptors.
|
||||
if cc.dopts.unaryInt != nil {
|
||||
interceptors = append([]UnaryClientInterceptor{cc.dopts.unaryInt}, interceptors...)
|
||||
}
|
||||
var chainedInt UnaryClientInterceptor
|
||||
if len(interceptors) == 0 {
|
||||
chainedInt = nil
|
||||
} else if len(interceptors) == 1 {
|
||||
chainedInt = interceptors[0]
|
||||
} else {
|
||||
chainedInt = func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error {
|
||||
return interceptors[0](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, 0, invoker), opts...)
|
||||
}
|
||||
}
|
||||
cc.dopts.unaryInt = chainedInt
|
||||
}
|
||||
|
||||
// getChainUnaryInvoker recursively generate the chained unary invoker.
|
||||
func getChainUnaryInvoker(interceptors []UnaryClientInterceptor, curr int, finalInvoker UnaryInvoker) UnaryInvoker {
|
||||
if curr == len(interceptors)-1 {
|
||||
return finalInvoker
|
||||
}
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
|
||||
return interceptors[curr+1](ctx, method, req, reply, cc, getChainUnaryInvoker(interceptors, curr+1, finalInvoker), opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// chainStreamClientInterceptors chains all stream client interceptors into one.
|
||||
func chainStreamClientInterceptors(cc *ClientConn) {
|
||||
interceptors := cc.dopts.chainStreamInts
|
||||
// Prepend dopts.streamInt to the chaining interceptors if it exists, since streamInt will
|
||||
// be executed before any other chained interceptors.
|
||||
if cc.dopts.streamInt != nil {
|
||||
interceptors = append([]StreamClientInterceptor{cc.dopts.streamInt}, interceptors...)
|
||||
}
|
||||
var chainedInt StreamClientInterceptor
|
||||
if len(interceptors) == 0 {
|
||||
chainedInt = nil
|
||||
} else if len(interceptors) == 1 {
|
||||
chainedInt = interceptors[0]
|
||||
} else {
|
||||
chainedInt = func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error) {
|
||||
return interceptors[0](ctx, desc, cc, method, getChainStreamer(interceptors, 0, streamer), opts...)
|
||||
}
|
||||
}
|
||||
cc.dopts.streamInt = chainedInt
|
||||
}
|
||||
|
||||
// getChainStreamer recursively generate the chained client stream constructor.
|
||||
func getChainStreamer(interceptors []StreamClientInterceptor, curr int, finalStreamer Streamer) Streamer {
|
||||
if curr == len(interceptors)-1 {
|
||||
return finalStreamer
|
||||
}
|
||||
return func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
|
||||
return interceptors[curr+1](ctx, desc, cc, method, getChainStreamer(interceptors, curr+1, finalStreamer), opts...)
|
||||
}
|
||||
}
|
||||
|
||||
// connectivityStateManager keeps the connectivity.State of ClientConn.
|
||||
// This struct will eventually be exported so the balancers can access it.
|
||||
type connectivityStateManager struct {
|
||||
@ -466,24 +532,6 @@ func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
// gRPC should resort to default service config when:
|
||||
// * resolver service config is disabled
|
||||
// * or, resolver does not return a service config or returns an invalid one.
|
||||
func (cc *ClientConn) fallbackToDefaultServiceConfig(sc string) bool {
|
||||
if cc.dopts.disableServiceConfig {
|
||||
return true
|
||||
}
|
||||
// The logic below is temporary, will be removed once we change the resolver.State ServiceConfig field type.
|
||||
// Right now, we assume that empty service config string means resolver does not return a config.
|
||||
if sc == "" {
|
||||
return true
|
||||
}
|
||||
// TODO: the logic below is temporary. Once we finish the logic to validate service config
|
||||
// in resolver, we will replace the logic below.
|
||||
_, err := parseServiceConfig(sc)
|
||||
return err != nil
|
||||
}
|
||||
|
||||
func (cc *ClientConn) updateResolverState(s resolver.State) error {
|
||||
cc.mu.Lock()
|
||||
defer cc.mu.Unlock()
|
||||
@ -494,54 +542,47 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if cc.fallbackToDefaultServiceConfig(s.ServiceConfig) {
|
||||
if cc.dopts.disableServiceConfig || s.ServiceConfig == nil {
|
||||
if cc.dopts.defaultServiceConfig != nil && cc.sc == nil {
|
||||
cc.applyServiceConfig(cc.dopts.defaultServiceConfig)
|
||||
}
|
||||
} else {
|
||||
// TODO: the parsing logic below will be moved inside resolver.
|
||||
sc, err := parseServiceConfig(s.ServiceConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if cc.sc == nil || cc.sc.rawJSONString != s.ServiceConfig {
|
||||
cc.applyServiceConfig(sc)
|
||||
}
|
||||
}
|
||||
|
||||
// update the service config that will be sent to balancer.
|
||||
if cc.sc != nil {
|
||||
s.ServiceConfig = cc.sc.rawJSONString
|
||||
} else if sc, ok := s.ServiceConfig.(*ServiceConfig); ok {
|
||||
cc.applyServiceConfig(sc)
|
||||
}
|
||||
|
||||
var balCfg serviceconfig.LoadBalancingConfig
|
||||
if cc.dopts.balancerBuilder == nil {
|
||||
// Only look at balancer types and switch balancer if balancer dial
|
||||
// option is not set.
|
||||
var isGRPCLB bool
|
||||
for _, a := range s.Addresses {
|
||||
if a.Type == resolver.GRPCLB {
|
||||
isGRPCLB = true
|
||||
break
|
||||
}
|
||||
}
|
||||
var newBalancerName string
|
||||
// TODO: use new loadBalancerConfig field with appropriate priority.
|
||||
if isGRPCLB {
|
||||
newBalancerName = grpclbName
|
||||
} else if cc.sc != nil && cc.sc.LB != nil {
|
||||
newBalancerName = *cc.sc.LB
|
||||
if cc.sc != nil && cc.sc.lbConfig != nil {
|
||||
newBalancerName = cc.sc.lbConfig.name
|
||||
balCfg = cc.sc.lbConfig.cfg
|
||||
} else {
|
||||
newBalancerName = PickFirstBalancerName
|
||||
var isGRPCLB bool
|
||||
for _, a := range s.Addresses {
|
||||
if a.Type == resolver.GRPCLB {
|
||||
isGRPCLB = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if isGRPCLB {
|
||||
newBalancerName = grpclbName
|
||||
} else if cc.sc != nil && cc.sc.LB != nil {
|
||||
newBalancerName = *cc.sc.LB
|
||||
} else {
|
||||
newBalancerName = PickFirstBalancerName
|
||||
}
|
||||
}
|
||||
cc.switchBalancer(newBalancerName)
|
||||
} else if cc.balancerWrapper == nil {
|
||||
// Balancer dial option was set, and this is the first time handling
|
||||
// resolved addresses. Build a balancer with dopts.balancerBuilder.
|
||||
cc.curBalancerName = cc.dopts.balancerBuilder.Name()
|
||||
cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
|
||||
}
|
||||
|
||||
cc.balancerWrapper.updateResolverState(s)
|
||||
cc.firstResolveEvent.Fire()
|
||||
cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -554,7 +595,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State) error {
|
||||
//
|
||||
// Caller must hold cc.mu.
|
||||
func (cc *ClientConn) switchBalancer(name string) {
|
||||
if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
|
||||
if strings.EqualFold(cc.curBalancerName, name) {
|
||||
return
|
||||
}
|
||||
|
||||
@ -693,6 +734,8 @@ func (ac *addrConn) connect() error {
|
||||
ac.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
// Update connectivity state within the lock to prevent subsequent or
|
||||
// concurrent calls from resetting the transport more than once.
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.mu.Unlock()
|
||||
|
||||
@ -703,7 +746,16 @@ func (ac *addrConn) connect() error {
|
||||
|
||||
// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
|
||||
//
|
||||
// It checks whether current connected address of ac is in the new addrs list.
|
||||
// If ac is Connecting, it returns false. The caller should tear down the ac and
|
||||
// create a new one. Note that the backoff will be reset when this happens.
|
||||
//
|
||||
// If ac is TransientFailure, it updates ac.addrs and returns true. The updated
|
||||
// addresses will be picked up by retry in the next iteration after backoff.
|
||||
//
|
||||
// If ac is Shutdown or Idle, it updates ac.addrs and returns true.
|
||||
//
|
||||
// If ac is Ready, it checks whether current connected address of ac is in the
|
||||
// new addrs list.
|
||||
// - If true, it updates ac.addrs and returns true. The ac will keep using
|
||||
// the existing connection.
|
||||
// - If false, it does nothing and returns false.
|
||||
@ -711,17 +763,18 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
|
||||
ac.mu.Lock()
|
||||
defer ac.mu.Unlock()
|
||||
grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
|
||||
if ac.state == connectivity.Shutdown {
|
||||
if ac.state == connectivity.Shutdown ||
|
||||
ac.state == connectivity.TransientFailure ||
|
||||
ac.state == connectivity.Idle {
|
||||
ac.addrs = addrs
|
||||
return true
|
||||
}
|
||||
|
||||
// Unless we're busy reconnecting already, let's reconnect from the top of
|
||||
// the list.
|
||||
if ac.state != connectivity.Ready {
|
||||
if ac.state == connectivity.Connecting {
|
||||
return false
|
||||
}
|
||||
|
||||
// ac.state is Ready, try to find the connected address.
|
||||
var curAddrFound bool
|
||||
for _, a := range addrs {
|
||||
if reflect.DeepEqual(ac.curAddr, a) {
|
||||
@ -970,6 +1023,9 @@ func (ac *addrConn) resetTransport() {
|
||||
// The spec doesn't mention what should be done for multiple addresses.
|
||||
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
|
||||
connectDeadline := time.Now().Add(dialDuration)
|
||||
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.transport = nil
|
||||
ac.mu.Unlock()
|
||||
|
||||
newTr, addr, reconnect, err := ac.tryAllAddrs(addrs, connectDeadline)
|
||||
@ -1004,55 +1060,32 @@ func (ac *addrConn) resetTransport() {
|
||||
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
newTr.Close()
|
||||
ac.mu.Unlock()
|
||||
newTr.Close()
|
||||
return
|
||||
}
|
||||
ac.curAddr = addr
|
||||
ac.transport = newTr
|
||||
ac.backoffIdx = 0
|
||||
|
||||
healthCheckConfig := ac.cc.healthCheckConfig()
|
||||
// LB channel health checking is only enabled when all the four requirements below are met:
|
||||
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
|
||||
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
|
||||
// 3. a service config with non-empty healthCheckConfig field is provided,
|
||||
// 4. the current load balancer allows it.
|
||||
hctx, hcancel := context.WithCancel(ac.ctx)
|
||||
healthcheckManagingState := false
|
||||
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
|
||||
if ac.cc.dopts.healthCheckFunc == nil {
|
||||
// TODO: add a link to the health check doc in the error message.
|
||||
grpclog.Error("the client side LB channel health check function has not been set.")
|
||||
} else {
|
||||
// TODO(deklerk) refactor to just return transport
|
||||
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
|
||||
healthcheckManagingState = true
|
||||
}
|
||||
}
|
||||
if !healthcheckManagingState {
|
||||
ac.updateConnectivityState(connectivity.Ready)
|
||||
}
|
||||
ac.startHealthCheck(hctx)
|
||||
ac.mu.Unlock()
|
||||
|
||||
// Block until the created transport is down. And when this happens,
|
||||
// we restart from the top of the addr list.
|
||||
<-reconnect.Done()
|
||||
hcancel()
|
||||
|
||||
// Need to reconnect after a READY, the addrConn enters
|
||||
// TRANSIENT_FAILURE.
|
||||
// restart connecting - the top of the loop will set state to
|
||||
// CONNECTING. This is against the current connectivity semantics doc,
|
||||
// however it allows for graceful behavior for RPCs not yet dispatched
|
||||
// - unfortunate timing would otherwise lead to the RPC failing even
|
||||
// though the TRANSIENT_FAILURE state (called for by the doc) would be
|
||||
// instantaneous.
|
||||
//
|
||||
// This will set addrConn to TRANSIENT_FAILURE for a very short period
|
||||
// of time, and turns CONNECTING. It seems reasonable to skip this, but
|
||||
// READY-CONNECTING is not a valid transition.
|
||||
ac.mu.Lock()
|
||||
if ac.state == connectivity.Shutdown {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
ac.mu.Unlock()
|
||||
// Ideally we should transition to Idle here and block until there is
|
||||
// RPC activity that leads to the balancer requesting a reconnect of
|
||||
// the associated SubConn.
|
||||
}
|
||||
}
|
||||
|
||||
@ -1066,8 +1099,6 @@ func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.T
|
||||
ac.mu.Unlock()
|
||||
return nil, resolver.Address{}, nil, errConnClosing
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
ac.transport = nil
|
||||
|
||||
ac.cc.mu.RLock()
|
||||
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
||||
@ -1111,14 +1142,35 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
||||
Authority: ac.cc.authority,
|
||||
}
|
||||
|
||||
once := sync.Once{}
|
||||
onGoAway := func(r transport.GoAwayReason) {
|
||||
ac.mu.Lock()
|
||||
ac.adjustParams(r)
|
||||
once.Do(func() {
|
||||
if ac.state == connectivity.Ready {
|
||||
// Prevent this SubConn from being used for new RPCs by setting its
|
||||
// state to Connecting.
|
||||
//
|
||||
// TODO: this should be Idle when grpc-go properly supports it.
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
}
|
||||
})
|
||||
ac.mu.Unlock()
|
||||
reconnect.Fire()
|
||||
}
|
||||
|
||||
onClose := func() {
|
||||
ac.mu.Lock()
|
||||
once.Do(func() {
|
||||
if ac.state == connectivity.Ready {
|
||||
// Prevent this SubConn from being used for new RPCs by setting its
|
||||
// state to Connecting.
|
||||
//
|
||||
// TODO: this should be Idle when grpc-go properly supports it.
|
||||
ac.updateConnectivityState(connectivity.Connecting)
|
||||
}
|
||||
})
|
||||
ac.mu.Unlock()
|
||||
close(onCloseCalled)
|
||||
reconnect.Fire()
|
||||
}
|
||||
@ -1140,60 +1192,99 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
|
||||
select {
|
||||
case <-time.After(connectDeadline.Sub(time.Now())):
|
||||
// We didn't get the preface in time.
|
||||
newTr.Close()
|
||||
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
|
||||
return nil, nil, errors.New("timed out waiting for server handshake")
|
||||
case <-prefaceReceived:
|
||||
// We got the preface - huzzah! things are good.
|
||||
case <-onCloseCalled:
|
||||
// The transport has already closed - noop.
|
||||
return nil, nil, errors.New("connection closed")
|
||||
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
|
||||
}
|
||||
select {
|
||||
case <-time.After(connectDeadline.Sub(time.Now())):
|
||||
// We didn't get the preface in time.
|
||||
newTr.Close()
|
||||
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
|
||||
return nil, nil, errors.New("timed out waiting for server handshake")
|
||||
case <-prefaceReceived:
|
||||
// We got the preface - huzzah! things are good.
|
||||
case <-onCloseCalled:
|
||||
// The transport has already closed - noop.
|
||||
return nil, nil, errors.New("connection closed")
|
||||
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
|
||||
}
|
||||
return newTr, reconnect, nil
|
||||
}
|
||||
|
||||
func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
|
||||
// Set up the health check helper functions
|
||||
newStream := func() (interface{}, error) {
|
||||
return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
|
||||
// startHealthCheck starts the health checking stream (RPC) to watch the health
|
||||
// stats of this connection if health checking is requested and configured.
|
||||
//
|
||||
// LB channel health checking is enabled when all requirements below are met:
|
||||
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption
|
||||
// 2. internal.HealthCheckFunc is set by importing the grpc/healthcheck package
|
||||
// 3. a service config with non-empty healthCheckConfig field is provided
|
||||
// 4. the load balancer requests it
|
||||
//
|
||||
// It sets addrConn to READY if the health checking stream is not started.
|
||||
//
|
||||
// Caller must hold ac.mu.
|
||||
func (ac *addrConn) startHealthCheck(ctx context.Context) {
|
||||
var healthcheckManagingState bool
|
||||
defer func() {
|
||||
if !healthcheckManagingState {
|
||||
ac.updateConnectivityState(connectivity.Ready)
|
||||
}
|
||||
}()
|
||||
|
||||
if ac.cc.dopts.disableHealthCheck {
|
||||
return
|
||||
}
|
||||
firstReady := true
|
||||
reportHealth := func(ok bool) {
|
||||
healthCheckConfig := ac.cc.healthCheckConfig()
|
||||
if healthCheckConfig == nil {
|
||||
return
|
||||
}
|
||||
if !ac.scopts.HealthCheckEnabled {
|
||||
return
|
||||
}
|
||||
healthCheckFunc := ac.cc.dopts.healthCheckFunc
|
||||
if healthCheckFunc == nil {
|
||||
// The health package is not imported to set health check function.
|
||||
//
|
||||
// TODO: add a link to the health check doc in the error message.
|
||||
grpclog.Error("Health check is requested but health check function is not set.")
|
||||
return
|
||||
}
|
||||
|
||||
healthcheckManagingState = true
|
||||
|
||||
// Set up the health check helper functions.
|
||||
currentTr := ac.transport
|
||||
newStream := func(method string) (interface{}, error) {
|
||||
ac.mu.Lock()
|
||||
if ac.transport != currentTr {
|
||||
ac.mu.Unlock()
|
||||
return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
return newNonRetryClientStream(ctx, &StreamDesc{ServerStreams: true}, method, currentTr, ac)
|
||||
}
|
||||
setConnectivityState := func(s connectivity.State) {
|
||||
ac.mu.Lock()
|
||||
defer ac.mu.Unlock()
|
||||
if ac.transport != newTr {
|
||||
if ac.transport != currentTr {
|
||||
return
|
||||
}
|
||||
if ok {
|
||||
if firstReady {
|
||||
firstReady = false
|
||||
ac.curAddr = addr
|
||||
}
|
||||
ac.updateConnectivityState(connectivity.Ready)
|
||||
} else {
|
||||
ac.updateConnectivityState(connectivity.TransientFailure)
|
||||
}
|
||||
ac.updateConnectivityState(s)
|
||||
}
|
||||
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
|
||||
if err != nil {
|
||||
if status.Code(err) == codes.Unimplemented {
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
|
||||
Severity: channelz.CtError,
|
||||
})
|
||||
// Start the health checking stream.
|
||||
go func() {
|
||||
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, setConnectivityState, healthCheckConfig.ServiceName)
|
||||
if err != nil {
|
||||
if status.Code(err) == codes.Unimplemented {
|
||||
if channelz.IsOn() {
|
||||
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
|
||||
Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
|
||||
Severity: channelz.CtError,
|
||||
})
|
||||
}
|
||||
grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
|
||||
} else {
|
||||
grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
|
||||
}
|
||||
grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
|
||||
} else {
|
||||
grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (ac *addrConn) resetConnectBackoff() {
|
||||
|
Reference in New Issue
Block a user