Skip to content

[SPARK-57709][CORE] Fail the stream callback and close the channel when installing the stream interceptor fails#56802

Open
LuciferYang wants to merge 1 commit into
apache:masterfrom
LuciferYang:streamresponse-callback-leak
Open

[SPARK-57709][CORE] Fail the stream callback and close the channel when installing the stream interceptor fails#56802
LuciferYang wants to merge 1 commit into
apache:masterfrom
LuciferYang:streamresponse-callback-leak

Conversation

@LuciferYang

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

In TransportResponseHandler, when a StreamResponse arrives and installing the stream interceptor fails (the TransportFrameDecoder is missing from the channel pipeline, or setInterceptor throws), the handler now fails the stream callback and closes the channel. Previously it only logged the error and called deactivateStream(), leaving the StreamCallback, which had already been removed from the queue, without any completion or failure notification.

Why are the changes needed?

This is defensive hardening rather than a fix for a failure seen in practice. The catch block is effectively unreachable in normal operation: while an interceptor is active the frame decoder stops framing, so the setInterceptor state check cannot trip, and the decoder is not removed mid-dispatch on the single channel event loop. Even so, every other branch in handle() notifies its callback while this one did not, so if the path were ever reached the caller would block until the connection idle-timeout instead of failing cleanly. Closing the channel also keeps a connection whose framing can no longer be trusted out of the pool, where it could otherwise corrupt later requests.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Added TransportResponseHandlerSuite.failStreamCallbackWhenInstallingInterceptorFails, which handles a StreamResponse on a channel whose pipeline has no TransportFrameDecoder, so installing the interceptor throws. It verifies that the callback is failed with the install exception and that the channel is closed. The test fails on the old code (the callback receives no interaction) and passes with the fix; the full TransportResponseHandlerSuite passes.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.8)

…en installing the stream interceptor fails

When installing the stream interceptor fails in TransportResponseHandler
(the frame decoder is missing from the pipeline, or setInterceptor
throws), the StreamResponse branch previously logged and only called
deactivateStream(). The StreamCallback had already been removed from the
queue, so it was never failed and the caller would block until the
connection idle-timeout. Fail the callback, consistent with every other
branch in handle(), and close the channel since its framing can no
longer be relied on.

This is defensive hardening: the path is effectively unreachable under
normal operation, because framing is suspended while an interceptor is
active and the frame decoder is not removed mid-dispatch on the channel
event loop.

@uros-b uros-b left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @LuciferYang!

@dongjoon-hyun dongjoon-hyun left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants