A modern perspective by NodeJS Streams

Revisiting batch applications: a modern perspective by NodeJS Streams

Having to deal with big chunks of data in a batch “flavour” is still an important demand. In some cases it is important to be able to process this data as fast as possible, while in others it is  not. 

The latter case is what Jobrapido had to face. The business requirements (assigning a classification to some input data) had no “time constraints”, so the technical choice was to leverage NodeJS native stream in order to be able to process the entire batch of data without “stealing” too many resources from other kinds of workflow, which really need fast access to those resources. 

NodeJS was chosen because of its natural aptitude to perform really well when a lot of asynchronous processing is involved. Thanks to the Stream native API architecture, the implementation of “rate limiting” logic is straightforward. 

So, let’s dive into the Stream… 

Stream Theory 

What is a stream? We can image a stream like a conveyor belt inside an assembly line, on which the raw materials flow from a source to a destination. During their journey the raw materials are processed by a pipeline that transforms them into the final product. A pipeline is a sequence of steps and each applies some processing to the materials. 

NodeJS defines four kinds of stream: readablewritableduplex and its specialization transform

  • Readable: this stream provides data. It is a unidirectional stream, i.e. data can only be read but cannot be written. 
  • Writable: you can put data on this stream, which is unidirectional, i.e. you can only write data on it. 
  • Duplex: this stream can consume and produce data. It is a bidirectional stream because you can write and read on it. 
  • Transform: it is a bidirectional stream like its parent, the duplex stream. The main difference between duplex and transform is that the latter only outputs data as a “reaction” to some incoming data whereas with a duplex stream the two events are independent. 

All of these types are exposed in the NodeJS native APIs as interface that can be implemented to work with streams (https://nodejs.org/api/stream.html).

NodeJS provides a method (called pipe) to combine streams, taking into consideration their nature. A readable stream could act as a source of data, then pipe it into one or more transform streams and finally the pipeline is closed with a writable that consumes the information. 

import { Readable, Transform, TransformCallback } from "stream";

class Pow extends Transform {
  public static withExponent(exponent: number) {
    return new Pow(exponent);
  }

  private constructor(private readonly exponent: number) {
    super({ objectMode: true });
  }

  public _transform(inputNumber: number, _: string, callback: TransformCallback) {
      const result = Math.pow(inputNumber, this.exponent);
      this.push(`${result}\n`);
      callback();
  }
}

Readable.from([1, 2, 3, 4, 5])
  .pipe(Pow.withExponent(1))
  .pipe(process.stdout);

The default implementation of the different kinds of streams described above relies on an internal buffer to deal with incoming/outgoing data flow. 

The internal buffer is not directly exposed to the outer world and the standard implementation deals with it for you. 

Streams, thanks to this implementation, are time and space efficient.

Let’s imagine having to transfer a large file from the local file system to a TCP connection. Streams can read the file “chunk by chunk” (avoiding putting the entire file in memory) and can start writing on the TCP connection without waiting for the entire file to be read from the file system. 

Back Pressure 

The buffer itself cannot handle all the possible scenarios. For example, a specific step into the stream pipeline could be slower than the previous one. NodeJS handles this case by applying a technique known as back pressure

Back pressure handles the amount of buffered data by keeping it to an acceptable level so that the faster source does not produce data that the slower receiver cannot handle. By doing this NodeJS guarantees that the memory is not overloaded by the buffered data. 

Using the pipe() method to chain the stream together, back pressure is already implemented and the developer does not have to deal with it. 

Back to our problem 

Now that the theory is clear, let’s dive into a real problem. Imagine you have a file with a list of people’s names and you want to know (with a confidence score) what a person’s gender is most likely to be.

In order to retrieve this information we used the API exposed by genderize.io. We have to use these resources carefully because they have daily limits, so in our project we have to take care of this. 

You can find the project on GitHub, so you can play with it: https://github.com/jobrapido/nodejs-streams-sample . Follow the README instructions to install and run it. 

The project is written in Typescript and is tested on NodeJS v8+ 

Anatomy of the project 

The project starts from the `main.ts` file that initializes our application, injects dependencies (how the injection is performed is not a topic for this article) and starts the pipeline. 

The pipeline structure is defined inside `gender-assigner-pipeline.ts`.

Below is a schema describing it: 

Let’s now analyse the single steps that compose our pipeline: 

  • fsInput (type: Readable): the starting point of our pipeline: the data came from the input file (‘data/input/input.csv’) 
  • csvParser (type: Transform): an external library that parses an input csv. In this example the CSV is trivial, but in a real-case scenario it could be a complex data structure 
  • inputDecoder (type: Transform): transforms the csv record into a structured object: in our case, an object of ‘Person’ class 
  • bufferTransformStream (type: Transform): this collects the incoming rows into fixed size chunks. From this step onward the base unit of the pipeline becomes an array of ‘Person’. 
  • assignGenderTransformStream (type: Transform): this represents the core logic of our project. It calls the Genderize.io API and parses the response. The return type is an array of ‘PersonWithGender’. 
  • stringifier (type: Transform): this step applies a conversion from a ‘PersonWithGender’ object to a csv string. 
  • metricStream (type: Transform): this step does not add any functionality to the pipeline, it is only useful for collecting and logging some metrics about the pipeline behaviour. 
  • fsOutput: (type: Writable): this is the final step of our pipeline. It writes the retrieved information to the output file (‘data/output/output.csv’) 

How to deal with performance 

A lot of the described steps are really simple, but we want to better analyse the bufferTransformStream step. 

At a first glance it appears to be a trivial step, with nothing related to the business logic of our application. On the contrary, it represents a key point of our pipeline. 

By aggregating multiple messages into an array (that becomes the payload of a single message from now on) we increase our final output speed because the subsequent step (assignGenderTransformStream) is now able to call, multiple times, the external HTTP API for every single invocation of the step. 

For further improvements, not applicable to all the cases, we could modify the assignGenderTransformStream in order to call the API in a batch flavour (https://genderize.io/#batch-usage) so that for each chunk only one HTTP call is actually performed. We leave this optimization to you, so you have the chance to put your hands on some code. 

Keep performance under control 

As the saying goes, “Speed is nothing without control“. If you remember from the preface of the article, the issue that we had to face here in Jobrapido was to process a huge amount of data without “stealing” resources from real-time services, and here is where rate limiting kicks in. 

In the sample case, we decided to apply rate limiting on the Genderize.io http calls step, which is the most demanding (in terms of resources) in our example. Rate limiting is achieved through an external library that enables to wrap custom function and limit the number of invocations per second. 

This fits perfectly in our asynchronous pipeline: only when the function actually invokes its callback, the information continues its journey through the rest of the pipeline. 

You can play around with this feature by changing the value of `BUFFER_SIZE` and `MAX_REQUESTS_PER_SECONDS` into the environment. 

Max requests per secondBuffer sizeThroughput (elements/second)
111,022
1101,017
10105,135
101008,957
3010019,652
100100022,762

Conclusions 

Here ends our brief dive into the Stream world, but for you it is only the beginning. You can continue reading the NodeJS Stream documentation to have more precise knowledge about the actual implementation. 

In the example, you can find some unit tests that help you understand how to implement a real working project based on your business needs and transform your theoretical knowledge into something useful for you or your company. 

In this article we wanted to describe a case where rate limiting is the key feature but the same architecture can be used (with the appropriate modification) to build a high throughput application, so don’t stop experimenting with NodeJS and its Streams! 

Howard Scordio - Software Engineer @ Jobrapido
Stefano Ranzini - Software Engineer @ Jobrapido
Please follow and like us: