Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
package org.eclipse.paho.android.service;

import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttPingActionListener;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttPingSender;
import org.eclipse.paho.client.mqttv3.MqttToken;
import org.eclipse.paho.client.mqttv3.internal.ClientComms;

import android.app.AlarmManager;
Expand Down Expand Up @@ -109,6 +111,11 @@ public void schedule(long delayInMilliseconds) {

/*
* This class sends PingReq packet to MQTT broker
*
* note perubahan :
* wakelock release ketika ping sudah terkirim.
* kondisi sebelumnya wakelock dapat dihold dengan
* waktu yang lama (1 ~ 2x keep alive interval) ketika jaringan kurang baik.
*/
class AlarmReceiver extends BroadcastReceiver {
private WakeLock wakelock;
Expand All @@ -126,47 +133,68 @@ public void onReceive(Context context, Intent intent) {
Log.d(TAG, "Ping " + count + " times.");

Log.d(TAG, "Check time :" + System.currentTimeMillis());
IMqttToken token = comms.checkForActivity();

// No ping has been sent.
if (token == null) {
return;
}
//listener untuk event ping, ketika ping terkirim / terjadi failure, release wakelock.
IMqttPingActionListener actionListener = new IMqttPingActionListener() {
@Override
public void onPingSent(IMqttToken token) {
//ping terkirim, release wakelock
Log.d(TAG, "Ping sent. Release lock(" + wakeLockTag + "):"
+ System.currentTimeMillis());
releaseWakeLock();
}

@Override
public void onSuccess(IMqttToken asyncActionToken) {
//dipanggil ketika mendapat respon ping kembali.
//wakelock sudah di lepas ketika ping terkirim sehingga tidak perlu melakukan release.
}

@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
Log.d(TAG, "Failure. Release lock(" + wakeLockTag + "):"
+ System.currentTimeMillis());
//Release wakelock when it is done.
releaseWakeLock();
}
};


// Assign new callback to token to execute code after PingResq
// arrives. Get another wakelock even receiver already has one,
// release it until ping response returns.

// wakelock dipindah ke sebelum checkForActivity karena yang sebelumnya ping sudah sent terlebih dahulu
// baru wakelock aquire.
if (wakelock == null) {
PowerManager pm = (PowerManager) service
.getSystemService(Service.POWER_SERVICE);
wakelock = pm.newWakeLock(PowerManager.PARTIAL_WAKE_LOCK,
wakeLockTag);
wakelock.setReferenceCounted(false);
wakelock.setReferenceCounted(false);
}
wakelock.acquire();
token.setActionCallback(new IMqttActionListener() {
Log.d(TAG, "hold wakelock(" + wakeLockTag + "):"
+ System.currentTimeMillis());

@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.d(TAG, "Success. Release lock(" + wakeLockTag + "):"
+ System.currentTimeMillis());
//Release wakelock when it is done.
if(wakelock != null && wakelock.isHeld()){
wakelock.release();
}
}
//action listener ditambahkan supaya wakelock dilepas setelah send ping.
IMqttToken token = comms.checkForActivity(actionListener);

@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
Log.d(TAG, "Failure. Release lock(" + wakeLockTag + "):"
+ System.currentTimeMillis());
//Release wakelock when it is done.
if(wakelock != null && wakelock.isHeld()){
wakelock.release();
}
}
});
// No ping has been sent.
if (token == null) {
Log.d(TAG, "no ping sent, Release lock(" + wakeLockTag + "):" +
System.currentTimeMillis());
releaseWakeLock();
return;
}
}

private void releaseWakeLock() {
if(wakelock != null && wakelock.isHeld()){
wakelock.release();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.util.Properties;
import java.util.Vector;

import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttPingActionListener;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
Expand Down Expand Up @@ -103,15 +105,19 @@ CommsReceiver getReceiver() {
void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {
final String methodName = "internalSend";
//@TRACE 200=internalSend key={0} message={1} token={2}
log.fine(CLASS_NAME, methodName, "200", new Object[]{message.getKey(), message, token});
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "200", new Object[]{message.getKey(), message, token});
}

if (token.getClient() == null ) {
// Associate the client with the token - also marks it as in use.
token.internalTok.setClient(getClient());
} else {
// Token is already in use - cannot reuse
//@TRACE 213=fail: token in use: key={0} message={1} token={2}
log.fine(CLASS_NAME, methodName, "213", new Object[]{message.getKey(), message, token});
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "213", new Object[]{message.getKey(), message, token});
}

throw new MqttException(MqttException.REASON_CODE_TOKEN_INUSE);
}
Expand Down Expand Up @@ -139,7 +145,9 @@ public void sendNoWait(MqttWireMessage message, MqttToken token) throws MqttExce
this.internalSend(message, token);
} else {
//@TRACE 208=failed: not connected
log.fine(CLASS_NAME, methodName, "208");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "208");
}
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_NOT_CONNECTED);
}
}
Expand All @@ -158,7 +166,9 @@ public void close() throws MqttException {
// Must be disconnected before close can take place
if (!isDisconnected()) {
//@TRACE 224=failed: not disconnected
log.fine(CLASS_NAME, methodName, "224");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "224");
}

if (isConnecting()) {
throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
Expand Down Expand Up @@ -197,7 +207,9 @@ public void connect(MqttConnectOptions options, MqttToken token) throws MqttExce
synchronized (conLock) {
if (isDisconnected() && !closePending) {
//@TRACE 214=state=CONNECTING
log.fine(CLASS_NAME,methodName,"214");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "214");
}

conState = CONNECTING;

Expand All @@ -221,7 +233,9 @@ public void connect(MqttConnectOptions options, MqttToken token) throws MqttExce
}
else {
// @TRACE 207=connect failed: not disconnected {0}
log.fine(CLASS_NAME,methodName,"207", new Object[] {new Byte(conState)});
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "207", new Object[]{new Byte(conState)});
}
if (isClosed() || closePending) {
throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
} else if (isConnecting()) {
Expand Down Expand Up @@ -250,7 +264,9 @@ public void connectComplete( MqttConnack cack, MqttException mex) throws MqttExc
}

// @TRACE 204=connect failed: rc={0}
log.fine(CLASS_NAME,methodName,"204", new Object[]{new Integer(rc)});
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "204", new Object[]{new Integer(rc)});
}
throw mex;
}

Expand All @@ -275,7 +291,9 @@ public void shutdownConnection(MqttToken token, MqttException reason) {
stoppingComms = true;

//@TRACE 216=state=DISCONNECTING
log.fine(CLASS_NAME,methodName,"216");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "216");
}

wasConnected = (isConnected() || isDisconnecting());
conState = DISCONNECTING;
Expand Down Expand Up @@ -336,7 +354,9 @@ public void shutdownConnection(MqttToken token, MqttException reason) {
// client to be marked as disconnected.
synchronized(conLock) {
//@TRACE 217=state=DISCONNECTED
log.fine(CLASS_NAME,methodName,"217");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "217");
}

conState = DISCONNECTED;
stoppingComms = false;
Expand Down Expand Up @@ -375,7 +395,9 @@ public void shutdownConnection(MqttToken token, MqttException reason) {
private MqttToken handleOldTokens(MqttToken token, MqttException reason) {
final String methodName = "handleOldTokens";
//@TRACE 222=>
log.fine(CLASS_NAME,methodName,"222");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "222");
}

MqttToken tokToNotifyLater = null;
try {
Expand Down Expand Up @@ -414,25 +436,35 @@ public void disconnect(MqttDisconnect disconnect, long quiesceTimeout, MqttToken
synchronized (conLock){
if (isClosed()) {
//@TRACE 223=failed: in closed state
log.fine(CLASS_NAME,methodName,"223");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "223");
}
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
} else if (isDisconnected()) {
//@TRACE 211=failed: already disconnected
log.fine(CLASS_NAME,methodName,"211");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "211");
}
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_ALREADY_DISCONNECTED);
} else if (isDisconnecting()) {
//@TRACE 219=failed: already disconnecting
log.fine(CLASS_NAME,methodName,"219");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "219");
}
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
} else if (Thread.currentThread() == callback.getThread()) {
//@TRACE 210=failed: called on callback thread
log.fine(CLASS_NAME,methodName,"210");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "210");
}
// Not allowed to call disconnect() from the callback, as it will deadlock.
throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_DISCONNECT_PROHIBITED);
}

//@TRACE 218=state=DISCONNECTING
log.fine(CLASS_NAME,methodName,"218");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "218");
}
conState = DISCONNECTING;
DisconnectBG discbg = new DisconnectBG(disconnect,quiesceTimeout,token);
discbg.start();
Expand Down Expand Up @@ -569,7 +601,9 @@ public void run() {
final String methodName = "connectBG:run";
MqttException mqttEx = null;
//@TRACE 220=>
log.fine(CLASS_NAME, methodName, "220");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "220");
}

try {
// Reset an exception on existing delivery tokens.
Expand All @@ -596,11 +630,15 @@ public void run() {
internalSend(conPacket, conToken);
} catch (MqttException ex) {
//@TRACE 212=connect failed: unexpected exception
log.fine(CLASS_NAME, methodName, "212", null, ex);
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "212", null, ex);
}
mqttEx = ex;
} catch (Exception ex) {
//@TRACE 209=connect failed: unexpected exception
log.fine(CLASS_NAME, methodName, "209", null, ex);
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "209", null, ex);
}
mqttEx = ExceptionHelper.createMqttException(ex);
}

Expand Down Expand Up @@ -632,7 +670,9 @@ void start() {
public void run() {
final String methodName = "disconnectBG:run";
//@TRACE 221=>
log.fine(CLASS_NAME, methodName, "221");
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "221");
}

// Allow current inbound and outbound work to complete
clientState.quiesce(quiesceTimeout);
Expand All @@ -657,19 +697,33 @@ public void run() {
public MqttToken checkForActivity(){
MqttToken token = null;
try{
token = clientState.checkForActivity();
token = clientState.checkForActivity(null);
}catch(MqttException e){
handleRunException(e);
}catch(Exception e){
handleRunException(e);
}
return token;
}
}

public MqttToken checkForActivity(IMqttPingActionListener actionListener) {
MqttToken token = null;
try{
token = clientState.checkForActivity(actionListener);
}catch(MqttException e){
handleRunException(e);
}catch(Exception e){
handleRunException(e);
}
return token;
}

private void handleRunException(Exception ex) {
final String methodName = "handleRunException";
//@TRACE 804=exception
log.fine(CLASS_NAME,methodName,"804",null, ex);
if(log.isLoggable(Logger.FINE)) {
log.fine(CLASS_NAME, methodName, "804", null, ex);
}
MqttException mex;
if ( !(ex instanceof MqttException)) {
mex = new MqttException(MqttException.REASON_CODE_CONNECTION_LOST, ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.eclipse.paho.client.mqttv3;

/**
* Created by ahmadulil on 12/18/15.
*
* Interface untuk callback ketika ping sent
*/
public interface IMqttPingActionListener extends IMqttActionListener {
void onPingSent(IMqttToken token);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public class MqttToken implements IMqttToken {
* MqttToken. MQTT application programs must not use the internal class.
*/
public Token internalTok = null;

/*
* callback ketika selesai kirim ping
*/
private IMqttPingActionListener pingSentCallback;

public MqttToken() {
}
Expand Down Expand Up @@ -98,4 +103,16 @@ public boolean getSessionPresent() {
public MqttWireMessage getResponse() {
return internalTok.getResponse();
}

/*
* callback untuk ping event
*/
public void setPingCallback(IMqttPingActionListener callback) {
internalTok.setActionCallback(callback);
this.pingSentCallback = callback;
}

public IMqttPingActionListener getPingSentCallback() {
return pingSentCallback;
}
}
Loading