JavaScript NSQ client WIP.
$ npm install nsq.js
The DEBUG environment variable can be used to enable traces within the module, for example all nsq debug() calls except fo the framer:
$ DEBUG=nsq*,-nsq:framer node test
nsq:reader connect nsqd 0.0.0.0:4150 events/ingestion [5] +0ms
nsq:connection connect: 0.0.0.0:4150 V2 +0ms
nsq:connection command: IDENTIFY null +2ms
nsq:connection command: SUB ["events","ingestion"] +1ms
nsq:connection command: RDY [5] +0ms
nsq:connection connect: undefined:4150 V2 +0ms
nsq:connection command: IDENTIFY null +1ms
nsq:connection command: PUB ["events"] +0ms
nsq:reconnect reset backoff +0ms
nsq:reconnect reset backoff +1ms
nsq:connection response OK +3ms
nsq:connection response OK +0ms
nsq:connection response OK +0ms
The NSQD documentation recommends applying backoff when requeueing implying that the consumer is faulty, IMO this is a weird default, and the opposite of what we need so it's not applied in this client.
var nsq = require('nsq.js');
// subscribe
var reader = nsq.reader({
nsqd: [':4150'],
maxInFlight: 1,
maxAttempts: 5,
topic: 'events',
channel: 'ingestion'
});
reader.on('error', function(err){
console.log(err.stack);
});
reader.on('message', function(msg){
var body = msg.body.toString();
console.log('%s attempts=%s', body, msg.attempts);
msg.requeue(2000);
});
reader.on('discard', function(msg){
var body = msg.body.toString();
console.log('giving up on %s', body);
msg.finish();
});
// publish
var writer = nsq.writer(':4150');
writer.on('ready', function() {
writer.publish('events', 'foo');
writer.publish('events', 'bar');
writer.publish('events', 'baz');
});
Create a reader:
id connection identifier (see client_id in the spec)topic topic namechannel channel namensqd array of nsqd addressesnsqlookupd array of nsqlookupd addressesmaxAttempts max attempts before discarding [Infinity]maxConnectionAttempts max reconnection attempts [Infinity]maxInFlight max messages distributed across connections [10]msgTimeout session-specific msg timeoutpollInterval nsqlookupd poll interval[10000]ready when false auto-RDY maintenance will be disabledtrace trace functionEvents:
message (msg) incoming messagediscard (msg) discarded messageerror response (err) response from nsqerror (err)Close the reader's connection(s) and fire the optional [fn] when completed.
Create a writer. By default a connection attempt to 0.0.0.0:4150 will be made unless one of the following options are provided:
port numberhost namensqd array of nsqd addressesnsqlookupd array of nsqlookupd addressesEvents:
error response (err) response from nsqerror (err)Publish the given message to topic where message
may be a string, buffer, or object. An array of messages
may be passed, in which case a MPUT is performed.
Close the writer's connection(s) and fire the optional [fn] when completed.
A single message.
Mark message as complete.
Re-queue the message immediately, or with the
given delay in milliseconds, or a string such
as "5s", "10m" etc.
Reset the message's timeout, increasing the length of time before NSQD considers it timed out.
Return parsed JSON object.
The following jstrace probes are available:
connection:ready ready count sentconnection:message message receivedmessage:finish finished a messagemessage:requeue requeued a messagemessage:touch touched a messagensqd --lookupd-tcp-address=0.0.0.0:4160 &
nsqadmin --lookupd-http-address=0.0.0.0:4161 &
nsqlookupd &
make test
MIT