diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b65f9c55c..664b43dca 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -72,6 +72,7 @@ jobs: - { name: dry-run, script: integration/dry_run/run.sh } - { name: copy-data, script: integration/copy_data/run.sh } - { name: load-balancer, script: integration/load_balancer/run.sh } + - { name: prepared-statements=full, script: integration/prepared_statements_full/run.sh } # plugins/run.sh builds 4 plugin crates with cargo, so it needs # the workspace target/ cache; the other entries just run the # cached pgdog binary. diff --git a/docs/issues/PROTOCOL_OUT_OF_SYNC.md b/docs/issues/PROTOCOL_OUT_OF_SYNC.md new file mode 100644 index 000000000..077f98ca6 --- /dev/null +++ b/docs/issues/PROTOCOL_OUT_OF_SYNC.md @@ -0,0 +1,300 @@ +# ProtocolOutOfSync — Known Root Causes + +`Error::ProtocolOutOfSync` fires when `ProtocolState`'s expected-response queue diverges from what +the backend actually sends. The catch site is `server.rs:394`; the connection is permanently marked +`State::Error` and discarded from the pool. + +**Queue mechanics** (`pgdog/src/backend/protocol/state.rs`). `handle()` pushes one `ExecutionItem` +per anticipated response before forwarding any client message. As server bytes arrive, `forward()` +calls `action(code)` which pops the queue front and checks the match. Two conditions raise +`ProtocolOutOfSync`: + +- **Empty queue** (`state.rs:168`) — a tracked message arrives but nothing was expected. +- **Ignore mismatch** (`state.rs:188–191`) — queue front is an `Ignore` slot but the server sent a + different code. + +--- + +## ✅ Issue 0 — `extended` flag never resets; `SET`-batch error poisons connection + +**Severity:** High — triggered by normal operation when `query_parser = "on"` and any parameterised +query has run on the connection. + +**Commit:** `54c9d3a942bdac55873f17d2af74ef1dfedb3f4a` + +**Location:** `pgdog/src/backend/protocol/state.rs`, `action()`, Error and ReadyForQuery arms. + +### Error + +When `query_parser = "on"`, pgdog caches every `SET` issued by a client so it can replay them on +whichever backend connection services the next request. The replay calls `execute_batch`, which +queues one `Code(RFQ)` per command — two cached `SET`s produce `[Code(RFQ), Code(RFQ)]`. + +Two independent defects caused `ProtocolOutOfSync` when a cached `SET` produced a server Error: + +**Defect 1 — `extended` one-way latch.** `ProtocolState.extended` was set the moment the first +`Parse`/`Bind` message passed through the connection, and never cleared. The Error arm +unconditionally set `out_of_sync = true` whenever `extended == true`. This permanently poisoned any +connection that had ever run an extended-protocol query — even for entirely unrelated subsequent +errors, such as a failed `SET` replay that has nothing to do with the original parameterised query. + +**Defect 2 — destructive queue clear on batch error.** The Error arm used `pop_back + clear + +push_back(RFQ)`, designed for a single-command queue. With two `Code(RFQ)` slots that sequence +collapses both into one: the first `ReadyForQuery` from the backend consumed the only remaining RFQ +slot; the second `SET`'s `CommandComplete` arrived against an empty queue → `ProtocolOutOfSync`. +The client never saw the actual SET error. + +Both defects are independent. Defect 2 fires on any two-command batch regardless of `extended`; +Defect 1 additionally poisons the connection via `out_of_sync = true`. + +All four conditions must hold simultaneously for the bug to surface: + +1. `query_parser = "on"` — pgdog tracks and replays `SET` commands via `execute_batch`. +2. Two or more `SET` commands in the cached session state, at least one with an invalid value — + produces two `Code(RFQ)` slots and causes PostgreSQL to return an Error on the first replayed `SET`. +3. A prior extended-protocol query on the same connection — sets `extended = true`, arming the + `out_of_sync = true` branch on any subsequent error (Defect 1). +4. Any subsequent query after the failed `execute_batch` — hits a connection whose `out_of_sync` is + `true`, causing `PG::ConnectionBad` at the client. + +### Fix + +Two changes in `state.rs` (commit `54c9d3a9`): + +**Defect 2:** A `!self.extended` fast-path in the Error arm returns `Action::Forward` immediately +when the queue front is `Code(RFQ)`, leaving all remaining slots untouched. For the extended path, +a `rfq_pos` scan drains only items before the first `Code(RFQ)` boundary — never consuming slots +belonging to subsequent queued commands. + +**Defect 1:** After every `ReadyForQuery`, `extended` is recomputed from the remaining queue items +(`self.queue.iter().any(ExecutionItem::extended)`). When the pipeline drains, `extended` resets to +`false`, breaking the one-way latch. The `ExecutionItem::extended()` helper in `state.rs` enables +this scan. + +### Tests + +Covered by a state-machine unit test (`test_pipelined_simple_query_error_keeps_next_query_response` +in `state.rs`) and two server integration tests in `server.rs` that require a live PostgreSQL +connection. An end-to-end reproduction lives in `integration/ruby/pg_spec.rb`. + +--- + +## ✅ Issue 1 — Failed `PREPARE` orphans the EXECUTE ReadyForQuery + +**Severity:** High — triggered by normal server behaviour; no client misbehaviour required. + +**Location:** `pgdog/src/backend/prepared_statements.rs`, `ProtocolMessage::Prepare` arm; +`pgdog/src/backend/protocol/state.rs`, Error handler. + +### Error + +When pgdog rewrites a simple-query `EXECUTE stmt(args)` it injects a synthetic `PREPARE` ahead of +it, producing two messages that are enqueued independently. After both `handle()` calls the queue +is: + +``` +[Ignore(ExecutionCompleted), Ignore(ReadyForQuery), Code(ReadyForQuery)] + ↑────────── handle(Prepare) ──────────↑ ↑── handle(Query) ──↑ +``` + +If the injected `PREPARE` fails on the server: + +| Step | Server sends | Error handler action | Queue after | +|---|---|---|---| +| 1 | `Error` for PREPARE | simple-query path: drain `Ignore(C)` and `Ignore(Z)`; stop at `Code(RFQ)` | `[Code(RFQ)]` | +| 2 | `ReadyForQuery` for PREPARE | `pop_front` → `Code(RFQ)` consumed | **empty** | +| 3 | `Error` for EXECUTE (statement absent) | simple-query path: queue empty; loop is a no-op | **empty** | +| 4 | `ReadyForQuery` for EXECUTE | `pop_front` on empty → **ProtocolOutOfSync** | — | + +The simple-query Error handler pops items from the front of the queue until it reaches a +`Code(ReadyForQuery)`, which it treats as the boundary of the current logical request. When the +injected PREPARE fails, that drain consumes both `Ignore` slots and leaves `Code(ReadyForQuery)` as +the sole item. The subsequent `Z` for the PREPARE then consumes *that* slot, emptying the queue +entirely. The EXECUTE sub-request's own `Error` arrives against an empty queue — the loop is a +no-op — and its `ReadyForQuery` fires `ProtocolOutOfSync`. + +Under high concurrency this becomes near-deterministic: the pool fast-path (`Guard::drop` → +`checkin` → `put`) hands a connection directly to a waiting client with no healthcheck and no +opportunity to drain the kernel socket buffer. The next query on that client consumes the stale +EXECUTE `Error + ReadyForQuery`, producing `ProtocolOutOfSync`. + +### Fix + +Error handler in `state.rs`, `ExecutionCode::Error` arm (see inline comments for full detail). + +On error, find the first `Code(ReadyForQuery)` in the queue (the client's RFQ boundary), drain +everything before it, count the `Ignore(RFQ)` slots in the drained portion, and prepend one +`[Ignore(RFQ), Ignore(Error)]` pair per slot. This reconstructs the correct response skeleton for +every injected sub-request that was ahead of the failing one, so each pending `ReadyForQuery` from +the backend still has a matching slot to consume. + +A separate fast-path at the top of the arm handles the case where the queue front is already +`Ignore(Error)` — subsequent errors from the same injected sub-request — by popping and returning +`Action::Ignore` directly, rather than running the full drain logic again. + +### Tests + +Two integration tests in `integration/prepared_statements_full/protocol_out_of_sync_spec.rb` cover +the session-pooled and transaction-pooled cases. A unit test (`test_injected_prepare_error_full_lifecycle` +in `state.rs`) guards the state-machine invariant directly. + +--- + +## ✅ Issue 2 — Double `action()` call in `forward()` for server CopyDone + +**Severity:** Medium — requires the client to omit a trailing `Sync`. + +**Location:** `pgdog/src/backend/prepared_statements.rs`, `forward()`. + +### Error + +`forward()` called `state.action(code)` unconditionally near the top of the function to advance the +state machine, then called it a second time inside the `'c'` (CopyDone) match arm. For the common +case this was harmless: a `Code(ReadyForQuery)` is always in the queue, and `action('Z')` pushes +that slot back rather than consuming it, making the double call idempotent. + +The unsafe path triggers when a client sends `Parse + Bind + Execute + Flush` without a trailing +`Sync`. In that case `handle()` builds `[Code(ParseComplete), Code(BindComplete), +Code(ExecutionCompleted)]` with no `Code(ReadyForQuery)` backstop. When the server responds with +CopyDone: + +``` +First action('c'): pops Code(ExecutionCompleted) — consumed +Second action('c'): empty queue → ProtocolOutOfSync +``` + +No libpq-based driver ever sends `Execute` without `Sync`, so this path was not reachable through +normal client behaviour. + +### Fix + +Removed the redundant `self.state.action(code)?` from the `'c'` arm in `forward()`. The call at +the top of the function already advances the state machine for CopyDone; the arm body is now empty. + +### Tests + +Two state-machine unit tests in `state.rs` pin the safe (RFQ backstop present) and unsafe (no +backstop) paths. Two server-level tests in `server.rs` cover the `Parse + Bind + Execute + Flush` +sequence with and without a trailing `Sync`. + +--- + +## ✅ Issue 4 — `extended` flag set at enqueue time, not at processing time + +**Severity:** Low in production — not triggerable through current code paths. Dangerous if triggered: +a write query executed on the backend may never be confirmed to the client. + +**Location:** `pgdog/src/backend/protocol/state.rs`, `add()` / `add_ignore()` and the +`ReadyForQuery` recalculation in `action()`. + +### Error + +Two related bugs in how `extended` was maintained: + +**Bug A — `add()` sets the flag too early.** `add()` and `add_ignore()` updated `extended` the +moment any `ParseComplete` or `BindComplete` was enqueued: + +```rust +self.extended = self.extended || code.extended(); +``` + +If a simple query occupied the head of the queue and an extended query was enqueued behind it, +`extended` flipped to `true` before any message had been processed. When the simple query at the +head then errored, the Error handler saw `extended == true` and took the extended path — setting +`out_of_sync = true` and clearing the queue — destroying the extended query's pending entries even +though those entries had not yet been touched. + +**Bug B — `ReadyForQuery` recalculation scans the entire remaining queue.** After consuming a +`ReadyForQuery`, the flag was recomputed as: + +```rust +self.extended = self.queue.iter().any(ExecutionItem::extended); +``` + +This scanned all remaining entries, not just those belonging to the next sub-request. If two simple +queries preceded an extended query, after the first simple query's `ReadyForQuery` was consumed the +scan found the extended items two hops away and set `extended = true`. The second simple query's +error then incorrectly took the extended path. + +Neither bug surfaces in production because `ClientRequest::spliced()` never places a simple query +and an extended query in the same `ProtocolState` queue — each sub-request gets its own fresh state. +However, if a write query were somehow affected, the backend would execute the statement while pgdog +hits `ProtocolOutOfSync` and discards the connection, leaving the client with no confirmation — +silent data inconsistency. + +### Fix + +The `extended` field was removed entirely from `ProtocolState`. Both bugs stemmed from maintaining +it as a cached flag that drifted out of phase with the queue head. Removing it eliminates both +defects at the root. + +`out_of_sync` is now set unconditionally for all errors (simple or extended) and cleared +unconditionally on every `ReadyForQuery`. The distinction is irrelevant: after any error the +connection must wait for the peer's `ReadyForQuery` before being reused, and that `ReadyForQuery` +always arrives as the next terminal message in the same sub-request: + +``` +Error → out_of_sync = true +ReadyForQuery → out_of_sync = false (connection clean; entries beyond this boundary intact) +``` + +### Tests + +Three unit tests in `state.rs` cover Bug A, Bug B, and the combined end-to-end case at the `server.rs` +level. The integration test `extended query succeeds after preceding simple query error` in +`integration/ruby/protocol_out_of_sync_spec.rb` serves as a regression guard via the normal pool path. + +--- + +## ✅ Issue 5 — Error in first pipelined request clears subsequent requests' queue entries + +**Severity:** Low — not reproducible through production code paths; included as a guard against +future refactoring. + +**Location:** `pgdog/src/backend/protocol/state.rs`, Error handler (`action()`, extended branch). + +### Error + +When a client pipelines multiple extended-query sequences — each terminated by its own `Sync` — and +all of them share one `ProtocolState` queue, an error in the first sequence causes `queue.clear()` +on the entire queue. This destroys the pending entries for all subsequent sequences, causing +`ProtocolOutOfSync` when their backend responses arrive. + +Suppose three sequences share one queue, abbreviated as entries `[1,2,C,Z]` per sequence (Parse, +Bind, CommandComplete, ReadyForQuery): + +``` +[1,2,C,Z, 1,2,C,Z, 1,2,C,Z] + ^─seq1─^ ^─seq2─^ ^─seq3─^ +``` + +After seq1 consumes ParseComplete (`1`) and BindComplete (`2`), the queue is `[C,Z,1,2,C,Z,1,2,C,Z]`. +Seq1 then errors: + +| Step | Action | Queue after | +|---|---|---| +| Error arm fires | `pop_back()` → seq3's `Z`; `clear()` removes `[C,Z,1,2,C,Z,1,2,C]`; `push_back(Z)` | `[Z]` | +| seq1 ReadyForQuery arrives | pops `Z` normally | **empty** | +| seq2 ParseComplete arrives | `pop_front()` on empty queue | **ProtocolOutOfSync** | + +This does not affect production because `ClientRequest::spliced()` splits every multi-Execute +pipeline into sub-requests at `Execute` boundaries, placing each `Sync` in its own standalone +sub-request that is sent and fully drained before the next one is enqueued. The `ProtocolState` +queue therefore only ever holds the entries for a single sync group at a time. + +This is a load-bearing invariant. If `spliced()` is ever changed to batch multiple sync groups into +one send, this bug will surface immediately. + +### Fix + +The Error arm drains only the failing sync group's entries — from the current queue head up to and +including its own `ReadyForQuery` boundary — using `drain(..rfq_pos)`, leaving everything beyond +intact. A fallback `queue.clear()` applies only when no `Code(ReadyForQuery)` exists in the queue +at all (nothing beyond the current group to preserve). + +The old `pop_back + clear + push_back(RFQ)` pattern was removed. + +### Tests + +Two unit tests in `state.rs` and one server-level test in `server.rs` cover the scenario where +seq1 errors and seq2/seq3 must complete normally. diff --git a/integration/prepared_statements_full/Gemfile b/integration/prepared_statements_full/Gemfile new file mode 100644 index 000000000..6f147c82f --- /dev/null +++ b/integration/prepared_statements_full/Gemfile @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +source 'https://rubygems.org' +gem 'pg' +gem 'rails' +gem 'rspec', '~> 3.4' +gem 'rubocop' +gem 'toxiproxy' diff --git a/integration/prepared_statements_full/Gemfile.lock b/integration/prepared_statements_full/Gemfile.lock new file mode 100644 index 000000000..a9e17f4d2 --- /dev/null +++ b/integration/prepared_statements_full/Gemfile.lock @@ -0,0 +1,276 @@ +GEM + remote: https://rubygems.org/ + specs: + action_text-trix (2.1.18) + railties + actioncable (8.1.3) + actionpack (= 8.1.3) + activesupport (= 8.1.3) + nio4r (~> 2.0) + websocket-driver (>= 0.6.1) + zeitwerk (~> 2.6) + actionmailbox (8.1.3) + actionpack (= 8.1.3) + activejob (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + mail (>= 2.8.0) + actionmailer (8.1.3) + actionpack (= 8.1.3) + actionview (= 8.1.3) + activejob (= 8.1.3) + activesupport (= 8.1.3) + mail (>= 2.8.0) + rails-dom-testing (~> 2.2) + actionpack (8.1.3) + actionview (= 8.1.3) + activesupport (= 8.1.3) + nokogiri (>= 1.8.5) + rack (>= 2.2.4) + rack-session (>= 1.0.1) + rack-test (>= 0.6.3) + rails-dom-testing (~> 2.2) + rails-html-sanitizer (~> 1.6) + useragent (~> 0.16) + actiontext (8.1.3) + action_text-trix (~> 2.1.15) + actionpack (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + globalid (>= 0.6.0) + nokogiri (>= 1.8.5) + actionview (8.1.3) + activesupport (= 8.1.3) + builder (~> 3.1) + erubi (~> 1.11) + rails-dom-testing (~> 2.2) + rails-html-sanitizer (~> 1.6) + activejob (8.1.3) + activesupport (= 8.1.3) + globalid (>= 0.3.6) + activemodel (8.1.3) + activesupport (= 8.1.3) + activerecord (8.1.3) + activemodel (= 8.1.3) + activesupport (= 8.1.3) + timeout (>= 0.4.0) + activestorage (8.1.3) + actionpack (= 8.1.3) + activejob (= 8.1.3) + activerecord (= 8.1.3) + activesupport (= 8.1.3) + marcel (~> 1.0) + activesupport (8.1.3) + base64 + bigdecimal + concurrent-ruby (~> 1.0, >= 1.3.1) + connection_pool (>= 2.2.5) + drb + i18n (>= 1.6, < 2) + json + logger (>= 1.4.2) + minitest (>= 5.1) + securerandom (>= 0.3) + tzinfo (~> 2.0, >= 2.0.5) + uri (>= 0.13.1) + ast (2.4.3) + base64 (0.3.0) + bigdecimal (4.1.1) + builder (3.3.0) + concurrent-ruby (1.3.6) + connection_pool (3.0.2) + crass (1.0.6) + date (3.5.1) + diff-lcs (1.6.2) + drb (2.2.3) + erb (6.0.2) + erubi (1.13.1) + globalid (1.3.0) + activesupport (>= 6.1) + i18n (1.14.8) + concurrent-ruby (~> 1.0) + io-console (0.8.2) + irb (1.17.0) + pp (>= 0.6.0) + prism (>= 1.3.0) + rdoc (>= 4.0.0) + reline (>= 0.4.2) + json (2.19.3) + language_server-protocol (3.17.0.5) + lint_roller (1.1.0) + logger (1.7.0) + loofah (2.25.1) + crass (~> 1.0.2) + nokogiri (>= 1.12.0) + mail (2.9.0) + logger + mini_mime (>= 0.1.1) + net-imap + net-pop + net-smtp + marcel (1.1.0) + mini_mime (1.1.5) + minitest (6.0.3) + drb (~> 2.0) + prism (~> 1.5) + net-imap (0.6.3) + date + net-protocol + net-pop (0.1.2) + net-protocol + net-protocol (0.2.2) + timeout + net-smtp (0.5.1) + net-protocol + nio4r (2.7.5) + nokogiri (1.19.2-aarch64-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-aarch64-linux-musl) + racc (~> 1.4) + nokogiri (1.19.2-arm-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-arm-linux-musl) + racc (~> 1.4) + nokogiri (1.19.2-arm64-darwin) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-darwin) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-linux-musl) + racc (~> 1.4) + parallel (1.28.0) + parser (3.3.11.1) + ast (~> 2.4.1) + racc + pg (1.6.3) + pg (1.6.3-aarch64-linux) + pg (1.6.3-aarch64-linux-musl) + pg (1.6.3-arm64-darwin) + pg (1.6.3-x86_64-darwin) + pg (1.6.3-x86_64-linux) + pg (1.6.3-x86_64-linux-musl) + pp (0.6.3) + prettyprint + prettyprint (0.2.0) + prism (1.9.0) + psych (5.3.1) + date + stringio + racc (1.8.1) + rack (3.2.6) + rack-session (2.1.1) + base64 (>= 0.1.0) + rack (>= 3.0.0) + rack-test (2.2.0) + rack (>= 1.3) + rackup (2.3.1) + rack (>= 3) + rails (8.1.3) + actioncable (= 8.1.3) + actionmailbox (= 8.1.3) + actionmailer (= 8.1.3) + actionpack (= 8.1.3) + actiontext (= 8.1.3) + actionview (= 8.1.3) + activejob (= 8.1.3) + activemodel (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + bundler (>= 1.15.0) + railties (= 8.1.3) + rails-dom-testing (2.3.0) + activesupport (>= 5.0.0) + minitest + nokogiri (>= 1.6) + rails-html-sanitizer (1.7.0) + loofah (~> 2.25) + nokogiri (>= 1.15.7, != 1.16.7, != 1.16.6, != 1.16.5, != 1.16.4, != 1.16.3, != 1.16.2, != 1.16.1, != 1.16.0.rc1, != 1.16.0) + railties (8.1.3) + actionpack (= 8.1.3) + activesupport (= 8.1.3) + irb (~> 1.13) + rackup (>= 1.0.0) + rake (>= 12.2) + thor (~> 1.0, >= 1.2.2) + tsort (>= 0.2) + zeitwerk (~> 2.6) + rainbow (3.1.1) + rake (13.3.1) + rdoc (7.2.0) + erb + psych (>= 4.0.0) + tsort + regexp_parser (2.12.0) + reline (0.6.3) + io-console (~> 0.5) + rspec (3.13.2) + rspec-core (~> 3.13.0) + rspec-expectations (~> 3.13.0) + rspec-mocks (~> 3.13.0) + rspec-core (3.13.6) + rspec-support (~> 3.13.0) + rspec-expectations (3.13.5) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-mocks (3.13.8) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-support (3.13.7) + rubocop (1.86.0) + json (~> 2.3) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.1.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 2.9.3, < 3.0) + rubocop-ast (>= 1.49.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 4.0) + rubocop-ast (1.49.1) + parser (>= 3.3.7.2) + prism (~> 1.7) + ruby-progressbar (1.13.0) + securerandom (0.4.1) + stringio (3.2.0) + thor (1.5.0) + timeout (0.6.1) + toxiproxy (2.0.2) + tsort (0.2.0) + tzinfo (2.0.6) + concurrent-ruby (~> 1.0) + unicode-display_width (3.2.0) + unicode-emoji (~> 4.1) + unicode-emoji (4.2.0) + uri (1.1.1) + useragent (0.16.11) + websocket-driver (0.8.0) + base64 + websocket-extensions (>= 0.1.0) + websocket-extensions (0.1.5) + zeitwerk (2.7.5) + +PLATFORMS + aarch64-linux + aarch64-linux-gnu + aarch64-linux-musl + arm-linux-gnu + arm-linux-musl + arm64-darwin + x86_64-darwin + x86_64-linux-gnu + x86_64-linux-musl + +DEPENDENCIES + pg + rails + rspec (~> 3.4) + rubocop + toxiproxy + +BUNDLED WITH + 2.7.2 diff --git a/integration/prepared_statements_full/dev.sh b/integration/prepared_statements_full/dev.sh new file mode 100755 index 000000000..f36274590 --- /dev/null +++ b/integration/prepared_statements_full/dev.sh @@ -0,0 +1,12 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +pushd ${SCRIPT_DIR} + +export GEM_HOME=~/.gem +mkdir -p ${GEM_HOME} +bundle install +bundle exec rspec *_spec.rb + +popd diff --git a/integration/prepared_statements_full/protocol_out_of_sync_spec.rb b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb new file mode 100644 index 000000000..11e3a889a --- /dev/null +++ b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require_relative 'rspec_helper' + +# Triggers the failed-prepare / orphaned-EXECUTE scenario. +# +# When pgdog rewrites a simple-query EXECUTE it injects [Prepare, Query]. +# If the injected PREPARE fails, the outer EXECUTE sub-request (Error + +# ReadyForQuery) must be consumed internally — nothing stale left on the wire. +def trigger_prepare_inject_failure(conn, statement_name:) + # PREPARE fails — pgdog caches the statement name despite the error. + expect { conn.exec "PREPARE #{statement_name} AS SELECT 1 FROM __pgdog_nonexistent_table__" } + .to raise_error(PG::Error, /__pgdog_nonexistent_table__/) + + # EXECUTE triggers [Prepare, Query] injection; the re-injected PREPARE fails + # again. pgdog must drain the orphaned EXECUTE E+Z internally and surface + # only the application-visible error to the caller. + expect { conn.exec "EXECUTE #{statement_name}" } + .to raise_error(PG::Error, /__pgdog_nonexistent_table__/) +end + +describe 'protocol out of sync regressions' do + after do + ensure_done + end + + # Session mode: a failed prepare-inject must not leave stale bytes on the + # wire. The connection stays usable for the next query. + it 'connection remains usable after failed prepare-inject in session mode' do + conn = connect_pgdog(user: 'pgdog_session') + begin + trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_session') + + result = conn.exec 'SELECT 1 AS alive' + expect(result.first['alive']).to eq('1') + ensure + conn.close unless conn.finished? + end + end + + # Transaction mode (pool_size=1): the backend connection is returned to the + # pool after each query. With a single backend any stale bytes not drained + # internally are visible to the very next borrower. The connection must be + # clean so the next query succeeds. + it 'connection remains usable after failed prepare-inject in transaction mode' do + conn = connect_pgdog(user: 'pgdog_tx_single') + begin + trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_tx') + + result = conn.exec 'SELECT 1 AS alive' + expect(result.first['alive']).to eq('1') + ensure + conn.close unless conn.finished? + end + end +end diff --git a/integration/prepared_statements_full/rspec_helper.rb b/integration/prepared_statements_full/rspec_helper.rb new file mode 100644 index 000000000..782cd1146 --- /dev/null +++ b/integration/prepared_statements_full/rspec_helper.rb @@ -0,0 +1,106 @@ +# frozen_string_literal: true + +require 'active_record' +require 'rspec' +require 'pg' +require 'toxiproxy' + +def admin + PG.connect('postgres://admin:pgdog@127.0.0.1:6432/admin') +end + +def admin_exec(sql) + conn = admin + conn.exec sql +ensure + conn&.close +end + +def failover + PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/failover') +end + +def admin_stats(database, column = nil) + conn = admin + stats = conn.exec 'SHOW STATS' + conn.close + stats = stats.select { |item| item['database'] == database } + return stats.map { |item| item[column].to_i } unless column.nil? + + stats +end + +def ensure_done + deadline = Time.now + 2 + pools = [] + clients = [] + servers = [] + pg_clients = [] + current_client_id = nil + + loop do + conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') + begin + pools = conn.exec 'SHOW POOLS' + current_client_id = conn.backend_pid + clients = conn.exec 'SHOW CLIENTS' + servers = conn.exec 'SHOW SERVERS' + ensure + conn.close + end + + pg_conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') + begin + pg_clients = pg_conn.exec 'SELECT state FROM pg_stat_activity'\ + " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ + " AND backend_type = 'client backend'"\ + " AND query NOT LIKE '%pg_stat_activity%'" + ensure + pg_conn.close + end + + pools_ready = pools.all? do |pool| + pool['sv_active'] == '0' && pool['cl_waiting'] == '0' && pool['out_of_sync'] == '0' + end + clients_ready = clients.all? do |client| + client['id'].to_i == current_client_id || client['state'] == 'idle' + end + servers_ready = servers + .select { |server| server['application_name'] != 'PgDog Pub/Sub Listener' } + .all? { |server| server['state'] == 'idle' } + pg_clients_ready = pg_clients.all? { |client| client['state'] == 'idle' } + + break if pools_ready && clients_ready && servers_ready && pg_clients_ready + break if Time.now >= deadline + + sleep 0.05 + end + + pools.each do |pool| + expect(pool['sv_active']).to eq('0') + expect(pool['cl_waiting']).to eq('0') + expect(pool['out_of_sync']).to eq('0') + end + + clients.each do |client| + next if client['id'].to_i == current_client_id + expect(client['state']).to eq('idle') + end + + servers + .select do |server| + server['application_name'] != 'PgDog Pub/Sub Listener' + end + .each do |server| + expect(server['state']).to eq('idle') + end + + pg_clients.each do |client| + expect(client['state']).to eq('idle') + end +end + + +def connect_pgdog(user: 'pgdog') + PG.connect(dbname: 'pgdog', user:, password: 'pgdog', port: 6432, host: '127.0.0.1') +end \ No newline at end of file diff --git a/integration/prepared_statements_full/run.sh b/integration/prepared_statements_full/run.sh new file mode 100755 index 000000000..3fec7612c --- /dev/null +++ b/integration/prepared_statements_full/run.sh @@ -0,0 +1,15 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source ${SCRIPT_DIR}/../common.sh + +# Native gem extensions (psych, pg) need yaml + libpq headers. +bash ${SCRIPT_DIR}/../ci/apt.sh ruby-dev libyaml-dev libpq-dev build-essential +command -v bundle >/dev/null || sudo gem install bundler --no-document + +run_pgdog $SCRIPT_DIR +wait_for_pgdog + +bash ${SCRIPT_DIR}/dev.sh + +stop_pgdog diff --git a/integration/prepared_statements_full/users.toml b/integration/prepared_statements_full/users.toml index 9a8205f04..a97596a57 100644 --- a/integration/prepared_statements_full/users.toml +++ b/integration/prepared_statements_full/users.toml @@ -2,3 +2,19 @@ database = "pgdog" name = "pgdog" password = "pgdog" + + +[[users]] +name = "pgdog_session" +database = "pgdog" +password = "pgdog" +server_user = "pgdog" +pooler_mode = "session" + +[[users]] +name = "pgdog_tx_single" +database = "pgdog" +password = "pgdog" +server_user = "pgdog" +pooler_mode = "transaction" +pool_size = 1 \ No newline at end of file diff --git a/integration/ruby/lb_spec.rb b/integration/ruby/lb_spec.rb index 9d11dcd7b..061d614eb 100644 --- a/integration/ruby/lb_spec.rb +++ b/integration/ruby/lb_spec.rb @@ -13,7 +13,7 @@ it 'distributes traffic evenly' do conn = failover # Reset stats and bans - admin.exec 'RECONNECT' + admin_exec 'RECONNECT' before = admin_stats('failover') 250.times do diff --git a/integration/ruby/protocol_out_of_sync_spec.rb b/integration/ruby/protocol_out_of_sync_spec.rb new file mode 100644 index 000000000..fda5f853a --- /dev/null +++ b/integration/ruby/protocol_out_of_sync_spec.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +require_relative 'rspec_helper' + +def connect(dbname = 'pgdog', user = 'pgdog') + PG.connect(dbname: dbname, user: user, password: 'pgdog', port: 6432, host: '127.0.0.1', application_name: '') +end + +describe 'protocol out of sync' do + after do + ensure_done + end + + # A simple query that errors must not prevent a subsequent extended query + # from executing. Both are sent as separate requests; pgdog must process + # each independently. + it 'extended query succeeds after preceding simple query error' do + conn = connect + + # Simple query that errors. + expect { conn.exec 'SELECT 1/0' }.to raise_error(PG::Error, /division by zero/) + + # Extended query must succeed despite the preceding error. + res = conn.exec_params 'SELECT $1::integer AS val', [42] + expect(res[0]['val']).to eq('42') + + conn.close + end + + # In pipeline mode, a failed first query must not prevent subsequent queries + # from executing. Seq2 and seq3 must return rows even when seq1 errors. + it 'extended query pipeline: error in seq1 does not drop seq2 and seq3' do + conn = connect + + conn.enter_pipeline_mode + + # Seq1: will fail — division by zero + conn.send_query_params 'SELECT 1/0', [] + conn.pipeline_sync + # Seq2: must succeed + conn.send_query_params 'SELECT $1::integer AS val', [2] + conn.pipeline_sync + # Seq3: must succeed + conn.send_query_params 'SELECT $1::integer AS val', [3] + conn.pipeline_sync + + # Seq1: fails — consume the error result, then the sync boundary. + begin + conn.get_result + rescue PG::Error => e + expect(e.message).to include('division by zero') + end + conn.get_result # nil — end of command + conn.get_result # PipelineSync — sync boundary + + # Seq2: must return a real row — if aborted instead, the request was dropped. + r2 = conn.get_result + expect(r2.result_status).to eq(PG::PGRES_TUPLES_OK) + expect(r2[0]['val']).to eq('2') + conn.get_result # end of command + conn.get_result # PipelineSync + + # Seq3: must return a real row. + r3 = conn.get_result + expect(r3.result_status).to eq(PG::PGRES_TUPLES_OK) + expect(r3[0]['val']).to eq('3') + conn.get_result # end of command + conn.get_result # PipelineSync + + conn.exit_pipeline_mode + conn.close + end +end diff --git a/integration/ruby/rspec_helper.rb b/integration/ruby/rspec_helper.rb index 8a7cf2990..98082f812 100644 --- a/integration/ruby/rspec_helper.rb +++ b/integration/ruby/rspec_helper.rb @@ -36,6 +36,13 @@ def admin PG.connect('postgres://admin:pgdog@127.0.0.1:6432/admin') end +def admin_exec(sql) + conn = admin + conn.exec sql +ensure + conn&.close +end + def failover PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/failover') end @@ -51,21 +58,63 @@ def admin_stats(database, column = nil) end def ensure_done - conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') - pools = conn.exec 'SHOW POOLS' + deadline = Time.now + 2 + pools = [] + clients = [] + servers = [] + pg_clients = [] + current_client_id = nil + + loop do + conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') + begin + pools = conn.exec 'SHOW POOLS' + current_client_id = conn.backend_pid + clients = conn.exec 'SHOW CLIENTS' + servers = conn.exec 'SHOW SERVERS' + ensure + conn.close + end + + pg_conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') + begin + pg_clients = pg_conn.exec 'SELECT state FROM pg_stat_activity'\ + " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ + " AND backend_type = 'client backend'"\ + " AND query NOT LIKE '%pg_stat_activity%'" + ensure + pg_conn.close + end + + pools_ready = pools.all? do |pool| + pool['sv_active'] == '0' && pool['cl_waiting'] == '0' && pool['out_of_sync'] == '0' + end + clients_ready = clients.all? do |client| + client['id'].to_i == current_client_id || client['state'] == 'idle' + end + servers_ready = servers + .select { |server| server['application_name'] != 'PgDog Pub/Sub Listener' } + .all? { |server| server['state'] == 'idle' } + pg_clients_ready = pg_clients.all? { |client| client['state'] == 'idle' } + + break if pools_ready && clients_ready && servers_ready && pg_clients_ready + break if Time.now >= deadline + + sleep 0.05 + end + pools.each do |pool| expect(pool['sv_active']).to eq('0') expect(pool['cl_waiting']).to eq('0') expect(pool['out_of_sync']).to eq('0') end - current_client_id = conn.backend_pid - clients = conn.exec 'SHOW CLIENTS' + clients.each do |client| next if client['id'].to_i == current_client_id expect(client['state']).to eq('idle') end - servers = conn.exec 'SHOW SERVERS' + servers .select do |server| server['application_name'] != 'PgDog Pub/Sub Listener' @@ -74,12 +123,7 @@ def ensure_done expect(server['state']).to eq('idle') end - conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') - clients = conn.exec 'SELECT state FROM pg_stat_activity'\ - " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ - " AND backend_type = 'client backend'"\ - " AND query NOT LIKE '%pg_stat_activity%'" - clients.each do |client| + pg_clients.each do |client| expect(client['state']).to eq('idle') end -end +end \ No newline at end of file diff --git a/pgdog/src/backend/prepared_statements.rs b/pgdog/src/backend/prepared_statements.rs index feaf0311f..77b4369f0 100644 --- a/pgdog/src/backend/prepared_statements.rs +++ b/pgdog/src/backend/prepared_statements.rs @@ -311,9 +311,7 @@ impl PreparedStatements { } // Backend told us the copy is done. - 'c' => { - self.state.action(code)?; - } + 'c' => {} _ => (), } diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index a93b89a99..e47f47206 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -1,3 +1,5 @@ +use tracing::error; + use crate::{ net::{Message, Protocol}, stats::memory::MemoryUsage, @@ -32,12 +34,6 @@ impl MemoryUsage for ExecutionCode { } } -impl ExecutionCode { - fn extended(&self) -> bool { - matches!(self, Self::ParseComplete | Self::BindComplete) - } -} - impl From for ExecutionCode { fn from(value: char) -> Self { match value { @@ -67,29 +63,17 @@ impl MemoryUsage for ExecutionItem { } } -impl ExecutionItem { - fn extended(&self) -> bool { - match self { - Self::Code(code) | Self::Ignore(code) => code.extended(), - } - } -} - #[derive(Debug, Clone, Default)] pub struct ProtocolState { queue: VecDeque, simulated: VecDeque, - extended: bool, out_of_sync: bool, } impl MemoryUsage for ProtocolState { #[inline] fn memory_usage(&self) -> usize { - self.queue.memory_usage() - + self.simulated.memory_usage() - + self.extended.memory_usage() - + self.out_of_sync.memory_usage() + self.queue.memory_usage() + self.simulated.memory_usage() + self.out_of_sync.memory_usage() } } @@ -102,7 +86,6 @@ impl ProtocolState { /// pub(crate) fn add_ignore(&mut self, code: impl Into) { let code = code.into(); - self.extended = self.extended || code.extended(); self.queue.push_back(ExecutionItem::Ignore(code)); } @@ -110,7 +93,6 @@ impl ProtocolState { /// to be returned by the server. pub(crate) fn add(&mut self, code: impl Into) { let code = code.into(); - self.extended = self.extended || code.extended(); self.queue.push_back(ExecutionItem::Code(code)) } @@ -154,29 +136,60 @@ impl ProtocolState { match code { ExecutionCode::Untracked => return Ok(Action::Forward), ExecutionCode::Error => { - if !self.extended { - // A simple-query error only aborts the current simple query. - // Keep any later pipelined simple query RFQs queued. - while !self.queue.is_empty() - && self.queue.front() - != Some(&ExecutionItem::Code(ExecutionCode::ReadyForQuery)) - { - self.queue.pop_front(); - } - return Ok(Action::Forward); + if matches!( + self.queue.front(), + Some(ExecutionItem::Ignore(ExecutionCode::Error)) + ) { + // We ignore errors only for the pgdog-injected sub-request. + // In that case the first error is already processed and + // sent to the client, for the remaining expected errors + // we've added ignores for errors and RFQ. + // The error is ignored but still be logged by [backend::server] module + self.queue.pop_front(); + return Ok(Action::Ignore); } - // Remove everything from the execution queue. - // The connection is out of sync until client re-syncs it. - if self.extended { - self.out_of_sync = true; - } - let last = self.queue.pop_back(); - self.queue.clear(); - if let Some(ExecutionItem::Code(ExecutionCode::ReadyForQuery)) = last { + // This is the first (and client-visible) error in the chain. It is forwarded + // so the client receives exactly one Error+RFQ for their request. + + // Mark the state out-of-sync so the connection is not reused until the client re-syncs. + // For simple query it happens immediately after receiving the RFQ + self.out_of_sync = true; + + // find the first position for RFQ code to effectively + // separate the pgdog-injected sub-request from the remaining queries + let Some(rfq_pos) = self + .queue + .iter() + .position(|i| matches!(i, ExecutionItem::Code(ExecutionCode::ReadyForQuery))) + else { + self.queue.clear(); + return Ok(Action::Forward); + }; + + // broken_queue - pgdog-injected sub-request part that contains multiple requests + // that are not be executed properly anyway, since we've got an error previously + let broken_queue = self.queue.drain(..rfq_pos); + + // Count how many queries are expected to finish in the pgdog-injected sub-request + // The current use case is only the Prepare + Execute messages from the [backend::server] + // And in case the prepare fails the execute will fail as well. + // WARN: That is not most reliable solution in case the injected set of queries + // will extend, but it should work for now. + let count_ignores = broken_queue + .filter(|i| matches!(i, ExecutionItem::Ignore(ExecutionCode::ReadyForQuery))) + .count(); + + // For every message that we expect to run add ignore for one error and one RFQ + // For prepare it'll be a one iteration that will create the query + // [Ignore(RFQ), Ignore(Error), Code(RFQ)] + for _ in 0..count_ignores { + self.queue + .push_front(ExecutionItem::Ignore(ExecutionCode::Error)); self.queue - .push_back(ExecutionItem::Code(ExecutionCode::ReadyForQuery)); + .push_front(ExecutionItem::Ignore(ExecutionCode::ReadyForQuery)); } + return Ok(Action::Forward); } @@ -185,8 +198,11 @@ impl ProtocolState { } _ => (), }; - let in_queue = self.queue.pop_front().ok_or(Error::ProtocolOutOfSync)?; - let action = match in_queue { + let in_queue = self.queue.pop_front().ok_or_else(|| { + error!("Unexpected action {code:?}: queue is empty"); + Error::ProtocolOutOfSync + })?; + match in_queue { // The queue is waiting for the server to send ReadyForQuery, // but it sent something else. That means the execution pipeline // isn't done. We are not tracking every single message, so this is expected. @@ -205,16 +221,12 @@ impl ProtocolState { if code == in_queue { Ok(Action::Ignore) } else { + error!(?self, "Unexpected action {code:?}: expected: {in_queue:?}"); + Err(Error::ProtocolOutOfSync) } } - }?; - - if code == ExecutionCode::ReadyForQuery { - self.extended = self.queue.iter().any(ExecutionItem::extended); } - - Ok(action) } pub(crate) fn in_copy_mode(&self) -> bool { @@ -234,11 +246,6 @@ impl ProtocolState { &self.queue } - #[cfg(test)] - pub(crate) fn queue_mut(&mut self) -> &mut VecDeque { - &mut self.queue - } - pub(crate) fn done(&self) -> bool { self.is_empty() && !self.out_of_sync } @@ -252,7 +259,7 @@ impl ProtocolState { !self.out_of_sync } - /// Check if the protocol is out of sync due to an error in extended protocol. + /// Check if the protocol is out of sync due to an error. pub(crate) fn out_of_sync(&self) -> bool { self.out_of_sync } @@ -360,7 +367,6 @@ mod test { assert_eq!(state.action('C').unwrap(), Action::Forward); assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); - assert!(!state.extended); } #[test] @@ -507,14 +513,65 @@ mod test { #[test] fn test_simple_query_error_no_out_of_sync() { let mut state = ProtocolState::default(); - // Simple query error should NOT set out_of_sync + // Simple query error sets out_of_sync temporarily; it is cleared by the next RFQ. state.add('C'); // CommandComplete (expected but won't arrive) state.add('Z'); // ReadyForQuery - assert!(!state.extended); assert_eq!(state.action('E').unwrap(), Action::Forward); - assert!(!state.out_of_sync); // Simple query doesn't set out_of_sync + assert!(state.out_of_sync); // set on error, cleared on RFQ + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!(!state.out_of_sync); + } + + // A simple-query error sets out_of_sync temporarily; the following RFQ clears it. + // Extended-query entries queued after the simple query are unaffected. + #[test] + fn test_simple_query_error_before_queued_extended_request_does_not_set_out_of_sync() { + let mut state = ProtocolState::default(); + state.add('C'); // CommandComplete (simple query) + state.add('Z'); // ReadyForQuery (simple query) + state.add('1'); // ParseComplete (extended query) + state.add('2'); // BindComplete (extended query) + state.add('C'); // CommandComplete (extended query) + state.add('Z'); // ReadyForQuery (extended query) + + assert_eq!(state.action('E').unwrap(), Action::Forward); // error forwarded + assert!(state.out_of_sync); // set on error + assert_eq!(state.action('Z').unwrap(), Action::Forward); // simple query RFQ + assert!(!state.out_of_sync); // cleared by RFQ; extended query intact + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(state.is_empty()); + } + + // out_of_sync is cleared by each sub-request's own RFQ, so a simple-query error + // between two RFQ boundaries does not bleed into subsequent extended-query entries. + #[test] + fn test_simple_query_error_after_rfq_before_extended_does_not_set_out_of_sync() { + let mut state = ProtocolState::default(); + state.add('C'); // CommandComplete (simple query 1) + state.add('Z'); // ReadyForQuery (simple query 1) + state.add('C'); // CommandComplete (simple query 2) + state.add('Z'); // ReadyForQuery (simple query 2) + state.add('1'); // ParseComplete (extended query) + state.add('2'); // BindComplete (extended query) + state.add('C'); // CommandComplete (extended query) + state.add('Z'); // ReadyForQuery (extended query) + + assert_eq!(state.action('C').unwrap(), Action::Forward); // simple query 1 OK assert_eq!(state.action('Z').unwrap(), Action::Forward); + + assert_eq!(state.action('E').unwrap(), Action::Forward); // simple query 2 errors + assert!(state.out_of_sync); // set on error + assert_eq!(state.action('Z').unwrap(), Action::Forward); // simple query 2 RFQ + assert!(!state.out_of_sync); // cleared; extended query intact + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(state.is_empty()); } #[test] @@ -821,7 +878,6 @@ mod test { state.add('Z'); // ReadyForQuery assert_eq!(state.action('1').unwrap(), Action::Forward); - assert!(state.extended); // Now marked as extended assert_eq!(state.action('2').unwrap(), Action::Forward); assert_eq!(state.action('C').unwrap(), Action::Forward); assert_eq!(state.action('Z').unwrap(), Action::Forward); @@ -890,4 +946,203 @@ mod test { assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); } + + // Double action('c') for server CopyDone + + // Safe path: Code(ReadyForQuery) backstop makes the double action('c') call idempotent. + #[test] + fn test_copydone_double_action_safe_with_rfq_backstop() { + let mut state = ProtocolState::default(); + // 1. Queue: CopyOut slot + RFQ backstop (from Sync). + state.add('G'); // CopyOut + state.add('Z'); // ReadyForQuery backstop + + // 2. First action('c'): pops CopyOut; RFQ backstop untouched. + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert_eq!(state.len(), 1); + + // 3. Second action('c'): sees RFQ at front; pushes it back (idempotent). + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert_eq!(state.len(), 1); // RFQ still present for the server's ReadyForQuery + } + + // Documents raw state-machine behavior: calling action('c') twice with no RFQ backstop + // causes ProtocolOutOfSync. forward() was the only caller that did this; the second call + // has been removed from the 'c' arm in prepared_statements.rs, making this path unreachable + // through normal protocol flow. The test is kept to pin the underlying invariant. + #[test] + fn test_copydone_double_action_oos_without_rfq_backstop() { + let mut state = ProtocolState::default(); + // Queue: Execute + Flush (no Sync) — no RFQ backstop. + state.add('C'); // ExecutionCompleted + + // First action('c'): pops ExecutionCompleted; queue empty. + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert!(state.is_empty()); + + // Second action('c') directly: empty queue → ProtocolOutOfSync. + // This is the raw state machine. forward() no longer makes this second call. + assert!(state.action('c').is_err()); + } + + // Happy path: injected ParseComplete arrives in order — silently ignored, rest forwarded. + #[test] + fn test_injected_parse_happy_path() { + let mut state = ProtocolState::default(); + state.add_ignore('1'); // ParseComplete — injected, swallowed + state.add('2'); // BindComplete + state.add('C'); // CommandComplete + state.add('Z'); // ReadyForQuery + + assert_eq!(state.action('1').unwrap(), Action::Ignore); // swallowed + assert_eq!(state.action('2').unwrap(), Action::Forward); // forwarded + assert_eq!(state.action('C').unwrap(), Action::Forward); // forwarded + assert_eq!(state.action('Z').unwrap(), Action::Forward); // forwarded + assert!(state.is_empty()); + } + + // Replicates the full lifecycle of an injected PREPARE that errors: + // + // Client sends: PREPARE foo AS ... (simple-query style) + // EXECUTE (via Query) + // + // pgdog injects ahead of the client's Query: + // add_ignore('C') — CommandComplete from PREPARE + // add_ignore('Z') — RFQ from PREPARE + // Then the client's Query adds: + // add('Z') — the client-visible RFQ + // + // Queue before first error: [Ignore(C), Ignore(Z), Code(Z)] + // + // Server responds to PREPARE with an error: + // 'E' → error branch fires: drain [Ignore(C), Ignore(Z)], count 1 Ignore(RFQ), + // push_front loop produces [Ignore(RFQ), Ignore(Error), Code(Z)]. + // Action::Forward — client receives this error. + // 'Z' → matches Ignore(RFQ) → Action::Ignore (PREPARE's RFQ suppressed) + // + // Server responds to EXECUTE (which fails because PREPARE never succeeded): + // 'E' → fast-path: front is Ignore(Error) → pop → Action::Ignore (suppressed) + // 'Z' → Code(Z) → Action::Forward — client receives the closing RFQ + #[test] + fn test_injected_prepare_error_full_lifecycle() { + let mut state = ProtocolState::default(); + + // --- setup: replicate what prepared_statements.rs does --- + // ProtocolMessage::Prepare injects: + state.add_ignore('C'); // Ignore(CommandComplete) — PREPARE response + state.add_ignore('Z'); // Ignore(RFQ) — PREPARE response + // ProtocolMessage::Query (client EXECUTE) adds: + state.add('Z'); // Code(RFQ) — client-visible + + // --- server sends Error for PREPARE --- + // Error branch: drains [Ignore(C), Ignore(Z)], finds 1 Ignore(Z), + // rebuilds queue as [Ignore(RFQ), Ignore(Error), Code(Z)]. + assert_eq!(state.action('E').unwrap(), Action::Forward); + + // --- server sends RFQ for PREPARE (now suppressed) --- + assert_eq!(state.action('Z').unwrap(), Action::Ignore); + + // --- server sends Error for EXECUTE (prepare never succeeded) --- + // Fast-path: Ignore(Error) is at front → pop and ignore. + assert_eq!(state.action('E').unwrap(), Action::Ignore); + + // --- server sends RFQ for EXECUTE --- + // Code(Z) is at front → forwarded to client. + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!(state.is_empty()); + } + + // ========================================= + // Pipeline multi-sync tests + // ========================================= + + // In pipeline mode, each request runs in isolation; a failure in one must not affect the others. + #[test] + fn test_pipeline_multi_sync_error_in_seq1_does_not_affect_seq2_seq3() { + // Setup: seq1 expects ParseComplete, BindComplete, CommandComplete, ReadyForQuery. + let mut seq1 = ProtocolState::default(); + seq1.add('1'); // ParseComplete + seq1.add('2'); // BindComplete + seq1.add('C'); // CommandComplete (won't arrive — execute errors) + seq1.add('Z'); // ReadyForQuery + + // Seq1: fails — seq2 and seq3 must still respond. + assert_eq!(seq1.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(seq1.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(seq1.action('E').unwrap(), Action::Forward); // ErrorResponse + assert!(seq1.out_of_sync); + assert_eq!(seq1.len(), 1); + assert_eq!(seq1.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(!seq1.out_of_sync); + assert!(seq1.is_empty()); + + // Seq2: independent state; seq1's error must not affect it. + let mut seq2 = ProtocolState::default(); + seq2.add('1'); + seq2.add('2'); + seq2.add('C'); + seq2.add('Z'); + + assert_eq!(seq2.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(seq2.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(seq2.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(seq2.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(seq2.is_empty()); + + // Seq3: independent state; seq1's error must not affect it. + let mut seq3 = ProtocolState::default(); + seq3.add('1'); + seq3.add('2'); + seq3.add('C'); + seq3.add('Z'); + + assert_eq!(seq3.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(seq3.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(seq3.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(seq3.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(seq3.is_empty()); + } + + // In pipeline mode, a failed request must not prevent subsequent pipelined + // requests from being processed. Seq1 errors; seq2 and seq3 must still + // receive their responses. + // + // Currently FAILS: when seq1 errors, seq2 and seq3 receive errors instead + // of their expected responses. + #[test] + fn test_pipeline_single_queue_error_only_clears_failing_sync_group() { + let mut state = ProtocolState::default(); + // Setup: all three sync groups loaded into a single shared queue. + state.add('1'); + state.add('2'); + state.add('C'); + state.add('Z'); // seq1 + state.add('1'); + state.add('2'); + state.add('C'); + state.add('Z'); // seq2 + state.add('1'); + state.add('2'); + state.add('C'); + state.add('Z'); // seq3 + + // Seq1: fails — seq2 and seq3 must still be reachable. + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('E').unwrap(), Action::Forward); // error — must not clear seq2/seq3 + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + + // Seq2: must process normally. + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + + // Seq3: must process normally. + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(state.is_empty()); + } } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 02372cd18..3302bed86 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -472,6 +472,13 @@ impl Server { Ok(forward) => { if forward { break message; + } else if message.code() == 'E' { + // we got an error that will not be forwarded to the client, + // but it still be useful for tracing + error!( + "Ignore error from stream: {:?}", + ErrorResponse::from_bytes(message.payload()) + ); } } Err(err) => { @@ -1172,7 +1179,6 @@ impl Drop for Server { } } -// Used for testing. #[cfg(test)] pub mod test { use bytes::{BufMut, BytesMut}; @@ -1433,6 +1439,44 @@ pub mod test { assert!(server.done()); } + // A simple query that errors must not prevent a subsequent extended query + // from executing when both are sent in the same batch. + // + // Currently FAILS: queuing the extended items sets extended=true before the + // simple query is processed, so the simple query error incorrectly clears + // the extended query's pending entries. + #[tokio::test] + async fn test_simple_query_error_before_extended_query_in_same_batch() { + let mut server = test_server().await; + + // Setup: simple query that errors, immediately followed by an extended query. + server + .send( + &vec![ + Query::new("SELECT 1/0").into(), + Parse::new_anonymous("SELECT 1").into(), + Bind::default().into(), + Execute::new().into(), + Sync.into(), + ] + .into(), + ) + .await + .unwrap(); + + // Simple query: errors, then ReadyForQuery. + assert_eq!(server.read().await.unwrap().code(), 'E'); // ErrorResponse + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + + // Extended query must still return its results. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'D'); // DataRow + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + assert!(server.done()); + } + #[tokio::test] async fn test_execute_batch_simple_query_error_then_success() { let mut server = test_server().await; @@ -1599,7 +1643,7 @@ pub mod test { let (new, name) = global.write().insert(&parse); assert!(new); let parse = parse.rename(&name); - assert_eq!(parse.name(), "__pgdog_1"); + assert!(parse.name().starts_with("__pgdog_")); let mut server = test_server().await; @@ -1608,7 +1652,7 @@ pub mod test { .send( &vec![ ProtocolMessage::from(Bind::new_params( - "__pgdog_1", + &name, &[Parameter { len: 1, data: "1".as_bytes().into(), @@ -2042,14 +2086,17 @@ pub mod test { let mut prep = PreparedStatements::new(); let mut parse = Parse::named("test", "SELECT 1::bigint"); + prep.insert_anyway(&mut parse); - assert_eq!(parse.name(), "__pgdog_1"); + + let name = parse.name().to_owned(); + assert!(name.starts_with("__pgdog_")); server .send( &vec![ProtocolMessage::from(Query::new(format!( "PREPARE {} AS {}", - parse.name(), + name, parse.query() )))] .into(), @@ -2062,10 +2109,10 @@ pub mod test { } assert!(server.sync_prepared()); server.sync_prepared_statements().await.unwrap(); - assert!(server.prepared_statements.contains("__pgdog_1")); + assert!(server.prepared_statements.contains(&name)); - let describe = Describe::new_statement("__pgdog_1"); - let bind = Bind::new_statement("__pgdog_1"); + let describe = Describe::new_statement(&name); + let bind = Bind::new_statement(&name); let execute = Execute::new(); server .send( @@ -2085,7 +2132,7 @@ pub mod test { assert_eq!(c, msg.code()); } - let parse = Parse::named("__pgdog_1", "SELECT 2::bigint"); + let parse = Parse::named(&name, "SELECT 2::bigint"); let describe = describe.clone(); server @@ -2099,7 +2146,7 @@ pub mod test { } server - .send(&vec![ProtocolMessage::from(Query::new("EXECUTE __pgdog_1"))].into()) + .send(&vec![ProtocolMessage::from(Query::new(format!("EXECUTE {name}")))].into()) .await .unwrap(); for c in ['T', 'D', 'C', 'Z'] { @@ -2802,38 +2849,6 @@ pub mod test { ); } - #[tokio::test] - async fn test_protocol_out_of_sync_sets_error_state() { - let mut server = test_server().await; - - server - .send(&vec![Query::new("SELECT 1").into()].into()) - .await - .unwrap(); - - for c in ['T', 'D'] { - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), c); - } - - // simulate an unlikely, but existent out-of-sync state - server - .prepared_statements_mut() - .state_mut() - .queue_mut() - .clear(); - - let res = server.read().await; - assert!( - matches!(res, Err(Error::ProtocolOutOfSync)), - "protocol should be out of sync" - ); - assert!( - server.stats().get_state() == State::Error, - "state should be Error after detecting desync" - ) - } - #[tokio::test] async fn test_reset_clears_client_params() { let mut server = test_server().await; @@ -3259,6 +3274,63 @@ pub mod test { assert!(!server.needs_drain()); } + // In pipeline mode, a failed request must not prevent subsequent pipelined + // requests from being processed. All three sequences are sent in one batch; + // seq1 fails, seq2 and seq3 must still return their rows. + // + // Currently FAILS: the error in seq1 causes seq2 and seq3 to receive + // ProtocolOutOfSync instead of their expected responses. + #[tokio::test] + async fn test_pipelined_multiple_syncs_first_fails() { + let mut server = test_server().await; + + // Three pipelined sequences sent in one batch. + server + .send( + &vec![ + // Seq1 — will fail. + Parse::new_anonymous("SELECT 1/0").into(), + Bind::default().into(), + Execute::new().into(), + Sync.into(), + // Seq2 — must succeed. + Parse::new_anonymous("SELECT 2").into(), + Bind::default().into(), + Execute::new().into(), + Sync.into(), + // Seq3 — must succeed. + Parse::new_anonymous("SELECT 3").into(), + Bind::default().into(), + Execute::new().into(), + Sync.into(), + ] + .into(), + ) + .await + .unwrap(); + + // Seq1: fails — seq2 and seq3 must still respond. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), 'E'); // ErrorResponse + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + + // Seq2: must return data despite seq1 erroring. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'D'); // DataRow + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + + // Seq3: must return data. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'D'); // DataRow + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + assert!(server.done()); + assert!(!server.has_more_messages()); + } + #[tokio::test] async fn test_empty_query_extended() { let mut server = test_server().await; @@ -4068,4 +4140,106 @@ pub mod test { // but saturation must not panic). } } + + // Failed injected PREPARE leaves EXECUTE ReadyForQuery unmatched — Error handler empties the queue. + #[tokio::test] + async fn test_prepare_execute_inject_failure_orphans_execute_rfq() { + let mut server = test_server().await; + + // 1. Send [Prepare, Query] as the rewriter injects for EXECUTE. + server + .send( + &vec![ + ProtocolMessage::Prepare { + name: "__pgdog_prepare_inject_test".to_string(), + statement: "SELECT 1 FROM __pgdog_nonexistent_table__".to_string(), + }, + ProtocolMessage::Query(Query::new("EXECUTE __pgdog_prepare_inject_test()")), + ] + .into(), + ) + .await + .unwrap(); + + // 2. PREPARE 'E' forwarded; 'Z' consumes re-added Code(RFQ) — queue empty. + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); // 'E' PREPARE error + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); // 'Z' RFQ — queue now empty + } + + // Extended Execute + Flush (no Sync): single action('c') now succeeds. + // CopyDone is forwarded to client; the trailing CommandComplete then hits an empty + // queue (no RFQ backstop, no Sync) and raises ProtocolOutOfSync. + // This is distinct from the former double-action bug, which fired on CopyDone itself. + #[tokio::test] + async fn test_copydone_single_action_without_sync() { + let mut server = test_server().await; + + // 1. Parse + Bind + Execute + Flush (not Sync); no RFQ backstop in queue. + server + .send( + &vec![ + ProtocolMessage::Parse(Parse::new_anonymous("COPY (VALUES (1),(2)) TO STDOUT")), + ProtocolMessage::Bind(Bind::new_params("", &[])), + ProtocolMessage::Execute(Execute::new()), + // Flush (not Sync): prompts PostgreSQL to send buffered responses. + // handle() maps this to Other, adding nothing to the queue. + Flush.into(), + ] + .into(), + ) + .await + .unwrap(); + + // 2. ParseComplete, BindComplete, CopyOutResponse, CopyData x2 arrive normally. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'H'); // CopyOutResponse + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 1 + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 2 + + // 3. CopyDone — fixed: single action() pops ExecutionCompleted; no second call. + assert_eq!(server.read().await.unwrap().code(), 'c'); // CopyDone forwarded + + // 4. CommandComplete hits empty queue (no RFQ backstop without Sync). + assert!( + matches!(server.read().await.unwrap_err(), Error::ProtocolOutOfSync), + "expected ProtocolOutOfSync on CommandComplete with empty queue" + ); + } + + // Safe path: Sync adds Code(RFQ) backstop — double action('c') is idempotent. + #[tokio::test] + async fn test_copydone_double_action_safe_with_sync() { + let mut server = test_server().await; + + // 1. Parse + Bind + Execute + Sync; RFQ backstop added to queue. + server + .send( + &vec![ + ProtocolMessage::Parse(Parse::new_anonymous("COPY (VALUES (1),(2)) TO STDOUT")), + ProtocolMessage::Bind(Bind::new_params("", &[])), + ProtocolMessage::Execute(Execute::new()), + ProtocolMessage::Sync(Sync), + ] + .into(), + ) + .await + .unwrap(); + + // 2. Full response sequence — CopyDone is safe with RFQ backstop. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'H'); // CopyOutResponse + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 1 + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 2 + assert_eq!(server.read().await.unwrap().code(), 'c'); // CopyDone -- safe with RFQ backstop + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + assert!( + server.done(), + "server must be done after full response sequence" + ); + } }