Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/quic/streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,8 @@ void Stream::EndWriting() {
void Stream::EntryRead(size_t amount) {
// Called when the JS consumer reads data from the inbound DataQueue.
// Extend the flow control window so the sender can transmit more.
if (session().is_destroyed()) return;
Session::SendPendingDataScope send_scope(&session());
session().ExtendStreamOffset(id(), amount);
session().ExtendOffset(amount);
}
Expand Down
62 changes: 62 additions & 0 deletions test/parallel/test-quic-stream-read-after-blocked.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Flags: --experimental-quic --experimental-stream-iter --no-warnings

// Test: a sender blocked on flow control is resumed once the consumer
// starts reading. This confirms that we do flush MAX_STREAM_DATA
// frames as expected when the consumer reads.

import { hasQuic, skip, mustCall, mustCallAtLeast } from '../common/index.mjs';
import { setTimeout as delay } from 'node:timers/promises';
import assert from 'node:assert';

const { strictEqual, deepStrictEqual } = assert;

if (!hasQuic) {
skip('QUIC is not enabled');
}

const { listen, connect } = await import('../common/quic.mjs');
const { bytes } = await import('stream/iter');

// Larger than the default 256 KB stream flow-control window:
const size = 1024 * 1024;
const expected = new Uint8Array(size);
for (let i = 0; i < size; i++) expected[i] = i & 0xff;

const senderBlocked = Promise.withResolvers();
const done = Promise.withResolvers();

const serverEndpoint = await listen(mustCall((serverSession) => {
serverSession.onstream = mustCall(async (stream) => {
stream.onblocked = mustCallAtLeast(() => senderBlocked.resolve(stream), 1);
stream.setBody(expected);
await stream.closed;
});
}));

const clientSession = await connect(serverEndpoint.address);
await clientSession.opened;

// Write a byte to open the stream:
const stream = await clientSession.createBidirectionalStream();
await stream.writer.write(new Uint8Array([1]));

// Wait until the sender has filled the window and blocked.
const serverStream = await senderBlocked.promise;

// Poll until everything has been acked, so the stream is fully idle:
while (serverStream.stats.maxOffsetAcknowledged !== serverStream.stats.bytesSent) {
await delay(1);
}

// Try to read:
const received = await bytes(stream);
strictEqual(received.byteLength, expected.byteLength);
deepStrictEqual(received, expected);

stream.writer.endSync();
await stream.closed;
clientSession.close();
done.resolve();

await Promise.all([done.promise, clientSession.closed]);
await serverEndpoint.close();
Loading