Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
Fix: Set 60 seconds for default mqtt keepalive option (#413)
Add: Support of multimeasure for MQTT and AMQP transport (#462)
Add: virtual host configuration added to AMQP transport (config.amqp.vhost and IOTA_AMQP_VHOST env var)
4 changes: 4 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ config.amqp = {
* Port where the AMQP broker is listening.
*/
port: 5672,
/**
* Virtual host in the AMQP Broker to which IoT Agent will connect (optional).
*/
// vhost: "custom-virtual-host",
/**
* user name that identifies the IOTA against the AMQP broker (optional).
*/
Expand Down
6 changes: 4 additions & 2 deletions docs/installationguide.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ These are the currently available MQTT configuration options:
- **retain**: retain flag (default is false).
- **retries**: Number of MQTT connection error retries (default is 5).
- **retryTime**: Time between MQTT connection retries (default is 5 seconds).
- **keepalive**: Time to keep connection open between client and MQTT broker (default is 60 seconds). If you experience
disconnnection problems using 0 (as the one described in
- **keepalive**: Time to keep connection open between client and MQTT broker (default is 60 seconds). If you
experience disconnnection problems using 0 (as the one described in
[this case](https://github.com/telefonicaid/iotagent-json/issues/455)) a value greater than 0 is recommended.
- **avoidLeadingSlash** this flag sets whether the agent publishes commands to topics starting with slash (default in
order versions) or without the slash. See
Expand All @@ -146,6 +146,7 @@ IoT Agent. The following attributes are accepted:

- **host**: Host where the AMQP Broker is located.
- **port**: Port where the AMQP Broker is listening
- **vhost**: virtual host in the AMQP Broker to which IoT Agent will connect (optional).
- **username**: username that identifies the IOTA against the AMQP broker (optional).
- **password**: password to be used if the username is provided (optional).
- **exchange**: Exchange in the AMQP broker
Expand Down Expand Up @@ -199,6 +200,7 @@ The ones relating specific JSON bindings are described in the following table.
| IOTA_MQTT_AVOID_LEADING_SLASH | mqtt.avoidLeadingSlash |
| IOTA_AMQP_HOST | amqp.host |
| IOTA_AMQP_PORT | amqp.port |
| IOTA_AMQP_VHOST | amqp.vhost |
| IOTA_AMQP_USERNAME | amqp.username |
| IOTA_AMQP_PASSWORD | amqp.password |
| IOTA_AMQP_EXCHANGE | amqp.exchange |
Expand Down
66 changes: 33 additions & 33 deletions lib/bindings/AMQPBinding.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ function start(callback) {
uri += ':' + config.getConfig().amqp.port;
}
}
if (config.getConfig().amqp.vhost && config.getConfig().amqp.vhost !== '/') {
uri += '/' + config.getConfig().amqp.vhost;
}
} else {
return config.getLogger().error(context, 'Error AMQP is not configured');
}
Expand All @@ -139,46 +142,43 @@ function start(callback) {
return;
}
isConnecting = true;
amqp.connect(
uri,
function(err, conn) {
isConnecting = false;
// try again
if (err) {
config.getLogger().error(context, err.message);
if (numRetried <= retries) {
numRetried++;
return setTimeout(createConnection, retryTime * 1000, callback);
amqp.connect(uri, function (err, conn) {
isConnecting = false;
// try again
if (err) {
config.getLogger().error(context, err.message);
if (numRetried <= retries) {
numRetried++;
return setTimeout(createConnection, retryTime * 1000, callback);
}
} else {
conn.on('error', function (err) {
if (err.message !== 'Connection closing') {
config.getLogger().error(context, err.message);
}
} else {
conn.on('error', function(err) {
if (err.message !== 'Connection closing') {
config.getLogger().error(context, err.message);
});
conn.on('close', function () {
// If amqpConn is null, the connection has been closed on purpose
if (amqpConn) {
config.getLogger().error(context, 'reconnecting');
if (numRetried <= retries) {
numRetried++;
return setTimeout(createConnection, retryTime * 1000);
}
});
conn.on('close', function() {
// If amqpConn is null, the connection has been closed on purpose
if (amqpConn) {
config.getLogger().error(context, 'reconnecting');
if (numRetried <= retries) {
numRetried++;
return setTimeout(createConnection, retryTime * 1000);
}
}
});
config.getLogger().info(context, 'connected');
amqpConn = conn;
if (callback) {
callback();
}
});
config.getLogger().info(context, 'connected');
amqpConn = conn;
if (callback) {
callback();
}
}
);
});
}

function createChannel(callback) {
config.getLogger().debug(context, 'channel creating');
amqpConn.createChannel(function(err, ch) {
amqpConn.createChannel(function (err, ch) {
if (err) {
config.getLogger().error(context, err.message);
}
Expand All @@ -199,7 +199,7 @@ function start(callback) {

function assertQueue(callback) {
config.getLogger().debug(context, 'asserting queues');
amqpChannel.assertQueue(queue, { exclusive: false }, function() {
amqpChannel.assertQueue(queue, { exclusive: false }, function () {
amqpChannel.assertQueue(queue + '_commands', { exclusive: false }, callback);
});
}
Expand All @@ -216,7 +216,7 @@ function start(callback) {
callback();
}

async.waterfall([createConnection, createChannel, assertExchange, assertQueue, createListener], function(error) {
async.waterfall([createConnection, createChannel, assertExchange, assertQueue, createListener], function (error) {
if (error) {
config.getLogger().debug('AMQP error %j', error);
}
Expand Down
6 changes: 6 additions & 0 deletions lib/configService.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ function processEnvironmentVariables() {
'IOTA_MQTT_AVOID_LEADING_SLASH',
'IOTA_AMQP_HOST',
'IOTA_AMQP_PORT',
'IOTA_AMQP_VHOST',
'IOTA_AMQP_USERNAME',
'IOTA_AMQP_PASSWORD',
'IOTA_AMQP_EXCHANGE',
Expand Down Expand Up @@ -102,6 +103,7 @@ function processEnvironmentVariables() {
const amqpVariables = [
'IOTA_AMQP_HOST',
'IOTA_AMQP_PORT',
'IOTA_AMQP_VHOST',
'IOTA_AMQP_USERNAME',
'IOTA_AMQP_PASSWORD',
'IOTA_AMQP_EXCHANGE',
Expand Down Expand Up @@ -213,6 +215,10 @@ function processEnvironmentVariables() {
config.amqp.port = process.env.IOTA_AMQP_PORT;
}

if (process.env.IOTA_AMQP_VHOST) {
config.amqp.vhost = process.env.IOTA_AMQP_VHOST;
}

if (process.env.IOTA_AMQP_USERNAME) {
config.amqp.username = process.env.IOTA_AMQP_USERNAME;
}
Expand Down
29 changes: 16 additions & 13 deletions test/unit/startup-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ const fs = require('fs');
// mqtt = require('mqtt'),
const sinon = require('sinon');

describe('Startup tests', function() {
describe('When the MQTT transport is started with environment variables', function() {
beforeEach(function() {
describe('Startup tests', function () {
describe('When the MQTT transport is started with environment variables', function () {
beforeEach(function () {
sinon.stub(fs, 'statSync');
process.env.IOTA_MQTT_HOST = '127.0.0.1';
process.env.IOTA_MQTT_PORT = '1883';
Expand All @@ -47,7 +47,7 @@ describe('Startup tests', function() {
process.env.IOTA_MQTT_KEEPALIVE = '0';
});

afterEach(function() {
afterEach(function () {
fs.statSync.restore();
delete process.env.IOTA_MQTT_PROTOCOL;
delete process.env.IOTA_MQTT_HOST;
Expand All @@ -65,7 +65,7 @@ describe('Startup tests', function() {
delete process.env.IOTA_MQTT_KEEPALIVE;
});

it('should load the MQTT environment variables in the internal configuration', function(done) {
it('should load the MQTT environment variables in the internal configuration', function (done) {
config.setConfig(iotAgentConfig);
config.getConfig().mqtt.host.should.equal('127.0.0.1');
config.getConfig().mqtt.port.should.equal('1883');
Expand All @@ -84,10 +84,11 @@ describe('Startup tests', function() {
});
});

describe('When the AMQP transport is started with environment variables', function() {
beforeEach(function() {
describe('When the AMQP transport is started with environment variables', function () {
beforeEach(function () {
process.env.IOTA_AMQP_HOST = 'localhost';
process.env.IOTA_AMQP_PORT = '9090';
process.env.IOTA_AMQP_VHOST = 'custom_vhost';
process.env.IOTA_AMQP_USERNAME = 'useramqp';
process.env.IOTA_AMQP_PASSWORD = 'passamqp';
process.env.IOTA_AMQP_EXCHANGE = 'xxx';
Expand All @@ -97,9 +98,10 @@ describe('Startup tests', function() {
process.env.IOTA_AMQP_RETRY_TIME = '5';
});

afterEach(function() {
afterEach(function () {
delete process.env.IOTA_AMQP_HOST;
delete process.env.IOTA_AMQP_PORT;
delete process.env.IOTA_AMQP_VHOST;
delete process.env.IOTA_AMQP_USERNAME;
delete process.env.IOTA_AMQP_PASSWORD;
delete process.env.IOTA_AMQP_EXCHANGE;
Expand All @@ -109,10 +111,11 @@ describe('Startup tests', function() {
delete process.env.IOTA_AMQP_RETRY_TIME;
});

it('should load the AMQP environment variables in the internal configuration', function(done) {
it('should load the AMQP environment variables in the internal configuration', function (done) {
config.setConfig(iotAgentConfig);
config.getConfig().amqp.host.should.equal('localhost');
config.getConfig().amqp.port.should.equal('9090');
config.getConfig().amqp.vhost.should.equal('custom_vhost');
config.getConfig().amqp.username.should.equal('useramqp');
config.getConfig().amqp.password.should.equal('passamqp');
config.getConfig().amqp.exchange.should.equal('xxx');
Expand All @@ -124,8 +127,8 @@ describe('Startup tests', function() {
});
});

describe('When the HTTP transport is started with environment variables', function() {
beforeEach(function() {
describe('When the HTTP transport is started with environment variables', function () {
beforeEach(function () {
sinon.stub(fs, 'statSync');
process.env.IOTA_HTTP_HOST = 'localhost';
process.env.IOTA_HTTP_PORT = '2222';
Expand All @@ -134,7 +137,7 @@ describe('Startup tests', function() {
process.env.IOTA_HTTP_CERT = '/http/bbb/cert.pem';
});

afterEach(function() {
afterEach(function () {
fs.statSync.restore();
delete process.env.IOTA_HTTP_HOST;
delete process.env.IOTA_HTTP_PORT;
Expand All @@ -143,7 +146,7 @@ describe('Startup tests', function() {
delete process.env.IOTA_HTTP_CERT;
});

it('should load the HTTP environment variables in the internal configuration', function(done) {
it('should load the HTTP environment variables in the internal configuration', function (done) {
config.setConfig(iotAgentConfig);
config.getConfig().http.host.should.equal('localhost');
config.getConfig().http.port.should.equal('2222');
Expand Down