mirror of
https://gitea.com/Lydanne/buildx.git
synced 2025-07-09 21:17:09 +08:00
vendor: bump k8s dependencies to v0.29.2
Signed-off-by: CrazyMax <1951866+crazy-max@users.noreply.github.com>
This commit is contained in:
14
vendor/k8s.io/client-go/tools/clientcmd/api/types.go
generated
vendored
14
vendor/k8s.io/client-go/tools/clientcmd/api/types.go
generated
vendored
@ -67,7 +67,7 @@ type Preferences struct {
|
||||
type Cluster struct {
|
||||
// LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
|
||||
// +k8s:conversion-gen=false
|
||||
LocationOfOrigin string
|
||||
LocationOfOrigin string `json:"-"`
|
||||
// Server is the address of the kubernetes cluster (https://hostname:port).
|
||||
Server string `json:"server"`
|
||||
// TLSServerName is used to check server certificate. If TLSServerName is empty, the hostname used to contact the server is used.
|
||||
@ -107,7 +107,7 @@ type Cluster struct {
|
||||
type AuthInfo struct {
|
||||
// LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
|
||||
// +k8s:conversion-gen=false
|
||||
LocationOfOrigin string
|
||||
LocationOfOrigin string `json:"-"`
|
||||
// ClientCertificate is the path to a client cert file for TLS.
|
||||
// +optional
|
||||
ClientCertificate string `json:"client-certificate,omitempty"`
|
||||
@ -159,7 +159,7 @@ type AuthInfo struct {
|
||||
type Context struct {
|
||||
// LocationOfOrigin indicates where this object came from. It is used for round tripping config post-merge, but never serialized.
|
||||
// +k8s:conversion-gen=false
|
||||
LocationOfOrigin string
|
||||
LocationOfOrigin string `json:"-"`
|
||||
// Cluster is the name of the cluster for this context
|
||||
Cluster string `json:"cluster"`
|
||||
// AuthInfo is the name of the authInfo for this context
|
||||
@ -252,7 +252,7 @@ type ExecConfig struct {
|
||||
// recommended as one of the prime benefits of exec plugins is that no secrets need
|
||||
// to be stored directly in the kubeconfig.
|
||||
// +k8s:conversion-gen=false
|
||||
Config runtime.Object
|
||||
Config runtime.Object `json:"-"`
|
||||
|
||||
// InteractiveMode determines this plugin's relationship with standard input. Valid
|
||||
// values are "Never" (this exec plugin never uses standard input), "IfAvailable" (this
|
||||
@ -264,7 +264,7 @@ type ExecConfig struct {
|
||||
// client.authentication.k8s.io/v1beta1, then this field is optional and defaults
|
||||
// to "IfAvailable" when unset. Otherwise, this field is required.
|
||||
// +optional
|
||||
InteractiveMode ExecInteractiveMode
|
||||
InteractiveMode ExecInteractiveMode `json:"interactiveMode,omitempty"`
|
||||
|
||||
// StdinUnavailable indicates whether the exec authenticator can pass standard
|
||||
// input through to this exec plugin. For example, a higher level entity might be using
|
||||
@ -272,14 +272,14 @@ type ExecConfig struct {
|
||||
// plugin to use standard input. This is kept here in order to keep all of the exec configuration
|
||||
// together, but it is never serialized.
|
||||
// +k8s:conversion-gen=false
|
||||
StdinUnavailable bool
|
||||
StdinUnavailable bool `json:"-"`
|
||||
|
||||
// StdinUnavailableMessage is an optional message to be displayed when the exec authenticator
|
||||
// cannot successfully run this exec plugin because it needs to use standard input and
|
||||
// StdinUnavailable is true. For example, a process that is already using standard input to
|
||||
// read user instructions might set this to "used by my-program to read user instructions".
|
||||
// +k8s:conversion-gen=false
|
||||
StdinUnavailableMessage string
|
||||
StdinUnavailableMessage string `json:"-"`
|
||||
}
|
||||
|
||||
var _ fmt.Stringer = new(ExecConfig)
|
||||
|
24
vendor/k8s.io/client-go/tools/clientcmd/loader.go
generated
vendored
24
vendor/k8s.io/client-go/tools/clientcmd/loader.go
generated
vendored
@ -128,6 +128,28 @@ type ClientConfigLoadingRules struct {
|
||||
// WarnIfAllMissing indicates whether the configuration files pointed by KUBECONFIG environment variable are present or not.
|
||||
// In case of missing files, it warns the user about the missing files.
|
||||
WarnIfAllMissing bool
|
||||
|
||||
// Warner is the warning log callback to use in case of missing files.
|
||||
Warner WarningHandler
|
||||
}
|
||||
|
||||
// WarningHandler allows to set the logging function to use
|
||||
type WarningHandler func(error)
|
||||
|
||||
func (handler WarningHandler) Warn(err error) {
|
||||
if handler == nil {
|
||||
klog.V(1).Info(err)
|
||||
} else {
|
||||
handler(err)
|
||||
}
|
||||
}
|
||||
|
||||
type MissingConfigError struct {
|
||||
Missing []string
|
||||
}
|
||||
|
||||
func (c MissingConfigError) Error() string {
|
||||
return fmt.Sprintf("Config not found: %s", strings.Join(c.Missing, ", "))
|
||||
}
|
||||
|
||||
// ClientConfigLoadingRules implements the ClientConfigLoader interface.
|
||||
@ -219,7 +241,7 @@ func (rules *ClientConfigLoadingRules) Load() (*clientcmdapi.Config, error) {
|
||||
}
|
||||
|
||||
if rules.WarnIfAllMissing && len(missingList) > 0 && len(kubeconfigs) == 0 {
|
||||
klog.Warningf("Config not found: %s", strings.Join(missingList, ", "))
|
||||
rules.Warner.Warn(MissingConfigError{Missing: missingList})
|
||||
}
|
||||
|
||||
// first merge all of our maps
|
||||
|
4
vendor/k8s.io/client-go/tools/clientcmd/merged_client_builder.go
generated
vendored
4
vendor/k8s.io/client-go/tools/clientcmd/merged_client_builder.go
generated
vendored
@ -49,12 +49,12 @@ type InClusterConfig interface {
|
||||
Possible() bool
|
||||
}
|
||||
|
||||
// NewNonInteractiveDeferredLoadingClientConfig creates a ConfigClientClientConfig using the passed context name
|
||||
// NewNonInteractiveDeferredLoadingClientConfig creates a ClientConfig using the passed context name
|
||||
func NewNonInteractiveDeferredLoadingClientConfig(loader ClientConfigLoader, overrides *ConfigOverrides) ClientConfig {
|
||||
return &DeferredLoadingClientConfig{loader: loader, overrides: overrides, icc: &inClusterClientConfig{overrides: overrides}}
|
||||
}
|
||||
|
||||
// NewInteractiveDeferredLoadingClientConfig creates a ConfigClientClientConfig using the passed context name and the fallback auth reader
|
||||
// NewInteractiveDeferredLoadingClientConfig creates a ClientConfig using the passed context name and the fallback auth reader
|
||||
func NewInteractiveDeferredLoadingClientConfig(loader ClientConfigLoader, overrides *ConfigOverrides, fallbackReader io.Reader) ClientConfig {
|
||||
return &DeferredLoadingClientConfig{loader: loader, overrides: overrides, icc: &inClusterClientConfig{overrides: overrides}, fallbackReader: fallbackReader}
|
||||
}
|
||||
|
65
vendor/k8s.io/client-go/tools/metrics/metrics.go
generated
vendored
65
vendor/k8s.io/client-go/tools/metrics/metrics.go
generated
vendored
@ -42,6 +42,10 @@ type LatencyMetric interface {
|
||||
Observe(ctx context.Context, verb string, u url.URL, latency time.Duration)
|
||||
}
|
||||
|
||||
type ResolverLatencyMetric interface {
|
||||
Observe(ctx context.Context, host string, latency time.Duration)
|
||||
}
|
||||
|
||||
// SizeMetric observes client response size partitioned by verb and host.
|
||||
type SizeMetric interface {
|
||||
Observe(ctx context.Context, verb string, host string, size float64)
|
||||
@ -58,6 +62,23 @@ type CallsMetric interface {
|
||||
Increment(exitCode int, callStatus string)
|
||||
}
|
||||
|
||||
// RetryMetric counts the number of retries sent to the server
|
||||
// partitioned by code, method, and host.
|
||||
type RetryMetric interface {
|
||||
IncrementRetry(ctx context.Context, code string, method string, host string)
|
||||
}
|
||||
|
||||
// TransportCacheMetric shows the number of entries in the internal transport cache
|
||||
type TransportCacheMetric interface {
|
||||
Observe(value int)
|
||||
}
|
||||
|
||||
// TransportCreateCallsMetric counts the number of times a transport is created
|
||||
// partitioned by the result of the cache: hit, miss, uncacheable
|
||||
type TransportCreateCallsMetric interface {
|
||||
Increment(result string)
|
||||
}
|
||||
|
||||
var (
|
||||
// ClientCertExpiry is the expiry time of a client certificate
|
||||
ClientCertExpiry ExpiryMetric = noopExpiry{}
|
||||
@ -65,6 +86,8 @@ var (
|
||||
ClientCertRotationAge DurationMetric = noopDuration{}
|
||||
// RequestLatency is the latency metric that rest clients will update.
|
||||
RequestLatency LatencyMetric = noopLatency{}
|
||||
// ResolverLatency is the latency metric that DNS resolver will update
|
||||
ResolverLatency ResolverLatencyMetric = noopResolverLatency{}
|
||||
// RequestSize is the request size metric that rest clients will update.
|
||||
RequestSize SizeMetric = noopSize{}
|
||||
// ResponseSize is the response size metric that rest clients will update.
|
||||
@ -76,6 +99,15 @@ var (
|
||||
// ExecPluginCalls is the number of calls made to an exec plugin, partitioned by
|
||||
// exit code and call status.
|
||||
ExecPluginCalls CallsMetric = noopCalls{}
|
||||
// RequestRetry is the retry metric that tracks the number of
|
||||
// retries sent to the server.
|
||||
RequestRetry RetryMetric = noopRetry{}
|
||||
// TransportCacheEntries is the metric that tracks the number of entries in the
|
||||
// internal transport cache.
|
||||
TransportCacheEntries TransportCacheMetric = noopTransportCache{}
|
||||
// TransportCreateCalls is the metric that counts the number of times a new transport
|
||||
// is created
|
||||
TransportCreateCalls TransportCreateCallsMetric = noopTransportCreateCalls{}
|
||||
)
|
||||
|
||||
// RegisterOpts contains all the metrics to register. Metrics may be nil.
|
||||
@ -83,11 +115,15 @@ type RegisterOpts struct {
|
||||
ClientCertExpiry ExpiryMetric
|
||||
ClientCertRotationAge DurationMetric
|
||||
RequestLatency LatencyMetric
|
||||
ResolverLatency ResolverLatencyMetric
|
||||
RequestSize SizeMetric
|
||||
ResponseSize SizeMetric
|
||||
RateLimiterLatency LatencyMetric
|
||||
RequestResult ResultMetric
|
||||
ExecPluginCalls CallsMetric
|
||||
RequestRetry RetryMetric
|
||||
TransportCacheEntries TransportCacheMetric
|
||||
TransportCreateCalls TransportCreateCallsMetric
|
||||
}
|
||||
|
||||
// Register registers metrics for the rest client to use. This can
|
||||
@ -103,6 +139,9 @@ func Register(opts RegisterOpts) {
|
||||
if opts.RequestLatency != nil {
|
||||
RequestLatency = opts.RequestLatency
|
||||
}
|
||||
if opts.ResolverLatency != nil {
|
||||
ResolverLatency = opts.ResolverLatency
|
||||
}
|
||||
if opts.RequestSize != nil {
|
||||
RequestSize = opts.RequestSize
|
||||
}
|
||||
@ -118,6 +157,15 @@ func Register(opts RegisterOpts) {
|
||||
if opts.ExecPluginCalls != nil {
|
||||
ExecPluginCalls = opts.ExecPluginCalls
|
||||
}
|
||||
if opts.RequestRetry != nil {
|
||||
RequestRetry = opts.RequestRetry
|
||||
}
|
||||
if opts.TransportCacheEntries != nil {
|
||||
TransportCacheEntries = opts.TransportCacheEntries
|
||||
}
|
||||
if opts.TransportCreateCalls != nil {
|
||||
TransportCreateCalls = opts.TransportCreateCalls
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@ -133,6 +181,11 @@ type noopLatency struct{}
|
||||
|
||||
func (noopLatency) Observe(context.Context, string, url.URL, time.Duration) {}
|
||||
|
||||
type noopResolverLatency struct{}
|
||||
|
||||
func (n noopResolverLatency) Observe(ctx context.Context, host string, latency time.Duration) {
|
||||
}
|
||||
|
||||
type noopSize struct{}
|
||||
|
||||
func (noopSize) Observe(context.Context, string, string, float64) {}
|
||||
@ -144,3 +197,15 @@ func (noopResult) Increment(context.Context, string, string, string) {}
|
||||
type noopCalls struct{}
|
||||
|
||||
func (noopCalls) Increment(int, string) {}
|
||||
|
||||
type noopRetry struct{}
|
||||
|
||||
func (noopRetry) IncrementRetry(context.Context, string, string, string) {}
|
||||
|
||||
type noopTransportCache struct{}
|
||||
|
||||
func (noopTransportCache) Observe(int) {}
|
||||
|
||||
type noopTransportCreateCalls struct{}
|
||||
|
||||
func (noopTransportCreateCalls) Increment(string) {}
|
||||
|
57
vendor/k8s.io/client-go/tools/remotecommand/fallback.go
generated
vendored
Normal file
57
vendor/k8s.io/client-go/tools/remotecommand/fallback.go
generated
vendored
Normal file
@ -0,0 +1,57 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
var _ Executor = &fallbackExecutor{}
|
||||
|
||||
type fallbackExecutor struct {
|
||||
primary Executor
|
||||
secondary Executor
|
||||
shouldFallback func(error) bool
|
||||
}
|
||||
|
||||
// NewFallbackExecutor creates an Executor that first attempts to use the
|
||||
// WebSocketExecutor, falling back to the legacy SPDYExecutor if the initial
|
||||
// websocket "StreamWithContext" call fails.
|
||||
// func NewFallbackExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
|
||||
func NewFallbackExecutor(primary, secondary Executor, shouldFallback func(error) bool) (Executor, error) {
|
||||
return &fallbackExecutor{
|
||||
primary: primary,
|
||||
secondary: secondary,
|
||||
shouldFallback: shouldFallback,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Stream is deprecated. Please use "StreamWithContext".
|
||||
func (f *fallbackExecutor) Stream(options StreamOptions) error {
|
||||
return f.StreamWithContext(context.Background(), options)
|
||||
}
|
||||
|
||||
// StreamWithContext initially attempts to call "StreamWithContext" using the
|
||||
// primary executor, falling back to calling the secondary executor if the
|
||||
// initial primary call to upgrade to a websocket connection fails.
|
||||
func (f *fallbackExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
|
||||
err := f.primary.StreamWithContext(ctx, options)
|
||||
if f.shouldFallback(err) {
|
||||
return f.secondary.StreamWithContext(ctx, options)
|
||||
}
|
||||
return err
|
||||
}
|
124
vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go
generated
vendored
124
vendor/k8s.io/client-go/tools/remotecommand/remotecommand.go
generated
vendored
@ -18,17 +18,10 @@ package remotecommand
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/apimachinery/pkg/util/remotecommand"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/transport/spdy"
|
||||
)
|
||||
|
||||
// StreamOptions holds information pertaining to the current streaming session:
|
||||
@ -63,120 +56,3 @@ type streamCreator interface {
|
||||
type streamProtocolHandler interface {
|
||||
stream(conn streamCreator) error
|
||||
}
|
||||
|
||||
// streamExecutor handles transporting standard shell streams over an httpstream connection.
|
||||
type streamExecutor struct {
|
||||
upgrader spdy.Upgrader
|
||||
transport http.RoundTripper
|
||||
|
||||
method string
|
||||
url *url.URL
|
||||
protocols []string
|
||||
}
|
||||
|
||||
// NewSPDYExecutor connects to the provided server and upgrades the connection to
|
||||
// multiplexed bidirectional streams.
|
||||
func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
|
||||
wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewSPDYExecutorForTransports(wrapper, upgradeRoundTripper, method, url)
|
||||
}
|
||||
|
||||
// NewSPDYExecutorForTransports connects to the provided server using the given transport,
|
||||
// upgrades the response using the given upgrader to multiplexed bidirectional streams.
|
||||
func NewSPDYExecutorForTransports(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL) (Executor, error) {
|
||||
return NewSPDYExecutorForProtocols(
|
||||
transport, upgrader, method, url,
|
||||
remotecommand.StreamProtocolV4Name,
|
||||
remotecommand.StreamProtocolV3Name,
|
||||
remotecommand.StreamProtocolV2Name,
|
||||
remotecommand.StreamProtocolV1Name,
|
||||
)
|
||||
}
|
||||
|
||||
// NewSPDYExecutorForProtocols connects to the provided server and upgrades the connection to
|
||||
// multiplexed bidirectional streams using only the provided protocols. Exposed for testing, most
|
||||
// callers should use NewSPDYExecutor or NewSPDYExecutorForTransports.
|
||||
func NewSPDYExecutorForProtocols(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL, protocols ...string) (Executor, error) {
|
||||
return &streamExecutor{
|
||||
upgrader: upgrader,
|
||||
transport: transport,
|
||||
method: method,
|
||||
url: url,
|
||||
protocols: protocols,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Stream opens a protocol streamer to the server and streams until a client closes
|
||||
// the connection or the server disconnects.
|
||||
func (e *streamExecutor) Stream(options StreamOptions) error {
|
||||
return e.StreamWithContext(context.Background(), options)
|
||||
}
|
||||
|
||||
// newConnectionAndStream creates a new SPDY connection and a stream protocol handler upon it.
|
||||
func (e *streamExecutor) newConnectionAndStream(ctx context.Context, options StreamOptions) (httpstream.Connection, streamProtocolHandler, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, e.method, e.url.String(), nil)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating request: %v", err)
|
||||
}
|
||||
|
||||
conn, protocol, err := spdy.Negotiate(
|
||||
e.upgrader,
|
||||
&http.Client{Transport: e.transport},
|
||||
req,
|
||||
e.protocols...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var streamer streamProtocolHandler
|
||||
|
||||
switch protocol {
|
||||
case remotecommand.StreamProtocolV4Name:
|
||||
streamer = newStreamProtocolV4(options)
|
||||
case remotecommand.StreamProtocolV3Name:
|
||||
streamer = newStreamProtocolV3(options)
|
||||
case remotecommand.StreamProtocolV2Name:
|
||||
streamer = newStreamProtocolV2(options)
|
||||
case "":
|
||||
klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
|
||||
fallthrough
|
||||
case remotecommand.StreamProtocolV1Name:
|
||||
streamer = newStreamProtocolV1(options)
|
||||
}
|
||||
|
||||
return conn, streamer, nil
|
||||
}
|
||||
|
||||
// StreamWithContext opens a protocol streamer to the server and streams until a client closes
|
||||
// the connection or the server disconnects or the context is done.
|
||||
func (e *streamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
|
||||
conn, streamer, err := e.newConnectionAndStream(ctx, options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
panicChan := make(chan any, 1)
|
||||
errorChan := make(chan error, 1)
|
||||
go func() {
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
panicChan <- p
|
||||
}
|
||||
}()
|
||||
errorChan <- streamer.stream(conn)
|
||||
}()
|
||||
|
||||
select {
|
||||
case p := <-panicChan:
|
||||
panic(p)
|
||||
case err := <-errorChan:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
171
vendor/k8s.io/client-go/tools/remotecommand/spdy.go
generated
vendored
Normal file
171
vendor/k8s.io/client-go/tools/remotecommand/spdy.go
generated
vendored
Normal file
@ -0,0 +1,171 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/apimachinery/pkg/util/remotecommand"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/transport/spdy"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// spdyStreamExecutor handles transporting standard shell streams over an httpstream connection.
|
||||
type spdyStreamExecutor struct {
|
||||
upgrader spdy.Upgrader
|
||||
transport http.RoundTripper
|
||||
|
||||
method string
|
||||
url *url.URL
|
||||
protocols []string
|
||||
rejectRedirects bool // if true, receiving redirect from upstream is an error
|
||||
}
|
||||
|
||||
// NewSPDYExecutor connects to the provided server and upgrades the connection to
|
||||
// multiplexed bidirectional streams.
|
||||
func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
|
||||
wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewSPDYExecutorForTransports(wrapper, upgradeRoundTripper, method, url)
|
||||
}
|
||||
|
||||
// NewSPDYExecutorRejectRedirects returns an Executor that will upgrade the future
|
||||
// connection to a SPDY bi-directional streaming connection when calling "Stream" (deprecated)
|
||||
// or "StreamWithContext" (preferred). Additionally, if the upstream server returns a redirect
|
||||
// during the attempted upgrade in these "Stream" calls, an error is returned.
|
||||
func NewSPDYExecutorRejectRedirects(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL) (Executor, error) {
|
||||
executor, err := NewSPDYExecutorForTransports(transport, upgrader, method, url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
spdyExecutor := executor.(*spdyStreamExecutor)
|
||||
spdyExecutor.rejectRedirects = true
|
||||
return spdyExecutor, nil
|
||||
}
|
||||
|
||||
// NewSPDYExecutorForTransports connects to the provided server using the given transport,
|
||||
// upgrades the response using the given upgrader to multiplexed bidirectional streams.
|
||||
func NewSPDYExecutorForTransports(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL) (Executor, error) {
|
||||
return NewSPDYExecutorForProtocols(
|
||||
transport, upgrader, method, url,
|
||||
remotecommand.StreamProtocolV5Name,
|
||||
remotecommand.StreamProtocolV4Name,
|
||||
remotecommand.StreamProtocolV3Name,
|
||||
remotecommand.StreamProtocolV2Name,
|
||||
remotecommand.StreamProtocolV1Name,
|
||||
)
|
||||
}
|
||||
|
||||
// NewSPDYExecutorForProtocols connects to the provided server and upgrades the connection to
|
||||
// multiplexed bidirectional streams using only the provided protocols. Exposed for testing, most
|
||||
// callers should use NewSPDYExecutor or NewSPDYExecutorForTransports.
|
||||
func NewSPDYExecutorForProtocols(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL, protocols ...string) (Executor, error) {
|
||||
return &spdyStreamExecutor{
|
||||
upgrader: upgrader,
|
||||
transport: transport,
|
||||
method: method,
|
||||
url: url,
|
||||
protocols: protocols,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Stream opens a protocol streamer to the server and streams until a client closes
|
||||
// the connection or the server disconnects.
|
||||
func (e *spdyStreamExecutor) Stream(options StreamOptions) error {
|
||||
return e.StreamWithContext(context.Background(), options)
|
||||
}
|
||||
|
||||
// newConnectionAndStream creates a new SPDY connection and a stream protocol handler upon it.
|
||||
func (e *spdyStreamExecutor) newConnectionAndStream(ctx context.Context, options StreamOptions) (httpstream.Connection, streamProtocolHandler, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, e.method, e.url.String(), nil)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("error creating request: %v", err)
|
||||
}
|
||||
|
||||
client := http.Client{Transport: e.transport}
|
||||
if e.rejectRedirects {
|
||||
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
|
||||
return fmt.Errorf("redirect not allowed")
|
||||
}
|
||||
}
|
||||
conn, protocol, err := spdy.Negotiate(
|
||||
e.upgrader,
|
||||
&client,
|
||||
req,
|
||||
e.protocols...,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
var streamer streamProtocolHandler
|
||||
|
||||
switch protocol {
|
||||
case remotecommand.StreamProtocolV5Name:
|
||||
streamer = newStreamProtocolV5(options)
|
||||
case remotecommand.StreamProtocolV4Name:
|
||||
streamer = newStreamProtocolV4(options)
|
||||
case remotecommand.StreamProtocolV3Name:
|
||||
streamer = newStreamProtocolV3(options)
|
||||
case remotecommand.StreamProtocolV2Name:
|
||||
streamer = newStreamProtocolV2(options)
|
||||
case "":
|
||||
klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
|
||||
fallthrough
|
||||
case remotecommand.StreamProtocolV1Name:
|
||||
streamer = newStreamProtocolV1(options)
|
||||
}
|
||||
|
||||
return conn, streamer, nil
|
||||
}
|
||||
|
||||
// StreamWithContext opens a protocol streamer to the server and streams until a client closes
|
||||
// the connection or the server disconnects or the context is done.
|
||||
func (e *spdyStreamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
|
||||
conn, streamer, err := e.newConnectionAndStream(ctx, options)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
panicChan := make(chan any, 1)
|
||||
errorChan := make(chan error, 1)
|
||||
go func() {
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
panicChan <- p
|
||||
}
|
||||
}()
|
||||
errorChan <- streamer.stream(conn)
|
||||
}()
|
||||
|
||||
select {
|
||||
case p := <-panicChan:
|
||||
panic(p)
|
||||
case err := <-errorChan:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
35
vendor/k8s.io/client-go/tools/remotecommand/v5.go
generated
vendored
Normal file
35
vendor/k8s.io/client-go/tools/remotecommand/v5.go
generated
vendored
Normal file
@ -0,0 +1,35 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
// streamProtocolV5 add support for V5 of the remote command subprotocol.
|
||||
// For the streamProtocolHandler, this version is the same as V4.
|
||||
type streamProtocolV5 struct {
|
||||
*streamProtocolV4
|
||||
}
|
||||
|
||||
var _ streamProtocolHandler = &streamProtocolV5{}
|
||||
|
||||
func newStreamProtocolV5(options StreamOptions) streamProtocolHandler {
|
||||
return &streamProtocolV5{
|
||||
streamProtocolV4: newStreamProtocolV4(options).(*streamProtocolV4),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *streamProtocolV5) stream(conn streamCreator) error {
|
||||
return p.streamProtocolV4.stream(conn)
|
||||
}
|
502
vendor/k8s.io/client-go/tools/remotecommand/websocket.go
generated
vendored
Normal file
502
vendor/k8s.io/client-go/tools/remotecommand/websocket.go
generated
vendored
Normal file
@ -0,0 +1,502 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package remotecommand
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
gwebsocket "github.com/gorilla/websocket"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/httpstream"
|
||||
"k8s.io/apimachinery/pkg/util/remotecommand"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/transport/websocket"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// writeDeadline defines the time that a write to the websocket connection
|
||||
// must complete by, otherwise an i/o timeout occurs. The writeDeadline
|
||||
// has nothing to do with a response from the other websocket connection
|
||||
// endpoint; only that the message was successfully processed by the
|
||||
// local websocket connection. The typical write deadline within the websocket
|
||||
// library is one second.
|
||||
const writeDeadline = 2 * time.Second
|
||||
|
||||
var (
|
||||
_ Executor = &wsStreamExecutor{}
|
||||
_ streamCreator = &wsStreamCreator{}
|
||||
_ httpstream.Stream = &stream{}
|
||||
|
||||
streamType2streamID = map[string]byte{
|
||||
v1.StreamTypeStdin: remotecommand.StreamStdIn,
|
||||
v1.StreamTypeStdout: remotecommand.StreamStdOut,
|
||||
v1.StreamTypeStderr: remotecommand.StreamStdErr,
|
||||
v1.StreamTypeError: remotecommand.StreamErr,
|
||||
v1.StreamTypeResize: remotecommand.StreamResize,
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
// pingPeriod defines how often a heartbeat "ping" message is sent.
|
||||
pingPeriod = 5 * time.Second
|
||||
// pingReadDeadline defines the time waiting for a response heartbeat
|
||||
// "pong" message before a timeout error occurs for websocket reading.
|
||||
// This duration must always be greater than the "pingPeriod". By defining
|
||||
// this deadline in terms of the ping period, we are essentially saying
|
||||
// we can drop "X-1" (e.g. 3-1=2) pings before firing the timeout.
|
||||
pingReadDeadline = (pingPeriod * 3) + (1 * time.Second)
|
||||
)
|
||||
|
||||
// wsStreamExecutor handles transporting standard shell streams over an httpstream connection.
|
||||
type wsStreamExecutor struct {
|
||||
transport http.RoundTripper
|
||||
upgrader websocket.ConnectionHolder
|
||||
method string
|
||||
url string
|
||||
// requested protocols in priority order (e.g. v5.channel.k8s.io before v4.channel.k8s.io).
|
||||
protocols []string
|
||||
// selected protocol from the handshake process; could be empty string if handshake fails.
|
||||
negotiated string
|
||||
// period defines how often a "ping" heartbeat message is sent to the other endpoint.
|
||||
heartbeatPeriod time.Duration
|
||||
// deadline defines the amount of time before "pong" response must be received.
|
||||
heartbeatDeadline time.Duration
|
||||
}
|
||||
|
||||
func NewWebSocketExecutor(config *restclient.Config, method, url string) (Executor, error) {
|
||||
// Only supports V5 protocol for correct version skew functionality.
|
||||
// Previous api servers will proxy upgrade requests to legacy websocket
|
||||
// servers on container runtimes which support V1-V4. These legacy
|
||||
// websocket servers will not handle the new CLOSE signal.
|
||||
return NewWebSocketExecutorForProtocols(config, method, url, remotecommand.StreamProtocolV5Name)
|
||||
}
|
||||
|
||||
// NewWebSocketExecutorForProtocols allows to execute commands via a WebSocket connection.
|
||||
func NewWebSocketExecutorForProtocols(config *restclient.Config, method, url string, protocols ...string) (Executor, error) {
|
||||
transport, upgrader, err := websocket.RoundTripperFor(config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error creating websocket transports: %v", err)
|
||||
}
|
||||
return &wsStreamExecutor{
|
||||
transport: transport,
|
||||
upgrader: upgrader,
|
||||
method: method,
|
||||
url: url,
|
||||
protocols: protocols,
|
||||
heartbeatPeriod: pingPeriod,
|
||||
heartbeatDeadline: pingReadDeadline,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Deprecated: use StreamWithContext instead to avoid possible resource leaks.
|
||||
// See https://github.com/kubernetes/kubernetes/pull/103177 for details.
|
||||
func (e *wsStreamExecutor) Stream(options StreamOptions) error {
|
||||
return e.StreamWithContext(context.Background(), options)
|
||||
}
|
||||
|
||||
// StreamWithContext upgrades an HTTPRequest to a WebSocket connection, and starts the various
|
||||
// goroutines to implement the necessary streams over the connection. The "options" parameter
|
||||
// defines which streams are requested. Returns an error if one occurred. This method is NOT
|
||||
// safe to run concurrently with the same executor (because of the state stored in the upgrader).
|
||||
func (e *wsStreamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
|
||||
req, err := http.NewRequestWithContext(ctx, e.method, e.url, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conn, err := websocket.Negotiate(e.transport, e.upgrader, req, e.protocols...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if conn == nil {
|
||||
panic(fmt.Errorf("websocket connection is nil"))
|
||||
}
|
||||
defer conn.Close()
|
||||
e.negotiated = conn.Subprotocol()
|
||||
klog.V(4).Infof("The subprotocol is %s", e.negotiated)
|
||||
|
||||
var streamer streamProtocolHandler
|
||||
switch e.negotiated {
|
||||
case remotecommand.StreamProtocolV5Name:
|
||||
streamer = newStreamProtocolV5(options)
|
||||
case remotecommand.StreamProtocolV4Name:
|
||||
streamer = newStreamProtocolV4(options)
|
||||
case remotecommand.StreamProtocolV3Name:
|
||||
streamer = newStreamProtocolV3(options)
|
||||
case remotecommand.StreamProtocolV2Name:
|
||||
streamer = newStreamProtocolV2(options)
|
||||
case "":
|
||||
klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
|
||||
fallthrough
|
||||
case remotecommand.StreamProtocolV1Name:
|
||||
streamer = newStreamProtocolV1(options)
|
||||
}
|
||||
|
||||
panicChan := make(chan any, 1)
|
||||
errorChan := make(chan error, 1)
|
||||
go func() {
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
panicChan <- p
|
||||
}
|
||||
}()
|
||||
creator := newWSStreamCreator(conn)
|
||||
go creator.readDemuxLoop(
|
||||
e.upgrader.DataBufferSize(),
|
||||
e.heartbeatPeriod,
|
||||
e.heartbeatDeadline,
|
||||
)
|
||||
errorChan <- streamer.stream(creator)
|
||||
}()
|
||||
|
||||
select {
|
||||
case p := <-panicChan:
|
||||
panic(p)
|
||||
case err := <-errorChan:
|
||||
return err
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
type wsStreamCreator struct {
|
||||
conn *gwebsocket.Conn
|
||||
// Protects writing to websocket connection; reading is lock-free
|
||||
connWriteLock sync.Mutex
|
||||
// map of stream id to stream; multiple streams read/write the connection
|
||||
streams map[byte]*stream
|
||||
streamsMu sync.Mutex
|
||||
}
|
||||
|
||||
func newWSStreamCreator(conn *gwebsocket.Conn) *wsStreamCreator {
|
||||
return &wsStreamCreator{
|
||||
conn: conn,
|
||||
streams: map[byte]*stream{},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *wsStreamCreator) getStream(id byte) *stream {
|
||||
c.streamsMu.Lock()
|
||||
defer c.streamsMu.Unlock()
|
||||
return c.streams[id]
|
||||
}
|
||||
|
||||
func (c *wsStreamCreator) setStream(id byte, s *stream) {
|
||||
c.streamsMu.Lock()
|
||||
defer c.streamsMu.Unlock()
|
||||
c.streams[id] = s
|
||||
}
|
||||
|
||||
// CreateStream uses id from passed headers to create a stream over "c.conn" connection.
|
||||
// Returns a Stream structure or nil and an error if one occurred.
|
||||
func (c *wsStreamCreator) CreateStream(headers http.Header) (httpstream.Stream, error) {
|
||||
streamType := headers.Get(v1.StreamType)
|
||||
id, ok := streamType2streamID[streamType]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown stream type: %s", streamType)
|
||||
}
|
||||
if s := c.getStream(id); s != nil {
|
||||
return nil, fmt.Errorf("duplicate stream for type %s", streamType)
|
||||
}
|
||||
reader, writer := io.Pipe()
|
||||
s := &stream{
|
||||
headers: headers,
|
||||
readPipe: reader,
|
||||
writePipe: writer,
|
||||
conn: c.conn,
|
||||
connWriteLock: &c.connWriteLock,
|
||||
id: id,
|
||||
}
|
||||
c.setStream(id, s)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// readDemuxLoop is the lock-free reading processor for this endpoint of the websocket
|
||||
// connection. This loop reads the connection, and demultiplexes the data
|
||||
// into one of the individual stream pipes (by checking the stream id). This
|
||||
// loop can *not* be run concurrently, because there can only be one websocket
|
||||
// connection reader at a time (a read mutex would provide no benefit).
|
||||
func (c *wsStreamCreator) readDemuxLoop(bufferSize int, period time.Duration, deadline time.Duration) {
|
||||
// Initialize and start the ping/pong heartbeat.
|
||||
h := newHeartbeat(c.conn, period, deadline)
|
||||
// Set initial timeout for websocket connection reading.
|
||||
if err := c.conn.SetReadDeadline(time.Now().Add(deadline)); err != nil {
|
||||
klog.Errorf("Websocket initial setting read deadline failed %v", err)
|
||||
return
|
||||
}
|
||||
go h.start()
|
||||
// Buffer size must correspond to the same size allocated
|
||||
// for the read buffer during websocket client creation. A
|
||||
// difference can cause incomplete connection reads.
|
||||
readBuffer := make([]byte, bufferSize)
|
||||
for {
|
||||
// NextReader() only returns data messages (BinaryMessage or Text
|
||||
// Message). Even though this call will never return control frames
|
||||
// such as ping, pong, or close, this call is necessary for these
|
||||
// message types to be processed. There can only be one reader
|
||||
// at a time, so this reader loop must *not* be run concurrently;
|
||||
// there is no lock for reading. Calling "NextReader()" before the
|
||||
// current reader has been processed will close the current reader.
|
||||
// If the heartbeat read deadline times out, this "NextReader()" will
|
||||
// return an i/o error, and error handling will clean up.
|
||||
messageType, r, err := c.conn.NextReader()
|
||||
if err != nil {
|
||||
websocketErr, ok := err.(*gwebsocket.CloseError)
|
||||
if ok && websocketErr.Code == gwebsocket.CloseNormalClosure {
|
||||
err = nil // readers will get io.EOF as it's a normal closure
|
||||
} else {
|
||||
err = fmt.Errorf("next reader: %w", err)
|
||||
}
|
||||
c.closeAllStreamReaders(err)
|
||||
return
|
||||
}
|
||||
// All remote command protocols send/receive only binary data messages.
|
||||
if messageType != gwebsocket.BinaryMessage {
|
||||
c.closeAllStreamReaders(fmt.Errorf("unexpected message type: %d", messageType))
|
||||
return
|
||||
}
|
||||
// It's ok to read just a single byte because the underlying library wraps the actual
|
||||
// connection with a buffered reader anyway.
|
||||
_, err = io.ReadFull(r, readBuffer[:1])
|
||||
if err != nil {
|
||||
c.closeAllStreamReaders(fmt.Errorf("read stream id: %w", err))
|
||||
return
|
||||
}
|
||||
streamID := readBuffer[0]
|
||||
s := c.getStream(streamID)
|
||||
if s == nil {
|
||||
klog.Errorf("Unknown stream id %d, discarding message", streamID)
|
||||
continue
|
||||
}
|
||||
for {
|
||||
nr, errRead := r.Read(readBuffer)
|
||||
if nr > 0 {
|
||||
// Write the data to the stream's pipe. This can block.
|
||||
_, errWrite := s.writePipe.Write(readBuffer[:nr])
|
||||
if errWrite != nil {
|
||||
// Pipe must have been closed by the stream user.
|
||||
// Nothing to do, discard the message.
|
||||
break
|
||||
}
|
||||
}
|
||||
if errRead != nil {
|
||||
if errRead == io.EOF {
|
||||
break
|
||||
}
|
||||
c.closeAllStreamReaders(fmt.Errorf("read message: %w", err))
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// closeAllStreamReaders closes readers in all streams.
|
||||
// This unblocks all stream.Read() calls.
|
||||
func (c *wsStreamCreator) closeAllStreamReaders(err error) {
|
||||
c.streamsMu.Lock()
|
||||
defer c.streamsMu.Unlock()
|
||||
for _, s := range c.streams {
|
||||
// Closing writePipe unblocks all readPipe.Read() callers and prevents any future writes.
|
||||
_ = s.writePipe.CloseWithError(err)
|
||||
}
|
||||
}
|
||||
|
||||
type stream struct {
|
||||
headers http.Header
|
||||
readPipe *io.PipeReader
|
||||
writePipe *io.PipeWriter
|
||||
// conn is used for writing directly into the connection.
|
||||
// Is nil after Close() / Reset() to prevent future writes.
|
||||
conn *gwebsocket.Conn
|
||||
// connWriteLock protects conn against concurrent write operations. There must be a single writer and a single reader only.
|
||||
// The mutex is shared across all streams because the underlying connection is shared.
|
||||
connWriteLock *sync.Mutex
|
||||
id byte
|
||||
}
|
||||
|
||||
func (s *stream) Read(p []byte) (n int, err error) {
|
||||
return s.readPipe.Read(p)
|
||||
}
|
||||
|
||||
// Write writes directly to the underlying WebSocket connection.
|
||||
func (s *stream) Write(p []byte) (n int, err error) {
|
||||
klog.V(4).Infof("Write() on stream %d", s.id)
|
||||
defer klog.V(4).Infof("Write() done on stream %d", s.id)
|
||||
s.connWriteLock.Lock()
|
||||
defer s.connWriteLock.Unlock()
|
||||
if s.conn == nil {
|
||||
return 0, fmt.Errorf("write on closed stream %d", s.id)
|
||||
}
|
||||
err = s.conn.SetWriteDeadline(time.Now().Add(writeDeadline))
|
||||
if err != nil {
|
||||
klog.V(7).Infof("Websocket setting write deadline failed %v", err)
|
||||
return 0, err
|
||||
}
|
||||
// Message writer buffers the message data, so we don't need to do that ourselves.
|
||||
// Just write id and the data as two separate writes to avoid allocating an intermediate buffer.
|
||||
w, err := s.conn.NextWriter(gwebsocket.BinaryMessage)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer func() {
|
||||
if w != nil {
|
||||
w.Close()
|
||||
}
|
||||
}()
|
||||
_, err = w.Write([]byte{s.id})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, err = w.Write(p)
|
||||
if err != nil {
|
||||
return n, err
|
||||
}
|
||||
err = w.Close()
|
||||
w = nil
|
||||
return n, err
|
||||
}
|
||||
|
||||
// Close half-closes the stream, indicating this side is finished with the stream.
|
||||
func (s *stream) Close() error {
|
||||
klog.V(4).Infof("Close() on stream %d", s.id)
|
||||
defer klog.V(4).Infof("Close() done on stream %d", s.id)
|
||||
s.connWriteLock.Lock()
|
||||
defer s.connWriteLock.Unlock()
|
||||
if s.conn == nil {
|
||||
return fmt.Errorf("Close() on already closed stream %d", s.id)
|
||||
}
|
||||
// Communicate the CLOSE stream signal to the other websocket endpoint.
|
||||
err := s.conn.WriteMessage(gwebsocket.BinaryMessage, []byte{remotecommand.StreamClose, s.id})
|
||||
s.conn = nil
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *stream) Reset() error {
|
||||
klog.V(4).Infof("Reset() on stream %d", s.id)
|
||||
defer klog.V(4).Infof("Reset() done on stream %d", s.id)
|
||||
s.Close()
|
||||
return s.writePipe.Close()
|
||||
}
|
||||
|
||||
func (s *stream) Headers() http.Header {
|
||||
return s.headers
|
||||
}
|
||||
|
||||
func (s *stream) Identifier() uint32 {
|
||||
return uint32(s.id)
|
||||
}
|
||||
|
||||
// heartbeat encasulates data necessary for the websocket ping/pong heartbeat. This
|
||||
// heartbeat works by setting a read deadline on the websocket connection, then
|
||||
// pushing this deadline into the future for every successful heartbeat. If the
|
||||
// heartbeat "pong" fails to respond within the deadline, then the "NextReader()" call
|
||||
// inside the "readDemuxLoop" will return an i/o error prompting a connection close
|
||||
// and cleanup.
|
||||
type heartbeat struct {
|
||||
conn *gwebsocket.Conn
|
||||
// period defines how often a "ping" heartbeat message is sent to the other endpoint
|
||||
period time.Duration
|
||||
// closing the "closer" channel will clean up the heartbeat timers
|
||||
closer chan struct{}
|
||||
// optional data to send with "ping" message
|
||||
message []byte
|
||||
// optionally received data message with "pong" message, same as sent with ping
|
||||
pongMessage []byte
|
||||
}
|
||||
|
||||
// newHeartbeat creates heartbeat structure encapsulating fields necessary to
|
||||
// run the websocket connection ping/pong mechanism and sets up handlers on
|
||||
// the websocket connection.
|
||||
func newHeartbeat(conn *gwebsocket.Conn, period time.Duration, deadline time.Duration) *heartbeat {
|
||||
h := &heartbeat{
|
||||
conn: conn,
|
||||
period: period,
|
||||
closer: make(chan struct{}),
|
||||
}
|
||||
// Set up handler for receiving returned "pong" message from other endpoint
|
||||
// by pushing the read deadline into the future. The "msg" received could
|
||||
// be empty.
|
||||
h.conn.SetPongHandler(func(msg string) error {
|
||||
// Push the read deadline into the future.
|
||||
klog.V(8).Infof("Pong message received (%s)--resetting read deadline", msg)
|
||||
err := h.conn.SetReadDeadline(time.Now().Add(deadline))
|
||||
if err != nil {
|
||||
klog.Errorf("Websocket setting read deadline failed %v", err)
|
||||
return err
|
||||
}
|
||||
if len(msg) > 0 {
|
||||
h.pongMessage = []byte(msg)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
// Set up handler to cleanup timers when this endpoint receives "Close" message.
|
||||
closeHandler := h.conn.CloseHandler()
|
||||
h.conn.SetCloseHandler(func(code int, text string) error {
|
||||
close(h.closer)
|
||||
return closeHandler(code, text)
|
||||
})
|
||||
return h
|
||||
}
|
||||
|
||||
// setMessage is optional data sent with "ping" heartbeat. According to the websocket RFC
|
||||
// this data sent with "ping" message should be returned in "pong" message.
|
||||
func (h *heartbeat) setMessage(msg string) {
|
||||
h.message = []byte(msg)
|
||||
}
|
||||
|
||||
// start the heartbeat by setting up necesssary handlers and looping by sending "ping"
|
||||
// message every "period" until the "closer" channel is closed.
|
||||
func (h *heartbeat) start() {
|
||||
// Loop to continually send "ping" message through websocket connection every "period".
|
||||
t := time.NewTicker(h.period)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-h.closer:
|
||||
klog.V(8).Infof("closed channel--returning")
|
||||
return
|
||||
case <-t.C:
|
||||
// "WriteControl" does not need to be protected by a mutex. According to
|
||||
// gorilla/websockets library docs: "The Close and WriteControl methods can
|
||||
// be called concurrently with all other methods."
|
||||
if err := h.conn.WriteControl(gwebsocket.PingMessage, h.message, time.Now().Add(writeDeadline)); err == nil {
|
||||
klog.V(8).Infof("Websocket Ping succeeeded")
|
||||
} else {
|
||||
klog.Errorf("Websocket Ping failed: %v", err)
|
||||
if errors.Is(err, gwebsocket.ErrCloseSent) {
|
||||
// we continue because c.conn.CloseChan will manage closing the connection already
|
||||
continue
|
||||
} else if e, ok := err.(net.Error); ok && e.Timeout() {
|
||||
// Continue, in case this is a transient failure.
|
||||
// c.conn.CloseChan above will tell us when the connection is
|
||||
// actually closed.
|
||||
// If Temporary function hadn't been deprecated, we would have used it.
|
||||
// But most of temporary errors are timeout errors anyway.
|
||||
continue
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user