From f83cf82426d29df2d4552efe9d89c9e5956e14cc Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Fri, 26 Jun 2026 11:19:46 +0800 Subject: [PATCH] [SPARK-57709][CORE] Fail the stream callback and close the channel when installing the stream interceptor fails When installing the stream interceptor fails in TransportResponseHandler (the frame decoder is missing from the pipeline, or setInterceptor throws), the StreamResponse branch previously logged and only called deactivateStream(). The StreamCallback had already been removed from the queue, so it was never failed and the caller would block until the connection idle-timeout. Fail the callback, consistent with every other branch in handle(), and close the channel since its framing can no longer be relied on. This is defensive hardening: the path is effectively unreachable under normal operation, because framing is suspended while an interceptor is active and the frame decoder is not removed mid-dispatch on the channel event loop. --- .../client/TransportResponseHandler.java | 8 ++++++ .../TransportResponseHandlerSuite.java | 25 +++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index d27fa08d829bb..870be0b561cad 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -247,6 +247,14 @@ public void handle(ResponseMessage message) throws Exception { } catch (Exception e) { logger.error("Error installing stream handler.", e); deactivateStream(); + try { + callback.onFailure(resp.streamId, e); + } catch (IOException ioe) { + logger.warn("Error in stream failure handler.", ioe); + } + // Installing the interceptor failed, so incoming data on this channel can no longer + // be decoded. Close it so the broken connection is not reused from the pool. + channel.close(); } } else { try { diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java index ca4bc0a90dfb8..7726e9f9b965c 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; import io.netty.channel.local.LocalChannel; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -171,6 +172,30 @@ public void failOutstandingStreamCallbackOnException() throws Exception { verify(cb).onFailure(eq("stream-1"), isA(IOException.class)); } + @Test + public void failStreamCallbackWhenInstallingInterceptorFails() throws Exception { + // With no TransportFrameDecoder in the pipeline, the decoder lookup in the StreamResponse + // branch returns null and installing the interceptor throws. The handler must fail the + // callback (so the caller does not hang) and close the channel rather than reuse a + // connection it can no longer decode. + Channel c = mock(Channel.class); + ChannelPipeline pipeline = mock(ChannelPipeline.class); + when(c.pipeline()).thenReturn(pipeline); + when(pipeline.get(TransportFrameDecoder.HANDLER_NAME)).thenReturn(null); + TransportResponseHandler handler = new TransportResponseHandler(c); + + StreamCallback cb = mock(StreamCallback.class); + handler.addStreamCallback("stream", cb); + assertEquals(1, handler.numOutstandingRequests()); + + // byteCount > 0 so the handler takes the interceptor-installation path. + handler.handle(new StreamResponse("stream", 1234L, null)); + + verify(cb, times(1)).onFailure(eq("stream"), isA(NullPointerException.class)); + verify(c, times(1)).close(); + assertEquals(0, handler.numOutstandingRequests()); + } + @Test public void handleSuccessfulMergedBlockMeta() throws Exception { TransportResponseHandler handler = new TransportResponseHandler(new LocalChannel());