先自定义一个监听器(回调)
package com.xyes.mqtt.listeners;
/**
* Created by 郭文梁 on 2017/8/31.
*/
@FunctionalInterface
public interface OnMessageListener {
void handleMessage(String topic, String message);
}
确定客户端的基本功能
package com.xyes.mqtt;
import com.xyes.mqtt.listeners.OnMessageListener;
import org.eclipse.paho.client.mqttv3.MqttException;
import java.io.Closeable;
/**
* Created by 郭文梁 on 2017/8/31.
* 客户端接口
*/
public interface RemoteClient extends Closeable {
/**
* @param broker 链接地址
* @param user 用户名
* @param password 密码
* @throws MqttException 异常
*/
void init(String broker, String user, String password) throws MqttException;
/**
* 发布消息
*
* @param topic 主题
* @param message 消息
* @param qos 传输等级
* @throws MqttException 异常
*/
void publish(String topic, String message, int qos) throws MqttException;
/**
* 订阅消息
*
* @param topic 主题
* @param listener 监听器
* @throws MqttException 异常
*/
void subscribe(String topic, OnMessageListener listener) throws MqttException;
/**
* 取消订阅
*
* @param topic 主题
* @throws MqttException 异常
*/
void unSubscribe(String topic) throws MqttException;
}
对其进行实现
package com.xyes.mqtt;
import com.xyes.mqtt.listeners.OnMessageListener;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* Created by 郭文梁 on 2017/8/31.
* MQTT客户端的简单实现
*/
public class BasicRemoteClient implements RemoteClient, IMqttMessageListener {
private String clientId;
private MqttClient client;
private Map<String, OnMessageListener> subscribeInfo;
public BasicRemoteClient() {
this(UUID.randomUUID().toString());
}
private BasicRemoteClient(String clientId) {
this.clientId = clientId;
}
@Override
public void init(String broker, String user, String password) throws MqttException {
subscribeInfo = new HashMap<>();
MemoryPersistence persistence = new MemoryPersistence();
client = new MqttClient(broker, clientId, persistence);
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(user);
options.setPassword(password.toCharArray());
client.connect(options);
}
@Override
public void publish(String topic, String message, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage(message.getBytes());
mqttMessage.setQos(qos);
client.publish(topic, mqttMessage);
}
@Override
public void subscribe(String topic, OnMessageListener listener) throws MqttException {
try {
client.subscribe(topic, this);
subscribeInfo.put(topic, listener);
} catch (MqttException e) {
subscribeInfo.remove(topic);
throw e;
}
}
@Override
public void unSubscribe(String topic) throws MqttException {
client.unsubscribe(topic);
subscribeInfo.remove(topic);
}
@Override
public void close() throws IOException {
try {
client.disconnect();
} catch (MqttException e) {
throw new IOException(e);
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
//获取监听器
OnMessageListener listener = subscribeInfo.get(topic);
if (null != listener) {
listener.handleMessage(topic, mqttMessage.toString());
}
}
}
测试(百度物接入服务)
import com.xyes.mqtt.BasicRemoteClient
;
import com.xyes.mqtt.RemoteClient
;
import org.eclipse.paho.client.mqttv3.MqttException
;
import org.junit.
Test;
import java.io.IOException
;
public class Main {
String
broker =
"tcp://xyes.mqtt.iot.gz.baidubce.com:1883";
String
user =
"**********";
String
password =
"**********";
String
topic =
"width_data";
@Test
public void testConnect()
throws MqttException
, IOException
, InterruptedException {
RemoteClient client =
new BasicRemoteClient()
;
client.init(
broker, user, password)
;
client.subscribe(
topic, (t
, m) -> {
System.
out.println(t +
"|" + m)
;
})
;
client.publish(
topic, "message from me", 0)
;
Thread.
sleep(
10000)
;
client.unSubscribe(
topic)
;
client.close()
;
}
}