diff --git a/src/quic/streams.cc b/src/quic/streams.cc index 7186aed89a78e9..7ed5212e863d56 100644 --- a/src/quic/streams.cc +++ b/src/quic/streams.cc @@ -1512,6 +1512,8 @@ void Stream::EndWriting() { void Stream::EntryRead(size_t amount) { // Called when the JS consumer reads data from the inbound DataQueue. // Extend the flow control window so the sender can transmit more. + if (session().is_destroyed()) return; + Session::SendPendingDataScope send_scope(&session()); session().ExtendStreamOffset(id(), amount); session().ExtendOffset(amount); } diff --git a/test/parallel/test-quic-stream-read-after-blocked.mjs b/test/parallel/test-quic-stream-read-after-blocked.mjs new file mode 100644 index 00000000000000..58551abe07212c --- /dev/null +++ b/test/parallel/test-quic-stream-read-after-blocked.mjs @@ -0,0 +1,62 @@ +// Flags: --experimental-quic --experimental-stream-iter --no-warnings + +// Test: a sender blocked on flow control is resumed once the consumer +// starts reading. This confirms that we do flush MAX_STREAM_DATA +// frames as expected when the consumer reads. + +import { hasQuic, skip, mustCall, mustCallAtLeast } from '../common/index.mjs'; +import { setTimeout as delay } from 'node:timers/promises'; +import assert from 'node:assert'; + +const { strictEqual, deepStrictEqual } = assert; + +if (!hasQuic) { + skip('QUIC is not enabled'); +} + +const { listen, connect } = await import('../common/quic.mjs'); +const { bytes } = await import('stream/iter'); + +// Larger than the default 256 KB stream flow-control window: +const size = 1024 * 1024; +const expected = new Uint8Array(size); +for (let i = 0; i < size; i++) expected[i] = i & 0xff; + +const senderBlocked = Promise.withResolvers(); +const done = Promise.withResolvers(); + +const serverEndpoint = await listen(mustCall((serverSession) => { + serverSession.onstream = mustCall(async (stream) => { + stream.onblocked = mustCallAtLeast(() => senderBlocked.resolve(stream), 1); + stream.setBody(expected); + await stream.closed; + }); +})); + +const clientSession = await connect(serverEndpoint.address); +await clientSession.opened; + +// Write a byte to open the stream: +const stream = await clientSession.createBidirectionalStream(); +await stream.writer.write(new Uint8Array([1])); + +// Wait until the sender has filled the window and blocked. +const serverStream = await senderBlocked.promise; + +// Poll until everything has been acked, so the stream is fully idle: +while (serverStream.stats.maxOffsetAcknowledged !== serverStream.stats.bytesSent) { + await delay(1); +} + +// Try to read: +const received = await bytes(stream); +strictEqual(received.byteLength, expected.byteLength); +deepStrictEqual(received, expected); + +stream.writer.endSync(); +await stream.closed; +clientSession.close(); +done.resolve(); + +await Promise.all([done.promise, clientSession.closed]); +await serverEndpoint.close();