index.js 18 KB


  1. 'use strict'
  2. const proc =
  3. typeof process === 'object' && process
  4. ? process
  5. : {
  6. stdout: null,
  7. stderr: null,
  8. }
  9. const EE = require('events')
  10. const Stream = require('stream')
  11. const stringdecoder = require('string_decoder')
  12. const SD = stringdecoder.StringDecoder
  13. const EOF = Symbol('EOF')
  14. const MAYBE_EMIT_END = Symbol('maybeEmitEnd')
  15. const EMITTED_END = Symbol('emittedEnd')
  16. const EMITTING_END = Symbol('emittingEnd')
  17. const EMITTED_ERROR = Symbol('emittedError')
  18. const CLOSED = Symbol('closed')
  19. const READ = Symbol('read')
  20. const FLUSH = Symbol('flush')
  21. const FLUSHCHUNK = Symbol('flushChunk')
  22. const ENCODING = Symbol('encoding')
  23. const DECODER = Symbol('decoder')
  24. const FLOWING = Symbol('flowing')
  25. const PAUSED = Symbol('paused')
  26. const RESUME = Symbol('resume')
  27. const BUFFER = Symbol('buffer')
  28. const PIPES = Symbol('pipes')
  29. const BUFFERLENGTH = Symbol('bufferLength')
  30. const BUFFERPUSH = Symbol('bufferPush')
  31. const BUFFERSHIFT = Symbol('bufferShift')
  32. const OBJECTMODE = Symbol('objectMode')
  33. // internal event when stream is destroyed
  34. const DESTROYED = Symbol('destroyed')
  35. // internal event when stream has an error
  36. const ERROR = Symbol('error')
  37. const EMITDATA = Symbol('emitData')
  38. const EMITEND = Symbol('emitEnd')
  39. const EMITEND2 = Symbol('emitEnd2')
  40. const ASYNC = Symbol('async')
  41. const ABORT = Symbol('abort')
  42. const ABORTED = Symbol('aborted')
  43. const SIGNAL = Symbol('signal')
  44. const defer = fn => Promise.resolve().then(fn)
  45. // TODO remove when Node v8 support drops
  46. const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1'
  47. const ASYNCITERATOR =
  48. (doIter && Symbol.asyncIterator) || Symbol('asyncIterator not implemented')
  49. const ITERATOR =
  50. (doIter && Symbol.iterator) || Symbol('iterator not implemented')
  51. // events that mean 'the stream is over'
  52. // these are treated specially, and re-emitted
  53. // if they are listened for after emitting.
  54. const isEndish = ev => ev === 'end' || ev === 'finish' || ev === 'prefinish'
  55. const isArrayBuffer = b =>
  56. b instanceof ArrayBuffer ||
  57. (typeof b === 'object' &&
  58. b.constructor &&
  59. b.constructor.name === 'ArrayBuffer' &&
  60. b.byteLength >= 0)
  61. const isArrayBufferView = b => !Buffer.isBuffer(b) && ArrayBuffer.isView(b)
  62. class Pipe {
  63. constructor(src, dest, opts) {
  64. this.src = src
  65. this.dest = dest
  66. this.opts = opts
  67. this.ondrain = () => src[RESUME]()
  68. dest.on('drain', this.ondrain)
  69. }
  70. unpipe() {
  71. this.dest.removeListener('drain', this.ondrain)
  72. }
  73. // istanbul ignore next - only here for the prototype
  74. proxyErrors() {}
  75. end() {
  76. this.unpipe()
  77. if (this.opts.end) this.dest.end()
  78. }
  79. }
  80. class PipeProxyErrors extends Pipe {
  81. unpipe() {
  82. this.src.removeListener('error', this.proxyErrors)
  83. super.unpipe()
  84. }
  85. constructor(src, dest, opts) {
  86. super(src, dest, opts)
  87. this.proxyErrors = er => dest.emit('error', er)
  88. src.on('error', this.proxyErrors)
  89. }
  90. }
  91. class Minipass extends Stream {
  92. constructor(options) {
  93. super()
  94. this[FLOWING] = false
  95. // whether we're explicitly paused
  96. this[PAUSED] = false
  97. this[PIPES] = []
  98. this[BUFFER] = []
  99. this[OBJECTMODE] = (options && options.objectMode) || false
  100. if (this[OBJECTMODE]) this[ENCODING] = null
  101. else this[ENCODING] = (options && options.encoding) || null
  102. if (this[ENCODING] === 'buffer') this[ENCODING] = null
  103. this[ASYNC] = (options && !!options.async) || false
  104. this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null
  105. this[EOF] = false
  106. this[EMITTED_END] = false
  107. this[EMITTING_END] = false
  108. this[CLOSED] = false
  109. this[EMITTED_ERROR] = null
  110. this.writable = true
  111. this.readable = true
  112. this[BUFFERLENGTH] = 0
  113. this[DESTROYED] = false
  114. if (options && options.debugExposeBuffer === true) {
  115. Object.defineProperty(this, 'buffer', { get: () => this[BUFFER] })
  116. }
  117. if (options && options.debugExposePipes === true) {
  118. Object.defineProperty(this, 'pipes', { get: () => this[PIPES] })
  119. }
  120. this[SIGNAL] = options && options.signal
  121. this[ABORTED] = false
  122. if (this[SIGNAL]) {
  123. this[SIGNAL].addEventListener('abort', () => this[ABORT]())
  124. if (this[SIGNAL].aborted) {
  125. this[ABORT]()
  126. }
  127. }
  128. }
  129. get bufferLength() {
  130. return this[BUFFERLENGTH]
  131. }
  132. get encoding() {
  133. return this[ENCODING]
  134. }
  135. set encoding(enc) {
  136. if (this[OBJECTMODE]) throw new Error('cannot set encoding in objectMode')
  137. if (
  138. this[ENCODING] &&
  139. enc !== this[ENCODING] &&
  140. ((this[DECODER] && this[DECODER].lastNeed) || this[BUFFERLENGTH])
  141. )
  142. throw new Error('cannot change encoding')
  143. if (this[ENCODING] !== enc) {
  144. this[DECODER] = enc ? new SD(enc) : null
  145. if (this[BUFFER].length)
  146. this[BUFFER] = this[BUFFER].map(chunk => this[DECODER].write(chunk))
  147. }
  148. this[ENCODING] = enc
  149. }
  150. setEncoding(enc) {
  151. this.encoding = enc
  152. }
  153. get objectMode() {
  154. return this[OBJECTMODE]
  155. }
  156. set objectMode(om) {
  157. this[OBJECTMODE] = this[OBJECTMODE] || !!om
  158. }
  159. get ['async']() {
  160. return this[ASYNC]
  161. }
  162. set ['async'](a) {
  163. this[ASYNC] = this[ASYNC] || !!a
  164. }
  165. // drop everything and get out of the flow completely
  166. [ABORT]() {
  167. this[ABORTED] = true
  168. this.emit('abort', this[SIGNAL].reason)
  169. this.destroy(this[SIGNAL].reason)
  170. }
  171. get aborted() {
  172. return this[ABORTED]
  173. }
  174. set aborted(_) {}
  175. write(chunk, encoding, cb) {
  176. if (this[ABORTED]) return false
  177. if (this[EOF]) throw new Error('write after end')
  178. if (this[DESTROYED]) {
  179. this.emit(
  180. 'error',
  181. Object.assign(
  182. new Error('Cannot call write after a stream was destroyed'),
  183. { code: 'ERR_STREAM_DESTROYED' }
  184. )
  185. )
  186. return true
  187. }
  188. if (typeof encoding === 'function') (cb = encoding), (encoding = 'utf8')
  189. if (!encoding) encoding = 'utf8'
  190. const fn = this[ASYNC] ? defer : f => f()
  191. // convert array buffers and typed array views into buffers
  192. // at some point in the future, we may want to do the opposite!
  193. // leave strings and buffers as-is
  194. // anything else switches us into object mode
  195. if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) {
  196. if (isArrayBufferView(chunk))
  197. chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
  198. else if (isArrayBuffer(chunk)) chunk = Buffer.from(chunk)
  199. else if (typeof chunk !== 'string')
  200. // use the setter so we throw if we have encoding set
  201. this.objectMode = true
  202. }
  203. // handle object mode up front, since it's simpler
  204. // this yields better performance, fewer checks later.
  205. if (this[OBJECTMODE]) {
  206. /* istanbul ignore if - maybe impossible? */
  207. if (this.flowing && this[BUFFERLENGTH] !== 0) this[FLUSH](true)
  208. if (this.flowing) this.emit('data', chunk)
  209. else this[BUFFERPUSH](chunk)
  210. if (this[BUFFERLENGTH] !== 0) this.emit('readable')
  211. if (cb) fn(cb)
  212. return this.flowing
  213. }
  214. // at this point the chunk is a buffer or string
  215. // don't buffer it up or send it to the decoder
  216. if (!chunk.length) {
  217. if (this[BUFFERLENGTH] !== 0) this.emit('readable')
  218. if (cb) fn(cb)
  219. return this.flowing
  220. }
  221. // fast-path writing strings of same encoding to a stream with
  222. // an empty buffer, skipping the buffer/decoder dance
  223. if (
  224. typeof chunk === 'string' &&
  225. // unless it is a string already ready for us to use
  226. !(encoding === this[ENCODING] && !this[DECODER].lastNeed)
  227. ) {
  228. chunk = Buffer.from(chunk, encoding)
  229. }
  230. if (Buffer.isBuffer(chunk) && this[ENCODING])
  231. chunk = this[DECODER].write(chunk)
  232. // Note: flushing CAN potentially switch us into not-flowing mode
  233. if (this.flowing && this[BUFFERLENGTH] !== 0) this[FLUSH](true)
  234. if (this.flowing) this.emit('data', chunk)
  235. else this[BUFFERPUSH](chunk)
  236. if (this[BUFFERLENGTH] !== 0) this.emit('readable')
  237. if (cb) fn(cb)
  238. return this.flowing
  239. }
  240. read(n) {
  241. if (this[DESTROYED]) return null
  242. if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH]) {
  243. this[MAYBE_EMIT_END]()
  244. return null
  245. }
  246. if (this[OBJECTMODE]) n = null
  247. if (this[BUFFER].length > 1 && !this[OBJECTMODE]) {
  248. if (this.encoding) this[BUFFER] = [this[BUFFER].join('')]
  249. else this[BUFFER] = [Buffer.concat(this[BUFFER], this[BUFFERLENGTH])]
  250. }
  251. const ret = this[READ](n || null, this[BUFFER][0])
  252. this[MAYBE_EMIT_END]()
  253. return ret
  254. }
  255. [READ](n, chunk) {
  256. if (n === chunk.length || n === null) this[BUFFERSHIFT]()
  257. else {
  258. this[BUFFER][0] = chunk.slice(n)
  259. chunk = chunk.slice(0, n)
  260. this[BUFFERLENGTH] -= n
  261. }
  262. this.emit('data', chunk)
  263. if (!this[BUFFER].length && !this[EOF]) this.emit('drain')
  264. return chunk
  265. }
  266. end(chunk, encoding, cb) {
  267. if (typeof chunk === 'function') (cb = chunk), (chunk = null)
  268. if (typeof encoding === 'function') (cb = encoding), (encoding = 'utf8')
  269. if (chunk) this.write(chunk, encoding)
  270. if (cb) this.once('end', cb)
  271. this[EOF] = true
  272. this.writable = false
  273. // if we haven't written anything, then go ahead and emit,
  274. // even if we're not reading.
  275. // we'll re-emit if a new 'end' listener is added anyway.
  276. // This makes MP more suitable to write-only use cases.
  277. if (this.flowing || !this[PAUSED]) this[MAYBE_EMIT_END]()
  278. return this
  279. }
  280. // don't let the internal resume be overwritten
  281. [RESUME]() {
  282. if (this[DESTROYED]) return
  283. this[PAUSED] = false
  284. this[FLOWING] = true
  285. this.emit('resume')
  286. if (this[BUFFER].length) this[FLUSH]()
  287. else if (this[EOF]) this[MAYBE_EMIT_END]()
  288. else this.emit('drain')
  289. }
  290. resume() {
  291. return this[RESUME]()
  292. }
  293. pause() {
  294. this[FLOWING] = false
  295. this[PAUSED] = true
  296. }
  297. get destroyed() {
  298. return this[DESTROYED]
  299. }
  300. get flowing() {
  301. return this[FLOWING]
  302. }
  303. get paused() {
  304. return this[PAUSED]
  305. }
  306. [BUFFERPUSH](chunk) {
  307. if (this[OBJECTMODE]) this[BUFFERLENGTH] += 1
  308. else this[BUFFERLENGTH] += chunk.length
  309. this[BUFFER].push(chunk)
  310. }
  311. [BUFFERSHIFT]() {
  312. if (this[OBJECTMODE]) this[BUFFERLENGTH] -= 1
  313. else this[BUFFERLENGTH] -= this[BUFFER][0].length
  314. return this[BUFFER].shift()
  315. }
  316. [FLUSH](noDrain) {
  317. do {} while (this[FLUSHCHUNK](this[BUFFERSHIFT]()) && this[BUFFER].length)
  318. if (!noDrain && !this[BUFFER].length && !this[EOF]) this.emit('drain')
  319. }
  320. [FLUSHCHUNK](chunk) {
  321. this.emit('data', chunk)
  322. return this.flowing
  323. }
  324. pipe(dest, opts) {
  325. if (this[DESTROYED]) return
  326. const ended = this[EMITTED_END]
  327. opts = opts || {}
  328. if (dest === proc.stdout || dest === proc.stderr) opts.end = false
  329. else opts.end = opts.end !== false
  330. opts.proxyErrors = !!opts.proxyErrors
  331. // piping an ended stream ends immediately
  332. if (ended) {
  333. if (opts.end) dest.end()
  334. } else {
  335. this[PIPES].push(
  336. !opts.proxyErrors
  337. ? new Pipe(this, dest, opts)
  338. : new PipeProxyErrors(this, dest, opts)
  339. )
  340. if (this[ASYNC]) defer(() => this[RESUME]())
  341. else this[RESUME]()
  342. }
  343. return dest
  344. }
  345. unpipe(dest) {
  346. const p = this[PIPES].find(p => p.dest === dest)
  347. if (p) {
  348. this[PIPES].splice(this[PIPES].indexOf(p), 1)
  349. p.unpipe()
  350. }
  351. }
  352. addListener(ev, fn) {
  353. return this.on(ev, fn)
  354. }
  355. on(ev, fn) {
  356. const ret = super.on(ev, fn)
  357. if (ev === 'data' && !this[PIPES].length && !this.flowing) this[RESUME]()
  358. else if (ev === 'readable' && this[BUFFERLENGTH] !== 0)
  359. super.emit('readable')
  360. else if (isEndish(ev) && this[EMITTED_END]) {
  361. super.emit(ev)
  362. this.removeAllListeners(ev)
  363. } else if (ev === 'error' && this[EMITTED_ERROR]) {
  364. if (this[ASYNC]) defer(() => fn.call(this, this[EMITTED_ERROR]))
  365. else fn.call(this, this[EMITTED_ERROR])
  366. }
  367. return ret
  368. }
  369. get emittedEnd() {
  370. return this[EMITTED_END]
  371. }
  372. [MAYBE_EMIT_END]() {
  373. if (
  374. !this[EMITTING_END] &&
  375. !this[EMITTED_END] &&
  376. !this[DESTROYED] &&
  377. this[BUFFER].length === 0 &&
  378. this[EOF]
  379. ) {
  380. this[EMITTING_END] = true
  381. this.emit('end')
  382. this.emit('prefinish')
  383. this.emit('finish')
  384. if (this[CLOSED]) this.emit('close')
  385. this[EMITTING_END] = false
  386. }
  387. }
  388. emit(ev, data, ...extra) {
  389. // error and close are only events allowed after calling destroy()
  390. if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])
  391. return
  392. else if (ev === 'data') {
  393. return !this[OBJECTMODE] && !data
  394. ? false
  395. : this[ASYNC]
  396. ? defer(() => this[EMITDATA](data))
  397. : this[EMITDATA](data)
  398. } else if (ev === 'end') {
  399. return this[EMITEND]()
  400. } else if (ev === 'close') {
  401. this[CLOSED] = true
  402. // don't emit close before 'end' and 'finish'
  403. if (!this[EMITTED_END] && !this[DESTROYED]) return
  404. const ret = super.emit('close')
  405. this.removeAllListeners('close')
  406. return ret
  407. } else if (ev === 'error') {
  408. this[EMITTED_ERROR] = data
  409. super.emit(ERROR, data)
  410. const ret =
  411. !this[SIGNAL] || this.listeners('error').length
  412. ? super.emit('error', data)
  413. : false
  414. this[MAYBE_EMIT_END]()
  415. return ret
  416. } else if (ev === 'resume') {
  417. const ret = super.emit('resume')
  418. this[MAYBE_EMIT_END]()
  419. return ret
  420. } else if (ev === 'finish' || ev === 'prefinish') {
  421. const ret = super.emit(ev)
  422. this.removeAllListeners(ev)
  423. return ret
  424. }
  425. // Some other unknown event
  426. const ret = super.emit(ev, data, ...extra)
  427. this[MAYBE_EMIT_END]()
  428. return ret
  429. }
  430. [EMITDATA](data) {
  431. for (const p of this[PIPES]) {
  432. if (p.dest.write(data) === false) this.pause()
  433. }
  434. const ret = super.emit('data', data)
  435. this[MAYBE_EMIT_END]()
  436. return ret
  437. }
  438. [EMITEND]() {
  439. if (this[EMITTED_END]) return
  440. this[EMITTED_END] = true
  441. this.readable = false
  442. if (this[ASYNC]) defer(() => this[EMITEND2]())
  443. else this[EMITEND2]()
  444. }
  445. [EMITEND2]() {
  446. if (this[DECODER]) {
  447. const data = this[DECODER].end()
  448. if (data) {
  449. for (const p of this[PIPES]) {
  450. p.dest.write(data)
  451. }
  452. super.emit('data', data)
  453. }
  454. }
  455. for (const p of this[PIPES]) {
  456. p.end()
  457. }
  458. const ret = super.emit('end')
  459. this.removeAllListeners('end')
  460. return ret
  461. }
  462. // const all = await stream.collect()
  463. collect() {
  464. const buf = []
  465. if (!this[OBJECTMODE]) buf.dataLength = 0
  466. // set the promise first, in case an error is raised
  467. // by triggering the flow here.
  468. const p = this.promise()
  469. this.on('data', c => {
  470. buf.push(c)
  471. if (!this[OBJECTMODE]) buf.dataLength += c.length
  472. })
  473. return p.then(() => buf)
  474. }
  475. // const data = await stream.concat()
  476. concat() {
  477. return this[OBJECTMODE]
  478. ? Promise.reject(new Error('cannot concat in objectMode'))
  479. : this.collect().then(buf =>
  480. this[OBJECTMODE]
  481. ? Promise.reject(new Error('cannot concat in objectMode'))
  482. : this[ENCODING]
  483. ? buf.join('')
  484. : Buffer.concat(buf, buf.dataLength)
  485. )
  486. }
  487. // stream.promise().then(() => done, er => emitted error)
  488. promise() {
  489. return new Promise((resolve, reject) => {
  490. this.on(DESTROYED, () => reject(new Error('stream destroyed')))
  491. this.on('error', er => reject(er))
  492. this.on('end', () => resolve())
  493. })
  494. }
  495. // for await (let chunk of stream)
  496. [ASYNCITERATOR]() {
  497. let stopped = false
  498. const stop = () => {
  499. this.pause()
  500. stopped = true
  501. return Promise.resolve({ done: true })
  502. }
  503. const next = () => {
  504. if (stopped) return stop()
  505. const res = this.read()
  506. if (res !== null) return Promise.resolve({ done: false, value: res })
  507. if (this[EOF]) return stop()
  508. let resolve = null
  509. let reject = null
  510. const onerr = er => {
  511. this.removeListener('data', ondata)
  512. this.removeListener('end', onend)
  513. this.removeListener(DESTROYED, ondestroy)
  514. stop()
  515. reject(er)
  516. }
  517. const ondata = value => {
  518. this.removeListener('error', onerr)
  519. this.removeListener('end', onend)
  520. this.removeListener(DESTROYED, ondestroy)
  521. this.pause()
  522. resolve({ value: value, done: !!this[EOF] })
  523. }
  524. const onend = () => {
  525. this.removeListener('error', onerr)
  526. this.removeListener('data', ondata)
  527. this.removeListener(DESTROYED, ondestroy)
  528. stop()
  529. resolve({ done: true })
  530. }
  531. const ondestroy = () => onerr(new Error('stream destroyed'))
  532. return new Promise((res, rej) => {
  533. reject = rej
  534. resolve = res
  535. this.once(DESTROYED, ondestroy)
  536. this.once('error', onerr)
  537. this.once('end', onend)
  538. this.once('data', ondata)
  539. })
  540. }
  541. return {
  542. next,
  543. throw: stop,
  544. return: stop,
  545. [ASYNCITERATOR]() {
  546. return this
  547. },
  548. }
  549. }
  550. // for (let chunk of stream)
  551. [ITERATOR]() {
  552. let stopped = false
  553. const stop = () => {
  554. this.pause()
  555. this.removeListener(ERROR, stop)
  556. this.removeListener(DESTROYED, stop)
  557. this.removeListener('end', stop)
  558. stopped = true
  559. return { done: true }
  560. }
  561. const next = () => {
  562. if (stopped) return stop()
  563. const value = this.read()
  564. return value === null ? stop() : { value }
  565. }
  566. this.once('end', stop)
  567. this.once(ERROR, stop)
  568. this.once(DESTROYED, stop)
  569. return {
  570. next,
  571. throw: stop,
  572. return: stop,
  573. [ITERATOR]() {
  574. return this
  575. },
  576. }
  577. }
  578. destroy(er) {
  579. if (this[DESTROYED]) {
  580. if (er) this.emit('error', er)
  581. else this.emit(DESTROYED)
  582. return this
  583. }
  584. this[DESTROYED] = true
  585. // throw away all buffered data, it's never coming out
  586. this[BUFFER].length = 0
  587. this[BUFFERLENGTH] = 0
  588. if (typeof this.close === 'function' && !this[CLOSED]) this.close()
  589. if (er) this.emit('error', er)
  590. // if no error to emit, still reject pending promises
  591. else this.emit(DESTROYED)
  592. return this
  593. }
  594. static isStream(s) {
  595. return (
  596. !!s &&
  597. (s instanceof Minipass ||
  598. s instanceof Stream ||
  599. (s instanceof EE &&
  600. // readable
  601. (typeof s.pipe === 'function' ||
  602. // writable
  603. (typeof s.write === 'function' && typeof s.end === 'function'))))
  604. )
  605. }
  606. }
  607. exports.Minipass = Minipass