Engineering
One change to optimize them all: a story about batching on the source side
At CloudQuery, we try to optimize the performance of our syncs so that all of the data is processed as fast as possible. After a last year's move from our custom in-house type system to Apache Arrow, we almost missed one massive opportunity to further optimize our data syncs.
Migration to Apache Arrow #
Here's a quick recap on our adoption of Apache Arrow at CloudQuery:
Before Arrow, we used our own type system that only supported around 14 different data types. However, we started hitting limitations in various use cases as we grew the number of supported data sources and destinations. Taking a step back, we decided to adopt Apache Arrow instead building yet another format.
Single resource per message #
Prior to our type system migration to Apache Arrow we used to send each resource synced in a separate streaming message over gRPC. Essentially, it looked like this:
When the type system change to Apache Arrow was completed, no significant regressions in performance were detected. That's why we decided that it's good enough and shipped it to all of our plugins.
Turns out having a single resource per message sent is actually tech debt #
As we had to do an upstream fix to Apache Arrow every now and then while expanding the support for the nested types and optimizing our codebase, we forgot to pay attention to the following:
Apache Arrow defines a cross language columnar format for flat and hierarchical data
It means that the Apache Arrow data processing is optimized for sending lots of rows at once, but we used it to send a single row at once.
Initially, we packed each resource that we sent over the wire into it's own Apache Arrow record, which contains:
- schema: contains field schema describing how the destination side should decode the data
- data: packed Apache Arrow arrays. This is the place where we can store many entries, but we simply overlooked this in the initial migration.
Particularly, it means that for every resource, or row in Apache Arrow, we would have a whole schema packed in (and unpacked on the destination side), spending tons of extra CPU cycles, RAM & network processing for every resource.
Essentially, to send
N
resources we would send N
Apache Arrow records:When instead we could be sending a single Apache Arrow record with
N
rows:Sending multiple rows in a single record #
Once the issue was noticed, we started planning on how to fix this issue and better utilize resources when using Apache Arrow.
We even fell into trap once of collecting wrong metrics. Instead of focusing on the resource consumption savings, we saw that the sync time wasn't affected and we de-prioritized the issue until we retested in the constrained environment.
To implement batching multiple resources in a single message, we had to make the following steps:
- Ensure that all destination plugins support receiving Apache Arrow records with multiple rows. Initially we developed the destination plugins code with a single resource per message received in mind, and there were some places that needed to be updated.
- Wait for some time to allow the users to migrate to the newer destination plugin versions. Otherwise, the batching of resources into a single record would be a breaking change and would require users to update the destination plugins along with source plugins right away.
- Roll out the change to some select source plugins and thoroughly test.
- Roll out the change to all source plugins.
We implemented destination support along with changes to the test suite to ensure that we don't break anything in the future and then we waited.
Implementation details #
We decided to implement a simple batching with the cutoff by timeout and size. So, for each table being synced we collect a batch on the source side into a single Apache Arrow record, and we send it out only after the batch is full or the timeout has passed.
You can see the change in our Go SDK as well as a small follow-up decreasing allocations for logging.
Bugs, bugs, bugs everywhere in the destination plugins #
One of the features that the CloudQuery Go SDK provides is out-of-the-box batch writers for destinations. It allows batching the data on the destination side to be written to the destination (file, database, etc.) in a single API call rather than spamming the destination with small writes.
Yes, we thought about batching in the destination processing. No, we didn't put 2 and 2 together at the time. Yes, it happens, and it's OK.
While updating the destination plugins to support Apache Arrow records with multiple rows we had introduced several inconsistencies in our batch writers. Almost all of those were about assuming a single row in the record when deciding to start a new batch. And some were about data size as well.
The final fix for the destination plugins is present in
v4.45.6
.Upgrading sources #
First we made a change to the PostgreSQL source plugin, introducing the
rows_per_record
option. It allows to manually select how many database rows would be packed into a single Apache Arrow record to be sent over the wire. After rolling this out we discovered the aforementioned issues with the destination plugins that we had to address first.After the destination code was fixed we proceeded to test syncs with different settings & decided on a sane default of batching up to 50 rows into the record & sending the records at least once in 5 seconds.
Yes, it involved testing. Lots and lots of testing. Behold the amount of benchmarks we had to run to get here.
We enabled the batching on the source side by default since
v4.48.0
.Rolling resource batching to select plugins #
AWS source plugin experiments #
One of the first plugins to receive the upgrade was the AWS source plugin. We chose this plugin as it's one of the most verbose (in terms of tables amount) and one of our most utilized plugins.
We saw that not only were less resources (CPU cycles, RAM and network bytes) being consumed during each sync, but also the sync time's decreased in the environments that are limited by CPU and network rather than the API response speed.
It wasn't obvious in the environment with only 550 thousand resources as the sync time stayed roughly the same (likely attributed to the AWS API response times):
However, when we were able to collect the data from a large AWS organization (114 million resources), the difference in sync time and resources consumed was drastic:
Additionally, we compared the sync speed in different EC2 instance configurations. You can find the detailed comparison here.
Expanding the roster #
After we saw performance improvements with AWS source plugin, we extended the testing to the following list:
We saw great improvements both in the CPU consumption:
As well as RAM:
GA #
We are pleased to announce that all Golang-based CloudQuery plugins now support working with Apache Arrow records with multiple rows!
To enjoy the new smooth experience please use versions no older than the ones listed below.
Wrapping up #
Sometimes the huge improvements can be achieved by reading the documentation and making a data shaping change.
The change to introduce batching is a small one, but it had to be rolled out gradually in order not to break user workflows.
So, when making changes like type system switching don't forget to spend a couple of extra weeks reading the fabulous manuals to save some time before uncovering the new technology potential.
Try it yourself #
Ready to get started with CloudQuery? You can try out CloudQuery locally with our quick start guide or explore the CloudQuery Platform (currently in beta) for a more scalable solution.
Want help getting started? Join the CloudQuery community to connect with other users and experts, or message our team directly here if you have any questions.
Plugins supporting Apache Arrow records with multiple rows #
Source plugins
Plugin name | Version | Notes |
---|---|---|
alicloud | v5.4.1 | |
apigee | v1.2.4 | |
aws | v27.3.0 | Manually updated |
awspricing | v4.2.2 | |
azure | v14.1.1 | |
azuredevops | v4.3.2 | |
backstage | v1.2.4 | |
bamboo-hr | v1.2.2 | |
bigquery | v1.4.2 | |
bitbucket | v2.0.2 | |
clickhouse | v1.0.0 | |
cloudflare | v9.2.2 | |
crowddev | v1.1.4 | |
datadog | v5.3.2 | |
digitalocean | v6.2.2 | |
entraid | v1.2.2 | |
facebookmarketing | v4.4.2 | |
fastly | v4.3.1 | |
file | v1.3.2 | |
file | v1.4.0 | Introduced rows_per_record option |
firestore | v4.2.0 | Introduced rows_per_record option |
gandi | v4.2.2 | |
gcp | v15.1.1 | |
github | v11.2.2 | |
gitlab | v7.1.2 | |
googleads | v3.5.0 | |
googleanalytics | v4.2.2 | |
heroku | v5.6.2 | |
homebrew | v4.3.2 | |
hubspot | v4.2.2 | |
jira | v1.5.4 | |
launchdarkly | v3.6.2 | |
mixpanel | v3.4.2 | |
mongodbatlas | v3.4.2 | |
mysql | v4.2.0 | Introduced rows_per_record option |
notion | v2.1.4 | |
okta | v5.2.2 | |
oracle | v7.0.2 | |
oracledb | v4.4.0 | Introduced rows_per_record option |
orca | v2.2.0 | |
pagerduty | v5.2.3 | |
plausible | v3.5.2 | |
postgresql | v3.0.0 | Pioneer of batching on source side |
render | v1.4.4 | |
s3 | v1.3.0 | Introduced rows_per_record option |
salesforce | v4.4.2 | |
sentinelone | v1.4.4 | |
servicenow | v1.3.2 | |
shopify | v5.4.2 | |
slack | v4.2.2 | |
snowflake-config | v2.1.2 | |
snyk | v6.3.2 | |
statuspage | v1.0.1 | |
stripe | v3.4.2 | |
tailscale | v5.2.2 | |
tempo-io | v1.2.2 | |
tenable | v2.0.2 | |
terraform | v4.4.2 | |
trello | v1.3.2 | |
vault | v2.4.2 | |
vercel | v3.4.2 | |
wiz | v2.0.3 |
Destination plugins
Plugin name | Version |
---|---|
azblob | v4.1.3 |
bigquery | v4.0.3 |
clickhouse | v4.1.2 |
duckdb | v5.9.2 |
elasticsearch | v3.3.2 |
file | v5.1.2 |
firehose | v2.5.2 |
gcs | v5.1.2 |
gremlin | v2.5.2 |
kafka | v5.0.1 |
meilisearch | v2.4.2 |
mongodb | v2.4.2 |
mssql | v4.6.4 |
mysql | v5.1.2 |
neo4j | v5.2.2 |
postgresql | v8.2.2 |
s3 | v7.2.3 |
snowflake | v4.1.2 |
sqlite | v2.9.2 |
test | v2.3.16 |
Written by Aleksandr Shcherbakov
Alex is a senior software engineer at CloudQuery who specialises in application development and cloud data infrastructure.