diff --git a/doc/api/net.md b/doc/api/net.md index 9ee3c1497397da..bfc625893cf801 100644 --- a/doc/api/net.md +++ b/doc/api/net.md @@ -447,6 +447,35 @@ changes: Calls [`server.close()`][] and returns a promise that fulfills when the server has closed. +### `server[Symbol.asyncIterator]()` + + + +> Stability: 1 - Experimental + +* Returns: {AsyncIterator} An async iterator that yields each incoming + [`net.Socket`][]. + +Returns an async iterator over the server's incoming connections, allowing them +to be consumed with `for await...of` as an alternative to the [`'connection'`][] +event. Iteration ends when the server emits [`'close'`][], and rejects if the +server emits [`'error'`][]. + +```mjs +import { createServer } from 'node:net'; + +const server = createServer().listen(8124); +for await (const socket of server) { + socket.end('hello world!'); +} +``` + +Connections are buffered while the loop body is busy; the server does not stop +accepting them, so a consumer that is slower than the connection rate can buffer +without bound. Use [`server.maxConnections`][] to bound concurrency. + ### `server.getConnections(callback)` + +> Stability: 1 - Experimental + +The `net/promises` API provides a set of `net` functions that return `Promise` +objects rather than relying on events. The API is accessible via +`require('node:net').promises` or `require('node:net/promises')`. + +### `netPromises.connect(options)` + +### `netPromises.connect(path)` + +### `netPromises.connect(port[, host])` + + + +* `options` {Object} Accepts the same arguments as [`net.connect()`][]. May + include a `signal` {AbortSignal} that can be used to abort an in-progress + connection attempt. +* Returns: {Promise} Fulfills with a connected [`net.Socket`][]. + +A promise-based alternative to [`net.connect()`][]. The returned promise is +fulfilled with the socket once its [`'connect'`][] event fires, and is rejected +if the connection fails or the `signal` is aborted. When the promise rejects, +the underlying socket is destroyed. + +This API is named for the action it performs and awaits — connecting — to +parallel [`netPromises.listen()`][]. It is not named `createConnection()`, +because that name belongs to the socket-factory taxonomy of the callback API, +which has no counterpart here. + +```mjs +import { connect } from 'node:net/promises'; + +const socket = await connect({ port: 8124 }); +socket.write('hello world!'); +socket.end(); +``` + +### `netPromises.listen([options])` + + + +* `options` {Object} Accepts the same options as [`net.createServer()`][] and + [`server.listen()`][], plus: + * `connectionListener` {Function} Automatically set as a listener for the + [`'connection'`][] event. + * `signal` {AbortSignal} An `AbortSignal` that may be used to abort the + server. Aborting before the server is listening rejects the returned + promise with an `AbortError`; aborting at any later point closes the + server, matching the `signal` option of [`server.listen()`][]. +* Returns: {Promise} Fulfills with a listening [`net.Server`][]. + +Creates a [`net.Server`][] and begins listening. The returned promise is +fulfilled with the server once its [`'listening'`][] event fires, and is +rejected if the server fails to bind or the `signal` is aborted before it is +listening. When the promise rejects, the server is closed. + +The resolved server is async iterable, so incoming connections can be consumed +with `for await...of` (see `server[Symbol.asyncIterator]()`). + +```mjs +import { listen } from 'node:net/promises'; + +const server = await listen({ port: 8124 }); +for await (const socket of server) { + socket.end('hello world!'); +} +``` + [IPC]: #ipc-support [Identifying paths for IPC connections]: #identifying-paths-for-ipc-connections [RFC 8305]: https://www.rfc-editor.org/rfc/rfc8305.txt @@ -2114,6 +2221,7 @@ net.isIPv6('fhqwhgads'); // returns false [`net.createServer()`]: #netcreateserveroptions-connectionlistener [`net.getDefaultAutoSelectFamily()`]: #netgetdefaultautoselectfamily [`net.getDefaultAutoSelectFamilyAttemptTimeout()`]: #netgetdefaultautoselectfamilyattempttimeout +[`netPromises.listen()`]: #netpromiseslistenoptions [`new net.Socket(options)`]: #new-netsocketoptions [`readable.setEncoding()`]: stream.md#readablesetencodingencoding [`server.close()`]: #serverclosecallback diff --git a/lib/internal/net/promises.js b/lib/internal/net/promises.js new file mode 100644 index 00000000000000..d573ebdf7493a3 --- /dev/null +++ b/lib/internal/net/promises.js @@ -0,0 +1,86 @@ +'use strict'; + +const { once } = require('events'); +const { + validateAbortSignal, + validateObject, +} = require('internal/validators'); +const { AbortError } = require('internal/errors'); +const { kEmptyObject } = require('internal/util'); + +// Lazily loaded to avoid a require cycle with the `net` module, which exposes +// this namespace through its `promises` getter. +let net; +function lazyNet() { + net ??= require('net'); + return net; +} + +// Resolves with a connected `net.Socket` once the `'connect'` event fires, and +// rejects if the connection fails or the optional `signal` is aborted. +async function connect(...args) { + const lazy = lazyNet(); + const options = lazy._normalizeArgs(args)[0]; + const { signal } = options; + if (signal !== undefined) { + validateAbortSignal(signal, 'options.signal'); + if (signal.aborted) { + throw new AbortError(undefined, { cause: signal.reason }); + } + } + + // Strip the signal so the socket does not also install its own abort + // handling; rejecting and destroying below fully tears the socket down. + const socket = lazy.connect({ ...options, signal: undefined }); + + try { + await once(socket, 'connect', signal !== undefined ? { signal } : kEmptyObject); + } catch (err) { + socket.destroy(); + throw err; + } + return socket; +} + +// Creates a server and resolves with it once it is listening, rejecting if it +// fails to bind or the optional `signal` is aborted. +async function listen(options = kEmptyObject) { + validateObject(options, 'options'); + const { signal, connectionListener } = options; + if (signal !== undefined) { + validateAbortSignal(signal, 'options.signal'); + if (signal.aborted) { + throw new AbortError(undefined, { cause: signal.reason }); + } + } + + const lazy = lazyNet(); + const server = lazy.createServer(options, connectionListener); + + try { + // Pass `signal` through to listen() so net installs its own + // close-on-abort handler: the signal aborts the server for its entire + // lifetime, not just the pending listen. + server.listen(options); + await once(server, 'listening', signal !== undefined ? { signal } : kEmptyObject); + } catch (err) { + // On abort, net's signal handler already closes the server, so closing + // again would be redundant; on other failures (e.g. a bind error) there + // is no such handler, so close it here. + if (!signal?.aborted) { + server.close(); + } + throw err; + } + return server; +} + +module.exports = { + connect, + listen, + get isIP() { return lazyNet().isIP; }, + get isIPv4() { return lazyNet().isIPv4; }, + get isIPv6() { return lazyNet().isIPv6; }, + get BlockList() { return lazyNet().BlockList; }, + get SocketAddress() { return lazyNet().SocketAddress; }, +}; diff --git a/lib/net.js b/lib/net.js index ee4bc9943e4d52..cfdef137af3c42 100644 --- a/lib/net.js +++ b/lib/net.js @@ -37,6 +37,7 @@ const { ObjectSetPrototypeOf, Symbol, SymbolAsyncDispose, + SymbolAsyncIterator, SymbolDispose, } = primordials; @@ -146,6 +147,8 @@ let cluster; let dns; let BlockList; let SocketAddress; +let netPromises; +let kFirstEventParam; let autoSelectFamilyDefault = getOptionValue('--network-family-autoselection'); let autoSelectFamilyAttemptTimeoutDefault = getOptionValue('--network-family-autoselection-attempt-timeout'); @@ -2486,6 +2489,14 @@ Server.prototype[SymbolAsyncDispose] = async function() { await FunctionPrototypeCall(promisify(this.close), this); }; +Server.prototype[SymbolAsyncIterator] = function() { + kFirstEventParam ??= require('internal/events/symbols').kFirstEventParam; + return EventEmitter.on(this, 'connection', { + close: ['close'], + [kFirstEventParam]: true, + }); +}; + Server.prototype._emitCloseIfDrained = function() { debug('SERVER _emitCloseIfDrained'); @@ -2576,6 +2587,10 @@ module.exports = { connect, createConnection: connect, createServer, + get promises() { + netPromises ??= require('internal/net/promises'); + return netPromises; + }, isIP: isIP, isIPv4: isIPv4, isIPv6: isIPv6, diff --git a/lib/net/promises.js b/lib/net/promises.js new file mode 100644 index 00000000000000..b87773474a08a3 --- /dev/null +++ b/lib/net/promises.js @@ -0,0 +1,3 @@ +'use strict'; + +module.exports = require('internal/net/promises'); diff --git a/test/parallel/test-net-promises-connect.js b/test/parallel/test-net-promises-connect.js new file mode 100644 index 00000000000000..89556690195557 --- /dev/null +++ b/test/parallel/test-net-promises-connect.js @@ -0,0 +1,62 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const net = require('net'); +const { once } = require('events'); +const { connect } = require('net/promises'); + +(async () => { + // Resolves with a connected socket and round-trips data. + { + const server = net.createServer((socket) => { + socket.end('hello'); + }).listen(0); + await once(server, 'listening'); + const socket = await connect({ port: server.address().port }); + assert.strictEqual(socket.connecting, false); + const chunks = []; + for await (const chunk of socket) { + chunks.push(chunk); + } + assert.strictEqual(Buffer.concat(chunks).toString(), 'hello'); + server.close(); + } + + // net.promises is the same object as require('net/promises'). + assert.strictEqual(net.promises, require('net/promises')); + + // Rejects when the connection is refused. + { + const server = net.createServer().listen(0); + await once(server, 'listening'); + const { port } = server.address(); + server.close(); + await once(server, 'close'); + await assert.rejects(connect({ port }), { code: 'ECONNREFUSED' }); + } + + // A pre-aborted signal rejects with an AbortError. + { + await assert.rejects( + connect({ port: 0, signal: AbortSignal.abort() }), + { name: 'AbortError' }); + } + + // Aborting while connecting rejects with an AbortError. + { + const server = net.createServer().listen(0); + await once(server, 'listening'); + const controller = new AbortController(); + const promise = connect({ port: server.address().port, signal: controller.signal }); + controller.abort(); + await assert.rejects(promise, { name: 'AbortError' }); + server.close(); + } + + // An invalid signal throws. + { + await assert.rejects( + connect({ port: 0, signal: 'INVALID_SIGNAL' }), + { code: 'ERR_INVALID_ARG_TYPE' }); + } +})().then(common.mustCall()); diff --git a/test/parallel/test-net-promises-listen.js b/test/parallel/test-net-promises-listen.js new file mode 100644 index 00000000000000..7e2d616ef02e5d --- /dev/null +++ b/test/parallel/test-net-promises-listen.js @@ -0,0 +1,71 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const net = require('net'); +const { once } = require('events'); +const { listen } = require('net/promises'); + +(async () => { + // Resolves with a listening server. + { + const server = await listen({ port: 0 }); + assert.strictEqual(server.listening, true); + assert.strictEqual(typeof server.address().port, 'number'); + server.close(); + } + + // The signal aborts the server for its entire lifetime, not just the + // pending listen: aborting after it is listening closes the server. + { + const controller = new AbortController(); + const server = await listen({ port: 0, signal: controller.signal }); + assert.strictEqual(server.listening, true); + const closed = once(server, 'close'); + controller.abort(); + await closed; + assert.strictEqual(server.listening, false); + } + + // The connectionListener option receives incoming connections. + { + const server = await listen({ + port: 0, + connectionListener: common.mustCall((socket) => { + socket.end(); + server.close(); + }), + }); + const client = net.connect(server.address().port); + client.resume(); + } + + // A pre-aborted signal rejects with an AbortError. + { + await assert.rejects( + listen({ port: 0, signal: AbortSignal.abort() }), + { name: 'AbortError' }); + } + + // Aborting while binding rejects with an AbortError and closes the server. + { + const controller = new AbortController(); + const promise = listen({ port: 0, signal: controller.signal }); + controller.abort(); + await assert.rejects(promise, { name: 'AbortError' }); + } + + // An invalid signal throws. + { + await assert.rejects( + listen({ port: 0, signal: 'INVALID_SIGNAL' }), + { code: 'ERR_INVALID_ARG_TYPE' }); + } + + // Rejects when the address is already in use. + { + const first = await listen({ port: 0 }); + const { port } = first.address(); + await assert.rejects(listen({ port }), { code: 'EADDRINUSE' }); + first.close(); + } +})().then(common.mustCall()); diff --git a/test/parallel/test-net-server-async-iterator.js b/test/parallel/test-net-server-async-iterator.js new file mode 100644 index 00000000000000..d55428dad68b24 --- /dev/null +++ b/test/parallel/test-net-server-async-iterator.js @@ -0,0 +1,50 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const net = require('net'); +const { once } = require('events'); + +(async () => { + // `for await...of` yields each incoming connection as a net.Socket. + { + const server = net.createServer().listen(0); + await once(server, 'listening'); + const { port } = server.address(); + + const total = 3; + const clients = []; + for (let i = 0; i < total; i++) { + clients.push(net.connect(port)); + } + + let count = 0; + for await (const socket of server) { + assert.ok(socket instanceof net.Socket); + socket.end(); + if (++count === total) break; + } + assert.strictEqual(count, total); + + // Breaking out of the loop stops iterating but does not close the server. + assert.strictEqual(server.listening, true); + + server.close(); + for (const client of clients) { + client.destroy(); + } + } + + // Iteration ends cleanly when the server closes. + { + const server = net.createServer().listen(0); + await once(server, 'listening'); + process.nextTick(() => server.close()); + + let count = 0; + for await (const socket of server) { + count++; + socket.end(); + } + assert.strictEqual(count, 0); + } +})().then(common.mustCall()); diff --git a/test/parallel/test-repl-tab-complete-import.js b/test/parallel/test-repl-tab-complete-import.js index ed8a6c2de5efc5..7a29861ed0baa6 100644 --- a/test/parallel/test-repl-tab-complete-import.js +++ b/test/parallel/test-repl-tab-complete-import.js @@ -52,11 +52,12 @@ replServer.complete("import\t( 'n", common.mustSucceed((data) => { assert.notStrictEqual(lastIndex, -1); }); assert.strictEqual(completions[lastIndex + 1], ''); - // There is only one Node.js module that starts with n: + // The Node.js modules that start with n: assert.strictEqual(completions[lastIndex + 2], 'net'); - assert.strictEqual(completions[lastIndex + 3], ''); + assert.strictEqual(completions[lastIndex + 3], 'net/promises'); + assert.strictEqual(completions[lastIndex + 4], ''); // It's possible to pick up non-core modules too - for (const completion of completions.slice(lastIndex + 4)) { + for (const completion of completions.slice(lastIndex + 5)) { assert.match(completion, /^n/); } })); diff --git a/test/parallel/test-repl-tab-complete-require.js b/test/parallel/test-repl-tab-complete-require.js index 47c03d8d5990a6..f7c3f5c1960ff0 100644 --- a/test/parallel/test-repl-tab-complete-require.js +++ b/test/parallel/test-repl-tab-complete-require.js @@ -67,11 +67,12 @@ const { startNewREPLServer } = require('../common/repl'); assert.notStrictEqual(lastIndex, -1); } assert.strictEqual(data[0][lastIndex + 1], ''); - // There is only one Node.js module that starts with n: + // The Node.js modules that start with n: assert.strictEqual(data[0][lastIndex + 2], 'net'); - assert.strictEqual(data[0][lastIndex + 3], ''); + assert.strictEqual(data[0][lastIndex + 3], 'net/promises'); + assert.strictEqual(data[0][lastIndex + 4], ''); // It's possible to pick up non-core modules too - for (const completion of data[0].slice(lastIndex + 4)) { + for (const completion of data[0].slice(lastIndex + 5)) { assert.match(completion, /^n/); } })