专业的编程技术博客社区

网站首页 > 博客文章 正文

聊聊rocketmq的BrokerHousekeepingService

baijin 2024-08-15 17:03:46 博客文章 16 ℃ 0 评论

本文主要研究一下rocketmq的BrokerHousekeepingService

BrokerHousekeepingService

org/apache/rocketmq/namesrv/routeinfo/BrokerHousekeepingService.java

public class BrokerHousekeepingService implements ChannelEventListener {

private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

private final NamesrvController namesrvController;

public BrokerHousekeepingService(NamesrvController namesrvController) {

this.namesrvController = namesrvController;

}

@Override

public void onChannelConnect(String remoteAddr, Channel channel) {

}

@Override

public void onChannelClose(String remoteAddr, Channel channel) {

this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);

}

@Override

public void onChannelException(String remoteAddr, Channel channel) {

this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);

}

@Override

public void onChannelIdle(String remoteAddr, Channel channel) {

this.namesrvController.getRouteInfoManager().onChannelDestroy(remoteAddr, channel);

}

}

  • 实现了ChannelEventListener接口,除了onChannelConnect外,其余各个方法均委托给namesrvController的routeInfoManager的onChannelDestroy方法

RouteInfoManager

org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java

public class RouteInfoManager {

private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);

private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;

private final ReadWriteLock lock = new ReentrantReadWriteLock();

private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;

private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

public RouteInfoManager() {

this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);

this.brokerAddrTable = new HashMap<String, BrokerData>(128);

this.clusterAddrTable = new HashMap<String, Set<String>>(32);

this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);

this.filterServerTable = new HashMap<String, List<String>>(256);

}

public void onChannelDestroy(String remoteAddr, Channel channel) {

String brokerAddrFound = null;

if (channel != null) {

try {

try {

this.lock.readLock().lockInterruptibly();

Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =

this.brokerLiveTable.entrySet().iterator();

while (itBrokerLiveTable.hasNext()) {

Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();

if (entry.getValue().getChannel() == channel) {

brokerAddrFound = entry.getKey();

break;

}

}

} finally {

this.lock.readLock().unlock();

}

} catch (Exception e) {

log.error("onChannelDestroy Exception", e);

}

}

if (null == brokerAddrFound) {

brokerAddrFound = remoteAddr;

} else {

log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);

}

if (brokerAddrFound != null && brokerAddrFound.length() > 0) {

try {

try {

this.lock.writeLock().lockInterruptibly();

this.brokerLiveTable.remove(brokerAddrFound);

this.filterServerTable.remove(brokerAddrFound);

String brokerNameFound = null;

boolean removeBrokerName = false;

Iterator<Entry<String, BrokerData>> itBrokerAddrTable =

this.brokerAddrTable.entrySet().iterator();

while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {

BrokerData brokerData = itBrokerAddrTable.next().getValue();

Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();

while (it.hasNext()) {

Entry<Long, String> entry = it.next();

Long brokerId = entry.getKey();

String brokerAddr = entry.getValue();

if (brokerAddr.equals(brokerAddrFound)) {

brokerNameFound = brokerData.getBrokerName();

it.remove();

log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",

brokerId, brokerAddr);

break;

}

}

if (brokerData.getBrokerAddrs().isEmpty()) {

removeBrokerName = true;

itBrokerAddrTable.remove();

log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",

brokerData.getBrokerName());

}

}

if (brokerNameFound != null && removeBrokerName) {

Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();

while (it.hasNext()) {

Entry<String, Set<String>> entry = it.next();

String clusterName = entry.getKey();

Set<String> brokerNames = entry.getValue();

boolean removed = brokerNames.remove(brokerNameFound);

if (removed) {

log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",

brokerNameFound, clusterName);

if (brokerNames.isEmpty()) {

log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",

clusterName);

it.remove();

}

break;

}

}

}

if (removeBrokerName) {

Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =

this.topicQueueTable.entrySet().iterator();

while (itTopicQueueTable.hasNext()) {

Entry<String, List<QueueData>> entry = itTopicQueueTable.next();

String topic = entry.getKey();

List<QueueData> queueDataList = entry.getValue();

Iterator<QueueData> itQueueData = queueDataList.iterator();

while (itQueueData.hasNext()) {

QueueData queueData = itQueueData.next();

if (queueData.getBrokerName().equals(brokerNameFound)) {

itQueueData.remove();

log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",

topic, queueData);

}

}

if (queueDataList.isEmpty()) {

itTopicQueueTable.remove();

log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",

topic);

}

}

}

} finally {

this.lock.writeLock().unlock();

}

} catch (Exception e) {

log.error("onChannelDestroy Exception", e);

}

}

}

//......

}

  • 使用HashMap定义了topicQueueTable、brokerAddrTable、clusterAddrTable、clusterAddrTable、filterServerTable
  • 在onChannelDestroy方法里头使用读写锁对这些map进行并发控制
  • 首先找事件channel对应的broker信息,然后将其从brokerLiveTable、filterServerTable、brokerAddrTable、clusterAddrTable、topicQueueTable中移除

小结

rocketmq的BrokerHousekeepingService实现了ChannelEventListener接口,除了onChannelConnect外,其余各个方法均委托给namesrvController的routeInfoManager的onChannelDestroy方法,该方法主要是将下线的broker的信息从内存的路由映射中删除掉。

doc

  • BrokerHousekeepingService

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表