1. Types of Streams
Readable Stream : Reads data.
Writable Stream : Writes data.
Duplex Stream : Both readable and writable.
Transform Stream : Modifies data while reading and writing.
2. Readable Streams
const fs = require ( 'fs' );
const readableStream = fs. createReadStream ( 'input.txt' );
readableStream. on ( 'data' , ( chunk ) => {
console. log ( `Received ${ chunk . length } bytes of data.` );
});
readableStream. on ( 'end' , () => {
console. log ( 'End of stream.' );
});
readableStream. on ( 'error' , ( err ) => {
console. error ( 'Stream error:' , err);
});
`.pipe()` Example
const readableStream = fs. createReadStream ( 'input.txt' );
const writableStream = fs. createWriteStream ( 'output.txt' );
readableStream. pipe (writableStream); // Pipe data from input.txt to output.txt
3. Writable Streams
const fs = require ( 'fs' );
const writableStream = fs. createWriteStream ( 'output.txt' );
writableStream. write ( 'This is the first line. \n ' );
writableStream. write ( 'This is the second line. \n ' );
writableStream. end (); // No more data to write
writableStream. on ( 'finish' , () => {
console. log ( 'Finished writing data.' );
});
writableStream. on ( 'error' , ( err ) => {
console. error ( 'Stream error:' , err);
});
4. Duplex Streams
const { Duplex } = require ( 'stream' );
const duplexStream = new Duplex ({
write ( chunk , encoding , callback ) {
console. log ( `Writing: ${ chunk }` );
callback ();
},
read ( size ) {
this . push ( 'Data from read.' );
this . push ( null ); // End the readable part
}
});
duplexStream. write ( 'Input data' );
duplexStream. on ( 'data' , ( chunk ) => console. log ( `Read: ${ chunk }` ));
const { Transform } = require ( 'stream' );
const upperCaseTransform = new Transform ({
transform ( chunk , encoding , callback ) {
this . push (chunk. toString (). toUpperCase ());
callback ();
}
});
process.stdin. pipe (upperCaseTransform). pipe (process.stdout);
6. Piping Streams
const fs = reqruie ( 'fs' );
const readStream = fs. createReadStream ( 'input.txt' );
const writeStream = fs. createWriteStream ( 'output.txt' );
readStream. pipe (writeStream);
7. Flow Control (Backpressure)
Backpressure means, when output stream target is not able to consume the stream, the read stream have to pause, so that there’s no memory leak. Otherwise it will keep queuing in the memory.
const readStream = fs. createReadStream ( 'largefile.txt' );
const writeStream = fs. createWriteStream ( 'output.txt' );
readStream. on ( 'data' , ( chunk ) => {
const shouldContinue = writeStream. write (chunk);
if ( ! shouldContinue) {
console. log ( 'Backpressure detected. Pausing reading.' );
readStream. pause ();
}
});
writeStream. on ( 'drain' , () => {
console. log ( 'Resuming reading after drain.' );
readStream. resume ();
});