- 
                Notifications
    
You must be signed in to change notification settings  - Fork 236
 
Description
Context
- OS and version used: We are using a custom Linux system build by Yocto (armv7l, linux kernel 6.6.66, glibc 2.39)
 - Java runtime used: 1.8.0_372-internal
 - SDK version used: currently 
compile group: 'com.microsoft.azure.sdk.iot', name: 'iot-device-client', version: '2.2.0'(but reading the current code base and verified that the problem still exists) 
It makes sense to state here that the problem has been realized on a single core system.
Description of the issue
The problem occurs after we changed kernel, glibc, ... and a lot of other stuff that could e.g. affect thread context switches, time of the thread creation, ...
We realized while using the DeviceClient's functions getTwinAsync and updateReportedPropertiesAsync that the onResponseReceived callback is not been called (sometimes).
We are using MQTT for transport.
While we checked a lot of things on our code base I start reading the implementation of the SDK and found a potential "race condition" that depends on which places threads contextes are switched.
- The class 
IotHubTransportcontains the functiononMessageReceived. - This function is called from from MqttIotHubConnection... but that is not related here.
 - First the message is added to receive queue 
this.addToReceivedMessagesQueue(message); - this will add the message to the received message queue and uses a semaphore release call so the  
IotHubReceiveTaskcould acquire the semaphore and handle its "logic" in another thread in parallel - the functions continues and will later call the onResponseReceived callback
 
But there are some conditions that needs to be fulfilled so onResponseReceived is called. E.g. correlationCallbacks.get(correlationId) must not be null, so correlationCallbacks needs contain a (non null) entry for the correltationId.
Now let's have a look at the "logic" that is executed asynchron.
- The 
IotHubReceiveTaskis calling the transport'shandleMessagefunction. - This itself it implemented again in the 
IotHubTransport. - It will call the 
acknowledgeReceivedMessagefunctions. - After doing its work it will spawn another thread that removes the 
correlationIdfromcorrelationCallbacks 
There is no barrier / synchronization done, so if the semaphore is released the IotHubReceiveTask could takeover. It can call the handleMessage and so the acknowledgeReceivedMessage function spawn a new thread, switch to that thread, remove the correlation ID and then switch the thread to execute to the handling onMessageReceived that added the message to the receive queue and released the semaphore and will now check for the correlation ID and as it is not part of the map anymore drop calling the onResponseReceived callback handling.
A simple fix could be to save the CorrelationCallbackContext object from the map before calling addToReceivedMessagesQueue and releasing the semaphore, so we can use it later regardless if it has already been removed or will be removed later.
Simple fix that does not touch any other code places (and most of the changes are just be caused by the different indentation count):
diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java
index 85f172d2d..d94616fa0 100644
--- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java
+++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java
@@ -309,6 +309,16 @@ public class IotHubTransport implements IotHubListener
     @Override
     public void onMessageReceived(IotHubTransportMessage message, TransportException e)
     {
+        CorrelationCallbackContext callbackContext = null;
+        if (message != null)
+        {
+            final String correlationId = message.getCorrelationId();
+            if (!correlationId.isEmpty())
+            {
+                callbackContext = correlationCallbacks.get(correlationId);
+            }
+        }
+
         if (message != null && e != null)
         {
             log.error("Exception encountered while receiving a message from service {}", message, e);
@@ -325,35 +335,30 @@ public class IotHubTransport implements IotHubListener
 
         try
         {
-            if (message != null)
+            if (callbackContext != null)
             {
-                String correlationId = message.getCorrelationId();
-                if (!correlationId.isEmpty())
+                if (callbackContext.getCallback() != null)
                 {
-                    CorrelationCallbackContext callbackContext = correlationCallbacks.get(correlationId);
-                    if (callbackContext != null && callbackContext.getCallback() != null)
+                    IotHubClientException clientException = null;
+                    if (e != null)
                     {
-                        IotHubClientException clientException = null;
-                        if (e != null)
-                        {
-                            // This case indicates that the transport layer failed to construct a valid message out of
-                            // a message delivered by the service
-                            clientException = e.toIotHubClientException();
-                        }
-                        else
+                        // This case indicates that the transport layer failed to construct a valid message out of
+                        // a message delivered by the service
+                        clientException = e.toIotHubClientException();
+                    }
+                    else
+                    {
+                        // This case indicates that the transport layer constructed a valid message out of a message
+                        // delivered by the service, but that message may contain an unsuccessful status code in cases
+                        // such as if an operation was rejected because it was badly formatted.
+                        IotHubStatusCode statusCode = IotHubStatusCode.getIotHubStatusCode(Integer.parseInt(message.getStatus()));
+                        if (!IotHubStatusCode.isSuccessful(statusCode))
                         {
-                            // This case indicates that the transport layer constructed a valid message out of a message
-                            // delivered by the service, but that message may contain an unsuccessful status code in cases
-                            // such as if an operation was rejected because it was badly formatted.
-                            IotHubStatusCode statusCode = IotHubStatusCode.getIotHubStatusCode(Integer.parseInt(message.getStatus()));
-                            if (!IotHubStatusCode.isSuccessful(statusCode))
-                            {
-                                clientException = new IotHubClientException(statusCode, "Received an unsuccessful operation error code from the service: " + statusCode);
-                            }
+                            clientException = new IotHubClientException(statusCode, "Received an unsuccessful operation error code from the service: " + statusCode);
                         }
-
-                        callbackContext.getCallback().onResponseReceived(message, callbackContext.getUserContext(), clientException);
                     }
+
+                    callbackContext.getCallback().onResponseReceived(message, callbackContext.getUserContext(), clientException);
                 }
             }
         }
Code sample exhibiting the issue
See description above, it is a software design problem as descriped above.
Console log of the issue
See description above, it is a software design problem as descriped above.