Skip to content

Commit ef4e1a9

Browse files
committed
Re-use slice buffer reader for a stream
1 parent 443caad commit ef4e1a9

File tree

5 files changed

+63
-40
lines changed

5 files changed

+63
-40
lines changed

internal/transport/controlbuf.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,11 @@ type earlyAbortStream struct {
146146
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
147147

148148
type dataFrame struct {
149-
streamID uint32
150-
endStream bool
151-
h []byte
152-
reader mem.Reader
149+
streamID uint32
150+
endStream bool
151+
h []byte
152+
data mem.BufferSlice
153+
processing bool
153154
// onEachWrite is called every time
154155
// a part of data is written out.
155156
onEachWrite func()
@@ -234,6 +235,7 @@ type outStream struct {
234235
itl *itemList
235236
bytesOutStanding int
236237
wq *writeQuota
238+
reader mem.Reader
237239

238240
next *outStream
239241
prev *outStream
@@ -461,7 +463,9 @@ func (c *controlBuffer) finish() {
461463
v.onOrphaned(ErrConnClosing)
462464
}
463465
case *dataFrame:
464-
_ = v.reader.Close()
466+
if !v.processing {
467+
v.data.Free()
468+
}
465469
}
466470
}
467471

@@ -650,10 +654,11 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
650654

651655
func (l *loopyWriter) registerStreamHandler(h *registerStream) {
652656
str := &outStream{
653-
id: h.streamID,
654-
state: empty,
655-
itl: &itemList{},
656-
wq: h.wq,
657+
id: h.streamID,
658+
state: empty,
659+
itl: &itemList{},
660+
wq: h.wq,
661+
reader: mem.BufferSlice{}.Reader(),
657662
}
658663
l.estdStreams[h.streamID] = str
659664
}
@@ -685,10 +690,11 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
685690
}
686691
// Case 2: Client wants to originate stream.
687692
str := &outStream{
688-
id: h.streamID,
689-
state: empty,
690-
itl: &itemList{},
691-
wq: h.wq,
693+
id: h.streamID,
694+
state: empty,
695+
itl: &itemList{},
696+
wq: h.wq,
697+
reader: mem.BufferSlice{}.Reader(),
692698
}
693699
return l.originateStream(str, h)
694700
}
@@ -790,10 +796,13 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
790796
// a RST_STREAM before stream initialization thus the stream might
791797
// not be established yet.
792798
delete(l.estdStreams, c.streamID)
799+
str.reader.Close()
793800
str.deleteSelf()
794801
for head := str.itl.dequeueAll(); head != nil; head = head.next {
795802
if df, ok := head.it.(*dataFrame); ok {
796-
_ = df.reader.Close()
803+
if !df.processing {
804+
df.data.Free()
805+
}
797806
}
798807
}
799808
}
@@ -928,21 +937,27 @@ func (l *loopyWriter) processData() (bool, error) {
928937
if str == nil {
929938
return true, nil
930939
}
940+
reader := str.reader
931941
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
942+
if !dataItem.processing {
943+
dataItem.processing = true
944+
str.reader.Reset(dataItem.data)
945+
dataItem.data.Free()
946+
}
932947
// A data item is represented by a dataFrame, since it later translates into
933948
// multiple HTTP2 data frames.
934949
// Every dataFrame has two buffers; h that keeps grpc-message header and data
935950
// that is the actual message. As an optimization to keep wire traffic low, data
936951
// from data is copied to h to make as big as the maximum possible HTTP2 frame
937952
// size.
938953

939-
if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
954+
if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
940955
// Client sends out empty data frame with endStream = true
941956
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
942957
return false, err
943958
}
944959
str.itl.dequeue() // remove the empty data item from stream
945-
_ = dataItem.reader.Close()
960+
_ = reader.Close()
946961
if str.itl.isEmpty() {
947962
str.state = empty
948963
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
@@ -971,8 +986,8 @@ func (l *loopyWriter) processData() (bool, error) {
971986
}
972987
// Compute how much of the header and data we can send within quota and max frame length
973988
hSize := min(maxSize, len(dataItem.h))
974-
dSize := min(maxSize-hSize, dataItem.reader.Remaining())
975-
remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
989+
dSize := min(maxSize-hSize, reader.Remaining())
990+
remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
976991
size := hSize + dSize
977992

978993
var buf *[]byte
@@ -993,7 +1008,7 @@ func (l *loopyWriter) processData() (bool, error) {
9931008
defer pool.Put(buf)
9941009

9951010
copy((*buf)[:hSize], dataItem.h)
996-
_, _ = dataItem.reader.Read((*buf)[hSize:])
1011+
_, _ = reader.Read((*buf)[hSize:])
9971012
}
9981013

9991014
// Now that outgoing flow controls are checked we can replenish str's write quota
@@ -1014,7 +1029,7 @@ func (l *loopyWriter) processData() (bool, error) {
10141029
dataItem.h = dataItem.h[hSize:]
10151030

10161031
if remainingBytes == 0 { // All the data from that message was written out.
1017-
_ = dataItem.reader.Close()
1032+
_ = reader.Close()
10181033
str.itl.dequeue()
10191034
}
10201035
if str.itl.isEmpty() {

internal/transport/http2_client.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,32 +1091,29 @@ func (t *http2Client) GracefulClose() {
10911091
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
10921092
// should proceed only if Write returns nil.
10931093
func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
1094-
reader := data.Reader()
1095-
10961094
if opts.Last {
10971095
// If it's the last message, update stream state.
10981096
if !s.compareAndSwapState(streamActive, streamWriteDone) {
1099-
_ = reader.Close()
11001097
return errStreamDone
11011098
}
11021099
} else if s.getState() != streamActive {
1103-
_ = reader.Close()
11041100
return errStreamDone
11051101
}
11061102
df := &dataFrame{
11071103
streamID: s.id,
11081104
endStream: opts.Last,
11091105
h: hdr,
1110-
reader: reader,
1106+
data: data,
11111107
}
1112-
if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota.
1113-
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
1114-
_ = reader.Close()
1108+
dataLen := data.Len()
1109+
if hdr != nil || dataLen != 0 { // If it's not an empty data frame, check quota.
1110+
if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
11151111
return err
11161112
}
11171113
}
1114+
data.Ref()
11181115
if err := t.controlBuf.put(df); err != nil {
1119-
_ = reader.Close()
1116+
data.Free()
11201117
return err
11211118
}
11221119
t.incrMsgSent()

internal/transport/http2_server.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ type http2Server struct {
131131
maxStreamMu sync.Mutex
132132
maxStreamID uint32 // max stream ID ever seen
133133

134-
logger *grpclog.PrefixLogger
134+
logger *grpclog.PrefixLogger
135+
sliceReader mem.Reader
135136
}
136137

137138
// NewServerTransport creates a http2 transport with conn and configuration
@@ -1132,33 +1133,30 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
11321133
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
11331134
// is returns if it fails (e.g., framing error, transport error).
11341135
func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
1135-
reader := data.Reader()
1136-
11371136
if !s.isHeaderSent() { // Headers haven't been written yet.
11381137
if err := t.writeHeader(s, nil); err != nil {
1139-
_ = reader.Close()
11401138
return err
11411139
}
11421140
} else {
11431141
// Writing headers checks for this condition.
11441142
if s.getState() == streamDone {
1145-
_ = reader.Close()
11461143
return t.streamContextErr(s)
11471144
}
11481145
}
11491146

11501147
df := &dataFrame{
11511148
streamID: s.id,
11521149
h: hdr,
1153-
reader: reader,
1150+
data: data,
11541151
onEachWrite: t.setResetPingStrikes,
11551152
}
1156-
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
1157-
_ = reader.Close()
1153+
dataLen := data.Len()
1154+
if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
11581155
return t.streamContextErr(s)
11591156
}
1157+
data.Ref()
11601158
if err := t.controlBuf.put(df); err != nil {
1161-
_ = reader.Close()
1159+
data.Free()
11621160
return err
11631161
}
11641162
t.incrMsgSent()

internal/transport/transport_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,11 @@ func (h *testStreamHandler) handleStreamMisbehave(t *testing.T, s *ServerStream)
203203
}
204204
}
205205
data := newBufferSlice(p)
206+
data.Ref()
206207
conn.controlBuf.put(&dataFrame{
207208
streamID: s.id,
208209
h: nil,
209-
reader: data.Reader(),
210+
data: data,
210211
onEachWrite: func() {},
211212
})
212213
sent += len(p)
@@ -1092,11 +1093,12 @@ func (s) TestServerContextCanceledOnClosedConnection(t *testing.T) {
10921093
t.Fatalf("Failed to open stream: %v", err)
10931094
}
10941095
d := newBufferSlice(make([]byte, http2MaxFrameLen))
1096+
d.Ref()
10951097
ct.controlBuf.put(&dataFrame{
10961098
streamID: s.id,
10971099
endStream: false,
10981100
h: nil,
1099-
reader: d.Reader(),
1101+
data: d,
11001102
onEachWrite: func() {},
11011103
})
11021104
// Loop until the server side stream is created.

mem/buffer_slice.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ type Reader interface {
137137
Close() error
138138
// Remaining returns the number of unread bytes remaining in the slice.
139139
Remaining() int
140+
// Reset frees the currently held buffer slice and starts reading from the
141+
// provided slice. This allows reusing the reader object.
142+
Reset(s BufferSlice)
140143
}
141144

142145
type sliceReader struct {
@@ -150,6 +153,14 @@ func (r *sliceReader) Remaining() int {
150153
return r.len
151154
}
152155

156+
func (r *sliceReader) Reset(s BufferSlice) {
157+
r.Close()
158+
s.Ref()
159+
r.data = s
160+
r.len = s.Len()
161+
r.bufferIdx = 0
162+
}
163+
153164
func (r *sliceReader) Close() error {
154165
r.data.Free()
155166
r.data = nil

0 commit comments

Comments
 (0)