Apache ActiveMQ构建MQTT单消息服务器

xiaoxiao2021-02-28  71

构建消息服务器:

   服务器VMware ubuntu:

           Apache ActiveMQ

   win7客户端    :

      org.fusesource.mqtt-client  

      

   ubuntu 客户端

      paho.mqtt.c

      ./paho_c_sub topic --host (192.168.239.240) 服务器端的IP  --port  1883 --username admin --password password

 

 

win7 pub

 

package org.fusesource.mqtt; /** * Created by Administrator on 2017/6/6. */ import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; public class MQTTClient { public static void main(String[] args) throws Exception { // while (true) { //Thread.sleep(5000); sendMessage("hello"); //} } public static void sendMessage(String message) { String user = env("APOLLO_USER", "admin"); String password = env("APOLLO_PASSWORD", "admin"); String host = env("APOLLO_HOST", "localhost");//apollo服务器地址 host = "192.168.239.240"; int port = 1883;//apollo端口号 // String destination = "/topic/1/OOOOOOOOOO/aaaa";//topic String destination = "topic";//topic Buffer msg = new UTF8Buffer(message); MQTT mqtt = new MQTT();//新建MQTT try { mqtt.setHost(host, port); mqtt.setUserName(user); mqtt.setPassword(password); FutureConnection connection = mqtt.futureConnection(); connection.connect().await(); UTF8Buffer topic = new UTF8Buffer(destination); while (true) { Thread.sleep(4000); connection.publish(topic, msg, QoS.AT_LEAST_ONCE, false); } // connection.disconnect().await(); } catch (Exception e) { e.printStackTrace(); } finally { // System.exit(0); } } private static String env(String key, String defaultValue) { String rc = System.getenv(key); if (rc == null) { return defaultValue; } return rc; } }

 

win7 SUB

 

package org.fusesource.mqtt; /** * Created by Administrator on 2017/6/6. */ /** * Licensed to the Apache Software Foundation (ASF) under one or more */ import org.fusesource.hawtbuf.Buffer; import org.fusesource.hawtbuf.UTF8Buffer; import org.fusesource.mqtt.client.Callback; import org.fusesource.mqtt.client.CallbackConnection; import org.fusesource.mqtt.client.Listener; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; import org.fusesource.mqtt.client.Topic; /** * Uses an callback based interface to MQTT. Callback based interfaces are * harder to use but are slightly more efficient. */ class MQTTListener { public static void main(String[] args) throws Exception { String user = env("APOLLO_USER", "admin"); String password = env("APOLLO_PASSWORD", "admin"); //String host = env("APOLLO_HOST", "localhost"); String host = env("APOLLO_HOST", "localhost"); System.out.println("---------------:"+host.toString()); host = "192.168.239.240"; int port = Integer.parseInt(env("APOLLO_PORT", "1883")); //int port = Integer.parseInt(env("APOLLO_PORT", "61613")); // final String destination = arg(args, 0, "/topic/1/OOOOOOOOOO/aaaa"); final String destination = arg(args, 0, "topic"); MQTT mqtt = new MQTT(); mqtt.setHost(host, port); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setKeepAlive((short) 30); // mqtt.setCleanSession(false); // mqtt.setClientId("aaaa"); final CallbackConnection connection = mqtt.callbackConnection(); connection.listener(new Listener() { public void onConnected() { } public void onDisconnected() { } public void onFailure(Throwable value) { value.printStackTrace(); System.exit(-2); } public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) { String body = msg.utf8().toString(); System.out.println(body); ack.run(); } }); connection.connect(new Callback<Void>() { @Override public void onSuccess(Void value) { Topic[] topics = { new Topic(destination, QoS.AT_LEAST_ONCE) }; connection.subscribe(topics, new Callback<byte[]>() { public void onSuccess(byte[] qoses) { System.out.println("connected..."); } public void onFailure(Throwable value) { value.printStackTrace(); System.exit(-2); } }); } @Override public void onFailure(Throwable value) { value.printStackTrace(); System.exit(-2); } }); // Wait forever.. synchronized (MQTTListener.class) { while (true) { MQTTListener.class.wait(); } } } private static String env(String key, String defaultValue) { String rc = System.getenv(key); if (rc == null) { return defaultValue; } return rc; } private static String arg(String[] args, int index, String defaultValue) { if (index < args.length) { return args[index]; } else { return defaultValue; } } }

 

转载请注明原文地址: https://www.6miu.com/read-48172.html

最新回复(0)