protobuf: remove gogoproto

Removes gogo/protobuf from buildx and updates to a version of
moby/buildkit where gogo is removed.

This also changes how the proto files are generated. This is because
newer versions of protobuf are more strict about name conflicts. If two
files have the same name (even if they are relative paths) and are used
in different protoc commands, they'll conflict in the registry.

Since protobuf file generation doesn't work very well with
`paths=source_relative`, this removes the `go:generate` expression and
just relies on the dockerfile to perform the generation.

Signed-off-by: Jonathan A. Sternberg <jonathan.sternberg@docker.com>
This commit is contained in:
Jonathan A. Sternberg
2024-10-02 15:51:59 -05:00
parent 8e47387d02
commit b35a0f4718
592 changed files with 46288 additions and 110420 deletions

View File

@ -32,6 +32,7 @@ import (
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/status"
)
@ -148,9 +149,9 @@ type dataFrame struct {
streamID uint32
endStream bool
h []byte
d []byte
reader mem.Reader
// onEachWrite is called every time
// a part of d is written out.
// a part of data is written out.
onEachWrite func()
}
@ -193,7 +194,7 @@ type goAway struct {
code http2.ErrCode
debugData []byte
headsUp bool
closeConn error // if set, loopyWriter will exit, resulting in conn closure
closeConn error // if set, loopyWriter will exit with this error
}
func (*goAway) isTransportResponseFrame() bool { return false }
@ -289,18 +290,22 @@ func (l *outStreamList) dequeue() *outStream {
}
// controlBuffer is a way to pass information to loopy.
// Information is passed as specific struct types called control frames.
// A control frame not only represents data, messages or headers to be sent out
// but can also be used to instruct loopy to update its internal state.
// It shouldn't be confused with an HTTP2 frame, although some of the control frames
// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
//
// Information is passed as specific struct types called control frames. A
// control frame not only represents data, messages or headers to be sent out
// but can also be used to instruct loopy to update its internal state. It
// shouldn't be confused with an HTTP2 frame, although some of the control
// frames like dataFrame and headerFrame do go out on wire as HTTP2 frames.
type controlBuffer struct {
ch chan struct{}
done <-chan struct{}
wakeupCh chan struct{} // Unblocks readers waiting for something to read.
done <-chan struct{} // Closed when the transport is done.
// Mutex guards all the fields below, except trfChan which can be read
// atomically without holding mu.
mu sync.Mutex
consumerWaiting bool
list *itemList
err error
consumerWaiting bool // True when readers are blocked waiting for new data.
closed bool // True when the controlbuf is finished.
list *itemList // List of queued control frames.
// transportResponseFrames counts the number of queued items that represent
// the response of an action initiated by the peer. trfChan is created
@ -308,47 +313,59 @@ type controlBuffer struct {
// closed and nilled when transportResponseFrames drops below the
// threshold. Both fields are protected by mu.
transportResponseFrames int
trfChan atomic.Value // chan struct{}
trfChan atomic.Pointer[chan struct{}]
}
func newControlBuffer(done <-chan struct{}) *controlBuffer {
return &controlBuffer{
ch: make(chan struct{}, 1),
list: &itemList{},
done: done,
wakeupCh: make(chan struct{}, 1),
list: &itemList{},
done: done,
}
}
// throttle blocks if there are too many incomingSettings/cleanupStreams in the
// controlbuf.
// throttle blocks if there are too many frames in the control buf that
// represent the response of an action initiated by the peer, like
// incomingSettings cleanupStreams etc.
func (c *controlBuffer) throttle() {
ch, _ := c.trfChan.Load().(chan struct{})
if ch != nil {
if ch := c.trfChan.Load(); ch != nil {
select {
case <-ch:
case <-(*ch):
case <-c.done:
}
}
}
// put adds an item to the controlbuf.
func (c *controlBuffer) put(it cbItem) error {
_, err := c.executeAndPut(nil, it)
return err
}
func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, error) {
var wakeUp bool
// executeAndPut runs f, and if the return value is true, adds the given item to
// the controlbuf. The item could be nil, in which case, this method simply
// executes f and does not add the item to the controlbuf.
//
// The first return value indicates whether the item was successfully added to
// the control buffer. A non-nil error, specifically ErrConnClosing, is returned
// if the control buffer is already closed.
func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) {
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return false, c.err
defer c.mu.Unlock()
if c.closed {
return false, ErrConnClosing
}
if f != nil {
if !f(it) { // f wasn't successful
c.mu.Unlock()
if !f() { // f wasn't successful
return false, nil
}
}
if it == nil {
return true, nil
}
var wakeUp bool
if c.consumerWaiting {
wakeUp = true
c.consumerWaiting = false
@ -359,98 +376,102 @@ func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, err
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are adding the frame that puts us over the threshold; create
// a throttling channel.
c.trfChan.Store(make(chan struct{}))
ch := make(chan struct{})
c.trfChan.Store(&ch)
}
}
c.mu.Unlock()
if wakeUp {
select {
case c.ch <- struct{}{}:
case c.wakeupCh <- struct{}{}:
default:
}
}
return true, nil
}
// Note argument f should never be nil.
func (c *controlBuffer) execute(f func(it any) bool, it any) (bool, error) {
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return false, c.err
}
if !f(it) { // f wasn't successful
c.mu.Unlock()
return false, nil
}
c.mu.Unlock()
return true, nil
}
// get returns the next control frame from the control buffer. If block is true
// **and** there are no control frames in the control buffer, the call blocks
// until one of the conditions is met: there is a frame to return or the
// transport is closed.
func (c *controlBuffer) get(block bool) (any, error) {
for {
c.mu.Lock()
if c.err != nil {
frame, err := c.getOnceLocked()
if frame != nil || err != nil || !block {
// If we read a frame or an error, we can return to the caller. The
// call to getOnceLocked() returns a nil frame and a nil error if
// there is nothing to read, and in that case, if the caller asked
// us not to block, we can return now as well.
c.mu.Unlock()
return nil, c.err
}
if !c.list.isEmpty() {
h := c.list.dequeue().(cbItem)
if h.isTransportResponseFrame() {
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are removing the frame that put us over the
// threshold; close and clear the throttling channel.
ch := c.trfChan.Load().(chan struct{})
close(ch)
c.trfChan.Store((chan struct{})(nil))
}
c.transportResponseFrames--
}
c.mu.Unlock()
return h, nil
}
if !block {
c.mu.Unlock()
return nil, nil
return frame, err
}
c.consumerWaiting = true
c.mu.Unlock()
// Release the lock above and wait to be woken up.
select {
case <-c.ch:
case <-c.wakeupCh:
case <-c.done:
return nil, errors.New("transport closed by client")
}
}
}
// Callers must not use this method, but should instead use get().
//
// Caller must hold c.mu.
func (c *controlBuffer) getOnceLocked() (any, error) {
if c.closed {
return false, ErrConnClosing
}
if c.list.isEmpty() {
return nil, nil
}
h := c.list.dequeue().(cbItem)
if h.isTransportResponseFrame() {
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are removing the frame that put us over the
// threshold; close and clear the throttling channel.
ch := c.trfChan.Swap(nil)
close(*ch)
}
c.transportResponseFrames--
}
return h, nil
}
// finish closes the control buffer, cleaning up any streams that have queued
// header frames. Once this method returns, no more frames can be added to the
// control buffer, and attempts to do so will return ErrConnClosing.
func (c *controlBuffer) finish() {
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
defer c.mu.Unlock()
if c.closed {
return
}
c.err = ErrConnClosing
c.closed = true
// There may be headers for streams in the control buffer.
// These streams need to be cleaned out since the transport
// is still not aware of these yet.
for head := c.list.dequeueAll(); head != nil; head = head.next {
hdr, ok := head.it.(*headerFrame)
if !ok {
continue
}
if hdr.onOrphaned != nil { // It will be nil on the server-side.
hdr.onOrphaned(ErrConnClosing)
switch v := head.it.(type) {
case *headerFrame:
if v.onOrphaned != nil { // It will be nil on the server-side.
v.onOrphaned(ErrConnClosing)
}
case *dataFrame:
_ = v.reader.Close()
}
}
// In case throttle() is currently in flight, it needs to be unblocked.
// Otherwise, the transport may not close, since the transport is closed by
// the reader encountering the connection error.
ch, _ := c.trfChan.Load().(chan struct{})
ch := c.trfChan.Swap(nil)
if ch != nil {
close(ch)
close(*ch)
}
c.trfChan.Store((chan struct{})(nil))
c.mu.Unlock()
}
type side int
@ -466,7 +487,7 @@ const (
// stream maintains a queue of data frames; as loopy receives data frames
// it gets added to the queue of the relevant stream.
// Loopy goes over this list of active streams by processing one node every iteration,
// thereby closely resemebling to a round-robin scheduling over all streams. While
// thereby closely resembling a round-robin scheduling over all streams. While
// processing a stream, loopy writes out data bytes from this stream capped by the min
// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
type loopyWriter struct {
@ -490,26 +511,29 @@ type loopyWriter struct {
draining bool
conn net.Conn
logger *grpclog.PrefixLogger
bufferPool mem.BufferPool
// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)
}
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger) *loopyWriter {
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
var buf bytes.Buffer
l := &loopyWriter{
side: s,
cbuf: cbuf,
sendQuota: defaultWindowSize,
oiws: defaultWindowSize,
estdStreams: make(map[uint32]*outStream),
activeStreams: newOutStreamList(),
framer: fr,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
logger: logger,
side: s,
cbuf: cbuf,
sendQuota: defaultWindowSize,
oiws: defaultWindowSize,
estdStreams: make(map[uint32]*outStream),
activeStreams: newOutStreamList(),
framer: fr,
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
bdpEst: bdpEst,
conn: conn,
logger: logger,
ssGoAwayHandler: goAwayHandler,
bufferPool: bufferPool,
}
return l
}
@ -767,6 +791,11 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
// not be established yet.
delete(l.estdStreams, c.streamID)
str.deleteSelf()
for head := str.itl.dequeueAll(); head != nil; head = head.next {
if df, ok := head.it.(*dataFrame); ok {
_ = df.reader.Close()
}
}
}
if c.rst { // If RST_STREAM needs to be sent.
if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
@ -902,16 +931,18 @@ func (l *loopyWriter) processData() (bool, error) {
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
// A data item is represented by a dataFrame, since it later translates into
// multiple HTTP2 data frames.
// Every dataFrame has two buffers; h that keeps grpc-message header and d that is actual data.
// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
// maximum possible HTTP2 frame size.
// Every dataFrame has two buffers; h that keeps grpc-message header and data
// that is the actual message. As an optimization to keep wire traffic low, data
// from data is copied to h to make as big as the maximum possible HTTP2 frame
// size.
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
return false, err
}
str.itl.dequeue() // remove the empty data item from stream
_ = dataItem.reader.Close()
if str.itl.isEmpty() {
str.state = empty
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
@ -926,9 +957,7 @@ func (l *loopyWriter) processData() (bool, error) {
}
return false, nil
}
var (
buf []byte
)
// Figure out the maximum size we can send
maxSize := http2MaxFrameLen
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
@ -942,43 +971,50 @@ func (l *loopyWriter) processData() (bool, error) {
}
// Compute how much of the header and data we can send within quota and max frame length
hSize := min(maxSize, len(dataItem.h))
dSize := min(maxSize-hSize, len(dataItem.d))
if hSize != 0 {
if dSize == 0 {
buf = dataItem.h
} else {
// We can add some data to grpc message header to distribute bytes more equally across frames.
// Copy on the stack to avoid generating garbage
var localBuf [http2MaxFrameLen]byte
copy(localBuf[:hSize], dataItem.h)
copy(localBuf[hSize:], dataItem.d[:dSize])
buf = localBuf[:hSize+dSize]
}
} else {
buf = dataItem.d
}
dSize := min(maxSize-hSize, dataItem.reader.Remaining())
remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
size := hSize + dSize
var buf *[]byte
if hSize != 0 && dSize == 0 {
buf = &dataItem.h
} else {
// Note: this is only necessary because the http2.Framer does not support
// partially writing a frame, so the sequence must be materialized into a buffer.
// TODO: Revisit once https://github.com/golang/go/issues/66655 is addressed.
pool := l.bufferPool
if pool == nil {
// Note that this is only supposed to be nil in tests. Otherwise, stream is
// always initialized with a BufferPool.
pool = mem.DefaultBufferPool()
}
buf = pool.Get(size)
defer pool.Put(buf)
copy((*buf)[:hSize], dataItem.h)
_, _ = dataItem.reader.Read((*buf)[hSize:])
}
// Now that outgoing flow controls are checked we can replenish str's write quota
str.wq.replenish(size)
var endStream bool
// If this is the last data message on this stream and all of it can be written in this iteration.
if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
if dataItem.endStream && remainingBytes == 0 {
endStream = true
}
if dataItem.onEachWrite != nil {
dataItem.onEachWrite()
}
if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
if err := l.framer.fr.WriteData(dataItem.streamID, endStream, (*buf)[:size]); err != nil {
return false, err
}
str.bytesOutStanding += size
l.sendQuota -= uint32(size)
dataItem.h = dataItem.h[hSize:]
dataItem.d = dataItem.d[dSize:]
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
if remainingBytes == 0 { // All the data from that message was written out.
_ = dataItem.reader.Close()
str.itl.dequeue()
}
if str.itl.isEmpty() {

View File

@ -24,7 +24,6 @@
package transport
import (
"bytes"
"context"
"errors"
"fmt"
@ -40,6 +39,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
@ -50,15 +50,11 @@ import (
// NewServerHandlerTransport returns a ServerTransport handling gRPC from
// inside an http.Handler, or writes an HTTP error to w and returns an error.
// It requires that the http Server supports HTTP/2.
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
if r.ProtoMajor != 2 {
msg := "gRPC requires HTTP/2"
http.Error(w, msg, http.StatusBadRequest)
return nil, errors.New(msg)
}
if r.Method != "POST" {
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler, bufferPool mem.BufferPool) (ServerTransport, error) {
if r.Method != http.MethodPost {
w.Header().Set("Allow", http.MethodPost)
msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
http.Error(w, msg, http.StatusBadRequest)
http.Error(w, msg, http.StatusMethodNotAllowed)
return nil, errors.New(msg)
}
contentType := r.Header.Get("Content-Type")
@ -69,6 +65,11 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
http.Error(w, msg, http.StatusUnsupportedMediaType)
return nil, errors.New(msg)
}
if r.ProtoMajor != 2 {
msg := "gRPC requires HTTP/2"
http.Error(w, msg, http.StatusHTTPVersionNotSupported)
return nil, errors.New(msg)
}
if _, ok := w.(http.Flusher); !ok {
msg := "gRPC requires a ResponseWriter supporting http.Flusher"
http.Error(w, msg, http.StatusInternalServerError)
@ -97,6 +98,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []s
contentType: contentType,
contentSubtype: contentSubtype,
stats: stats,
bufferPool: bufferPool,
}
st.logger = prefixLoggerForServerHandlerTransport(st)
@ -170,6 +172,8 @@ type serverHandlerTransport struct {
stats []stats.Handler
logger *grpclog.PrefixLogger
bufferPool mem.BufferPool
}
func (ht *serverHandlerTransport) Close(err error) {
@ -243,6 +247,7 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
}
s.hdrMu.Lock()
defer s.hdrMu.Unlock()
if p := st.Proto(); p != nil && len(p.Details) > 0 {
delete(s.trailer, grpcStatusDetailsBinHeader)
stBytes, err := proto.Marshal(p)
@ -267,7 +272,6 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
}
}
}
s.hdrMu.Unlock()
})
if err == nil { // transport has not been closed
@ -329,16 +333,28 @@ func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
s.hdrMu.Unlock()
}
func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error {
// Always take a reference because otherwise there is no guarantee the data will
// be available after this function returns. This is what callers to Write
// expect.
data.Ref()
headersWritten := s.updateHeaderSent()
return ht.do(func() {
err := ht.do(func() {
defer data.Free()
if !headersWritten {
ht.writePendingHeaders(s)
}
ht.rw.Write(hdr)
ht.rw.Write(data)
for _, b := range data {
_, _ = ht.rw.Write(b.ReadOnlyData())
}
ht.rw.(http.Flusher).Flush()
})
if err != nil {
data.Free()
return err
}
return nil
}
func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
@ -405,7 +421,7 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
windowHandler: func(int) {},
}
@ -414,21 +430,19 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
go func() {
defer close(readerDone)
// TODO: minimize garbage, optimize recvBuffer code/ownership
const readSize = 8196
for buf := make([]byte, readSize); ; {
n, err := req.Body.Read(buf)
for {
buf := ht.bufferPool.Get(http2MaxFrameLen)
n, err := req.Body.Read(*buf)
if n > 0 {
s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
buf = buf[n:]
*buf = (*buf)[:n]
s.buf.put(recvMsg{buffer: mem.NewBuffer(buf, ht.bufferPool)})
} else {
ht.bufferPool.Put(buf)
}
if err != nil {
s.buf.put(recvMsg{err: mapRecvMsgError(err)})
return
}
if len(buf) == 0 {
buf = make([]byte, readSize)
}
}
}()

View File

@ -47,6 +47,7 @@ import (
isyscall "google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
@ -59,6 +60,8 @@ import (
// atomically.
var clientConnectionCounter uint64
var goAwayLoopyWriterTimeout = 5 * time.Second
var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
// http2Client implements the ClientTransport interface with HTTP2.
@ -114,11 +117,11 @@ type http2Client struct {
streamQuota int64
streamsQuotaAvailable chan struct{}
waitingStreams uint32
nextID uint32
registeredCompressors string
// Do not access controlBuf with mu held.
mu sync.Mutex // guard the following variables
nextID uint32
state transportState
activeStreams map[uint32]*Stream
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
@ -140,13 +143,11 @@ type http2Client struct {
// variable.
kpDormant bool
// Fields below are for channelz metric collection.
channelzID *channelz.Identifier
czData *channelzData
channelz *channelz.Socket
onClose func(GoAwayReason)
bufferPool *bufferPool
bufferPool mem.BufferPool
connectionID uint64
logger *grpclog.PrefixLogger
@ -231,7 +232,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
}(conn)
// The following defer and goroutine monitor the connectCtx for cancelation
// The following defer and goroutine monitor the connectCtx for cancellation
// and deadline. On context expiration, the connection is hard closed and
// this function will naturally fail as a result. Otherwise, the defer
// waits for the goroutine to exit to prevent the context from being
@ -319,6 +320,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if opts.MaxHeaderListSize != nil {
maxHeaderListSize = *opts.MaxHeaderListSize
}
t := &http2Client{
ctx: ctx,
ctxDone: ctx.Done(), // Cache Done chan.
@ -346,11 +348,25 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
streamsQuotaAvailable: make(chan struct{}, 1),
czData: new(channelzData),
keepaliveEnabled: keepaliveEnabled,
bufferPool: newBufferPool(),
bufferPool: opts.BufferPool,
onClose: onClose,
}
var czSecurity credentials.ChannelzSecurityValue
if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
czSecurity = au.GetSecurityValue()
}
t.channelz = channelz.RegisterSocket(
&channelz.Socket{
SocketType: channelz.SocketTypeNormal,
Parent: opts.ChannelzParent,
SocketMetrics: channelz.SocketMetrics{},
EphemeralMetrics: t.socketMetrics,
LocalAddr: t.localAddr,
RemoteAddr: t.remoteAddr,
SocketOptions: channelz.GetSocketOption(t.conn),
Security: czSecurity,
})
t.logger = prefixLoggerForClientTransport(t)
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
@ -381,10 +397,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
}
sh.HandleConn(t.ctx, connBegin)
}
t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
if err != nil {
return nil, err
}
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
go t.keepalive()
@ -399,10 +411,10 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
readerErrCh := make(chan error, 1)
go t.reader(readerErrCh)
defer func() {
if err == nil {
err = <-readerErrCh
}
if err != nil {
// writerDone should be closed since the loopy goroutine
// wouldn't have started in the case this function returns an error.
close(t.writerDone)
t.Close(err)
}
}()
@ -449,8 +461,12 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
if err := t.framer.writer.Flush(); err != nil {
return nil, err
}
// Block until the server preface is received successfully or an error occurs.
if err = <-readerErrCh; err != nil {
return nil, err
}
go func() {
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
if err := t.loopy.run(); !isIOError(err) {
// Immediately close the connection, as the loopy writer returns
// when there are no more active streams and we were draining (the
@ -491,7 +507,6 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
closeStream: func(err error) {
t.CloseStream(s, err)
},
freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@ -508,6 +523,17 @@ func (t *http2Client) getPeer() *peer.Peer {
}
}
// OutgoingGoAwayHandler writes a GOAWAY to the connection. Always returns (false, err) as we want the GoAway
// to be the last frame loopy writes to the transport.
func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {
t.mu.Lock()
defer t.mu.Unlock()
if err := t.framer.fr.WriteGoAway(t.nextID-2, http2.ErrCodeNo, g.debugData); err != nil {
return false, err
}
return false, g.closeConn
}
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
aud := t.createAudience(callHdr)
ri := credentials.RequestInfo{
@ -756,8 +782,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return ErrConnClosing
}
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
t.channelz.SocketMetrics.StreamsStarted.Add(1)
t.channelz.SocketMetrics.LastLocalStreamCreatedTimestamp.Store(time.Now().UnixNano())
}
// If the keepalive goroutine has gone dormant, wake it up.
if t.kpDormant {
@ -772,7 +798,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
firstTry := true
var ch chan struct{}
transportDrainRequired := false
checkForStreamQuota := func(it any) bool {
checkForStreamQuota := func() bool {
if t.streamQuota <= 0 { // Can go negative if server decreases it.
if firstTry {
t.waitingStreams++
@ -784,23 +810,24 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
t.waitingStreams--
}
t.streamQuota--
h := it.(*headerFrame)
h.streamID = t.nextID
t.nextID += 2
// Drain client transport if nextID > MaxStreamID which signals gRPC that
// the connection is closed and a new one must be created for subsequent RPCs.
transportDrainRequired = t.nextID > MaxStreamID
s.id = h.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.mu.Lock()
if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
t.mu.Unlock()
return false // Don't create a stream if the transport is already closed.
}
hdr.streamID = t.nextID
t.nextID += 2
// Drain client transport if nextID > MaxStreamID which signals gRPC that
// the connection is closed and a new one must be created for subsequent RPCs.
transportDrainRequired = t.nextID > MaxStreamID
s.id = hdr.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
t.activeStreams[s.id] = s
t.mu.Unlock()
if t.streamQuota > 0 && t.waitingStreams > 0 {
select {
case t.streamsQuotaAvailable <- struct{}{}:
@ -810,13 +837,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return true
}
var hdrListSizeErr error
checkForHeaderListSize := func(it any) bool {
checkForHeaderListSize := func() bool {
if t.maxSendHeaderListSize == nil {
return true
}
hdrFrame := it.(*headerFrame)
var sz int64
for _, f := range hdrFrame.hf {
for _, f := range hdr.hf {
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
return false
@ -825,8 +851,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
return true
}
for {
success, err := t.controlBuf.executeAndPut(func(it any) bool {
return checkForHeaderListSize(it) && checkForStreamQuota(it)
success, err := t.controlBuf.executeAndPut(func() bool {
return checkForHeaderListSize() && checkForStreamQuota()
}, hdr)
if err != nil {
// Connection closed.
@ -928,16 +954,16 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
t.mu.Unlock()
if channelz.IsOn() {
if eosReceived {
atomic.AddInt64(&t.czData.streamsSucceeded, 1)
t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
} else {
atomic.AddInt64(&t.czData.streamsFailed, 1)
t.channelz.SocketMetrics.StreamsFailed.Add(1)
}
}
},
rst: rst,
rstCode: rstCode,
}
addBackStreamQuota := func(any) bool {
addBackStreamQuota := func() bool {
t.streamQuota++
if t.streamQuota > 0 && t.waitingStreams > 0 {
select {
@ -957,8 +983,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// Close kicks off the shutdown process of the transport. This should be called
// only once on a transport. Once it is called, the transport should not be
// accessed any more.
// accessed anymore.
func (t *http2Client) Close(err error) {
t.conn.SetWriteDeadline(time.Now().Add(time.Second * 10))
t.mu.Lock()
// Make sure we only close once.
if t.state == closing {
@ -982,10 +1009,23 @@ func (t *http2Client) Close(err error) {
t.kpDormancyCond.Signal()
}
t.mu.Unlock()
t.controlBuf.finish()
// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
// connection. See https://httpwg.org/specs/rfc7540.html#GOAWAY. It
// also waits for loopyWriter to be closed with a timer to avoid the
// long blocking in case the connection is blackholed, i.e. TCP is
// just stuck.
t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err})
timer := time.NewTimer(goAwayLoopyWriterTimeout)
defer timer.Stop()
select {
case <-t.writerDone: // success
case <-timer.C:
t.logger.Infof("Failed to write a GOAWAY frame as part of connection close after %s. Giving up and closing the transport.", goAwayLoopyWriterTimeout)
}
t.cancel()
t.conn.Close()
channelz.RemoveEntry(t.channelzID)
channelz.RemoveEntry(t.channelz.ID)
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
@ -1038,27 +1078,36 @@ func (t *http2Client) GracefulClose() {
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
func (t *http2Client) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error {
reader := data.Reader()
if opts.Last {
// If it's the last message, update stream state.
if !s.compareAndSwapState(streamActive, streamWriteDone) {
_ = reader.Close()
return errStreamDone
}
} else if s.getState() != streamActive {
_ = reader.Close()
return errStreamDone
}
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
h: hdr,
d: data,
reader: reader,
}
if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota.
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
_ = reader.Close()
return err
}
}
return t.controlBuf.put(df)
if err := t.controlBuf.put(df); err != nil {
_ = reader.Close()
return err
}
return nil
}
func (t *http2Client) getStream(f http2.Frame) *Stream {
@ -1090,7 +1139,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
// for the transport and the stream based on the current bdp
// estimation.
func (t *http2Client) updateFlowControl(n uint32) {
updateIWS := func(any) bool {
updateIWS := func() bool {
t.initialWindowSize = int32(n)
t.mu.Lock()
for _, s := range t.activeStreams {
@ -1163,10 +1212,13 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
buffer := t.bufferPool.get()
buffer.Reset()
buffer.Write(f.Data())
s.write(recvMsg{buffer: buffer})
pool := t.bufferPool
if pool == nil {
// Note that this is only supposed to be nil in tests. Otherwise, stream is
// always initialized with a BufferPool.
pool = mem.DefaultBufferPool()
}
s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
}
}
// The server has closed the stream without sending trailers. Record that
@ -1195,7 +1247,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
if statusCode == codes.Canceled {
if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
// Our deadline was already exceeded, and that was likely the cause
// of this cancelation. Alter the status code accordingly.
// of this cancellation. Alter the status code accordingly.
statusCode = codes.DeadlineExceeded
}
}
@ -1243,7 +1295,7 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
}
updateFuncs = append(updateFuncs, updateStreamQuota)
}
t.controlBuf.executeAndPut(func(any) bool {
t.controlBuf.executeAndPut(func() bool {
for _, f := range updateFuncs {
f()
}
@ -1280,7 +1332,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
id := f.LastStreamID
if id > 0 && id%2 == 0 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", id))
return
}
// A client can receive multiple GoAways from the server (see
@ -1708,7 +1760,7 @@ func (t *http2Client) keepalive() {
// keepalive timer expired. In both cases, we need to send a ping.
if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
}
t.controlBuf.put(p)
timeoutLeft = t.kp.Timeout
@ -1738,40 +1790,23 @@ func (t *http2Client) GoAway() <-chan struct{} {
return t.goAway
}
func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
s := channelz.SocketInternalMetric{
StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(t.fc.getSize()),
SocketOptions: channelz.GetSocketOption(t.conn),
LocalAddr: t.localAddr,
RemoteAddr: t.remoteAddr,
// RemoteName :
func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics {
return &channelz.EphemeralSocketMetrics{
LocalFlowControlWindow: int64(t.fc.getSize()),
RemoteFlowControlWindow: t.getOutFlowWindow(),
}
if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
s.Security = au.GetSecurityValue()
}
s.RemoteFlowControlWindow = t.getOutFlowWindow()
return &s
}
func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
func (t *http2Client) IncrMsgSent() {
atomic.AddInt64(&t.czData.msgSent, 1)
atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
t.channelz.SocketMetrics.MessagesSent.Add(1)
t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
}
func (t *http2Client) IncrMsgRecv() {
atomic.AddInt64(&t.czData.msgRecv, 1)
atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
t.channelz.SocketMetrics.MessagesReceived.Add(1)
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
}
func (t *http2Client) getOutFlowWindow() int64 {

View File

@ -25,6 +25,7 @@ import (
"fmt"
"io"
"math"
"math/rand"
"net"
"net/http"
"strconv"
@ -38,12 +39,12 @@ import (
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/mem"
"google.golang.org/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@ -118,9 +119,8 @@ type http2Server struct {
idle time.Time
// Fields below are for channelz metric collection.
channelzID *channelz.Identifier
czData *channelzData
bufferPool *bufferPool
channelz *channelz.Socket
bufferPool mem.BufferPool
connectionID uint64
@ -262,9 +262,24 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
idle: time.Now(),
kep: kep,
initialWindowSize: iwz,
czData: new(channelzData),
bufferPool: newBufferPool(),
bufferPool: config.BufferPool,
}
var czSecurity credentials.ChannelzSecurityValue
if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
czSecurity = au.GetSecurityValue()
}
t.channelz = channelz.RegisterSocket(
&channelz.Socket{
SocketType: channelz.SocketTypeNormal,
Parent: config.ChannelzParent,
SocketMetrics: channelz.SocketMetrics{},
EphemeralMetrics: t.socketMetrics,
LocalAddr: t.peer.LocalAddr,
RemoteAddr: t.peer.Addr,
SocketOptions: channelz.GetSocketOption(t.conn),
Security: czSecurity,
},
)
t.logger = prefixLoggerForServerTransport(t)
t.controlBuf = newControlBuffer(t.done)
@ -274,10 +289,6 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
updateFlowControl: t.updateFlowControl,
}
}
t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.peer.Addr, t.peer.LocalAddr))
if err != nil {
return nil, err
}
t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
t.framer.writer.Flush()
@ -320,8 +331,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.handleSettings(sf)
go func() {
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler, t.bufferPool)
err := t.loopy.run()
close(t.loopyWriterDone)
if !isIOError(err) {
@ -334,9 +344,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
// closed, would lead to a TCP RST instead of FIN, and the client
// encountering errors. For more info:
// https://github.com/grpc/grpc-go/issues/5358
timer := time.NewTimer(time.Second)
defer timer.Stop()
select {
case <-t.readerDone:
case <-time.After(time.Second):
case <-timer.C:
}
t.conn.Close()
}
@ -592,8 +604,8 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
}
t.mu.Unlock()
if channelz.IsOn() {
atomic.AddInt64(&t.czData.streamsStarted, 1)
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
t.channelz.SocketMetrics.StreamsStarted.Add(1)
t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
}
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
@ -602,10 +614,9 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
freeBuffer: t.bufferPool.put,
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
@ -658,8 +669,14 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if err := t.operateHeaders(ctx, frame, handle); err != nil {
t.Close(err)
break
// Any error processing client headers, e.g. invalid stream ID,
// is considered a protocol violation.
t.controlBuf.put(&goAway{
code: http2.ErrCodeProtocol,
debugData: []byte(err.Error()),
closeConn: err,
})
continue
}
case *http2.DataFrame:
t.handleData(frame)
@ -796,10 +813,13 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
buffer := t.bufferPool.get()
buffer.Reset()
buffer.Write(f.Data())
s.write(recvMsg{buffer: buffer})
pool := t.bufferPool
if pool == nil {
// Note that this is only supposed to be nil in tests. Otherwise, stream is
// always initialized with a BufferPool.
pool = mem.DefaultBufferPool()
}
s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
}
}
if f.StreamEnded() {
@ -842,7 +862,7 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
}
return nil
})
t.controlBuf.executeAndPut(func(any) bool {
t.controlBuf.executeAndPut(func() bool {
for _, f := range updateFuncs {
f()
}
@ -996,12 +1016,13 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
}
headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
hf := &headerFrame{
streamID: s.id,
hf: headerFields,
endStream: false,
onWrite: t.setResetPingStrikes,
})
}
success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf)
if !success {
if err != nil {
return err
@ -1071,7 +1092,9 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
onWrite: t.setResetPingStrikes,
}
success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
success, err := t.controlBuf.executeAndPut(func() bool {
return t.checkForHeaderListSize(trailingHeader)
}, nil)
if !success {
if err != nil {
return err
@ -1094,27 +1117,37 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
func (t *http2Server) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error {
reader := data.Reader()
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil {
_ = reader.Close()
return err
}
} else {
// Writing headers checks for this condition.
if s.getState() == streamDone {
_ = reader.Close()
return t.streamContextErr(s)
}
}
df := &dataFrame{
streamID: s.id,
h: hdr,
d: data,
reader: reader,
onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
_ = reader.Close()
return t.streamContextErr(s)
}
return t.controlBuf.put(df)
if err := t.controlBuf.put(df); err != nil {
_ = reader.Close()
return err
}
return nil
}
// keepalive running in a separate goroutine does the following:
@ -1190,12 +1223,12 @@ func (t *http2Server) keepalive() {
continue
}
if outstandingPing && kpTimeoutLeft <= 0 {
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Timeout))
return
}
if !outstandingPing {
if channelz.IsOn() {
atomic.AddInt64(&t.czData.kpCount, 1)
t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
}
t.controlBuf.put(p)
kpTimeoutLeft = t.kp.Timeout
@ -1235,7 +1268,7 @@ func (t *http2Server) Close(err error) {
if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
}
channelz.RemoveEntry(t.channelzID)
channelz.RemoveEntry(t.channelz.ID)
// Cancel all active streams.
for _, s := range streams {
s.cancel()
@ -1256,9 +1289,9 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
if channelz.IsOn() {
if eosReceived {
atomic.AddInt64(&t.czData.streamsSucceeded, 1)
t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
} else {
atomic.AddInt64(&t.czData.streamsFailed, 1)
t.channelz.SocketMetrics.StreamsFailed.Add(1)
}
}
}
@ -1375,38 +1408,21 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
return false, nil
}
func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
s := channelz.SocketInternalMetric{
StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
LocalFlowControlWindow: int64(t.fc.getSize()),
SocketOptions: channelz.GetSocketOption(t.conn),
LocalAddr: t.peer.LocalAddr,
RemoteAddr: t.peer.Addr,
// RemoteName :
func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {
return &channelz.EphemeralSocketMetrics{
LocalFlowControlWindow: int64(t.fc.getSize()),
RemoteFlowControlWindow: t.getOutFlowWindow(),
}
if au, ok := t.peer.AuthInfo.(credentials.ChannelzSecurityInfo); ok {
s.Security = au.GetSecurityValue()
}
s.RemoteFlowControlWindow = t.getOutFlowWindow()
return &s
}
func (t *http2Server) IncrMsgSent() {
atomic.AddInt64(&t.czData.msgSent, 1)
atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
t.channelz.SocketMetrics.MessagesSent.Add(1)
t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
}
func (t *http2Server) IncrMsgRecv() {
atomic.AddInt64(&t.czData.msgRecv, 1)
atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
t.channelz.SocketMetrics.MessagesReceived.Add(1)
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
}
func (t *http2Server) getOutFlowWindow() int64 {
@ -1439,7 +1455,7 @@ func getJitter(v time.Duration) time.Duration {
}
// Generate a jitter between +/- 10% of the value.
r := int64(v / 10)
j := grpcrand.Int63n(2*r) - r
j := rand.Int63n(2*r) - r
return time.Duration(j)
}

View File

@ -317,28 +317,32 @@ func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter {
return w
}
func (w *bufWriter) Write(b []byte) (n int, err error) {
func (w *bufWriter) Write(b []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
if w.batchSize == 0 { // Buffer has been disabled.
n, err = w.conn.Write(b)
n, err := w.conn.Write(b)
return n, toIOError(err)
}
if w.buf == nil {
b := w.pool.Get().(*[]byte)
w.buf = *b
}
written := 0
for len(b) > 0 {
nn := copy(w.buf[w.offset:], b)
b = b[nn:]
w.offset += nn
n += nn
if w.offset >= w.batchSize {
err = w.flushKeepBuffer()
copied := copy(w.buf[w.offset:], b)
b = b[copied:]
written += copied
w.offset += copied
if w.offset < w.batchSize {
continue
}
if err := w.flushKeepBuffer(); err != nil {
return written, err
}
}
return n, err
return written, nil
}
func (w *bufWriter) Flush() error {
@ -418,10 +422,9 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBu
return f
}
func getWriteBufferPool(writeBufferSize int) *sync.Pool {
func getWriteBufferPool(size int) *sync.Pool {
writeBufferMutex.Lock()
defer writeBufferMutex.Unlock()
size := writeBufferSize * 2
pool, ok := writeBufferPoolMap[size]
if ok {
return pool

View File

@ -107,8 +107,14 @@ func doHTTPConnectHandshake(ctx context.Context, conn net.Conn, backendAddr stri
}
return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump)
}
return &bufConn{Conn: conn, r: r}, nil
// The buffer could contain extra bytes from the target server, so we can't
// discard it. However, in many cases where the server waits for the client
// to send the first message (e.g. when TLS is being used), the buffer will
// be empty, so we can avoid the overhead of reading through this buffer.
if r.Buffered() != 0 {
return &bufConn{Conn: conn, r: r}, nil
}
return conn, nil
}
// proxyDial dials, connecting to a proxy first if necessary. Checks if a proxy

View File

@ -22,12 +22,12 @@
package transport
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"sync/atomic"
"time"
@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
@ -46,32 +47,10 @@ import (
const logLevel = 2
type bufferPool struct {
pool sync.Pool
}
func newBufferPool() *bufferPool {
return &bufferPool{
pool: sync.Pool{
New: func() any {
return new(bytes.Buffer)
},
},
}
}
func (p *bufferPool) get() *bytes.Buffer {
return p.pool.Get().(*bytes.Buffer)
}
func (p *bufferPool) put(b *bytes.Buffer) {
p.pool.Put(b)
}
// recvMsg represents the received msg from the transport. All transport
// protocol specific info has been removed.
type recvMsg struct {
buffer *bytes.Buffer
buffer mem.Buffer
// nil: received some data
// io.EOF: stream is completed. data is nil.
// other non-nil error: transport failure. data is nil.
@ -101,6 +80,9 @@ func newRecvBuffer() *recvBuffer {
func (b *recvBuffer) put(r recvMsg) {
b.mu.Lock()
if b.err != nil {
// drop the buffer on the floor. Since b.err is not nil, any subsequent reads
// will always return an error, making this buffer inaccessible.
r.buffer.Free()
b.mu.Unlock()
// An error had occurred earlier, don't accept more
// data or errors.
@ -147,45 +129,70 @@ type recvBufferReader struct {
ctx context.Context
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
recv *recvBuffer
last *bytes.Buffer // Stores the remaining data in the previous calls.
last mem.Buffer // Stores the remaining data in the previous calls.
err error
freeBuffer func(*bytes.Buffer)
}
// Read reads the next len(p) bytes from last. If last is drained, it tries to
// read additional data from recv. It blocks if there no additional data available
// in recv. If Read returns any non-nil error, it will continue to return that error.
func (r *recvBufferReader) Read(p []byte) (n int, err error) {
func (r *recvBufferReader) ReadHeader(header []byte) (n int, err error) {
if r.err != nil {
return 0, r.err
}
if r.last != nil {
// Read remaining data left in last call.
copied, _ := r.last.Read(p)
if r.last.Len() == 0 {
r.freeBuffer(r.last)
r.last = nil
}
return copied, nil
n, r.last = mem.ReadUnsafe(header, r.last)
return n, nil
}
if r.closeStream != nil {
n, r.err = r.readClient(p)
n, r.err = r.readHeaderClient(header)
} else {
n, r.err = r.read(p)
n, r.err = r.readHeader(header)
}
return n, r.err
}
func (r *recvBufferReader) read(p []byte) (n int, err error) {
// Read reads the next n bytes from last. If last is drained, it tries to read
// additional data from recv. It blocks if there no additional data available in
// recv. If Read returns any non-nil error, it will continue to return that
// error.
func (r *recvBufferReader) Read(n int) (buf mem.Buffer, err error) {
if r.err != nil {
return nil, r.err
}
if r.last != nil {
buf = r.last
if r.last.Len() > n {
buf, r.last = mem.SplitUnsafe(buf, n)
} else {
r.last = nil
}
return buf, nil
}
if r.closeStream != nil {
buf, r.err = r.readClient(n)
} else {
buf, r.err = r.read(n)
}
return buf, r.err
}
func (r *recvBufferReader) readHeader(header []byte) (n int, err error) {
select {
case <-r.ctxDone:
return 0, ContextErr(r.ctx.Err())
case m := <-r.recv.get():
return r.readAdditional(m, p)
return r.readHeaderAdditional(m, header)
}
}
func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
func (r *recvBufferReader) read(n int) (buf mem.Buffer, err error) {
select {
case <-r.ctxDone:
return nil, ContextErr(r.ctx.Err())
case m := <-r.recv.get():
return r.readAdditional(m, n)
}
}
func (r *recvBufferReader) readHeaderClient(header []byte) (n int, err error) {
// If the context is canceled, then closes the stream with nil metadata.
// closeStream writes its error parameter to r.recv as a recvMsg.
// r.readAdditional acts on that message and returns the necessary error.
@ -206,25 +213,67 @@ func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
// faster.
r.closeStream(ContextErr(r.ctx.Err()))
m := <-r.recv.get()
return r.readAdditional(m, p)
return r.readHeaderAdditional(m, header)
case m := <-r.recv.get():
return r.readAdditional(m, p)
return r.readHeaderAdditional(m, header)
}
}
func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) {
// If the context is canceled, then closes the stream with nil metadata.
// closeStream writes its error parameter to r.recv as a recvMsg.
// r.readAdditional acts on that message and returns the necessary error.
select {
case <-r.ctxDone:
// Note that this adds the ctx error to the end of recv buffer, and
// reads from the head. This will delay the error until recv buffer is
// empty, thus will delay ctx cancellation in Recv().
//
// It's done this way to fix a race between ctx cancel and trailer. The
// race was, stream.Recv() may return ctx error if ctxDone wins the
// race, but stream.Trailer() may return a non-nil md because the stream
// was not marked as done when trailer is received. This closeStream
// call will mark stream as done, thus fix the race.
//
// TODO: delaying ctx error seems like a unnecessary side effect. What
// we really want is to mark the stream as done, and return ctx error
// faster.
r.closeStream(ContextErr(r.ctx.Err()))
m := <-r.recv.get()
return r.readAdditional(m, n)
case m := <-r.recv.get():
return r.readAdditional(m, n)
}
}
func (r *recvBufferReader) readHeaderAdditional(m recvMsg, header []byte) (n int, err error) {
r.recv.load()
if m.err != nil {
if m.buffer != nil {
m.buffer.Free()
}
return 0, m.err
}
copied, _ := m.buffer.Read(p)
if m.buffer.Len() == 0 {
r.freeBuffer(m.buffer)
r.last = nil
} else {
r.last = m.buffer
n, r.last = mem.ReadUnsafe(header, m.buffer)
return n, nil
}
func (r *recvBufferReader) readAdditional(m recvMsg, n int) (b mem.Buffer, err error) {
r.recv.load()
if m.err != nil {
if m.buffer != nil {
m.buffer.Free()
}
return nil, m.err
}
return copied, nil
if m.buffer.Len() > n {
m.buffer, r.last = mem.SplitUnsafe(m.buffer, n)
}
return m.buffer, nil
}
type streamState uint32
@ -240,7 +289,7 @@ const (
type Stream struct {
id uint32
st ServerTransport // nil for client side Stream
ct *http2Client // nil for server side Stream
ct ClientTransport // nil for server side Stream
ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
@ -250,7 +299,7 @@ type Stream struct {
recvCompress string
sendCompress string
buf *recvBuffer
trReader io.Reader
trReader *transportReader
fc *inFlow
wq *writeQuota
@ -303,7 +352,7 @@ func (s *Stream) isHeaderSent() bool {
}
// updateHeaderSent updates headerSent and returns true
// if it was alreay set. It is valid only on server-side.
// if it was already set. It is valid only on server-side.
func (s *Stream) updateHeaderSent() bool {
return atomic.SwapUint32(&s.headerSent, 1) == 1
}
@ -362,8 +411,12 @@ func (s *Stream) SendCompress() string {
// ClientAdvertisedCompressors returns the compressor names advertised by the
// client via grpc-accept-encoding header.
func (s *Stream) ClientAdvertisedCompressors() string {
return s.clientAdvertisedCompressors
func (s *Stream) ClientAdvertisedCompressors() []string {
values := strings.Split(s.clientAdvertisedCompressors, ",")
for i, v := range values {
values[i] = strings.TrimSpace(v)
}
return values
}
// Done returns a channel which is closed when it receives the final status
@ -403,7 +456,7 @@ func (s *Stream) TrailersOnly() bool {
return s.noHeaders
}
// Trailer returns the cached trailer metedata. Note that if it is not called
// Trailer returns the cached trailer metadata. Note that if it is not called
// after the entire stream is done, it could return an empty MD. Client
// side only.
// It can be safely read only after stream has ended that is either read
@ -494,36 +547,87 @@ func (s *Stream) write(m recvMsg) {
s.buf.put(m)
}
// Read reads all p bytes from the wire for this stream.
func (s *Stream) Read(p []byte) (n int, err error) {
func (s *Stream) ReadHeader(header []byte) (err error) {
// Don't request a read if there was an error earlier
if er := s.trReader.(*transportReader).er; er != nil {
return 0, er
if er := s.trReader.er; er != nil {
return er
}
s.requestRead(len(p))
return io.ReadFull(s.trReader, p)
s.requestRead(len(header))
for len(header) != 0 {
n, err := s.trReader.ReadHeader(header)
header = header[n:]
if len(header) == 0 {
err = nil
}
if err != nil {
if n > 0 && err == io.EOF {
err = io.ErrUnexpectedEOF
}
return err
}
}
return nil
}
// tranportReader reads all the data available for this Stream from the transport and
// Read reads n bytes from the wire for this stream.
func (s *Stream) Read(n int) (data mem.BufferSlice, err error) {
// Don't request a read if there was an error earlier
if er := s.trReader.er; er != nil {
return nil, er
}
s.requestRead(n)
for n != 0 {
buf, err := s.trReader.Read(n)
var bufLen int
if buf != nil {
bufLen = buf.Len()
}
n -= bufLen
if n == 0 {
err = nil
}
if err != nil {
if bufLen > 0 && err == io.EOF {
err = io.ErrUnexpectedEOF
}
data.Free()
return nil, err
}
data = append(data, buf)
}
return data, nil
}
// transportReader reads all the data available for this Stream from the transport and
// passes them into the decoder, which converts them into a gRPC message stream.
// The error is io.EOF when the stream is done or another non-nil error if
// the stream broke.
type transportReader struct {
reader io.Reader
reader *recvBufferReader
// The handler to control the window update procedure for both this
// particular stream and the associated transport.
windowHandler func(int)
er error
}
func (t *transportReader) Read(p []byte) (n int, err error) {
n, err = t.reader.Read(p)
func (t *transportReader) ReadHeader(header []byte) (int, error) {
n, err := t.reader.ReadHeader(header)
if err != nil {
t.er = err
return
return 0, err
}
t.windowHandler(n)
return
t.windowHandler(len(header))
return n, nil
}
func (t *transportReader) Read(n int) (mem.Buffer, error) {
buf, err := t.reader.Read(n)
if err != nil {
t.er = err
return buf, err
}
t.windowHandler(buf.Len())
return buf, nil
}
// BytesReceived indicates whether any bytes have been received on this stream.
@ -566,9 +670,10 @@ type ServerConfig struct {
WriteBufferSize int
ReadBufferSize int
SharedWriteBuffer bool
ChannelzParentID *channelz.Identifier
ChannelzParent *channelz.Server
MaxHeaderListSize *uint32
HeaderTableSize *uint32
BufferPool mem.BufferPool
}
// ConnectOptions covers all relevant options for communicating with the server.
@ -601,12 +706,14 @@ type ConnectOptions struct {
ReadBufferSize int
// SharedWriteBuffer indicates whether connections should reuse write buffer
SharedWriteBuffer bool
// ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
ChannelzParentID *channelz.Identifier
// ChannelzParent sets the addrConn id which initiated the creation of this client transport.
ChannelzParent *channelz.SubChannel
// MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
MaxHeaderListSize *uint32
// UseProxy specifies if a proxy should be used.
UseProxy bool
// The mem.BufferPool to use when reading/writing to the wire.
BufferPool mem.BufferPool
}
// NewClientTransport establishes the transport with the required ConnectOptions
@ -668,7 +775,7 @@ type ClientTransport interface {
// Write sends the data for the given stream. A nil stream indicates
// the write is to be performed on the transport as a whole.
Write(s *Stream, hdr []byte, data []byte, opts *Options) error
Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error
// NewStream creates a Stream for an RPC.
NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
@ -720,7 +827,7 @@ type ServerTransport interface {
// Write sends the data for the given stream.
// Write may not be called on all streams.
Write(s *Stream, hdr []byte, data []byte, opts *Options) error
Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error
// WriteStatus sends the status of a stream to the client. WriteStatus is
// the final call made on a stream and always occurs.
@ -793,7 +900,7 @@ var (
// connection is draining. This could be caused by goaway or balancer
// removing the address.
errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
// errStreamDone is returned from write at the client side to indiacte application
// errStreamDone is returned from write at the client side to indicate application
// layer of an error.
errStreamDone = errors.New("the stream is done")
// StatusGoAway indicates that the server sent a GOAWAY that included this
@ -815,30 +922,6 @@ const (
GoAwayTooManyPings GoAwayReason = 2
)
// channelzData is used to store channelz related data for http2Client and http2Server.
// These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
type channelzData struct {
kpCount int64
// The number of streams that have started, including already finished ones.
streamsStarted int64
// Client side: The number of streams that have ended successfully by receiving
// EoS bit set frame from server.
// Server side: The number of streams that have ended successfully by sending
// frame with EoS bit set.
streamsSucceeded int64
streamsFailed int64
// lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
// instead of time.Time since it's more costly to atomically update time.Time variable than int64
// variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
lastStreamCreatedTime int64
msgSent int64
msgRecv int64
lastMsgSentTime int64
lastMsgRecvTime int64
}
// ContextErr converts the error from context package into a status error.
func ContextErr(err error) error {
switch err {