Types

Stream Description
Readable Data emitter
Writable Data receiver
Transform Emitter and receiver
Duplex Emitter and receiver (independent)

See: Stream (nodejs.org)

Streams

const Readable = require('stream').Readable
const Writable = require('stream').Writable
const Transform = require('stream').Transform

Piping

clock()              // Readable stream
  .pipe(xformer())   // Transform stream
  .pipe(renderer())  // Writable stream

Methods

stream.push(/*...*/)         // Emit a chunk
stream.emit('error', error)  // Raise an error
stream.push(null)            // Close a stream

Events

const st = source()
st.on('data', (data) => { console.log('<-', data) })
st.on('error', (err) => { console.log('!', err.message) })
st.on('close', () => { console.log('** bye') })
st.on('finish', () => { console.log('** bye') })

Assuming source() is a readable stream.

Flowing mode

// Toggle flowing mode
st.resume()
st.pause()
// Automatically turns on flowing mode
st.on('data', /*...*/)

Stream types

Readable

function clock () {
  const stream = new Readable({
    objectMode: true,
    read() {}
  })

  setInterval(() => {
    stream.push({ time: new Date() })
  }, 1000)

  return stream
}

// Implement read() if you
// need on-demand reading.

Readable streams are generators of data. Write data using stream.push().

Transform

function xformer () {
  let count = 0

  return new Transform({
    objectMode: true,
    transform: (data, _, done) => {
      done(null, { ...data, index: count++ })
    }
  })
}

Pass the updated chunk to done(null, chunk).

Writable

function renderer () {
  return new Writable({
    objectMode: true,
    write: (data, _, done) => {
      console.log('<-', data)
      done()
    }
  })
}

All together now

clock()              // Readable stream
  .pipe(xformer())   // Transform stream
  .pipe(renderer())  // Writable stream

Also see