ActiveMQ 入门helloworld

xiaoxiao2021-02-28  85

1.下载安装ActiveMQ

官网下载地址:http://activemq.apache.org/download.html

ActiveMQ 提供了Windows 和Linux、Unix 等几个版本,我选择了Windows 版本下进行开发。

下载完压缩包后,直接解压:

 

目录:

         bin存放的是脚本文件

         conf存放的是基本配置文件

        data存放的是日志文件

        docs存放的是说明文档

        examples存放的是简单的实例

        lib存放的是activemq所需jar包

        webapps用于存放项目的目录

 

2.启动ActiveMQ

可以在命令行中用命令启动,我这里就直接鼠标点击运行脚本启动了

ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。

admin:http://127.0.0.1:8161/admin/

我们在浏览器打开链接之后输入账号密码(默认账号密码都是admin),类似于访问tomcat。

 

登陆成功后页面显示:

ActiveMQ启动就完成了,关闭直接点击关闭脚本或用命令管就可以了。

 

3.不多bb,直接上helloworld代码

创建一个maven项目,最好是用jdk1.8,我用jdk1.7会报错,pom.xml依赖如下:

    <dependencies>       <dependency>           <groupId>org.apache.activemq</groupId>           <artifactId>activemq-all</artifactId>           <version>5.15.0</version>       </dependency>     </dependencies>

3.1创建生产者类:

package com.iflytek; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /**  *  * @author cjf  * 生产者  */ public class Producter {     //ActiveMq 的默认用户名     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;     //ActiveMq 的默认登录密码     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;     //ActiveMQ 的链接地址     private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;     AtomicInteger count = new AtomicInteger(0);     //链接工厂     ConnectionFactory connectionFactory;     //链接对象     Connection connection;     //事务管理     Session session;     ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();     public void init(){         try {             //创建一个链接工厂             connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);             //从工厂中创建一个链接             connection  = connectionFactory.createConnection();             //开启链接             connection.start();             //创建一个事务(这里通过参数可以设置事务的级别)             session = connection.createSession(true,Session.SESSION_TRANSACTED);         } catch (JMSException e) {             e.printStackTrace();         }     }     public void sendMessage(String disname){         try {             //创建一个消息队列             Queue queue = session.createQueue(disname);             //消息生产者             MessageProducer messageProducer = null;             if(threadLocal.get()!=null){                 messageProducer = threadLocal.get();             }else{                 messageProducer = session.createProducer(queue);                 threadLocal.set(messageProducer);             }            while(true){                 Thread.sleep(1000);                 int num = count.getAndIncrement();                 //创建一条消息                 TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+                         "productor:我是大帅哥,我现在正在生产东西!,count:"+num);                 System.out.println(Thread.currentThread().getName()+                         "productor:我是大帅哥,我现在正在生产东西!,count:"+num);                 //发送消息                 messageProducer.send(msg);                 //提交事务                 session.commit();             }         } catch (JMSException e) {             e.printStackTrace();         } catch (InterruptedException e) {             e.printStackTrace();         }     } }

 

3.2,创建消费者类:

package com.iflytek; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /**  *  * @author cjf  * 消费者  */ public class Comsumer {     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;     private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;     ConnectionFactory connectionFactory;     Connection connection;     Session session;     ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();     AtomicInteger count = new AtomicInteger();     public void init(){         try {             connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);             connection  = connectionFactory.createConnection();             connection.start();             session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);         } catch (JMSException e) {             e.printStackTrace();         }     }     public void getMessage(String disname){         try {             Queue queue = session.createQueue(disname);             MessageConsumer consumer = null;             if(threadLocal.get()!=null){                 consumer = threadLocal.get();             }else{                 consumer = session.createConsumer(queue);                 threadLocal.set(consumer);             }             while(true){                 Thread.sleep(1000);                 TextMessage msg = (TextMessage) consumer.receive();                 if(msg!=null) {                     msg.acknowledge();                     System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());                 }else {                     break;                 }             }         } catch (JMSException e) {             e.printStackTrace();         } catch (InterruptedException e) {             e.printStackTrace();         }     } }

 

4.运行ActiveMQ项目,进行测试

4.1,生产者生产消息:

package com.iflytek; /**  *  * @author cjf  * 测试生产者生产消息  */ public class TestProducter {     public static void main(String[] args){         Producter producter = new Producter();         producter.init();         TestProducter testMq = new TestProducter();         try {             Thread.sleep(1000);         } catch (InterruptedException e) {             e.printStackTrace();         }         //Thread 1         new Thread(testMq.new ProductorMq(producter)).start();         //Thread 2         new Thread(testMq.new ProductorMq(producter)).start();         //Thread 3         new Thread(testMq.new ProductorMq(producter)).start();         //Thread 4         new Thread(testMq.new ProductorMq(producter)).start();         //Thread 5         new Thread(testMq.new ProductorMq(producter)).start();     }     private class ProductorMq implements Runnable{         Producter producter;         public ProductorMq(Producter producter){             this.producter = producter;         }         @Override         public void run() {             while(true){                 try {                     producter.sendMessage("Jaycekon-MQ");                     Thread.sleep(10000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }         }     } }

运行结果:

4.2,测试消费者消费消息:

package com.iflytek; /**  *  * @author cjf  * activeMQ测试消费者消费消息  */ public class TestConsumer {     public static void main(String[] args){         Comsumer comsumer = new Comsumer();         comsumer.init();         TestConsumer testConsumer = new TestConsumer();         new Thread(testConsumer.new ConsumerMq(comsumer)).start();         new Thread(testConsumer.new ConsumerMq(comsumer)).start();         new Thread(testConsumer.new ConsumerMq(comsumer)).start();         new Thread(testConsumer.new ConsumerMq(comsumer)).start();     }     private class ConsumerMq implements Runnable{         Comsumer comsumer;         public ConsumerMq(Comsumer comsumer){             this.comsumer = comsumer;         }         @Override         public void run() {             while(true){                 try {                     comsumer.getMessage("Jaycekon-MQ");                     Thread.sleep(10000);                 } catch (InterruptedException e) {                     e.printStackTrace();                 }             }         }     } }

运行结果:

 

 

5.ActiveMQ的helloworld就写完了,更多MQ其他的知识自己再好好学,我也是个菜鸡刚看了2天这东西,求关注啊,可以交流一下。

 

转载请注明原文地址: https://www.6miu.com/read-47845.html

最新回复(0)