Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ public boolean isDone() {
*/
@Override
public RemotingCommand createConnectionClosedResponse(InetSocketAddress responseHost) {
return null;
String address = responseHost != null ? responseHost.toString() : "unknown";
return ObTablePacket.createTransportErrorPacket(TransportCodes.BOLT_SEND_FAILED,
"connection {" + address + "} closed", null);
}

/*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*-
* #%L
* OBKV Table Client Framework
* %%
* Copyright (C) 2021 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.rpc.bolt.transport;

import com.alipay.oceanbase.rpc.util.TableClientLoggerFactory;
import com.alipay.remoting.Connection;
import com.alipay.remoting.ConnectionEventHandler;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.config.switches.GlobalSwitch;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;

/**
* Notify pending RPC futures immediately when the underlying channel becomes inactive.
* The default {@link ConnectionEventHandler} only calls {@link Connection#onClose()} from
* {@code close()}, which is not always invoked when the peer disconnects abruptly.
*/
public class ObConnectionEventHandler extends ConnectionEventHandler {

private static final Logger LOGGER = TableClientLoggerFactory
.getLogger(ObConnectionEventHandler.class);

public ObConnectionEventHandler(GlobalSwitch globalSwitch) {
super(globalSwitch);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt == ConnectionEventType.CONNECT) {
Connection connection = ctx.channel().attr(Connection.CONNECTION).get();
if (connection != null) {
LOGGER.info("connection [{}] established", formatConnection(connection));
}
}
super.userEventTriggered(ctx, evt);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Connection connection = ctx.channel().attr(Connection.CONNECTION).get();
if (connection != null) {
LOGGER.info("connection [{}] closed", formatConnection(connection));
connection.onClose();
}
super.channelInactive(ctx);
}

private static String formatConnection(Connection connection) {
String localIp = connection.getLocalIP();
String local = (localIp == null || localIp.isEmpty()) ? "unknown"
: localIp + ":" + connection.getLocalPort();
String remote = "unknown";
String originUrl = connection.getUrl() != null ? connection.getUrl().getOriginUrl() : null;
if (originUrl != null && !originUrl.isEmpty()) {
remote = originUrl;
} else {
String remoteIp = connection.getRemoteIP();
if (remoteIp != null && !remoteIp.isEmpty()) {
remote = remoteIp + ":" + connection.getRemotePort();
}
}
return local + " - " + remote;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ public ObTablePacket createExceptionResponse(int id, ResponseStatus status, Thro
*/
@Override
public ObTablePacket createConnectionClosedResponse(InetSocketAddress address, String message) {
return null;
String addressStr = address != null ? address.toString() : "unknown";
String errMsg = message != null ? message : "connection {" + addressStr + "} closed";
return ObTablePacket.createTransportErrorPacket(TransportCodes.BOLT_SEND_FAILED, errMsg,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private boolean connect() throws Exception {
MONITOR.info(logMessage(null, "CONNECT", endpoint, System.currentTimeMillis() - start));

if (tries >= maxTryTimes) {
if (!obTable.isOdpMode()) {
if (!obTable.isOdpMode() && obTable.getObServerAddr() != null) {
RouteTableRefresher.SuspectObServer suspectAddr = new RouteTableRefresher.SuspectObServer(
obTable.getObServerAddr());
RouteTableRefresher.addIntoSuspectIPs(suspectAddr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ private void checkAlive(ObServerAddr addr) {
}

public static void addIntoSuspectIPs(SuspectObServer server) throws InterruptedException {
if (server == null || server.getAddr() == null) {
return;
}
ObServerAddr addr = server.getAddr();
if (suspectServers.get(addr) != null) {
// already in the list, directly return
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/alipay/oceanbase/rpc/table/ObTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import com.alipay.oceanbase.rpc.table.api.TableBatchOps;
import com.alipay.oceanbase.rpc.table.api.TableQuery;
import com.alipay.oceanbase.rpc.util.TraceUtil;
import com.alipay.remoting.ConnectionEventHandler;
import com.alipay.oceanbase.rpc.bolt.transport.ObConnectionEventHandler;
import com.alipay.remoting.config.switches.GlobalSwitch;
import com.alipay.remoting.connection.ConnectionFactory;
import com.alipay.remoting.exception.RemotingException;
Expand Down Expand Up @@ -101,7 +101,7 @@ public void init() throws Exception {
.newBuilder()
.configWriteBufferWaterMark(getNettyBufferLowWatermark(),
getNettyBufferHighWatermark()).build();
connectionFactory.init(new ConnectionEventHandler(new GlobalSwitch())); // Only for monitoring connection status
connectionFactory.init(new ObConnectionEventHandler(new GlobalSwitch()));
realClient = new ObTableRemoting(new ObPacketFactory(enableRerouting));
connectionPool = new ObTableConnectionPool(this, obTableConnectionPoolSize);
connectionPool.init();
Expand Down
Loading