diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObClientFuture.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObClientFuture.java index 7effce99..c6f798e5 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObClientFuture.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObClientFuture.java @@ -116,7 +116,9 @@ public boolean isDone() { */ @Override public RemotingCommand createConnectionClosedResponse(InetSocketAddress responseHost) { - return null; + String address = responseHost != null ? responseHost.toString() : "unknown"; + return ObTablePacket.createTransportErrorPacket(TransportCodes.BOLT_SEND_FAILED, + "connection {" + address + "} closed", null); } /* diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObConnectionEventHandler.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObConnectionEventHandler.java new file mode 100644 index 00000000..1aa17b46 --- /dev/null +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObConnectionEventHandler.java @@ -0,0 +1,79 @@ +/*- + * #%L + * OBKV Table Client Framework + * %% + * Copyright (C) 2021 OceanBase + * %% + * OBKV Table Client Framework is licensed under Mulan PSL v2. + * You can use this software according to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * #L% + */ + +package com.alipay.oceanbase.rpc.bolt.transport; + +import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory; +import com.alipay.remoting.Connection; +import com.alipay.remoting.ConnectionEventHandler; +import com.alipay.remoting.ConnectionEventType; +import com.alipay.remoting.config.switches.GlobalSwitch; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; + +/** + * Notify pending RPC futures immediately when the underlying channel becomes inactive. + * The default {@link ConnectionEventHandler} only calls {@link Connection#onClose()} from + * {@code close()}, which is not always invoked when the peer disconnects abruptly. + */ +public class ObConnectionEventHandler extends ConnectionEventHandler { + + private static final Logger LOGGER = TableClientLoggerFactory + .getLogger(ObConnectionEventHandler.class); + + public ObConnectionEventHandler(GlobalSwitch globalSwitch) { + super(globalSwitch); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt == ConnectionEventType.CONNECT) { + Connection connection = ctx.channel().attr(Connection.CONNECTION).get(); + if (connection != null) { + LOGGER.info("connection [{}] established", formatConnection(connection)); + } + } + super.userEventTriggered(ctx, evt); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Connection connection = ctx.channel().attr(Connection.CONNECTION).get(); + if (connection != null) { + LOGGER.info("connection [{}] closed", formatConnection(connection)); + connection.onClose(); + } + super.channelInactive(ctx); + } + + private static String formatConnection(Connection connection) { + String localIp = connection.getLocalIP(); + String local = (localIp == null || localIp.isEmpty()) ? "unknown" + : localIp + ":" + connection.getLocalPort(); + String remote = "unknown"; + String originUrl = connection.getUrl() != null ? connection.getUrl().getOriginUrl() : null; + if (originUrl != null && !originUrl.isEmpty()) { + remote = originUrl; + } else { + String remoteIp = connection.getRemoteIP(); + if (remoteIp != null && !remoteIp.isEmpty()) { + remote = remoteIp + ":" + connection.getRemotePort(); + } + } + return local + " - " + remote; + } +} diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObPacketFactory.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObPacketFactory.java index 2cf180cc..7cdf72dd 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObPacketFactory.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObPacketFactory.java @@ -163,6 +163,9 @@ public ObTablePacket createExceptionResponse(int id, ResponseStatus status, Thro */ @Override public ObTablePacket createConnectionClosedResponse(InetSocketAddress address, String message) { - return null; + String addressStr = address != null ? address.toString() : "unknown"; + String errMsg = message != null ? message : "connection {" + addressStr + "} closed"; + return ObTablePacket.createTransportErrorPacket(TransportCodes.BOLT_SEND_FAILED, errMsg, + null); } } diff --git a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java index 756f57d8..ce2286c1 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java +++ b/src/main/java/com/alipay/oceanbase/rpc/bolt/transport/ObTableConnection.java @@ -118,7 +118,7 @@ private boolean connect() throws Exception { MONITOR.info(logMessage(null, "CONNECT", endpoint, System.currentTimeMillis() - start)); if (tries >= maxTryTimes) { - if (!obTable.isOdpMode()) { + if (!obTable.isOdpMode() && obTable.getObServerAddr() != null) { RouteTableRefresher.SuspectObServer suspectAddr = new RouteTableRefresher.SuspectObServer( obTable.getObServerAddr()); RouteTableRefresher.addIntoSuspectIPs(suspectAddr); diff --git a/src/main/java/com/alipay/oceanbase/rpc/location/model/RouteTableRefresher.java b/src/main/java/com/alipay/oceanbase/rpc/location/model/RouteTableRefresher.java index 1755ca2c..315f06be 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/location/model/RouteTableRefresher.java +++ b/src/main/java/com/alipay/oceanbase/rpc/location/model/RouteTableRefresher.java @@ -193,6 +193,9 @@ private void checkAlive(ObServerAddr addr) { } public static void addIntoSuspectIPs(SuspectObServer server) throws InterruptedException { + if (server == null || server.getAddr() == null) { + return; + } ObServerAddr addr = server.getAddr(); if (suspectServers.get(addr) != null) { // already in the list, directly return diff --git a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java index b44a7988..807f730c 100644 --- a/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java +++ b/src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java @@ -33,7 +33,7 @@ import com.alipay.oceanbase.rpc.table.api.TableBatchOps; import com.alipay.oceanbase.rpc.table.api.TableQuery; import com.alipay.oceanbase.rpc.util.TraceUtil; -import com.alipay.remoting.ConnectionEventHandler; +import com.alipay.oceanbase.rpc.bolt.transport.ObConnectionEventHandler; import com.alipay.remoting.config.switches.GlobalSwitch; import com.alipay.remoting.connection.ConnectionFactory; import com.alipay.remoting.exception.RemotingException; @@ -101,7 +101,7 @@ public void init() throws Exception { .newBuilder() .configWriteBufferWaterMark(getNettyBufferLowWatermark(), getNettyBufferHighWatermark()).build(); - connectionFactory.init(new ConnectionEventHandler(new GlobalSwitch())); // Only for monitoring connection status + connectionFactory.init(new ObConnectionEventHandler(new GlobalSwitch())); realClient = new ObTableRemoting(new ObPacketFactory(enableRerouting)); connectionPool = new ObTableConnectionPool(this, obTableConnectionPoolSize); connectionPool.init();