Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
Fix: Set 60 seconds for default mqtt keepalive option (#413)
Add: Support of multimeasure for MQTT and AMQP transport (#462)
Add: virtual host configuration added to AMQP transport (config.amqp.vhost and IOTA_AMQP_VHOST env var)
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,39 @@ All the tests are designed to test end-to-end scenarios, and there are some requ

- MQTT v5 broker (like mosquitto v1.6.7 server running)
- MongoDB v3.x server running
- AMQP 0-9-1 server with `foo/bar` vHost created (lile RabbitMQ v3 server running)
- You can set up Mosquitto to run the test as follows:
1. Create a file with the following content. You can name it as `rabbit-config.json`: <br/><br/>
```json
{
"vhosts": [
{
"name": "/"
},
{
"name": "foo/bar"
}
],
"permissions": [
{
"user": "guest",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
},
{
"user": "guest",
"vhost": "foo/bar",
"configure": ".*",
"write": ".*",
"read": ".*"
}
]
}
```
2. Run the container with
`docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 -v /foo/bar/iotagent-json/docs/rabbit-config.json:/etc/rabbitmq/definition.json -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_management load_definitions '/etc/rabbitmq/definition.json'" rabbitmq:3`

---

Expand Down
4 changes: 4 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ config.amqp = {
* Port where the AMQP broker is listening.
*/
port: 5672,
/**
* Virtual host in the AMQP Broker to which IoT Agent will connect (optional).
*/
// vhost: "custom-virtual-host",
/**
* user name that identifies the IOTA against the AMQP broker (optional).
*/
Expand Down
6 changes: 4 additions & 2 deletions docs/installationguide.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ These are the currently available MQTT configuration options:
- **retain**: retain flag (default is false).
- **retries**: Number of MQTT connection error retries (default is 5).
- **retryTime**: Time between MQTT connection retries (default is 5 seconds).
- **keepalive**: Time to keep connection open between client and MQTT broker (default is 60 seconds). If you experience
disconnnection problems using 0 (as the one described in
- **keepalive**: Time to keep connection open between client and MQTT broker (default is 60 seconds). If you
experience disconnnection problems using 0 (as the one described in
[this case](https://github.com/telefonicaid/iotagent-json/issues/455)) a value greater than 0 is recommended.
- **avoidLeadingSlash** this flag sets whether the agent publishes commands to topics starting with slash (default in
order versions) or without the slash. See
Expand All @@ -146,6 +146,7 @@ IoT Agent. The following attributes are accepted:

- **host**: Host where the AMQP Broker is located.
- **port**: Port where the AMQP Broker is listening
- **vhost**: virtual host in the AMQP Broker to which IoT Agent will connect (optional).
- **username**: username that identifies the IOTA against the AMQP broker (optional).
- **password**: password to be used if the username is provided (optional).
- **exchange**: Exchange in the AMQP broker
Expand Down Expand Up @@ -199,6 +200,7 @@ The ones relating specific JSON bindings are described in the following table.
| IOTA_MQTT_AVOID_LEADING_SLASH | mqtt.avoidLeadingSlash |
| IOTA_AMQP_HOST | amqp.host |
| IOTA_AMQP_PORT | amqp.port |
| IOTA_AMQP_VHOST | amqp.vhost |
| IOTA_AMQP_USERNAME | amqp.username |
| IOTA_AMQP_PASSWORD | amqp.password |
| IOTA_AMQP_EXCHANGE | amqp.exchange |
Expand Down
66 changes: 33 additions & 33 deletions lib/bindings/AMQPBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ function start(callback) {
uri += ':' + config.getConfig().amqp.port;
}
}
if (config.getConfig().amqp.vhost && config.getConfig().amqp.vhost !== '/') {
uri += '/' + config.getConfig().amqp.vhost;
}
} else {
return config.getLogger().error(context, 'Error AMQP is not configured');
}
Expand All @@ -139,46 +142,43 @@ function start(callback) {
return;
}
isConnecting = true;
amqp.connect(
uri,
function(err, conn) {
isConnecting = false;
// try again
if (err) {
config.getLogger().error(context, err.message);
if (numRetried <= retries) {
numRetried++;
return setTimeout(createConnection, retryTime * 1000, callback);
amqp.connect(uri, function (err, conn) {
isConnecting = false;
// try again
if (err) {
config.getLogger().error(context, err.message);
if (numRetried <= retries) {
numRetried++;
return setTimeout(createConnection, retryTime * 1000, callback);
}
} else {
conn.on('error', function (err) {
if (err.message !== 'Connection closing') {
config.getLogger().error(context, err.message);
}
} else {
conn.on('error', function(err) {
if (err.message !== 'Connection closing') {
config.getLogger().error(context, err.message);
});
conn.on('close', function () {
// If amqpConn is null, the connection has been closed on purpose
if (amqpConn) {
config.getLogger().error(context, 'reconnecting');
if (numRetried <= retries) {
numRetried++;
return setTimeout(createConnection, retryTime * 1000);
}
});
conn.on('close', function() {
// If amqpConn is null, the connection has been closed on purpose
if (amqpConn) {
config.getLogger().error(context, 'reconnecting');
if (numRetried <= retries) {
numRetried++;
return setTimeout(createConnection, retryTime * 1000);
}
}
});
config.getLogger().info(context, 'connected');
amqpConn = conn;
if (callback) {
callback();
}
});
config.getLogger().info(context, 'connected');
amqpConn = conn;
if (callback) {
callback();
}
}
);
});
}

function createChannel(callback) {
config.getLogger().debug(context, 'channel creating');
amqpConn.createChannel(function(err, ch) {
amqpConn.createChannel(function (err, ch) {
if (err) {
config.getLogger().error(context, err.message);
}
Expand All @@ -199,7 +199,7 @@ function start(callback) {

function assertQueue(callback) {
config.getLogger().debug(context, 'asserting queues');
amqpChannel.assertQueue(queue, { exclusive: false }, function() {
amqpChannel.assertQueue(queue, { exclusive: false }, function () {
amqpChannel.assertQueue(queue + '_commands', { exclusive: false }, callback);
});
}
Expand All @@ -216,7 +216,7 @@ function start(callback) {
callback();
}

async.waterfall([createConnection, createChannel, assertExchange, assertQueue, createListener], function(error) {
async.waterfall([createConnection, createChannel, assertExchange, assertQueue, createListener], function (error) {
if (error) {
config.getLogger().debug('AMQP error %j', error);
}
Expand Down
6 changes: 6 additions & 0 deletions lib/configService.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ function processEnvironmentVariables() {
'IOTA_MQTT_AVOID_LEADING_SLASH',
'IOTA_AMQP_HOST',
'IOTA_AMQP_PORT',
'IOTA_AMQP_VHOST',
'IOTA_AMQP_USERNAME',
'IOTA_AMQP_PASSWORD',
'IOTA_AMQP_EXCHANGE',
Expand Down Expand Up @@ -102,6 +103,7 @@ function processEnvironmentVariables() {
const amqpVariables = [
'IOTA_AMQP_HOST',
'IOTA_AMQP_PORT',
'IOTA_AMQP_VHOST',
'IOTA_AMQP_USERNAME',
'IOTA_AMQP_PASSWORD',
'IOTA_AMQP_EXCHANGE',
Expand Down Expand Up @@ -213,6 +215,10 @@ function processEnvironmentVariables() {
config.amqp.port = process.env.IOTA_AMQP_PORT;
}

if (process.env.IOTA_AMQP_VHOST) {
config.amqp.vhost = process.env.IOTA_AMQP_VHOST;
}

if (process.env.IOTA_AMQP_USERNAME) {
config.amqp.username = process.env.IOTA_AMQP_USERNAME;
}
Expand Down
9 changes: 9 additions & 0 deletions test/config-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ config.amqp = {
options: { durable: true }
};

config.amqpVhost = {
host: 'localhost',
port: 5672,
vhost: 'foo/bar',
exchange: 'amq.topic',
queue: 'iota_queue',
options: { durable: true }
};

config.iota = {
logLevel: 'FATAL',
contextBroker: {
Expand Down
Loading