Recently we fully onboarded a new customer that strained our our current architectural approach of doing all batch processing within an AWS Lambda by pushing up against the 15 minute hard limit. The fully managed aspect of AWS Lambda is very appealing to a company of our size so I’m loathe to switch to a different runner to bypass this limitation.
The process in question is written in Ruby and looks for new files in S3, does some standard translation and augmentation, then loads the result into DuckDB. There is not an official Ruby client for DuckDB, nor is there out-of-the-box ActiveRecord support, so our DB connectivity layer is custom. Since Ruby is primarily single-threaded and DuckDB errors on write conflicts, this database layer assumes single-threaded writes and utilizes class variables for state that we leverage for faster bulk writes.
This was fine, right up until we took on a customer that was 15x bigger than everyone else.
Small aside on Ruby
Ruby is a smaller language so it might not be common knowledge that there are different Rubies with different features. MRI (Matz’s Ruby Interpreter) is the reference implementation and by far the most popular deployed version. This version uses a Global VM Lock (GVL), however that is not something inherent or specified by the language. Another popular implementation, JRuby, which runs on the Java Virtual Machine (JVM) does not have feature/limitation. Beyond the JVM startup penalty, the downside of JRuby is not all third party libraries are guaranteed to work. We use MRI and this article reflects that.
Back to the main story
There’s a lot of dead time in the process described above while we wait for data to come in from S3. If we need a real world speedup, the most logical option would be to do some processing while we wait for the file to download.
But wait! isn’t Ruby single-threaded? The Ruby of record, MRI, utilizes a Global VM Lock (GVL) that ensures that only one Ruby thread is running at one time. The phrase “Ruby thread” is doing a lot of work in that sentence. A lot of operations in a Ruby program are actually taking place within the C layer, which is not bound by GVL. (Aside: I found this to be a good overview of the GVL) The slowest action, downloading from S3, is almost entirely in that C layer.
We still need writes to be single threaded, so we need something more that just threading out the download -> load loop. We could use a Mutex around the part of the code that writes to the database. I shy away from manual thread control as it tends to become difficult to maintain over time. What I wanted was multiple download threads feeding a single writing thread with a queue that would block inserts, therefore stopping downloads, so that I did not exhaust filesystem space.
The solution I settled on were Channels from concurrent-ruby edge. I like the general paradigm of orchestrating through queues and the built in back-pressure of a fixed number of executors prevent resource overload.
Taking from a channel is also a blocking action.
You can exploit this fact to send a finish token when all processing is done.
This feature of concurrent-ruby is a few years old, but the documentation is a little sparse with examples hard to come by. Below is an annotated, simplified example of what I put into production.
class ChannelPipeline
# Ruby isn't typed by default, so we can use a simple string as a marker for when processing is done.
# All we need to do is ensure that it does not look like any data coming in the system
CHANNEL_FINISH_TOKEN = '__finish__'
def load_metrics
# Create the channel which we are going to use to download files from s3 and make them available
# on the local disk.
# You want this to be more than 1. The 5 here is somewhat arbitrary.
# The ideal number depends on the relative speed of the processes
s3_channel = Concurrent::Channel.new(capacity: 5)
# Create the channel which is going to be doing the db insert.
# In my case, this has to be single threaded
db_channel = Concurrent::Channel.new(capacity: 1)
# A single threaded channel will be used to communicate that the process is done, either due to
# an error or with successful completion
finish_channel = Concurrent::Channel.new(capacity: 1)
# I'm going to admit that I found the naming in the library a little confusing.
# `Concurrent::Channel.new` creates something you use for communicating, but
# `Concurrent::Channel.go` creates a worker thread you can use for processing.
# So I'm starting a channel, but also sending a channel as arguments.
start_s3_channel(s3_channel, db_channel, finish_channel)
start_db_channel(db_channel, finish_channel)
# On the main thread, list the files in s3
start_aws_file_list(s3_channel)
Logger.info('Workers started, waiting on the finish token')
success = ~finish_channel
s3_channel.close
db_channel.close
finish_channel.close
raise 'Unable to complete pipeline' unless success
Logger.info('Pipeline complete')
end
def start_s3_channel(s3_channel, db_channel, finish_channel)
# Start the worker that will perform the downloads
Concurrent::Channel.go do
# Loop on the channel. `~` is an alias for `take`. If there is nothing in the channel,
# this is a blocking action
while (s3_channel_message = ~s3_channel)
if s3_channel_message == CHANNEL_FINISH_TOKEN
# We received the finish token, pass it forward then break the loop
Logger.info('Completed downloads of files from s3')
db_channel << CHANNEL_FINISH_TOKEN
break
else
# Pull the s3 object out of the message, download it, then publish a new message
# to the db thread that it is available for loading
s3_channel_message => { s3_object: }
local_file = S3Helper.download(s3_object)
db_channel << { local_file: }
end
end
rescue StandardError => e
# Ruby threads don't bubble errors up to the main thread. I want to halt on failure,
# so catch it here then tell the main thread to stop execution.
Logger.error(e)
finish_channel << false
end
end
def start_db_channel(db_channel, finish_channel)
# The structure of this is very similar to the structure above. Limiting the size of db_channel
# is what gets the throttling behavior I wanted.
Concurrent::Channel.go do
while (db_channel_message = ~db_channel)
if db_channel_message == CHANNEL_FINISH_TOKEN
finish_channel << true
break
else
db_channel_message => { local_file: }
Db::Loader.load(local_file)
end
end
rescue StandardError => e
Logger.error(e)
finish_channel << false
end
end
def start_aws_file_list(s3_channel)
AwsUtil.list_objects(bucket:, prefix: "/") do |s3_object|
s3_channel << { s3_object: }
end
# Push the finish token the let the channels know we've reached the end of the input
s3_channel << CHANNEL_FINISH_TOKEN
end
end
For reference, the previous code looked more like:
class ChannelPipeline
def load_metrics
AwsUtil.list_objects(bucket:, prefix: "/") do |s3_object|
local_file = S3Helper.download(s3_object)
Db::Loader.load(local_file)
end
end
end
The multithreaded version is more complicated and wouldn’t be an ideal first pass. Going with the complicated route first would have made debugging and enhancing the business logic harder.
But that the point where we had enough scale where we needed performance, this change provided a 1.5x speedup on the original implementation. Since new data grows linearly, that buys us a lot of time before we have to do anything more complicated and frees up our capacity to focus on direct customer impacting problems. When you’re early in a company’s life-cycle, that flexibility could be the difference between success and failure.
Did you like this content?
Related Posts:
Serveless SvelteKit in AWS Lambda (and all of my frustrations about the process)
I saw an opportunity to keep our OpEx low by running our containerized SvekteKit application in AWS Lambda instead of ECS or K8N and a great vibe coding opportunity. It turned out to be an exercise in frustration. In this post I provide both the solution and the optional color commentary on how I got there.
DuckDB: The database Swiss Army Knife
I initially used DuckDB to prototype a few features for a dashboard app with plans to change it out for a "real database." Instead it became a central component of our application and a useful do-it-all dataprocessing system.
Managing the Small Family Farm
The devops mantra is that infrastructure is livestock, not pets. Personal infrastructure will never be truely anonymous but we can use the tooling to make managing it a lot easier.