'use strict';
var async = require('async'),
Datastore = require('nedb'),
_ = require('lodash');
module.exports = function (Connection) {
Connection.addInstanceMethods(
/** @lends Connection.prototype */
{
/**
* Start long-polling for events.
* @see {@link https://developers.box.com/docs/#events-long-polling}
* @fires Connection#"polling.error"
* @fires Connection#"polling.end"
*/
startLongPolling: function () {
var self = this;
async.waterfall([
function (next) {
if (!self.events) {
self.events = new Datastore();
self.events.ensureIndex({
fieldName: 'event_id',
unique: true
});
self.events.ensureIndex({
fieldName: 'created_at',
}, next);
} else {
next();
}
},
function (next) {
if (!self.keepPolling) {
self.keepPolling = true;
async.whilst(function () {
return self.keepPolling;
}, _.bind(self._emit, self), _.noop);
next(null, true);
} else {
next(null, false);
}
},
function (doLongPoll, next) {
if (doLongPoll) {
async.whilst(function () {
return self.keepPolling;
}, _.bind(self._longPoll, self), function (err) {
if (err) {
/**
* Fires when an error occurs during long-polling.
* @event Connection#"polling.error"
* @type {Error}
* @see {@link Connection#startLongPolling}
*/
self.emit('polling.error', err);
} else if (!self.keepPolling) {
/**
* Fires when a running long-polling process ends.
* @event Connection#"polling.end"
* @see {@link Connection#stopLongPolling}
*/
self.emit('polling.end');
}
});
}
next();
}
], _.noop);
},
/**
* Stop a running long-polling process.
* @see {@link https://developers.box.com/docs/#events-long-polling}
*/
stopLongPolling: function () {
this.keepPolling = false;
},
/**
* Do not call this method directly.
* @summary The internal long-polling sequence.
* @private
* @param {optionalErrorCallback} done - Called after finishing one round of long polling.
* @fires Connection#"polling.ready"
*/
_longPoll: function (done) {
var self = this;
self.log.debug('NSP: %s', self.nsp);
async.waterfall([
function (next) {
if (!self.nsp) {
self._request('https://api.box.com/2.0/events', 'GET', next, {
stream_position: 'now'
});
} else {
next(null, null);
}
},
function (body, next) {
if (body) {
self.nsp = body.next_stream_position;
/**
* Fires when a stream position has been fixed. All events from here onwards will be captured.
* @event Connection#"polling.ready"
*/
self.emit('polling.ready');
}
self._request('https://api.box.com/2.0/events', 'OPTIONS', next);
},
function (body, next) {
self._request(body.entries[0].url, 'GET', next, {
stream_position: self.nsp
}, null, null, null, null, {
timeout: 600000,
num_retries: -1
});
},
function (body, next) {
self.log.debug(body);
if (!_.isEmpty(body) && (body.message === 'new_change')) {
self._request('https://api.box.com/2.0/events', 'GET', next, {
stream_position: self.nsp
});
} else {
self.log.debug('Refreshing long-poll...');
next(null, null);
}
},
function (body, next) {
if (body) {
self.nsp = body.next_stream_position;
_.each(body.entries, _.bind(self._trackEvent, self));
}
next();
}
], done);
},
/**
* Do not call this method directly.
* @summary The internal tracker for remote events.
* @private
* @param {Object} event - The event to push.
*/
_trackEvent: function (event) {
var self = this;
event.monologued = false;
self.events.insert(event, function (err) {
if (!err) {
_.delay(function () {
self.events.remove({
event_id: event.event_id
});
}, 60000);
}
});
},
/**
* Do not call this method directly.
* @summary The internal event emitter.
* @private
* @param {function} done - The callback.
* @fires Connection#"polling.event.EVENT"
*/
_emit: function (done) {
var self = this;
async.waterfall([
function (next) {
_.delay(next, 15000);
},
function (next) {
self.events.find({
monologued: false
}).sort({
created_at: 1
}).exec(next);
},
function (items, next) {
_.each(items, function (item) {
/**
* Events captured from an event stream during long-polling.
* The event name is of the form polling.event.EVENT, where EVENT is the lowercased,
* (_ -> .)-transformed version of the source event type. This transformation helps
* leverage {@link external:Monologue|Monologue} subscription filters.
* @typedef {string} PollingEvent
* @see {@link https://developers.box.com/docs/#events}
* @example
* When an event of type {@linkcode ITEM_CREATE} is read from the event stream,
* an event {@linkcode polling.event.item.create} is emitted.
*/
/**
* Fires when an event is read from the event stream during long-polling.
* @event Connection#"polling.event.EVENT"
* @type {PollingEvent}
*/
self.emit('polling.event.' + item.event_type.toLowerCase().replace('_', '.'),
_.omit(item, 'monologued', '_id'));
});
self.events.update({
event_id: {
$in: _.pluck(items, 'event_id')
}
}, {
$set: {
monologued: true
}
}, {
multi: true
}, next);
}
], done);
}
});
};