Streams in Node.js.

A powerful way to handle a lot of data

Published on
Nov 20, 2023

Read time
8 min read

Introduction

For newer web developers, concepts like memory management and data streaming can be intimidating. They’re not usually a part of web development tutorials and yet they are critical for some of the most popular websites on the internet: streaming sites like YouTube, Netflix, Spotify — and many more.

As my career has progressed, I found myself needing to work with data at a much larger scale than when I was newer, and streams have proved an invaluable tool for processing a lot of data in cloud infrastructure — where keeping memory low helps keeps costs low!

Memory management is not usually a part of learning materials aimed at developers of garbage-collected programming languages, like JavaScript. Most of the time, this is useful. We can spend more time writing application logic and trust the underlying engine to do the rest. But when working at scale, with limited resources, or on the cloud, we may need to think differently.

To illustrate the power of streams, this article compares several ways to copy a file using Node.js — first synchronous, then asynchronous, and finally via streams. We’ll also explore using streams to generate a large file without a large memory footprint, including how to mitigate a common gotcha that could completely undermine our memory gains. But first, here’s a recap of what memory actually is.

What is computer memory?

In computing, memory is temporary storage space that the computer uses to execute tasks and run programs. Unlike disk space, which is used for long-term storage of files, memory is optimised to temporarily hold and manipulate data in real-time.

There are several types of computer memory, but we’ll be talking about the primary one — RAM (Random Access Memory). If your computer has more RAM, you can use more applications at once and run more intensive applications, like video-editing software or certain video games.

In the world of Node.js, memory is the amount of data our Node.js programs are storing (via assigning data to variables) or manipulating. If we’re using cloud infrastructure (i.e. remote servers), we may wish to keep our memory usage low to avoid higher costs. But this can also act as a guard rail: often times, exceeding the memory limit is a sign of developer error or poorly optimised code — issues which we would want to catch in a staging environment, before pushing our code to production.

What is streaming?

Streaming refers to the process of continuously delivering and receiving data in smaller chunks. On video websites, for example, streaming allows users to start watching videos without having to download the entire video file, allowing for near-real-time playback. Other examples include audio streaming, online games, and webinars.

In servers, streaming can be a useful tool to read or write large documents, to process large datasets, and to transfer data to and from cloud storage. In this article, we’ll discuss approaches to all of the above. But first, let’s look at the simplest methods for reading and writing files in Node.js.

If you prefer to learn by watching videos, here’s a video version of the tutorial:

Otherwise, keep reading!

Copying Files in Node.js

We’ll begin by exploring how to copy a file, as it’s a relatively simple task that requires both a read step and a write step. In all these examples, we’ll import functionality from Node’s filesystem library, fs.

Synchronous

The simplest way to copy a file is using synchronous methods, like so:

import fs from "node:fs";

function copyFile() {
  const data = fs.readFileSync("myFile.txt");
  fs.writeFileSync("myCopiedFile.txt", data);
}

copyFile();

The problem with this approach is that it blocks the main thread, and therefore, it’s suitable only for limited use cases.

For example, it can be common to load SSL certificates synchronously during server startup — since these files are small and this process can be blocking. Or we could use synchronously methods for developer scripts that run on a developer’s computer, since then we may not need to worry about blocking the main thread or using up memory.

Asynchronous

Asynchronous operations, by contrast, do not block the main thread. That means that the work to read and write files can be done in the background.

import fs from "node:fs/promises";

async function copyFile() {
  const data = await fs.readFile("myFile.txt");
  await fs.writeFile("myCopiedFile.txt", data);
}

copyFile();

This is applicable to a much wider number of use-cases, but it is not suitable to cope with high volumes of data. When working with large files, data will grow, demanding more memory, and putting strain on — or potentially risking crashes in — cloud infrastructure.

Streaming

We can mitigate many of these risks via streaming. In the example below, we create a read stream and a write stream, and use the pipe method to transfer the data from one stream to the other. (Piping means to provide the output of one stream as the input to another stream.)

import fs from "node:fs";

function copyFile() {
  const writeStream = fs.createWriteStream("myCopiedFile.txt");
  const readStream = fs.createReadStream("myFile.txt").pipe(writeStream);

  readStream.on("finish", () => {
    writeStream.end();
  });
}

copyFile();

We can make this example even simpler using the stream library, whose pipeline function allows us to connect a number of streams in an easy-to-read format:

import fs from "node:fs";
import stream from "node:stream/promises";

async function copyFile() {
  const readStream = fs.createReadStream("myFile.txt");
  const writeStream = fs.createWriteStream("myCopiedFile.txt");

  await stream.pipeline(readStream, writeStream);
}

copyFile();

Hopefully, these examples demonstrate that streaming data can be almost as easy as reading and writing files in their entirety.

But there is a common gotcha that could entirely mitigate the benefits of streaming if it’s not handled correctly, which we’ll encounter in the next example.

Generating a Large CSV File

Now let’s look at a simplified version of a real-life problem I encountered in my work: generating a large CSV file.

A function that had been working for several months had stopped running. It turned out that the data it had been processing had grown significantly, and it was now requiring more memory than our cloud-infrastructure allowed.

First, let’s see a non-streaming version of this problem. To log the memory usage in the examples below, I’ll use this function:

function logMemory() {
  console.log(
    Math.round((process.memoryUsage().rss / 1024 / 1024) * 100) / 100 + " MB"
  );
}

In the code below, we’ll generate a CSV with 10 million rows, performing a basic multiplication each time, and then write the file at the end.

import fs from "node:fs/promises";

async function writeCsvInMemory() {
  const rows = [];
  rows.push("Number,Squared\n");

  for (let i = 0; i  10_000_000; i++) {
    rows.push(`${i},${i * i}\n`);
  }

  logMemory();

  await fs.writeFile("myFile2.csv", rows.join(""));
}

await writeCsvInMemory();

The memory used in this example is (on my machine — a Macbook Pro M1) is a whopping 1.38GB. Far too high for most servers in the cloud!

Can we solve the problem with streaming?

import fs from "node:fs";

async function writeCsvWithStream() {
  const writeStream = fs.createWriteStream("myFile.csv");
  writeStream.write("Number,Squared\n");

  for (let i = 0; i < 10_000_000; i++) {
    writeStream.write(`${i},${i * i}\n`);
  }

  logMemory();

  writeStream.end();
}

await writeCsvWithStream();

It may seem surprising that this version uses up even more memory — this time 1.89GB!

The problem is a process called backpressure.

Handling Backpressure

Backpressure occurs when data is produced faster than it can be consumed. If a readstream or data source (in our case, our JavaScript logic) sends data faster than our write stream can process it, we end up building a backlog of data that is stored in memory and which — as in the example above — can even end up using more memory than non-streaming methods!

We can get around this by pausing writing data when the limit is reached, which occurs when the write method returns false instead of true. Then, once the write stream has “drained” — in other words, its caught up with the backlog — then we can continue writing data again.

Backpressure may occur several times before we end our streams, so we can add guardrails to our code to be able to cope with this scenario. Here’s one way of handling this:

import fs from "node:fs";

async function writeCsvWithStream() {
  const writeStream = fs
    .createWriteStream("myFile.csv")
    .on("error", (err) = console.error("Error writing to stream:", err))
    .on("drain", writeData)
    .write("Number,Squared\n");

  let current = 0;

  function writeData() {
    const max = 10_000_000;

    while (current  max) {
      const result = writeStream.write(`${current},${current * current}\n`);
      current++;

      if (!result) {
        break;
      }

      if (current = max) {
        logMemory();
        writeStream.end();
        break;
      }
    }
  }

  writeData();
}

writeCsvWithStream();

In the code above, we track our progress creating the CSV using the current variable. We have a writeData function that is called once at the beginning, then it is called again whenever the write stream drains. The function stops executing when backpressure occurs or when current reaches the intended number of rows.

There is a tradeoff here, since the process takes longer to complete. But the memory cost is much lower, consistently around 75MB throughout the execution of the function — a fraction of the memory used in the examples above!

Other Examples

Copying a File with Backpressure Handling

We can apply a similar approach to our file copying example. This is easier than our looping example above, as fs read streams come with handy pause and resume methods:

import fs from "node:fs";
import stream from "node:stream/promises";

async function copyFile() {
  const readStream = fs.createReadStream("myFile.csv");
  const writeStream = fs.createWriteStream("myCopiedFile.csv");

  readStream.on("data", function (chunk) {
    const result = writeStream.write(chunk);

    if (!result) {
      readStream.pause();
    }
  });

  writeStream.on("drain", () => {
    readStream.resume();
  });

  readStream.on("end", function () {
    logMemory();
    writeStream.end();
  });

  await stream.pipeline(readStream, writeStream);
}

copyFile();

If we copy the large CSV generated in the previous example, the memory usage stays low — again, at around 75MB on my machine.

Streaming Files to AWS S

Another common use case for streaming is uploading files to the cloud. Taking S3 as an example, we can upload a file directly via streaming without having to write anything to the disk. The @aws-sdk/lib-storage library handles a lot of the complexity for us. We don’t need to know the size of our overall file or our streams, and lots of different stream types are supported.

import fs from "node:fs";
import { S3 } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";

async function uploadFile() {
  const readStream = fs.createReadStream("myFile.txt");
  const client = new S3(config);

  const upload = new Upload({
    client,
    params: {
      Bucket: config.bucketName,
      Key: "myFile.txt",
      Body: readStream,
    },
  });

  upload.on("httpUploadProgress", (progress) => {
    logMemory();
    console.log(`Upload progress: ${progress.loaded} / ${progress.total}`);
  });

  await upload.done();
}

Under the hood, this process is performing a multipart file upload. It doesn’t send every single data chunk from the stream, but accumulates a certain number of chunks before then sending off that “part” of the file and freeing up the used memory.

We don’t have to account for backpressure in this example — but if we were using a write stream to upload the file, we would need to make sure it drains properly if we want to keep memory usage low.

Reading a Large CSV and Filtering by Row

Finally, a lot of this article has focused on either write streams or using both read and write streams together. But there are also many situations when using a read stream wisely can lead to greater efficiency or lower memory usage.

Recently, I worked on a function that loaded a large CSV into memory and then filtered a number of the rows. Initially, the code read the entire CSV into memory, storing the tens of thousands of rows in an array, before then filtering them.

Instead, with streaming, we can avoid memory spikes by filtering as the chunks of data come in. In the example below, I perform a check on each data chunk as it comes in. (I’m also using the csv-parser library to turn my stream of CSV rows into JSON objects.)

import fs from "node:fs";
import csv from "csv-parser";

async function readCsv() {
  const keys = new Set();
  const rows = [];

  return fs
    .createReadStream("myLargeFile.csv")
    .pipe(csv())
    .on("data", (data) => {
      const { count, year } = data;
      const key = `${count}-${year}`;

      if (!keys.has(key)) {
        keys.add(key);
        rows.push(data);
      }
    })
    .on("end", () => {
      processRows(rows);
    });
}

readCsv();

I hope you found this article a useful introduction to working with streams in Node.js. I have found these patterns useful in my work, but there’s also a lot more you can do with streams!

© 2024 Bret Cameron