diff --git a/lib/DBSQLOperation.ts b/lib/DBSQLOperation.ts index 1ec1922c..3a8d40dd 100644 --- a/lib/DBSQLOperation.ts +++ b/lib/DBSQLOperation.ts @@ -65,6 +65,20 @@ export default class DBSQLOperation implements IOperation { private sessionId?: string; + // Serialises all result consumption on THIS operation. `fetchAll` holds it + // across its entire drain loop; `fetchChunk` / `hasMoreRows` hold it per + // call. The result pipeline behind every backend is a single stateful cursor + // (the kernel path in particular threads shared, non-atomic prefetch state + // through `KernelResultsProvider` → `ArrowResultConverter` → `ResultSlicer`), + // so concurrent consumers on one operation must be serialised or they corrupt + // that cursor and silently drop rows. Holding the lock across the WHOLE + // `fetchAll` drain (not per chunk) is what makes two concurrent `fetchAll()` + // calls behave like the Thrift backend: the first drains the complete result + // set, the second observes an exhausted cursor and returns `[]` — rather than + // splitting the rows between them. Uncontended on the normal single-consumer + // path (the chain is an already-resolved promise). + private fetchChain: Promise = Promise.resolve(); + constructor(options: DBSQLOperationConstructorOptions) { this.context = options.context; this.backend = options.backend; @@ -115,21 +129,30 @@ export default class DBSQLOperation implements IOperation { * const result = await queryOperation.fetchAll(); */ public async fetchAll(options?: FetchOptions): Promise> { - const data: Array> = []; - - const fetchChunkOptions = { - ...options, - disableBuffering: true, - }; - - do { - // eslint-disable-next-line no-await-in-loop - const chunk = await this.fetchChunk(fetchChunkOptions); - data.push(chunk); - } while (await this.hasMoreRows()); // eslint-disable-line no-await-in-loop - this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.id}`); - - return data.flat(); + // Hold the fetch lock across the ENTIRE drain (see `fetchChain`): a + // concurrent fetchAll()/fetchChunk() on the same operation queues behind + // this loop instead of interleaving with it. The loop calls the + // *Internal (non-locking) primitives to avoid self-deadlock on the lock we + // already hold. Error telemetry wraps the whole drain. + return this.runFetchExclusive(() => + this.withErrorTelemetry(async () => { + const data: Array> = []; + + const fetchChunkOptions = { + ...options, + disableBuffering: true, + }; + + do { + // eslint-disable-next-line no-await-in-loop + const chunk = await this.fetchChunkInternal(fetchChunkOptions); + data.push(chunk); + } while (await this.hasMoreRowsInternal()); // eslint-disable-line no-await-in-loop + this.context.getLogger().log(LogLevel.debug, `Fetched all data from operation with id: ${this.id}`); + + return data.flat(); + }), + ); } /** @@ -142,7 +165,7 @@ export default class DBSQLOperation implements IOperation { * const result = await queryOperation.fetchChunk({maxRows: 1000}); */ public async fetchChunk(options?: FetchOptions): Promise> { - return this.withErrorTelemetry(() => this.fetchChunkInternal(options)); + return this.runFetchExclusive(() => this.withErrorTelemetry(() => this.fetchChunkInternal(options))); } private async fetchChunkInternal(options?: FetchOptions): Promise> { @@ -241,21 +264,26 @@ export default class DBSQLOperation implements IOperation { } public async hasMoreRows(): Promise { - return this.withErrorTelemetry(async () => { - // If operation is closed or cancelled - we should not try to get data from it - if (this.closed || this.cancelled) { - return false; - } + return this.runFetchExclusive(() => this.withErrorTelemetry(() => this.hasMoreRowsInternal())); + } - // Wait for operation to finish before checking for more rows - // This ensures metadata can be fetched successfully - if (this.backend.hasResultSet()) { - await this.waitUntilReadyThroughBackend(); - } + // Non-locking body of `hasMoreRows`. Called directly by `fetchAll`'s drain + // loop (which already holds `fetchChain`) and by the public `hasMoreRows` + // wrapper (which acquires it). Must never acquire the lock itself. + private async hasMoreRowsInternal(): Promise { + // If operation is closed or cancelled - we should not try to get data from it + if (this.closed || this.cancelled) { + return false; + } - // If we fetched all the data from server - check if there's anything buffered in result handler - return this.backend.hasMore(); - }); + // Wait for operation to finish before checking for more rows + // This ensures metadata can be fetched successfully + if (this.backend.hasResultSet()) { + await this.waitUntilReadyThroughBackend(); + } + + // If we fetched all the data from server - check if there's anything buffered in result handler + return this.backend.hasMore(); } public async getSchema(options?: GetSchemaOptions): Promise { @@ -338,6 +366,21 @@ export default class DBSQLOperation implements IOperation { } } + // Run `fn` with exclusive access to this operation's result cursor by + // chaining it onto `fetchChain`. The next caller waits for this one to settle + // (success OR failure) before starting, so the single stateful fetch pipeline + // is only ever driven by one in-flight consumer. A rejection is delivered to + // THIS caller but not propagated to the next waiter (the chain swallows it), + // so one failed fetch never poisons subsequent fetches. + private runFetchExclusive(fn: () => Promise): Promise { + const run = this.fetchChain.then(fn, fn); + this.fetchChain = run.then( + () => undefined, + () => undefined, + ); + return run; + } + private async failIfClosed(): Promise { if (this.closed) { throw new OperationStateError(OperationStateErrorCode.Closed); diff --git a/tests/unit/DBSQLOperation.test.ts b/tests/unit/DBSQLOperation.test.ts index f9195105..1fbde86a 100644 --- a/tests/unit/DBSQLOperation.test.ts +++ b/tests/unit/DBSQLOperation.test.ts @@ -1008,20 +1008,26 @@ describe('DBSQLOperation', () => { const originalData = [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]; const tempData = [...originalData]; - const fetchChunkStub = sinon.stub(operation, 'fetchChunk').callsFake(async (): Promise> => { - return tempData.splice(0, 3); - }); - const hasMoreRowsStub = sinon.stub(operation, 'hasMoreRows').callsFake(async () => { + // Warning: this check is implementation-specific. + // `fetchAll` holds the per-operation fetch lock across the entire drain, so + // it calls the *non-locking* internal primitives (`fetchChunkInternal` / + // `hasMoreRowsInternal`) rather than the public `fetchChunk` / `hasMoreRows` + // (which re-acquire the same lock and would self-deadlock). Stub the + // internals that the drain loop actually invokes. + const fetchChunkStub = sinon + .stub(operation as any, 'fetchChunkInternal') + .callsFake(async (): Promise> => { + return tempData.splice(0, 3); + }); + const hasMoreRowsStub = sinon.stub(operation as any, 'hasMoreRowsInternal').callsFake(async () => { return tempData.length > 0; }); const fetchedData = await operation.fetchAll(); - // Warning: this check is implementation-specific - // `fetchAll` should wait for operation to complete. In current implementation - // it does so by calling `fetchChunk` at least once, which internally does - // all the job. But since here we stub `fetchChunk` it won't really wait, - // therefore here we ensure it was called at least once + // `fetchAll` should wait for the operation to complete; in the current + // implementation it does so by draining via `fetchChunkInternal` at least + // once, which internally does all the work. expect(fetchChunkStub.callCount).to.be.gte(1); expect(fetchChunkStub.called).to.be.true;