1. Types of Streams

  • Readable Stream: Reads data.
  • Writable Stream: Writes data.
  • Duplex Stream: Both readable and writable.
  • Transform Stream: Modifies data while reading and writing.

2. Readable Streams

const fs = require('fs');
const readableStream = fs.createReadStream('input.txt');
 
readableStream.on('data', (chunk) => {
    console.log(`Received ${chunk.length} bytes of data.`);
});
 
readableStream.on('end', () => {
    console.log('End of stream.');
});
 
readableStream.on('error', (err) => {
    console.error('Stream error:', err);
});
`.pipe()` Example
const readableStream = fs.createReadStream('input.txt');
const writableStream = fs.createWriteStream('output.txt');
 
readableStream.pipe(writableStream);  // Pipe data from input.txt to output.txt

3. Writable Streams

const fs = require('fs');
const writableStream = fs.createWriteStream('output.txt');
 
writableStream.write('This is the first line.\n');
writableStream.write('This is the second line.\n');
 
writableStream.end();  // No more data to write
 
writableStream.on('finish', () => {
    console.log('Finished writing data.');
});
 
writableStream.on('error', (err) => {
    console.error('Stream error:', err);
});

4. Duplex Streams

const { Duplex } = require('stream');
 
const duplexStream = new Duplex({
    write(chunk, encoding, callback) {
        console.log(`Writing: ${chunk}`);
        callback();
    },
    read(size) {
        this.push('Data from read.');
        this.push(null);  // End the readable part
    }
});
 
duplexStream.write('Input data');
duplexStream.on('data', (chunk) => console.log(`Read: ${chunk}`));

5. Transform Streams

const { Transform } = require('stream');
 
const upperCaseTransform = new Transform({
    transform(chunk, encoding, callback) {
        this.push(chunk.toString().toUpperCase());
        callback();
    }
});
 
process.stdin.pipe(upperCaseTransform).pipe(process.stdout);

6. Piping Streams

const fs = reqruie('fs');
 
const readStream = fs.createReadStream('input.txt');
const writeStream = fs.createWriteStream('output.txt');
 
readStream.pipe(writeStream);

7. Flow Control (Backpressure)

  • Backpressure means, when output stream target is not able to consume the stream, the read stream have to pause, so that there’s no memory leak. Otherwise it will keep queuing in the memory.
const readStream = fs.createReadStream('largefile.txt');
const writeStream = fs.createWriteStream('output.txt');
 
readStream.on('data', (chunk) => {
    const shouldContinue = writeStream.write(chunk);
    if (!shouldContinue) {
        console.log('Backpressure detected. Pausing reading.');
        readStream.pause();
    }
});
 
writeStream.on('drain', () => {
    console.log('Resuming reading after drain.');
    readStream.resume();
});