Skip to content

Commit e3732dc

Browse files
authored
Cherry-pick #8528 to v1.73.x (#8553)
Original PR: #8528 Related issue: #8517 RELEASE NOTES: * server: Fix bug preventing streams from being cancelled or timed out when blocked on flow control.
1 parent bd2d1b0 commit e3732dc

File tree

2 files changed

+77
-4
lines changed

2 files changed

+77
-4
lines changed

internal/transport/http2_server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,10 +1341,10 @@ func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCo
13411341
// called to interrupt the potential blocking on other goroutines.
13421342
s.cancel()
13431343

1344-
oldState := s.swapState(streamDone)
1345-
if oldState == streamDone {
1346-
return
1347-
}
1344+
// We can't return early even if the stream's state is "done" as the state
1345+
// might have been set by the `finishStream` method. Deleting the stream via
1346+
// `finishStream` can get blocked on flow control.
1347+
s.swapState(streamDone)
13481348
t.deleteStream(s, eosReceived)
13491349

13501350
t.controlBuf.put(&cleanupStream{

test/transport_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,3 +229,76 @@ func (s) TestRSTDuringMessageRead(t *testing.T) {
229229
t.Fatalf("client.EmptyCall() returned %v; want status with code %v", err, codes.Canceled)
230230
}
231231
}
232+
233+
// Test verifies that a client-side cancellation correctly frees up resources on
234+
// the server. The test setup is designed to simulate a scenario where a server
235+
// is blocked from sending a large message due to a full client-side flow
236+
// control window. The client-side cancellation of this blocked RPC then frees
237+
// up the max concurrent streams quota on the server, allowing a new RPC to be
238+
// created successfully.
239+
func (s) TestCancelWhileServerWaitingForFlowControl(t *testing.T) {
240+
serverDoneCh := make(chan struct{}, 2)
241+
const flowControlWindowSize = 65535
242+
ss := &stubserver.StubServer{
243+
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
244+
// Send a large message to exhaust the client's flow control window.
245+
stream.Send(&testpb.StreamingOutputCallResponse{
246+
Payload: &testpb.Payload{
247+
Body: make([]byte, flowControlWindowSize+1),
248+
},
249+
})
250+
serverDoneCh <- struct{}{}
251+
return nil
252+
},
253+
}
254+
255+
// Create a server that allows only 1 stream at a time.
256+
ss = stubserver.StartTestService(t, ss, grpc.MaxConcurrentStreams(1))
257+
defer ss.Stop()
258+
// Use a static flow control window.
259+
if err := ss.StartClient(grpc.WithInitialWindowSize(flowControlWindowSize)); err != nil {
260+
t.Fatalf("Error while start test service client: %v", err)
261+
}
262+
client := ss.Client
263+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
264+
defer cancel()
265+
266+
streamCtx, streamCancel := context.WithCancel(ctx)
267+
defer streamCancel()
268+
269+
if _, err := client.StreamingOutputCall(streamCtx, &testpb.StreamingOutputCallRequest{}); err != nil {
270+
t.Fatalf("Failed to create server streaming RPC: %v", err)
271+
}
272+
273+
// Wait for the server handler to return. This should cause the trailers to
274+
// be buffered on the server, waiting for flow control quota to first send
275+
// the data frame.
276+
select {
277+
case <-ctx.Done():
278+
t.Fatal("Context timed out waiting for server handler to return.")
279+
case <-serverDoneCh:
280+
}
281+
282+
// Attempt to create a stream. It should fail since the previous stream is
283+
// still blocked.
284+
shortCtx, shortCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
285+
defer shortCancel()
286+
_, err := client.StreamingOutputCall(shortCtx, &testpb.StreamingOutputCallRequest{})
287+
if status.Code(err) != codes.DeadlineExceeded {
288+
t.Fatalf("Server stream creation returned error with unexpected status code: %v, want code: %v", err, codes.DeadlineExceeded)
289+
}
290+
291+
// Cancel the RPC, this should free up concurrent stream quota on the
292+
// server.
293+
streamCancel()
294+
295+
// Attempt to create another stream.
296+
stream, err := client.StreamingOutputCall(ctx, &testpb.StreamingOutputCallRequest{})
297+
if err != nil {
298+
t.Fatalf("Failed to create server streaming RPC: %v", err)
299+
}
300+
_, err = stream.Recv()
301+
if err != nil {
302+
t.Fatalf("Failed to read from the stream: %v", err)
303+
}
304+
}

0 commit comments

Comments
 (0)