Parallelising ETL workflows with the Jongleur gem

Parallelising ETL workflows with the Jongleur gem


Our assignment

Our company has just gone on a huge recruitment spree and has just hired 100,000 new employees. The HR department has sent us a spreadsheet with the employee details, asking us to save them in the company staff database. After having a look at the data, we notice a few actions we need to take before inserting the details into our database and we come up with the following algorithm:

  • Read data from the spreadsheet

  • Break employees' full name into first name and last name fields

  • Assign each employee a unique company number

  • Insert employee into the database as long as their first and last name is less than 255 characters long

A single-process ETL pipeline

We then proceed to write a crude but effective ETL pipeline

require 'csv'
require 'pg'
require 'ostruct'
require 'securerandom'

@users = []
@conn = PG.connect( dbname: 'testdb' )
DATA_FILE = "data/users.csv"

def extract(data_file)
  CSV.foreach(data_file) do |row|
    user = OpenStruct.new
    user.name = row[0]
    user.email = row[1]
    @users << user
  end
end

def transform
  @users.map! do |usr|
    usr.first_name = usr.name.split[0]
    usr.last_name = usr.name.split[1]
    usr.staff_no = SecureRandom.uuid
    usr
  end
end

def load(usr)
  if( usr.first_name.length < 255 &&  usr.last_name.length < 255)
    @conn.exec_params(
      "INSERT INTO STAFF (FIRST_NAME, LAST_NAME, EMAIL, STAFF_NO) VALUES ($1, $2, $3, $4)",
      [usr.first_name, usr.last_name, usr.email, usr.staff_no]
    )
  else puts "Validation failed for #{usr.first_name} #{usr.last_name}"
  end
end

extract(DATA_FILE)
transform
@users.each {|usr| load(usr)}

Our extract method reads the data from file and stores it in memory. The transform method then splits the full name field into first and last name and assigns a UUID to the employee. Finally, our load method ensures the name fields don't exceed the expected length, before inserting the data into the Staff table.

We test our script and it works fine, however all this data crunching takes a long time and, us being the perfectionist that we are, would like to make it faster.

The need for speed

Enter Jongleur a Ruby gem that allows us to create tasks that are executed as separate OS processes and that can be ran with a certain precedence.

We'll use Jongleur to split our ETL pipeline into two separate and parallel pipelines, each dealing with one half of the input data. We want our task graph to look like this:

2x ETL

Each Extract, Transform and Load task will do exactly what we described in our initial algorithm above, but with only half of the spreadsheet records. The Cleanup task will kick-in last, to sweep away any undue remnants after our pipeline has been ran.

Representing the above graph in Jongleur is a simple matter of creating a Ruby Hash:

etl_graph = {
  Extract1: [:Transform1],
  Transform1: [:Load1],
  Extract2: [:Transform2],
  Transform2: [:Load2],
  Load1: [:Cleanup],
  Load2: [:Cleanup]
}

We can then just tell Jongleur to use our task graph

Jongleur::API.add_task_graph etl_graph

Design considerations

One thing we need to consider before creating our parallel tasks is how to share data between tasks which are ran as separate processes. Shared Memory could be used but has some pitfalls which is why we'll be using the wonderful in-memory data store which is Redis. Redis is (mostly) single-threaded, which means concurrent requests are executed sequentially and safely (and blazingly quickly). Furthermore, we'll take advantage of one of Jongleur's great features whereby each task can access the process ids (pids) of its predecessors. This means that we can use a task's pid as a key for its data. So our Extract1 task can save data to Redis with its own pid and when Transform1 task starts it will know exactly which data was created by Extract1, its predecessor!

Common attributes

Every Jongleur task class must inherit from the base class Jongleur::WorkerTask. This means that we can use this class to define data that we want accessible from any and all WorkerTask instances. In our case we want any Extract, Transform and Cleanup tasks to be able to access our Redis data store. To achieve that we simply define the Redis connection as a class variable in the Jongleur::WorkerTask hierarchy:

class Jongleur::WorkerTask
  @@redis = Redis.new(host: "localhost", port: 6379, db: 15)
end

Extracting

Both our Extract1 and Extract2 tasks will need to have some common functionality, namely to parse records from the csv file and save them to Redis:

class Extract < Jongleur::WorkerTask

  DATA_FILE = "data/users.csv"

  def process_row(row, rowno)
    user = {}
    user[:num] = rowno
    user[:name] = row[0]
    user[:email] = row[1]
    user

    @@redis.hset(Process.pid.to_s, "user:#{rowno}", user.to_json)
  end
end

Now the only divergence between Extract1 and Extract2 comes when reading the input data from the csv file. Extract1 will only read and store the first 50,000 records, while Extract2 will read and store the remaining 50,000 records.

class Extract1 < Extract
  def execute
    CSV.foreach(DATA_FILE).with_index(1) do |row, rowno|
      break if rowno >= 50000
      process_row(row, rowno)
    end
  end
end

class Extract2 < Extract
  def execute
    CSV.foreach(DATA_FILE).with_index(1) do |row, rowno|
      next if rowno < 50000
      process_row(row, rowno)
    end
  end
end

Transforming

Each Tranform task will need to use its Extracting predecessor's pid in order to load the extracted records it needs to transform. It will then proceed to split each record's name, assign it a UUID and save it back to Redis:

class Transform < Jongleur::WorkerTask
  @desc = 'Transforming'
  def execute
    loader_id = Jongleur::Implementation.get_process_id(predecessors.first)
    users = @@redis.hgetall(loader_id)
    users.keys.each do |usr_key|
      transformed_user = {}
      data = JSON.parse users[usr_key]
      transformed_user[:first_name] = data['name'].split[0]
      transformed_user[:last_name] = data['name'].split[1]
      transformed_user[:staff_no] = SecureRandom.uuid
      transformed_user[:num] = data['num']
      transformed_user[:email] = data['email']
      @@redis.hset(Process.pid.to_s, "user:#{data['num']}", transformed_user.to_json)
    end
  end
end

class Transform1 < Transform; end
class Transform2 < Transform; end

Loading

Load1 and Load2 will pick up the records stored by their respective predecessor Transform tasks, will validate the name length and will insert the records in the Staff table of our HR's Postgres database:

class Load < Jongleur::WorkerTask
  @desc = 'Loading'

  def execute
    loader_id = Jongleur::Implementation.get_process_id(predecessors.first)
    users = @@redis.hgetall(loader_id)
    conn = PG.connect( dbname: 'testdb' )
    users.keys.each do |usr_key|
      data = JSON.parse users[usr_key]
      if( data['first_name'].length < 255 &&  data['last_name'].length < 255 )
        conn.exec_params(
          "INSERT INTO STAFF (FIRST_NAME, LAST_NAME, EMAIL, STAFF_NO) VALUES ($1, $2, $3, $4)",
          [ data['first_name'], data['last_name'], data['email'], data['staff_no'] ]
        )
      else puts "Validation failed for #{usr.first_name} #{usr.last_name}"
      end
    end

  end
end

class Load1 < Load; end
class Load2 < Load; end

Cleanup

All our Cleanup task has to do is to erase the Redis data created by the previously-ran tasks

class Cleanup < Jongleur::WorkerTask
  def execute
    @@redis.flushdb
    puts "...Cleanup"
  end
end

Running our pipelines

We now have

  1. Defined our Task Graph and configured Jongleur with it

  2. Implemented an #execute method for each of the Tasks in our Task Graph

We are now ready to invoke Jongleur and run our parallel pipelines:

API.run do |on|
  on.completed do |task_matrix|
    puts "Jongleur run is complete \n"
    puts task_matrix
    puts "oh-oh" if API.failed_tasks(task_matrix).length > 0
  end
end

The output is:

Starting workflow...
starting task Extract1
starting task Extract2
finished task: Extract1, process: 22722, exit_status: 0, success: true
starting task Transform1
finished task: Extract2, process: 22723, exit_status: 0, success: true
starting task Transform2
finished task: Transform1, process: 22726, exit_status: 0, success: true
starting task Load1
finished task: Transform2, process: 22727, exit_status: 0, success: true
starting task Load2
finished task: Load1, process: 22729, exit_status: 0, success: true
finished task: Load2, process: 22732, exit_status: 0, success: true
starting task Cleanup
...Cleanup
finished task: Cleanup, process: 22745, exit_status: 0, success: true
Workflow finished
Jongleur run is complete
#<struct Jongleur::Task name=:Extract1, pid=22722, running=false, exit_status=0, finish_time=1546800512.6342049, success_status=true>
#<struct Jongleur::Task name=:Transform1, pid=22726, running=false, exit_status=0, finish_time=1546800518.355283, success_status=true>
#<struct Jongleur::Task name=:Extract2, pid=22723, running=false, exit_status=0, finish_time=1546800512.967079, success_status=true>
#<struct Jongleur::Task name=:Transform2, pid=22727, running=false, exit_status=0, finish_time=1546800519.273847, success_status=true>
#<struct Jongleur::Task name=:Load1, pid=22729, running=false, exit_status=0, finish_time=1546800533.147315, success_status=true>
#<struct Jongleur::Task name=:Load2, pid=22732, running=false, exit_status=0, finish_time=1546800534.1630201, success_status=true>
#<struct Jongleur::Task name=:Cleanup, pid=22745, running=false, exit_status=0, finish_time=1546800534.899818, success_status=true>

Jongleur gives us a thorough account of each Task's details and success status. Notice how both initial Tasks (Extract1, Extract2) start at the same time and each new Task starts only once its predecessor on the Task Graph has finished. If a Task failed, for whatever reason, Jongleur wouldn't execute its dependent tasks but it would continue running any other tasks it could. The status of all Tasks would still be visible in the Task Matrix structure yielded by the completed callback.

Performance

Running the simple single-process ETL pipeline at the beginning of this article produced the following timings:

9.55s user 1.40s system 32% cpu 33.494 total

Running the parallel Jongleur pipelines produced:

19.99s user 5.23s system 89% cpu 27.210 total

User time is increased with Jongleur (I'd guess due to using Redis) and so is kernel time, obviously due to forking and managing many processes, but overall time is approx. 18% faster using Jongleur with two pipelines. Also consider that in this example we only used two parallel lines, i.e. we split the data load into 2 parts. It is a simple matter to split it into 4, 8 or even 16 parallel lines (dependent on our CPU cores) and thus gain a massive performance gain, using the same logic and principles demonstrated here and simply adding more Tasks.

Jongleur advantages

  1. Performance, through parallelisation.

  2. Implicit precedence handling. Jongleur will start running a task only once its predecessor tasks have successfully finished running. If a task fails, its dependent tasks wil not be ran.

  3. Code modularisation. If a task fails, for whatever reason, it will be marked as such by Jongleur and its dependent tasks will not be run. However, that won't stop any other tasks from being run, which means that partial failure does not necessitate complete failure, in use cases where this is desirable.

The Complete Code

including data file for this demo is freely available

The Future

Feel free to suggest improvements and new features for Jongleur or to talk to me about trying it out for your own Use Cases :)