parallel.js 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. const { isArray } = require("./utils/isArray");
  2. const proto = exports;
  3. proto._parallelNode = async function _parallelNode(todo, parallel, fn, sourceData) {
  4. const that = this;
  5. // upload in parallel
  6. const jobErr = [];
  7. let jobs = [];
  8. const tempBatch = todo.length / parallel;
  9. const remainder = todo.length % parallel;
  10. const batch = remainder === 0 ? tempBatch : ((todo.length - remainder) / parallel) + 1;
  11. let taskIndex = 1;
  12. for (let i = 0; i < todo.length; i++) {
  13. if (that.isCancel()) {
  14. break;
  15. }
  16. if (sourceData) {
  17. jobs.push(fn(that, todo[i], sourceData));
  18. } else {
  19. jobs.push(fn(that, todo[i]));
  20. }
  21. if (jobs.length === parallel || (taskIndex === batch && i === (todo.length - 1))) {
  22. try {
  23. taskIndex += 1;
  24. /* eslint no-await-in-loop: [0] */
  25. await Promise.all(jobs);
  26. } catch (err) {
  27. jobErr.push(err);
  28. }
  29. jobs = [];
  30. }
  31. }
  32. return jobErr;
  33. };
  34. proto._parallel = function _parallel(todo, parallel, jobPromise) {
  35. const that = this;
  36. return new Promise((resolve) => {
  37. const _jobErr = [];
  38. if (parallel <= 0 || !todo) {
  39. resolve(_jobErr);
  40. return;
  41. }
  42. function onlyOnce(fn) {
  43. return function (...args) {
  44. if (fn === null) throw new Error('Callback was already called.');
  45. const callFn = fn;
  46. fn = null;
  47. callFn.apply(this, args);
  48. };
  49. }
  50. function createArrayIterator(coll) {
  51. let i = -1;
  52. const len = coll.length;
  53. return function next() {
  54. return (++i < len && !that.isCancel()) ? { value: coll[i], key: i } : null;
  55. };
  56. }
  57. const nextElem = createArrayIterator(todo);
  58. let done = false;
  59. let running = 0;
  60. let looping = false;
  61. function iterateeCallback(err, value) {
  62. running -= 1;
  63. if (err) {
  64. done = true;
  65. _jobErr.push(err);
  66. resolve(_jobErr);
  67. } else if (value === {} || (done && running <= 0)) {
  68. done = true;
  69. resolve(_jobErr);
  70. } else if (!looping) {
  71. /* eslint no-use-before-define: [0] */
  72. if (that.isCancel()) {
  73. resolve(_jobErr);
  74. } else {
  75. replenish();
  76. }
  77. }
  78. }
  79. function iteratee(value, callback) {
  80. jobPromise(value).then((result) => {
  81. callback(null, result);
  82. }).catch((err) => {
  83. callback(err);
  84. });
  85. }
  86. function replenish() {
  87. looping = true;
  88. while (running < parallel && !done && !that.isCancel()) {
  89. const elem = nextElem();
  90. if (elem === null || _jobErr.length > 0) {
  91. done = true;
  92. if (running <= 0) {
  93. resolve(_jobErr);
  94. }
  95. return;
  96. }
  97. running += 1;
  98. iteratee(elem.value, onlyOnce(iterateeCallback));
  99. }
  100. looping = false;
  101. }
  102. replenish();
  103. });
  104. };
  105. /**
  106. * cancel operation, now can use with multipartUpload
  107. * @param {Object} abort
  108. * {String} anort.name object key
  109. * {String} anort.uploadId upload id
  110. * {String} anort.options timeout
  111. */
  112. proto.cancel = function cancel(abort) {
  113. this.options.cancelFlag = true;
  114. if (isArray(this.multipartUploadStreams)) {
  115. this.multipartUploadStreams.forEach(_ => {
  116. if (_.destroyed === false) {
  117. const err = {
  118. name: 'cancel',
  119. message: 'cancel'
  120. };
  121. _.destroy(err);
  122. }
  123. });
  124. }
  125. this.multipartUploadStreams = [];
  126. if (abort) {
  127. this.abortMultipartUpload(abort.name, abort.uploadId, abort.options);
  128. }
  129. };
  130. proto.isCancel = function isCancel() {
  131. return this.options.cancelFlag;
  132. };
  133. proto.resetCancelFlag = function resetCancelFlag() {
  134. this.options.cancelFlag = false;
  135. };
  136. proto._stop = function _stop() {
  137. this.options.cancelFlag = true;
  138. };
  139. // cancel is not error , so create an object
  140. proto._makeCancelEvent = function _makeCancelEvent() {
  141. const cancelEvent = {
  142. status: 0,
  143. name: 'cancel'
  144. };
  145. return cancelEvent;
  146. };
  147. // abort is not error , so create an object
  148. proto._makeAbortEvent = function _makeAbortEvent() {
  149. const abortEvent = {
  150. status: 0,
  151. name: 'abort',
  152. message: 'upload task has been abort'
  153. };
  154. return abortEvent;
  155. };