announcement
engineering

One change to optimize them all: a story about batching on the source side

Aleksandr Shcherbakov

Aleksandr Shcherbakov

At CloudQuery, we try to optimize the performance of our syncs so that all of the data is processed as fast as possible. After a last year's move from our custom in-house type system to Apache Arrow, we almost missed one massive opportunity to further optimize our data syncs...

Migration to Apache Arrow #

Before Arrow, we used our own type system that only supported around 14 different data types. However, we started hitting limitations in various use cases as we grew the number of supported data sources and destinations. Taking a step back, we decided to adopt Apache Arrow instead building yet another format.
XKCD Formats

Single resource per message #

Prior to our type system migration to Apache Arrow we used to send each resource synced in a separate streaming message over gRPC. Essentially, it looked like this:
Diagram showing data flow from an API to a source plugin using gRPC, then to a CLI, and finally to multiple destination plugins, each handling a single resource over gRPC.
When the type system change to Apache Arrow was completed, no significant regressions in performance were detected. That's why we decided that it's good enough and shipped it to all of our plugins.

Turns out having a single resource per message sent is actually tech debt #

As we had to do an upstream fix to Apache Arrow every now and then while expanding the support for the nested types and optimizing our codebase, we forgot to pay attention to the following:
Apache Arrow defines a cross language columnar format for flat and hierarchical data
It means that the Apache Arrow data processing is optimized for sending lots of rows at once, but we used it to send a single row at once.
Initially, we packed each resource that we sent over the wire into it's own Apache Arrow record, which contains:
  • schema: contains field schema describing how the destination side should decode the data
  • data: packed Apache Arrow arrays. This is the place where we can store many entries, but we simply overlooked this in the initial migration.
Particularly, it means that for every resource, or row in Apache Arrow, we would have a whole schema packed in (and unpacked on the destination side), spending tons of extra CPU cycles, RAM & network processing for every resource.
Essentially, to send N resources we would send N Apache Arrow records:
Diagram illustrating a data record structure with schema and fields, where each field contains an array of length 1.
When instead we could be sending a single Apache Arrow record with N rows:
Diagram illustrating a single data record structure with schema and fields, where each field contains an array of length N.

Sending multiple rows in a single record #

Once the issue was noticed, we started planning on how to fix this issue and better utilize resources when using Apache Arrow.
We even fell into trap once of collecting wrong metrics. Instead of focusing on the resource consumption savings, we saw that the sync time wasn't affected and we de-prioritized the issue until we retested in the constrained environment.
To implement batching multiple resources in a single message, we had to make the following steps:
  1. Ensure that all destination plugins support receiving Apache Arrow records with multiple rows. Initially we developed the destination plugins code with a single resource per message received in mind, and there were some places that needed to be updated.
  2. Wait for some time to allow the users to migrate to the newer destination plugin versions. Otherwise, the batching of resources into a single record would be a breaking change and would require users to update the destination plugins along with source plugins right away.
  3. Rollout the change to some select source plugins and thoroughly test.
  4. Rollout the change to all source plugins.
We implemented destination support along with changes to the test suite to ensure that we don't break anything in the future and then we waited.

Implementation details #

We decided to implement a simple batching with the cutoff by timeout and size. So, for each table being synced we collect a batch on the source side into a single Apache Arrow record, and we send it out only after the batch is full or the timeout has passed.
You can see the change in our Go SDK as well as a small follow-up decreasing allocations for logging.

Bugs, bugs, bugs everywhere in the destination plugins #

One of the features that the CloudQuery Go SDK provides is out-of-the-box batch writers for destinations. It allows batching the data on the destination side to be written to the destination (file, database, etc.) in a single API call rather than spamming the destination with small writes.
Yes, we thought about batching in the destination processing. No, we didn't put 2 and 2 together at the time. Yes, it happens, and it's OK.
While updating the destination plugins to support Apache Arrow records with multiple rows we had introduced several inconsistencies in our batch writers. Almost all of those were about assuming a single row in the record when deciding to start a new batch. And some were about data size as well.
The final fix for the destination plugins is present in v4.45.6.

Upgrading sources #

First we made a change to the PostgreSQL source plugin, introducing the rows_per_record option. It allows to manually select how many database rows would be packed into a single Apache Arrow record to be sent over the wire. After rolling this out we discovered the aforementioned issues with the destination plugins that we had to address first.
After the destination code was fixed we proceeded to test syncs with different settings & decided on a sane default of batching up to 50 rows into the record & sending the records at least once in 5 seconds.
Yes, it involved testing. Lots and lots of testing. Behold the amount of benchmarks we had to run to get here.
We enabled the batching on the source side by default since v4.48.0.
 

Rolling resource batching to select plugins #

AWS source plugin experiments #

One of the first plugins to receive the upgrade was the AWS source plugin. We chose this plugin as it's one of the most verbose (in terms of tables amount) and one of our most utilized plugins.
We saw that not only were less resources (CPU cycles, RAM and network bytes) being consumed during each sync, but also the sync time's decreased in the environments that are limited by CPU and network rather than the API response speed.
It wasn't obvious in the environment with only 550 thousand resources as the sync time stayed roughly the same (likely attributed to the AWS API response times):
Graph showing AWS source plugin GC metrics comparing version v27.1.0 and v27.3.0 syncing 550 thousand resources. The metrics include percentage of CPU time spent in GC and live heap size after GC over time from program start in seconds.
However, when we were able to collect the data from a large AWS organization (114 million resources), the difference in sync time and resources consumed was drastic:
Graph showing AWS source plugin GC metrics comparing version v27.1.0 and v27.3.0 syncing 114 million resources. The metrics include percentage of CPU time spent in GC and live heap size after GC over time from program start in hours.
Additionally, we compared the sync speed in different EC2 instance configurations. You can find the detailed comparison here.

Expanding the roster #

After we saw performance improvements with AWS source plugin, we extended the testing to the following list:
We saw great improvements both in the CPU consumption: Bar chart comparing CPU consumption by plugin for AWS, Azure, GCP, PostgreSQL, and S3 before and after a specific update. Each plugin shows a significant reduction in CPU time in the 'after' version.
As well as RAM: Bar chart comparing RAM consumption by plugin for AWS, Azure, GCP, PostgreSQL, and S3 before and after a specific update. Each plugin shows a significant reduction in RAM allocations in the 'after' version.

GA #

We are pleased to announce that all Golang-based CloudQuery plugins now support working with Apache Arrow records with multiple rows!
To enjoy the new smooth experience please use versions no older than the ones listed below.

Wrapping up #

Sometimes the huge improvements can be achieved by reading the documentation and making a data shaping change.
The change to introduce batching is a small one, but it had to be rolled out gradually in order not to break user workflows.
So, when making changes like type system switching don't forget to spend a couple of extra weeks reading the fabulous manuals to save some time before uncovering the new technology potential.

Try it yourself #

Try CloudQuery for free today and experience the optimized performance of our plugins firsthand. If you have any questions or want to discuss this post, connect with our engineering team on our Discord channel. We look forward to chatting with you!

Plugins supporting Apache Arrow records with multiple rows #

Source plugins
Plugin nameVersionNotes
alicloudv5.4.1
apigeev1.2.4
awsv27.3.0Manually updated
awspricingv4.2.2
azurev14.1.1
azuredevopsv4.3.2
backstagev1.2.4
bamboo-hrv1.2.2
bigqueryv1.4.2
bitbucketv2.0.2
clickhousev1.0.0
cloudflarev9.2.2
crowddevv1.1.4
datadogv5.3.2
digitaloceanv6.2.2
entraidv1.2.2
facebookmarketingv4.4.2
fastlyv4.3.1
filev1.3.2
filev1.4.0Introduced rows_per_record option
firestorev4.2.0Introduced rows_per_record option
gandiv4.2.2
gcpv15.1.1
githubv11.2.2
gitlabv7.1.2
googleadsv3.5.0
googleanalyticsv4.2.2
herokuv5.6.2
homebrewv4.3.2
hubspotv4.2.2
jirav1.5.4
launchdarklyv3.6.2
mixpanelv3.4.2
mongodbatlasv3.4.2
mysqlv4.2.0Introduced rows_per_record option
notionv2.1.4
oktav5.2.2
oraclev7.0.2
oracledbv4.4.0Introduced rows_per_record option
orcav2.2.0
pagerdutyv5.2.3
plausiblev3.5.2
postgresqlv3.0.0Pioneer of batching on source side
renderv1.4.4
s3v1.3.0Introduced rows_per_record option
salesforcev4.4.2
sentinelonev1.4.4
servicenowv1.3.2
shopifyv5.4.2
slackv4.2.2
snowflake-configv2.1.2
snykv6.3.2
statuspagev1.0.1
stripev3.4.2
tailscalev5.2.2
tempo-iov1.2.2
tenablev2.0.2
terraformv4.4.2
trellov1.3.2
vaultv2.4.2
vercelv3.4.2
wizv2.0.3
Destination plugins
Plugin nameVersion
azblobv4.1.3
bigqueryv4.0.3
clickhousev4.1.2
duckdbv5.9.2
elasticsearchv3.3.2
filev5.1.2
firehosev2.5.2
gcsv5.1.2
gremlinv2.5.2
kafkav5.0.1
meilisearchv2.4.2
mongodbv2.4.2
mssqlv4.6.4
mysqlv5.1.2
neo4jv5.2.2
postgresqlv8.2.2
s3v7.2.3
snowflakev4.1.2
sqlitev2.9.2
testv2.3.16
Subscribe to product updates

Be the first to know about new features.


© 2024 CloudQuery, Inc. All rights reserved.