diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index d27fa08d829bb..870be0b561cad 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -247,6 +247,14 @@ public void handle(ResponseMessage message) throws Exception { } catch (Exception e) { logger.error("Error installing stream handler.", e); deactivateStream(); + try { + callback.onFailure(resp.streamId, e); + } catch (IOException ioe) { + logger.warn("Error in stream failure handler.", ioe); + } + // Installing the interceptor failed, so incoming data on this channel can no longer + // be decoded. Close it so the broken connection is not reused from the pool. + channel.close(); } } else { try { diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java index ca4bc0a90dfb8..7726e9f9b965c 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; import io.netty.channel.local.LocalChannel; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -171,6 +172,30 @@ public void failOutstandingStreamCallbackOnException() throws Exception { verify(cb).onFailure(eq("stream-1"), isA(IOException.class)); } + @Test + public void failStreamCallbackWhenInstallingInterceptorFails() throws Exception { + // With no TransportFrameDecoder in the pipeline, the decoder lookup in the StreamResponse + // branch returns null and installing the interceptor throws. The handler must fail the + // callback (so the caller does not hang) and close the channel rather than reuse a + // connection it can no longer decode. + Channel c = mock(Channel.class); + ChannelPipeline pipeline = mock(ChannelPipeline.class); + when(c.pipeline()).thenReturn(pipeline); + when(pipeline.get(TransportFrameDecoder.HANDLER_NAME)).thenReturn(null); + TransportResponseHandler handler = new TransportResponseHandler(c); + + StreamCallback cb = mock(StreamCallback.class); + handler.addStreamCallback("stream", cb); + assertEquals(1, handler.numOutstandingRequests()); + + // byteCount > 0 so the handler takes the interceptor-installation path. + handler.handle(new StreamResponse("stream", 1234L, null)); + + verify(cb, times(1)).onFailure(eq("stream"), isA(NullPointerException.class)); + verify(c, times(1)).close(); + assertEquals(0, handler.numOutstandingRequests()); + } + @Test public void handleSuccessfulMergedBlockMeta() throws Exception { TransportResponseHandler handler = new TransportResponseHandler(new LocalChannel());