123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- 'use strict';
- Object.defineProperty(exports, "__esModule", {
- value: true
- });
- exports.default = queue;
- var _onlyOnce = require('./onlyOnce.js');
- var _onlyOnce2 = _interopRequireDefault(_onlyOnce);
- var _setImmediate = require('./setImmediate.js');
- var _setImmediate2 = _interopRequireDefault(_setImmediate);
- var _DoublyLinkedList = require('./DoublyLinkedList.js');
- var _DoublyLinkedList2 = _interopRequireDefault(_DoublyLinkedList);
- var _wrapAsync = require('./wrapAsync.js');
- var _wrapAsync2 = _interopRequireDefault(_wrapAsync);
- function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
- function queue(worker, concurrency, payload) {
- if (concurrency == null) {
- concurrency = 1;
- } else if (concurrency === 0) {
- throw new RangeError('Concurrency must not be zero');
- }
- var _worker = (0, _wrapAsync2.default)(worker);
- var numRunning = 0;
- var workersList = [];
- const events = {
- error: [],
- drain: [],
- saturated: [],
- unsaturated: [],
- empty: []
- };
- function on(event, handler) {
- events[event].push(handler);
- }
- function once(event, handler) {
- const handleAndRemove = (...args) => {
- off(event, handleAndRemove);
- handler(...args);
- };
- events[event].push(handleAndRemove);
- }
- function off(event, handler) {
- if (!event) return Object.keys(events).forEach(ev => events[ev] = []);
- if (!handler) return events[event] = [];
- events[event] = events[event].filter(ev => ev !== handler);
- }
- function trigger(event, ...args) {
- events[event].forEach(handler => handler(...args));
- }
- var processingScheduled = false;
- function _insert(data, insertAtFront, rejectOnError, callback) {
- if (callback != null && typeof callback !== 'function') {
- throw new Error('task callback must be a function');
- }
- q.started = true;
- var res, rej;
- function promiseCallback(err, ...args) {
- // we don't care about the error, let the global error handler
- // deal with it
- if (err) return rejectOnError ? rej(err) : res();
- if (args.length <= 1) return res(args[0]);
- res(args);
- }
- var item = q._createTaskItem(data, rejectOnError ? promiseCallback : callback || promiseCallback);
- if (insertAtFront) {
- q._tasks.unshift(item);
- } else {
- q._tasks.push(item);
- }
- if (!processingScheduled) {
- processingScheduled = true;
- (0, _setImmediate2.default)(() => {
- processingScheduled = false;
- q.process();
- });
- }
- if (rejectOnError || !callback) {
- return new Promise((resolve, reject) => {
- res = resolve;
- rej = reject;
- });
- }
- }
- function _createCB(tasks) {
- return function (err, ...args) {
- numRunning -= 1;
- for (var i = 0, l = tasks.length; i < l; i++) {
- var task = tasks[i];
- var index = workersList.indexOf(task);
- if (index === 0) {
- workersList.shift();
- } else if (index > 0) {
- workersList.splice(index, 1);
- }
- task.callback(err, ...args);
- if (err != null) {
- trigger('error', err, task.data);
- }
- }
- if (numRunning <= q.concurrency - q.buffer) {
- trigger('unsaturated');
- }
- if (q.idle()) {
- trigger('drain');
- }
- q.process();
- };
- }
- function _maybeDrain(data) {
- if (data.length === 0 && q.idle()) {
- // call drain immediately if there are no tasks
- (0, _setImmediate2.default)(() => trigger('drain'));
- return true;
- }
- return false;
- }
- const eventMethod = name => handler => {
- if (!handler) {
- return new Promise((resolve, reject) => {
- once(name, (err, data) => {
- if (err) return reject(err);
- resolve(data);
- });
- });
- }
- off(name);
- on(name, handler);
- };
- var isProcessing = false;
- var q = {
- _tasks: new _DoublyLinkedList2.default(),
- _createTaskItem(data, callback) {
- return {
- data,
- callback
- };
- },
- *[Symbol.iterator]() {
- yield* q._tasks[Symbol.iterator]();
- },
- concurrency,
- payload,
- buffer: concurrency / 4,
- started: false,
- paused: false,
- push(data, callback) {
- if (Array.isArray(data)) {
- if (_maybeDrain(data)) return;
- return data.map(datum => _insert(datum, false, false, callback));
- }
- return _insert(data, false, false, callback);
- },
- pushAsync(data, callback) {
- if (Array.isArray(data)) {
- if (_maybeDrain(data)) return;
- return data.map(datum => _insert(datum, false, true, callback));
- }
- return _insert(data, false, true, callback);
- },
- kill() {
- off();
- q._tasks.empty();
- },
- unshift(data, callback) {
- if (Array.isArray(data)) {
- if (_maybeDrain(data)) return;
- return data.map(datum => _insert(datum, true, false, callback));
- }
- return _insert(data, true, false, callback);
- },
- unshiftAsync(data, callback) {
- if (Array.isArray(data)) {
- if (_maybeDrain(data)) return;
- return data.map(datum => _insert(datum, true, true, callback));
- }
- return _insert(data, true, true, callback);
- },
- remove(testFn) {
- q._tasks.remove(testFn);
- },
- process() {
- // Avoid trying to start too many processing operations. This can occur
- // when callbacks resolve synchronously (#1267).
- if (isProcessing) {
- return;
- }
- isProcessing = true;
- while (!q.paused && numRunning < q.concurrency && q._tasks.length) {
- var tasks = [],
- data = [];
- var l = q._tasks.length;
- if (q.payload) l = Math.min(l, q.payload);
- for (var i = 0; i < l; i++) {
- var node = q._tasks.shift();
- tasks.push(node);
- workersList.push(node);
- data.push(node.data);
- }
- numRunning += 1;
- if (q._tasks.length === 0) {
- trigger('empty');
- }
- if (numRunning === q.concurrency) {
- trigger('saturated');
- }
- var cb = (0, _onlyOnce2.default)(_createCB(tasks));
- _worker(data, cb);
- }
- isProcessing = false;
- },
- length() {
- return q._tasks.length;
- },
- running() {
- return numRunning;
- },
- workersList() {
- return workersList;
- },
- idle() {
- return q._tasks.length + numRunning === 0;
- },
- pause() {
- q.paused = true;
- },
- resume() {
- if (q.paused === false) {
- return;
- }
- q.paused = false;
- (0, _setImmediate2.default)(q.process);
- }
- };
- // define these as fixed properties, so people get useful errors when updating
- Object.defineProperties(q, {
- saturated: {
- writable: false,
- value: eventMethod('saturated')
- },
- unsaturated: {
- writable: false,
- value: eventMethod('unsaturated')
- },
- empty: {
- writable: false,
- value: eventMethod('empty')
- },
- drain: {
- writable: false,
- value: eventMethod('drain')
- },
- error: {
- writable: false,
- value: eventMethod('error')
- }
- });
- return q;
- }
- module.exports = exports['default'];
|