Activemq 消息服务

xiaoxiao2021-03-01  18

1、简介

ActiveMQ 自己看百科

2、下载安装

 去 apache 官网下载,http://archive.apache.org/dist/activemq/ 

3、安装

a、解压

tar -xvf apache-activemq-5.14.4-bin.tar.gz

 b、拷贝到/usr/local下

mv apache-activemq-5.14.4 /usr/local/activemq

 c、启动

bin/activemq start

 d、启动后可以通过8161端口访问

http://10.0.0.12:8161

 e、设置访问账号密码:

在 conf/activemq.xml 中的 broker 节点中加入如下代码,设置消息的发送接收账号密码:

<plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="manager" password="123456" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins>

 在 conf/jetty.xml 文件中找到bean 的name为 securityConstraint的修改为如下内容:

<bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" /> <property name="roles" value="admin" /> <property name="authenticate" value="true" /> </bean>

 在文件conf/jetty-realm.properties中设置网页登录的账号密码

# 账号: 密码,[角色,角色] admin: 123456, admin

 设置完成后,打开d步骤的地址,提示输入会提示输入账号密码

 

4、Activemq持久化到mysql数据库

修改配置文件conf/activemq.xml,找到 persistenceAdapter 节点,修改为如下内容

<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter>

 在该文件的beans节点加如下配置  就是spring xml方式配置一个 数据库链接

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://127.0.0.1/test"/> <property name="username" value="root"/> <property name="password" value="123456"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>

 5、java代码链接

protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("manager" , "123456" , "tcp://10.0.0.12:61616"); @Test public void testSend()throws Exception{ try { Connection conn = connectionFactory.createConnection(); conn.start(); Session session = conn.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue("FirstQueue"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); for(int x=0;x<10;x++){ TextMessage textMessage = session.createTextMessage("ActiveMq 发送的消息" + x); producer.send( textMessage ); session.commit(); } conn.close(); } catch (Exception e) { e.printStackTrace(); } } @Test public void testReceiver()throws Exception{ Connection conn = connectionFactory.createConnection(); conn.start(); Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue("FirstQueue"); MessageConsumer consumer = session.createConsumer(destination); while (true) { //设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } } }

a、在application.properties文件中加入activemq的配置

spring.activemq.user=manager spring.activemq.password=123456 spring.activemq.brokerUrl=tcp://10.0.0.12:61616

  

b、spring boot + activemq 创建监听类

import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class ConsumerMessageListener implements MessageListener { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println( textMessage.getText()); } catch (Exception e) { e.printStackTrace(); } } }

 定义bean DefaultMessageListenerContainer

@Bean @Autowired public DefaultMessageListenerContainer container(ConnectionFactory connectionFactory ){ DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setDestination(new ActiveMQQueue("default_queue")); container.setMessageListener(new ConsumerMessageListener()); container.setConcurrentConsumers(2); container.setConcurrency( "2-5" ); return container ; }

 使用JmsTemplate发送消息

@Autowired private JmsTemplate jmsTemplate; Map<String, Object> map = new HashMap<String, Object>(); map.put("userid", "1"); map.put("username", "admin"); final String mapStr = JSON.toJSONString(map); jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage(mapStr); return textMessage; } });

 

c、发布topic

ActiveMQTopic activeMQTopic = new ActiveMQTopic("topic001"); jmsTemplate.send(activeMQTopic, new MessageCreator(){ @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage("你好"); } });

 d、订阅topic

@Bean @Autowired public DefaultMessageListenerContainer container2(ConnectionFactory connectionFactory ){ DefaultMessageListenerContainer container = new DefaultMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setDestination(new ActiveMQTopic("topic001")); container.setMessageListener(new ConsumerMessageListener()); container.setConcurrentConsumers(1); container.setConcurrency( "1" ); return container ; }

 

相关资源:ActiveMQ消息服务器 v5.16.0
转载请注明原文地址: https://www.6miu.com/read-3099971.html

最新回复(0)