循序渐进ActiveMQ(6)----使用zookeeper实现activemq的主从环境搭建

xiaoxiao2021-02-28  43

使用ZooKeeper实现的Master-Slave实现方式,是对ActiveMQ进行高可用的一种有效的解决方案。

原理:

1 使用ZooKeeper(集群)注册所有的ActiveMQ Broker。

2 只有其中的一个Broker可以对外提供服务(也就是Master节点),其他的Broker处于待机状态,被视为Slave。

3 。如果Master因故障而不能提供服务,则利用ZooKeeper的内部选举机制会从Slave中选举出一个Broker充当Master节

点,继续对外提供服务。

官方文档:http://activemq.apache.org/replicated-leveldb-store.html

部署方案

(1)首先我们下载apache-activemq-5.11.1-bin.tar.gz上传到我们的机器上准备部署。

(2)Zookeeper方案

主机IP消息端口通信端口节点目录/root/下192.168.98.95(hostname:node1)21812888:3888zookeeper-3.4.9192.168.98.96(hostname:node2)21812888:3888zookeeper-3.4.9192.168.98.97(hostname:node3)21812888:3888zookeeper-3.4.9

(3)ActiveMQ方案

主机IP消息端口集群通信端口控制台端口节点目录/root/下192.168.98.95(hostname:node1)51511626218161activemq-cluster/node1/192.168.98.96(hostname:node2)51512626228162activemq-cluster/node2/192.168.98.97(hostname:node3)51513626238163activemq-cluster/node3/

1 首先搭建zookeeper环境,

2 继续搭建activemq环境

2.1 在192.168.98.95节点下,创建/root/activemq-cluster文件夹,解压apache-activemq-5.11.1-bin.tar.gz文件,然后对解压好的文件改名,操作如下:

    1 命令:mkdir activemq-cluster

    2 命令:tar -zxvf apache-activemq-5.11.1-bin.tar.gz -C /root/activemq-cluster

    3 命令:cd  /root/activemq-cluster

    4 命令:mv apache-activemq-5.11.1 node1

如此操作,再次反复解压apache-activemq-5.11.1-bin.tar.gz文件到192.168.98.96和192.168.98.97的/root/activemq-cluster/下,分别对应建立node2和node3文件夹。

我们现在已经解压好了三台机器的三个mq节点也就是node1、node2、node3,下面我们要做的事情就是更改每个节点不同的配置和端口。

2.2 修改配置

2.2.1 修改控制台端口(默认为8161),在mq安装路径下的conf/jetty.xml进 行修改即可。(三个节点都要修改,并且端口都不同)

# cd /root/activemq-cluster/node1/conf/# vim /root/activemq-cluster/node1/conf/jetty.xml

node2和node3分别修改为8162和8163

2.2.2 集群配置文件修改:我们在mq安装路径下的conf/activemq.xml进行修   改其中的持久化适配器,修改其中的bind、zkAddress、hostname、zkPath。然后也需要修改mq的brokerName,并且每个节点名称都必须相同。

    #vim /root/activemq-cluster/node1/conf/activemq.xml

第一处修改:brokerName=“jeff-activemq-cluster”(三个节点都需要修改)

第二处修改:先注释掉适配器中的kahadb

第三处修改:添加新的leveldb配置如下(三个节点都需要修改):

Node1:

<persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://192.168.98.95:62621" zkAddress="192.168.98.95:2181,192.168.98.96:2181,192.168.98.97:2181" hostname="node1" zkPath="/activemq/leveldb-stores" /> </persistenceAdapter>

Node2:

<persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://192.168.98.96:62622" zkAddress="192.168.98.95:2181,192.168.98.96:2181,192.168.98.97:2181" hostname="node2" zkPath="/activemq/leveldb-stores" /> </persistenceAdapter>Node3:

<persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://192.168.98.97:62623" zkAddress="192.168.98.95:2181,192.168.98.96:2181,192.168.98.97:2181" hostname="node3" zkPath="/activemq/leveldb-stores" /> </persistenceAdapter>第四处修改:(修改通信的端口,避免冲突)              # vim /root/activemq-cluster/node1/conf/activemq.xml

修改这个文件的通信端口号,三个节点都需要修改(51511,51512,51513)

Ok,到此为止,我们的activemq集群环境已经搭建完毕!

3  测试启动activemq集群第一步:启动zookeeper集群,命令:zkServer.sh start第二步:启动mq集群:顺序启动mq:命令如下:/root/activemq-cluster/node1/bin/activemq start(关闭stop)/root/activemq-cluster/node2/bin/activemq start(关闭stop)/root/activemq-cluster/node3/bin/activemq start(关闭stop)第三步:查看日志信息:tail -f /root/activemq-cluster/node1/data/activemq.logtail -f /root/activemq-cluster/node2/data/activemq.logtail -f /root/activemq-cluster/node3/data/activemq.log如果不报错,我们的主从配置启动成功,可以使用控制台查看!

第四步:在代码中对集群的brokerUrl配置进行修改即可:

failover:(tcp://192.168.98.95:51511,tcp://192.168.98.96:51512,tcp://192.168.98.97:51513)?Randomize=false

查看mq主节点

既然zookeeper+activemq主从环境已经搭建完毕,那么究竟哪个机器activemq是主节点?哪个机器activemq是从节点?

我们打开zookeeper:

可以看到在zk的根节点下有个activemq/leveldb-stores节点,它下边有三个子节点,表示我们的三个activemq主机注册的节点,而主节点就是含有"eleced":"0000000003"这个属性的节点,而另外的子节点没有这个属性值:

主节点是node1,192.168.98.95,然后我们打开控制台:

http://192.168.98.96:8162/admin/queues.jsp

http://192.168.98.97:8163/admin/queues.jsp

发现并没有界面服务可以提供

打开:http://192.168.98.95:8161/admin/queues.jsp,界面正常访问:

应用测试

我们连接到activemq的主备集群名称为first的队列去发送50万条消息,每隔1s发送一次,然后启动消费者接收。

Sender.java

package jeff.mq.master; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author jeffSheng * 2018年7月3日 */ public class Sender { public static void main(String[] args) throws Exception { final Sender s = new Sender(); s.sender(); } public void sender() throws Exception{ /** * 第一步: * 建立ConnectionFactory工厂对象,需要填入用户名、密码、及要连接的地址,均 * 使用默认即可,默认端口“tcp://localhost:61616” */ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "failover:(tcp://192.168.98.95:51511,tcp://192.168.98.96:51512,tcp://192.168.98.97:51513)?Randomize=false"); /** * 第二步: * 通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法 * 开启连接,Connection连接默认是关闭的。 */ Connection connection = connectionFactory.createConnection(); connection.start(); /** * 第三步: * 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务, * 参数配置2为签收模式,一般我们设置自动签收。 */ //我们这里不开启事务 // Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //开启事务 Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); /** * 第四步: * 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象, * 在ptp模式中,Destination被称作Queue即队列;在Pub/Sub模式中Destination被称作Topic即主题 * 在程序众包给可以使用多个Queue和Topic */ Destination destination = session.createQueue("first"); /** * 第五步: * 我们需要通过Session对象常见消息的发送和接收对象(生产者和消费者) * MessageProcuder/MessageConsumer */ MessageProducer messageProducer = session.createProducer(null); /** * 第六步: * 我们可以使用MessageProducer的setDeliverMode方法为其设置持久化特性和非持久化特性(DeliverMode) */ //如果设置为持久话方式,我们需要指定具体持久话策略,比如jdbc持久化到数据库 // messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); /** * 第七步: * 最后我们使用JMS规范的TextMessage形式创建数据(通过Session对象),并用MessageProducer的send方法发送数据, * 同理客户端使用receive方法进行接收数据。最后不要忘记关闭Connection */ for (int i = 0; i < 500000; i++) { TextMessage textMessage = session.createTextMessage(); int n = (int)(Math.random()*10); textMessage.setText("我是消息,Id:"+n); /** * 参数解释: * 第一个参数:目的地 * 第二个参数:消息 * 第三个参数:是否持久化 * 第四个参数:优先级0~9,0-4是普通消息,5-9是加急消息 * 第五个参数:存活时间,这里我们设置的是2分钟 */ messageProducer.send(destination,textMessage,DeliveryMode.NON_PERSISTENT,0,1000*60); System.out.println("发送消息:"+textMessage.getText()); Thread.sleep(1000); } //关闭方法会递归向下关闭会话等连接 if(connection!=null){ connection.close(); } } }

receiver.java

package jeff.mq.master; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author jeffSheng * 2018年7月3日 */ public class Receiver { public static void main(String[] args) throws Exception { /** * 第一步: * 建立ConnectionFactory工厂对象,需要填入用户名、密码、及要连接的地址,均 * 使用默认即可,默认端口“tcp://localhost:61616” */ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "failover:(tcp://192.168.98.95:51511,tcp://192.168.98.96:51512,tcp://192.168.98.97:51513)?Randomize=false"); /** * 第二步: * 通过ConnectionFactory工厂对象我们创建一个Connection连接,并且调用Connection的start方法 * 开启连接,Connection连接默认是关闭的。 */ Connection connection = connectionFactory.createConnection(); connection.start(); /** * 第三步: * 通过Connection对象创建Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务, * 参数配置2为签收模式,一般我们设置自动签收。 */ //我们这里不开启事务 Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); /** * 第四步: * 通过Session创建Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象, * 在ptp模式中,Destination被称作Queue即队列;在Pub/Sub模式中Destination被称作Topic即主题 * 在程序众包给可以使用多个Queue和Topic */ Destination destination = session.createQueue("first"); /** * 第五步: * 我们需要通过Session对象常见消息的发送和接收对象(生产者和消费者) * MessageProcuder/MessageConsumer */ MessageConsumer messageConsumer = session.createConsumer(destination); while(true){ TextMessage msg = (TextMessage)messageConsumer.receive(); if(msg==null)break; System.out.println("收到内容: "+msg.getText()); } //关闭方法会递归向下关闭会话等连接 if(connection!=null){ connection.close(); } } }

当node1宕机后,node2和node3中有一个会成为maser,而node1恢复后会进行再次选举!

留个疑问:

      在测试的过程中发现生产者发送的数据无法入队!

---忙活了半个晚上加一个早上终于找到原因了,是linux的时间跟我windows的系统时间不一致,linux的时间快了,导致我windows消息设置的存活时间很快就过去了,发送的消息全部过期,我说怎么在topics的activemq.advisor.expire.queue.two中不停增长,而queue界面毛都没有!

调整了linux的时间,重启zk+mq,终于给我出来了!

linux校对时间方法:

 yum install ntp

 ntpdate cn.pool.ntp.org

失败转移

现在zk中的主activemq服务器是node1:

我们先启动服务让生产者和消费者正常运行,然后我们停掉node1:

可以看到eclipse控制台消费者打印的消息停滞了一小会儿,停在id=50

接着继续打印了!

我们看下现在的zk节点:主节点变成了node2,而node1已经不存在了

我们现在重启node1,

zk多了一个00000000059节点就是node1,但是主节点仍然是node2!activemq的选主逻辑应该是zk监听顺序临时节点的原理。

成功进行了failover失败转移!

负载均衡配置

很简单:

集群1链接集群2: <networkConnectors> <networkConnector uri="static:(tcp://192.168.1.112:51514,tcp://192.168.1.112:51515,tcp://192.168.1.112:51516)" duplex="false"/> </networkConnectors>就是在集群1中连接集群2的uri连接.

Jeff.Star 认证博客专家 Java [努力支撑经历,经历支撑能力.][思路决定出路,细节决定成败.][聚焦,分享,转化,参与.][数据在流动,技术在流动,我们也要流动.]微信:TiensC
转载请注明原文地址: https://www.6miu.com/read-2627984.html

最新回复(0)