Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/PubSubClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,28 @@ bool PubSubClient::beginPublish(const char* topic, size_t plength, uint8_t qos,
}
this->_qos = qos; // save the QoS for later endPublish() operation
// check if the header and the topic (including 2 length bytes) fit into the buffer

#if MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE > 0
int total_msg_len = MQTT_MAX_HEADER_SIZE + strlen(topic) + 2 + plength; // header + topic length + topic + payload
if (total_msg_len > this->_client->availableForWrite()) {
for (uint16_t i = 0; i < MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE; i++)
{
if (this->_client->availableForWrite() > total_msg_len)
{
break; // Enough space in the client(socket) buffer
} else
{
// Not enough space in the client(socket) buffer
// so let's empty the buffer first
_client->flush();
this->loop();

delay(1); // Give some time to the client to flush
}
}
}
#endif

if (connected() && MQTT_MAX_HEADER_SIZE + strlen(topic) + 2 <= this->bufferSize) {
// first write the topic at the end of the maximal variable header (MQTT_MAX_HEADER_SIZE) to the buffer
size_t topicLen = writeString(topic, this->buffer, MQTT_MAX_HEADER_SIZE, this->bufferSize) - MQTT_MAX_HEADER_SIZE;
Expand Down Expand Up @@ -749,6 +771,26 @@ bool PubSubClient::subscribe(const char* topic, uint8_t qos) {
length = writeNextMsgId(buffer, length, this->bufferSize); // buffer size is checked before
length = writeString(topic, this->buffer, length, this->bufferSize);
this->buffer[length++] = qos;

#if MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE > 0
if (int(length) > this->_client->availableForWrite()) {
for (uint16_t i = 0; i < MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE; i++)
{
if (this->_client->availableForWrite() > int(length))
{
break; // Enough space in the client(socket) buffer
} else
{
// Not enough space in the client(socket) buffer
// so let's empty the buffer first
_client->flush();
this->loop();

delay(1); // Give some time to the client to flush
}
}
}
#endif
return write(MQTTSUBSCRIBE | MQTT_QOS_GET_HDR(MQTT_QOS1), this->buffer, length - MQTT_MAX_HEADER_SIZE);
}
return false;
Expand Down
12 changes: 12 additions & 0 deletions src/PubSubClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@
#undef MQTT_MAX_TRANSFER_SIZE
#endif

/**
* @brief Sets the maximum number of times the network client is checked for the amount of bytes is available to be written.
* Some hardware has a work buffer limit. Before a message to the MQTT server is sent in 'publish' or 'subscribe'
* the ehternet client is checked weather is has enough buffer to be able to send the entire message.
* Use 50 on hardware such as the Teensy 4.1 with lwip/QNEthernet lib.
* @note Defaults to undefined, which does not use the client function 'availableForWrite' at all.
*/
#ifndef MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE // just a hack that it gets shown in Doxygen
#define MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE 50
#undef MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE
#endif

/**
* @defgroup group_state state() result
* @brief These values indicate the current PubSubClient::state() of the client.
Expand Down