对 mqtt client paho的简单封装

xiaoxiao2021-02-28  126

先自定义一个监听器(回调)

package com.xyes.mqtt.listeners; /** * Created by 郭文梁 on 2017/8/31. */ @FunctionalInterface public interface OnMessageListener { void handleMessage(String topic, String message); }

确定客户端的基本功能

package com.xyes.mqtt; import com.xyes.mqtt.listeners.OnMessageListener; import org.eclipse.paho.client.mqttv3.MqttException; import java.io.Closeable; /** * Created by 郭文梁 on 2017/8/31. * 客户端接口 */ public interface RemoteClient extends Closeable { /** * @param broker 链接地址 * @param user 用户名 * @param password 密码 * @throws MqttException 异常 */ void init(String broker, String user, String password) throws MqttException; /** * 发布消息 * * @param topic 主题 * @param message 消息 * @param qos 传输等级 * @throws MqttException 异常 */ void publish(String topic, String message, int qos) throws MqttException; /** * 订阅消息 * * @param topic 主题 * @param listener 监听器 * @throws MqttException 异常 */ void subscribe(String topic, OnMessageListener listener) throws MqttException; /** * 取消订阅 * * @param topic 主题 * @throws MqttException 异常 */ void unSubscribe(String topic) throws MqttException; }

对其进行实现

package com.xyes.mqtt; import com.xyes.mqtt.listeners.OnMessageListener; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * Created by 郭文梁 on 2017/8/31. * MQTT客户端的简单实现 */ public class BasicRemoteClient implements RemoteClient, IMqttMessageListener { private String clientId; private MqttClient client; private Map<String, OnMessageListener> subscribeInfo; public BasicRemoteClient() { this(UUID.randomUUID().toString()); } private BasicRemoteClient(String clientId) { this.clientId = clientId; } @Override public void init(String broker, String user, String password) throws MqttException { subscribeInfo = new HashMap<>(); MemoryPersistence persistence = new MemoryPersistence(); client = new MqttClient(broker, clientId, persistence); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setUserName(user); options.setPassword(password.toCharArray()); client.connect(options); } @Override public void publish(String topic, String message, int qos) throws MqttException { MqttMessage mqttMessage = new MqttMessage(message.getBytes()); mqttMessage.setQos(qos); client.publish(topic, mqttMessage); } @Override public void subscribe(String topic, OnMessageListener listener) throws MqttException { try { client.subscribe(topic, this); subscribeInfo.put(topic, listener); } catch (MqttException e) { subscribeInfo.remove(topic); throw e; } } @Override public void unSubscribe(String topic) throws MqttException { client.unsubscribe(topic); subscribeInfo.remove(topic); } @Override public void close() throws IOException { try { client.disconnect(); } catch (MqttException e) { throw new IOException(e); } } @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { //获取监听器 OnMessageListener listener = subscribeInfo.get(topic); if (null != listener) { listener.handleMessage(topic, mqttMessage.toString()); } } }

测试(百度物接入服务)

import com.xyes.mqtt.BasicRemoteClient; import com.xyes.mqtt.RemoteClient; import org.eclipse.paho.client.mqttv3.MqttException; import org.junit.Test; import java.io.IOException; public class Main { String broker = "tcp://xyes.mqtt.iot.gz.baidubce.com:1883"; String user = "**********"; String password = "**********"; String topic = "width_data"; @Test public void testConnect() throws MqttException, IOException, InterruptedException { RemoteClient client = new BasicRemoteClient(); client.init(broker, user, password); client.subscribe(topic, (t, m) -> { System.out.println(t + "|" + m); }); client.publish(topic, "message from me", 0); Thread.sleep(10000); client.unSubscribe(topic); client.close(); } }
转载请注明原文地址: https://www.6miu.com/read-42757.html

最新回复(0)