如何写一个MQTT连接的github android客户端端

android4.0以上实现Mqtt客户端 - CSDN博客
android4.0以上实现Mqtt客户端
由于wmqtt.jar库在android4.0以上实现有问题会报MqttException Null异常,原因是该库只支持4.0以下版本。无奈只有寻找其他解决方案,最后选择的是Paho库中的client版本,org.eclipse.paho.client.mqttv3.jar。利用该库可以在android4.0以上正常连接Mqtt的服务器,博主用的android5.1进行实验的。
博主利用mqtt实现的主要功能是android端要与特定的智能硬件进行通信,而且是双向通信,android要给智能硬件发控制信息,智能硬件要给android返回状态信息;关于订阅与发布的id问题是其中的关键,android端要获取智能硬件的id,作为android发布信息的主题,在绑定完智能硬件之后,智能硬件发布以智能硬件的id+后缀作为智能硬件的发布信息id,而android端订阅智能硬件的id+后缀的主题。(启动的时候默认订阅服务器主题,服务器发送目前已经绑定了的智能硬件的主题,拉取主题列表。主题列表中有智能硬件在线、离线状态,更新客户端状态。外部设置进行其他智能硬件的主题订阅,在接口中添加主题订阅操作,在接口中添加自定义主题发布操作。)
android端为了每个活动都可以更方便得对确定主题的发布,与订阅主题的接收。在android建立一个基础活动类MqttBaseActivity,而其他需要进行mqtt操作的活动都继承该活动,减少活动中非主要业务代码冗余,提高代码可维护性。
要实现上述设想,需要进行3方面的工作:
1.MqttService类实现;
2.MqttBaseActivity类实现;
3.主活动调用示例;
MqttService提供给外界的接口主要包括内容订阅,自定义主题消息发送,全局广播消息接收。
package com.splxtech.powermanagor.
import java.util.L
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttConnectO
import org.eclipse.paho.client.mqttv3.MqttDefaultFileP
import org.eclipse.paho.client.mqttv3.MqttDeliveryT
import org.eclipse.paho.client.mqttv3.MqttE
import org.eclipse.paho.client.mqttv3.MqttM
import org.eclipse.paho.client.mqttv3.MqttPersistenceE
import org.eclipse.paho.client.mqttv3.MqttT
import org.eclipse.paho.client.mqttv3.internal.MemoryP
import android.app.AlarmM
import android.app.PendingI
import android.app.S
import android.content.BroadcastR
import android.content.C
import android.content.I
import android.content.IntentF
import android.net.ConnectivityM
import android.net.NetworkI
import android.os.H
import android.os.HandlerT
import android.os.IB
import android.provider.Settings.S
import android.support.v4.content.LocalBroadcastM
import android.util.L
import com.splxtech.powermanagor.IMqttS
public class MqttService extends Service implements MqttCallback
public static final String DEBUG_TAG = "MqttService";
private static final String MQTT_THREAD_NAME = "MqttService[" + DEBUG_TAG + "]";
private static final String MQTT_BROKER = "";
private static final int MQTT_PORT = 1883;
public static final int MQTT_QOS_0 = 0;
public static final int MQTT_QOS_1 = 1;
public static final int MQTT_QOS_2 = 2;
private static final int MQTT_KEEP_ALIVE = 240000;
private static final String MQTT_KEEP_ALIVE_TOPIC_FORAMT = "/users/%s/keepalive";
private static final byte[]
MQTT_KEEP_ALIVE_MESSAGE = { 0 };
private static final int MQTT_KEEP_ALIVE_QOS = MQTT_QOS_0;
private static final boolean MQTT_CLEAN_SESSION = true;
private static final String MQTT_URL_FORMAT = "tcp://%s:%d";
private static final String ACTION_START
= DEBUG_TAG + ".START";
private static final String ACTION_STOP
= DEBUG_TAG + ".STOP";
private static final String ACTION_KEEPALIVE= DEBUG_TAG + ".KEEPALIVE";
private static final String ACTION_RECONNECT= DEBUG_TAG + ".RECONNECT";
private static final String DEVICE_ID_FORMAT = "andr_%s";
private boolean mStarted = false;
private String mDeviceId;
private Handler mConnH
private MqttDefaultFilePersistence mDataS
private MemoryPersistence mMemS
private MqttConnectOptions mO
private MqttTopic mKeepAliveT
private MqttClient mC
private AlarmManager mAlarmM
private ConnectivityManager mConnectivityM
private LocalBroadcastManager localBroadcastM
public static final String MQTT_RECE_MESSAGE_ACTION = "com.splxtech.powermanagor.engine.mqttservice.recemessage";
* Start MQTT Client
public static void actionStart(Context ctx) {
Intent i = new Intent(ctx,MqttService.class);
i.setAction(ACTION_START);
ctx.startService(i);
* Stop MQTT Client
public static void actionStop(Context ctx) {
Intent i = new Intent(ctx,MqttService.class);
i.setAction(ACTION_STOP);
ctx.startService(i);
* Send a KeepAlive Message
public static void actionKeepalive(Context ctx) {
Intent i = new Intent(ctx,MqttService.class);
i.setAction(ACTION_KEEPALIVE);
ctx.startService(i);
* Initalizes the DeviceId and most instance variables
* Including the Connection Handler, Datastore, Alarm Manager
* and ConnectivityManager.
public void onCreate() {
super.onCreate();
mDeviceId = String.format(DEVICE_ID_FORMAT,
Secure.getString(getContentResolver(), Secure.ANDROID_ID));
HandlerThread thread = new HandlerThread(MQTT_THREAD_NAME);
thread.start();
mConnHandler = new Handler(thread.getLooper());
mDataStore = new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath());
} catch(MqttPersistenceException e) {
e.printStackTrace();
mDataStore = null;
mMemStore = new MemoryPersistence();
mOpts = new MqttConnectOptions();
mOpts.setCleanSession(MQTT_CLEAN_SESSION);
mAlarmManager = (AlarmManager) getSystemService(ALARM_SERVICE);
mConnectivityManager = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
localBroadcastManager = LocalBroadcastManager.getInstance(this);
* Service onStartCommand
* Handles the action passed via the Intent
* START_REDELIVER_INTENT
public int onStartCommand(Intent intent, int flags, int startId) {
super.onStartCommand(intent, flags, startId);
String action = intent.getAction();
Log.i(DEBUG_TAG,"Received action of " + action);
if(action == null) {
Log.i(DEBUG_TAG,"Starting service with no action\n Probably from a crash");
if(action.equals(ACTION_START)) {
Log.i(DEBUG_TAG,"Received ACTION_START");
} else if(action.equals(ACTION_STOP)) {
} else if(action.equals(ACTION_KEEPALIVE)) {
keepAlive();
} else if(action.equals(ACTION_RECONNECT)) {
if(isNetworkAvailable()) {
reconnectIfNecessary();
return START_REDELIVER_INTENT;
* Attempts connect to the Mqtt Broker
* and listen for Connectivity changes
* via ConnectivityManager.CONNECTVITIY_ACTION BroadcastReceiver
private synchronized void start() {
if(mStarted) {
Log.i(DEBUG_TAG,"Attempt to start while already started");
if(hasScheduledKeepAlives()) {
stopKeepAlives();
connect();
registerReceiver(mConnectivityReceiver,new IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION));
* Attempts to stop the Mqtt client
* as well as halting all keep alive messages queued
* in the alarm manager
private synchronized void stop() {
if(!mStarted) {
Log.i(DEBUG_TAG,"Attemtpign to stop connection that isn't running");
if(mClient != null) {
mConnHandler.post(new Runnable() {
public void run() {
mClient.disconnect();
} catch(MqttException ex) {
ex.printStackTrace();
mClient = null;
mStarted = false;
stopKeepAlives();
unregisterReceiver(mConnectivityReceiver);
* Connects to the broker with the appropriate datastore
private synchronized void connect() {
String url = String.format(Locale.US, MQTT_URL_FORMAT, MQTT_BROKER, MQTT_PORT);
Log.i(DEBUG_TAG,"Connecting with URL: " + url);
if(mDataStore != null) {
Log.i(DEBUG_TAG,"Connecting with DataStore");
mClient = new MqttClient(url,mDeviceId,mDataStore);
Log.i(DEBUG_TAG,"Connecting with MemStore");
mClient = new MqttClient(url,mDeviceId,mMemStore);
} catch(MqttException e) {
e.printStackTrace();
mConnHandler.post(new Runnable() {
public void run() {
mClient.connect(mOpts);
mClient.subscribe("hello", 0);
mClient.setCallback(MqttService.this);
mStarted = true;
Log.i(DEBUG_TAG,"Successfully connected and subscribed starting keep alives");
startKeepAlives();
} catch(MqttException e) {
e.printStackTrace();
* Schedules keep alives via a PendingIntent
* in the Alarm Manager
private void startKeepAlives() {
Intent i = new Intent();
i.setClass(this, MqttService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
mAlarmManager.setRepeating(AlarmManager.RTC_WAKEUP,
System.currentTimeMillis() + MQTT_KEEP_ALIVE,
MQTT_KEEP_ALIVE, pi);
* Cancels the Pending Intent
* in the alarm manager
private void stopKeepAlives() {
Intent i = new Intent();
i.setClass(this, MqttService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
mAlarmManager.cancel(pi);
* Publishes a KeepALive to the topic
* in the broker
private synchronized void keepAlive() {
if(isConnected()) {
sendKeepAlive();
} catch(MqttConnectivityException ex) {
ex.printStackTrace();
reconnectIfNecessary();
} catch(MqttPersistenceException ex) {
ex.printStackTrace();
} catch(MqttException ex) {
ex.printStackTrace();
* Checkes the current connectivity
* and reconnects if it is required.
private synchronized void reconnectIfNecessary() {
if(mStarted && mClient == null) {
connect();
* Query's the NetworkInfo via ConnectivityManager
* to return the current connected state
* boolean true if we are connected false otherwise
private boolean isNetworkAvailable() {
NetworkInfo info = mConnectivityManager.getActiveNetworkInfo();
return (info == null) ? false : info.isConnected();
* Verifies the client State with our local connected state
* true if its a match we are connected false if we aren't connected
private boolean isConnected() {
if(mStarted && mClient != null && !mClient.isConnected()) {
Log.i(DEBUG_TAG,"Mismatch between what we think is connected and what is connected");
if(mClient != null) {
return (mStarted && mClient.isConnected()) ? true : false;
return false;
* Receiver that listens for connectivity chanes
* via ConnectivityManager
private final BroadcastReceiver mConnectivityReceiver = new BroadcastReceiver() {
public void onReceive(Context context, Intent intent) {
Log.i(DEBUG_TAG,"Connectivity Changed...");
* Sends a Keep Alive message to the specified topic
* MqttDeliveryToken specified token you can choose to wait for completion
private synchronized MqttDeliveryToken sendKeepAlive()
throws MqttConnectivityException, MqttPersistenceException, MqttException {
if(!isConnected())
throw new MqttConnectivityException();
if(mKeepAliveTopic == null) {
mKeepAliveTopic = mClient.getTopic(
String.format(Locale.US, MQTT_KEEP_ALIVE_TOPIC_FORAMT,mDeviceId));
Log.i(DEBUG_TAG,"Sending Keepalive to " + MQTT_BROKER);
MqttMessage message = new MqttMessage(MQTT_KEEP_ALIVE_MESSAGE);
message.setQos(MQTT_KEEP_ALIVE_QOS);
return mKeepAliveTopic.publish(message);
* Query's the AlarmManager to check if there is
* a keep alive currently scheduled
* true if there is currently one scheduled false otherwise
private synchronized boolean hasScheduledKeepAlives() {
Intent i = new Intent();
i.setClass(this, MqttService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getBroadcast(this, 0, i, PendingIntent.FLAG_NO_CREATE);
return (pi != null) ? true : false;
public IBinder onBind(Intent arg0) {
return iMqttS
* Connectivity Lost from broker
public void connectionLost(Throwable arg0) {
stopKeepAlives();
mClient = null;
if(isNetworkAvailable()) {
reconnectIfNecessary();
* Publish Message Completion
public void deliveryComplete(MqttDeliveryToken arg0) {
* Received Message from broker
public void messageArrived(MqttTopic topic, MqttMessage message)
throws Exception {
Intent intent = new Intent(MQTT_RECE_MESSAGE_ACTION);
intent.putExtra("Topic",topic.getName());
intent.putExtra("Message",message.getPayload());
localBroadcastManager.sendBroadcast(intent);
Log.i(DEBUG_TAG,"
Topic:\t" + topic.getName() +
Message:\t" + new String(message.getPayload()) +
QoS:\t" + message.getQos());
* MqttConnectivityException Exception class
private class MqttConnectivityException extends Exception {
private static final long serialVersionUID = -9469420L;
private IMqttService.Stub iMqttService = new IMqttService.Stub(){
public boolean mqttSubscribe(String topic,int mqttQOS)
if(isConnected()) {
mClient.subscribe(topic, mqttQOS);
return true;
} catch (MqttException e) {
e.printStackTrace();
return false;
return false;
public boolean mqttPubMessage(String topic,String Message,int mqttQOS)
if(isConnected())
MqttTopic mqttTopic = mClient.getTopic(topic);
MqttMessage message = new MqttMessage(Message.getBytes());
message.setQos(mqttQOS);
mqttTopic.publish(message);
return true;
catch (MqttException e)
e.printStackTrace();
return false;
return false;
2.MqttBaseActivity.class,里面包含iMqtt_Service接口绑定,广播接收器消息注册,调用基本思路是如果继承子类中有receiver的实现,则会调用接口绑定与广播消息接收器注册,如果没有receiver实现则认为在该activity中不需要实现mqtt的订阅、发布、接收任务。下面是具体实现代码:
package com.splxtech.powermanagor.B
import android.content.BroadcastR
import ponentN
import android.content.C
import android.content.I
import android.content.IntentF
import android.content.ServiceC
import android.os.IB
import android.support.v4.content.LocalBroadcastM
import com.splxtech.powermanagor.IMqttS
import com.splxtech.powermanagor.engine.MqttS
import com.splxtech.splxapplib.activity.BaseA
* Created by li300 on
public abstract class MqttBaseActivity extends BaseActivity {
public MessageMqttReciver mR
private IntentFilter mIntentF
private Intent mServiceI
private LocalBroadcastManager localBroadcastM
public IMqttService iMqttS
private boolean
private ServiceConnection conn = new ServiceConnection() {
public void onServiceConnected(ComponentName componentName, IBinder iBinder) {
iMqttService = IMqttService.Stub.asInterface(iBinder);
public void onServiceDisconnected(ComponentName componentName) {
iMqttService = null;
public void onStart()
flag = false;
if(mReciver!=null)
flag = true;
initMqtt();
localBroadcastManager.registerReceiver(mReciver,mIntentFilter);
bindService(mServiceIntent,conn,BIND_ABOVE_CLIENT);
super.onStart();
public void onDestroy()
if(flag==true)
unbindService(conn);
localBroadcastManager.unregisterReceiver(mReciver);
super.onDestroy();
public void initMqtt()
localBroadcastManager = LocalBroadcastManager.getInstance(this);
mServiceIntent = new Intent(this, MqttService.class);
mIntentFilter = new IntentFilter();
mIntentFilter.addAction(MqttService.MQTT_RECE_MESSAGE_ACTION);
public abstract class MessageMqttReciver extends BroadcastReceiver
public abstract void onReceive(Context context, Intent intent);
本文已收录于以下专栏:
相关文章推荐
首先创建一个MqttClient对象用于连接到远程的MQTT服务器,第一个参数为地址,第二个参数为客户端名称,第三个参数为clientid保存形式。
然后可以...
继上篇的MQTT学习,此篇主要是实现一个安卓客户端,利用activemq服务器,实现安卓客户端之间的推送首先需要说明客户端的结构客户端的组成如下:
登录界面,如下图所示
注册界面,如图
推送界面,如图...
这篇文章主要讲解如何使用ESP8266(基于NONS_SDK_v2.0)作为MQTT客户端,并连上在本地电脑搭建的MQTT服务器。
他的最新文章
讲师:吴岸城
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)博客分类:
本文介绍在中实现推送方式的基础知识及相关解决方案。
1.推送方式基础知识:
当我们开发需要和服务器交互的应用程序时,基本上都需要获取服务器端的数据,比如《地震应急通》就需要及时获取服务器上最新的地震信息。要获取服务器 上不定时更新的信息一般来说有两种方法,第一种是客户端使用Pull(拉)的方式,隔一段时间就去服务器上获取信息,看是否有更新的信息出现。第二种就是 服务器使用Push(推送)的方式,当服务器端有新信息了,则把最新的信息Push到客户端上。
虽然Pull和Push两种方式都能实现获取服务器端更新信息的功能,但是明显来说Push is better than pull。因为Pull方式更费客户端的网络流量,更主要的是费电量。
在开发Android和iPhone应用程序时,我们往往需要从服务器不定的向手机客户端即时推送各种通知消息,iPhone上已经有了比较简单的和完美的推送通知解决方案,我会在以后详细介绍IPhone中的解决方案,可是Android平台上实现起来却相对比较麻烦,最近利用几天的时间对Android的推送通知服务进行初步的研究。 在Android手机平台上,Google提供了C2DM(Cloudto Device Messaging)服务,起初我就是准备采用这个服务来实现自己手机上的推送功能。
Android Cloud to Device Messaging (C2DM)是一个用来帮助开发者从服务器向Android应用程序发送数据的服务。该服务提供了一个简单的、轻量级的机制,允许服务器可以通知移动应用程序直接与服务器进行通信,以便于从服务器获取应用程序更新和用户数据。C2DM服务负责处理诸如消息排队等事务并向运行于目标设备上的应用程序分发这些消息。关于C2DM具体使用过程,我会以后的博文中再详细介绍,这里大家先了解下大致方案情况。
C2DM操作过程图:
但是经过一番研究发现,这个服务存在很大的问题:
1)C2DM内置于Android的2.2系统上,无法兼容老的1.6到2.1系统;
2)C2DM需要依赖于Google官方提供的C2DM服务器,由于国内的网络环境,这个服务经常不可用,如果想要很好的使用,我们的App Server必须也在国外,这个恐怕不是每个开发者都能够实现的; 有了上述两个使用上的制约,导致我最终放弃了这个方案,不过我想利用另外一篇文章来详细的介绍C2DM的框架以及客户端和App Server的相应设置方法,可以作为学习与参考之用。 即然C2DM无法满足我们的要求,那么我们就需要自己来实现Android手机客户端与App Server之间的通信协议,保证在App Server想向指定的Android设备发送消息时,Android设备能够及时的收到。
2. 几种常见的解决方案
1)轮询(Pull):应用程序应当阶段性的与服务器进行连接并查询是否有新的消息到达,你必须自己实现与服务器之间的通信,例如消息排队等。而且你还要考虑轮询的频率,如果太慢可能导致某些消息的延迟,如果太快,则会大量消耗网络带宽和电池。
2)SMS(Push):在Android平台上,你可以通过拦截SMS消息并且解析消息内容来了解服务器的意图。这是一个不错的想法,我就见过采 用这个方案的应用程序。这个方案的好处是,可以实现完全的实时操作。但是问题是这个方案的成本相对比较高,你很难找到免费的短消息发送网关,关于这个方案的实现。
3)持久连接(Push):这个方案可以解决由轮询带来的性能问题,但是还是会消耗手机的电池。Apple的推送服务之所以工作的很好,是因为每一 台手机仅仅保持一个与服务器之间的连接,事实上C2DM也是这么工作的。不过这个方案也存在不足,就是我们很难在手机上实现一个可靠的服务。
Android操作系统允许在低内存情况下杀死系统服务,所以你的通知服务很可能被操作系统Kill掉了。 前两个方案存在明显的不足,第三个方案也有不足,不过我们可以通过良好的设计来弥补,以便于让该方案可以有效的工作。毕竟,我们要知道GMail,GTalk以及GoogleVoice都可以实现实时更新的。
3. MQTT协议实现Android推送
采用MQTT协议实现Android推送 MQTT是一个轻量级的消息发布/订阅协议,它是实现基于手机客户端的消息推送服务器的理想解决方案。 wmqtt.jar 是IBM提供的MQTT协议的实现。我们可以从下载该项目的实例代码,并且可以找到一个采用PHP书写的。
架构如下所示:
wmqtt.jar 是IBM提供的MQTT协议的实现。我们可以从如下站点它。你可以将该jar包加入你自己的Android应用程序中。
4.RSMB实现推送:
Really Small Message Broker (RSMB) ,他是一个简单的,同样由IBM提供。缺省打开1883端口,应用程序当中,它负责接收来自服务器的消息并将其转发给指定的移动设备。
SAM是一个针对MQTT写的。我们可以从这个下载它.
send_mqtt.php是一个通过POST接收消息并且通过SAM将消息发送给RSMB的PHP脚本。
Really Small Message Broker (RSMB) ,他是一个简单的MQTT代理,同样由IBM提供。缺省打开1883端口,应用程序当中,它负责接收来自服务器的消息并将其转发给指定的移动设备。
5. XMPP协议实现Android推送
这是我在项目中采用的方案。事实上Google官方的C2DM服务器底层也是采用XMPP协议进行的封装。 XMPP(可扩展通讯和表示协议)是基于可扩展标记语言(XML)的协议,它用于即时消息(IM)以及在线探测。这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息。关于XMPP协议我在上篇博文中已经介绍,大家可以参考:
pn是一个基于XMPP协议的java开源Android push notification实现,我会在以后的博文中详细介绍androidpn。它包含了完整的客户端和服务器端。经过源代码研究我发现,该服务器端基本是在另外一个开源工程openfire基础上修改实现的,不过比较郁闷的是androidpn的文档是由韩语写的,所以整个研究过程基本都是读源码。
实现意图如下图所示:
androidpn 客户端需要用到一个基于java的开源XMPP协议包asmack,这个包同样也是基于openfire下的另外一个开源项目smack,不过我们不需要 自己编译,可以直接把androidpn客户端里面的asmack.jar拿来使用。客户端利用asmack中提供的XMPPConnection类与服 务器建立持久连接,并通过该连接进行用户注册和登录认证,同样也是通过这条连接,接收服务器发送的通知。
androidpn服务器端也是java语言实现的,基于openfire开源工程,不过它的Web部分采用的是spring框架,这一点与 openfire是不同的。Androidpn服务器包含两个部分,一个是侦听在5222端口上的XMPP服务,负责与客户端的 XMPPConnection类进行通信,作用是用户注册和身份认证,并发送推送通知消息。另外一部分是Web服务器,采用一个轻量级的HTTP服务器, 负责接收用户的Web请求。服务器架构如下:
最上层包含四个组成部分,分别是SessionManager,Auth Manager,PresenceManager以及Notification Manager。SessionManager负责管理客户端与服务器之间的会话,Auth Manager负责客户端用户认证管理,Presence Manager负责管理客户端用户的登录状态,NotificationManager负责实现服务器向客户端推送消息功能。
这个解决方案的最大优势就是简单,我们不需要象C2DM那样依赖操作系统版本,也不会担心某一天Google服务器不可用。利用XMPP协议我们还可以进一步的对协议进行扩展,实现更为完善的功能。 采用这个方案,我们目前只能发送文字消息,不过对于推送来说一般足够了,因为我们不能指望通过推送得到所有的数据,一般情况下,利用推送只是告诉手机端服务器发生了某些改变,当客户端收到通知以后,应该主动到服务器获取最新的数据,这样才是推送服务的完整实现。
至于详细使用过程,我们会在下个博文中再给大家介绍。
转自:安卓巴士&Android开发资料共享区&&
wanggongze
浏览: 26345 次
来自: 苏州
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'

我要回帖

更多关于 android telnet客户端 的文章

 

随机推荐