In this post, you will learn how to run an open-source high-performance extract, load, transform (ELT) framework,
CloudQuery, with an orchestrator, in this case,
Dagster. The cool part here is that you will be running CloudQuery completely inside Dagster itself, using Dagster resource management and scheduling capabilities. By running it completely locally, this means that you won’t be incurring any additional cloud costs. This is mostly possible due to CloudQuery and CloudQuery plugins design that can be run as single binaries without any third-party dependencies.
Pre-requirements #
It is recommended that you have a basic understanding of both
Dagster and
CloudQuery.
Writing a simple data pipeline #
In the first step, you will write the simplest pipeline consisting of two assets one for ingesting
HackerNews data and one for creating a table that analyzes the most recent comments using
DuckDB.
Full source available
here, but let’s go step by step:
Configuring CloudQuery #
Firstly, let’s configure an ingestion pipeline with CloudQuery that ingests data from
HackerNews to
DuckDB. Configuration will look the following:
kind: source
spec:
name: "hackernews"
path: "cloudquery/hackernews"
registry: "cloudquery"
version: "v3.1.9"
tables: ["*"]
destinations:
- "duckdb"
spec:
item_concurrency: 100
start_time: "2024-02-02T00:00:00Z"
\---
kind: destination
spec:
name: duckdb
path: cloudquery/duckdb
registry: cloudquery
version: "v5.6.3"
spec:
connection_string: ./example.db
You can run the following simple configuration with CloudQuery locally:
cloudquery sync ./config.yml
This will start fetching all Hacker News items and comments starting from February 2, 2024. This process might take a while, so feel free to cancel the process while it’s running.
If you want to use your
MotherDuck account then you just need to make a change to
connection_string
to be
md:?motherduck_token=<token>
, as described
here.
Creating a software defined asset in Dagster running CloudQuery #
To run CloudQuery inside Dagster we are going to write a
software defined asset that will execute CloudQuery with the same configuration as above.
The code is available
here.
@asset
def ingest_hn(context: AssetExecutionContext) -> None:
ret = None
with tempfile.NamedTemporaryFile(mode='w+t') as temp_file:
temp_file.write(CQ_HN_SPEC)
temp_file.flush()
if CQ_MIGRATE_ONLY:
ret = execute_shell_command(f"cloudquery migrate --log-console {temp_file.name}", output_logging="STREAM", log=context.log)
else:
ret = execute_shell_command(f"cloudquery sync --log-console {temp_file.name}", output_logging="STREAM", log=context.log)
if ret[1] != 0:
raise Exception(f"cloudquery command failed with exit code {ret[1]}")
In this Dagster asset, you will use [execute_shell_command](https://docs.dagster.io/_apidocs/libraries/dagster-shell#dagster_shell.execute_shell_command)
which executes cloudquery sync
just like in the prior step with the same configuration.
This makes it more convenient to debug, avoids any additional abstraction layer, and gets CloudQuery running inside Dagster with the output being redirected to Dagster logging.
For testing purposes, this asset includes an additional environment variable called CQ_MIGRATE_ONLY
which is useful to enable in testing so the pipeline can run CloudQuery with the same configuration and only create the tables without running a full sync, this will also enable downstream assets to run successfully, as the schema will be available (just without any data).
The spec is defined statically in CQ_HN_SPEC in
constants.py and is parametrized using an environment variable we can control called
DUCKDB_CONNECTION_STRING and we can set it locally to a local file and in production to MotherDuck database.
Creating a downstream analytics asset #
Now we can create a simple view/table each time we sync new items from CloudQuery by utilizing the standard Dagster DuckDB resource and executing this simple query in a dependent
asset:
CREATE OR REPLACE TABLE hackernews_oss_items AS
SELECT
title,
score
FROM
hackernews_items
WHERE
parent = 0
AND (
title ILIKE '%open source%'
OR
title ILIKE '%open-source%'
)
ORDER BY
score DESC
LIMIT 20;
Testing #
The cool part is that we can run simple smoke tests with a
few lines of code and this will execute the whole pipeline 100% locally and with almost identical configuration. This can save tons of money during development and CI (imagine spinning up compute just to run smoke tests for every PR 💸).
Running #
To run this locally, follow the
README and just execute
dagster dev
. To run this in the cloud please check out
Dagster Cloud documentation.
Resource Management #
Apart from being able to run everything locally, speed up development, and save costs, we can also take advantage of Dagster CPU management. For example, if you are using the ECS launcher you can define the CPU and memory for each ingestion/CloudStep as
described here (Note: This is an advanced Dagster feature, so proceed with caution).
Summary #
This tutorial demonstrated how to run CloudQuery entirely within Dagster, leveraging its resource management and scheduling capabilities to ingest, store, and analyze data locally with DuckDB.
This setup accelerated development, reduced cloud costs, and maintained flexibility for various databases, showcasing an efficient and scalable approach to data pipeline orchestration. This example is not limited only to DuckDB, but can work well with any other database that can run locally or in production easily such as PostgreSQL, ClickHouse, and
many others.
If you have questions about this post or about CloudQuery, the best way to get connected with us is on the
CloudQuery Community.
The best way to get started is by trying the
CloudQuery Platform, or by setting up
CloudQuery locally.
FAQs #
Q: Can I use other databases other than DuckDB with CloudQuery?
A: Yes, the setup is flexible and can work with other databases that can run locally or in production, such as PostgreSQL, ClickHouse, and others. You just need to adjust the configuration settings to match your chosen database.
Q: How can I test my pipeline without incurring cloud costs?
A: You can run the entire pipeline locally using Dagster and CloudQuery, which allows you to test without additional cloud costs. Utilize the CQ_MIGRATE_ONLY environment variable for testing to create the necessary tables without running a full data sync.
Q: What are the benefits of using Dagster with CloudQuery?
A: Using Dagster with CloudQuery enables you to use your own orchestrator (especially if you already use Dagster) and use its enhanced resource management, scheduling capabilities, and the ability to run everything locally. This integration speeds up development, reduces costs, and provides a robust framework for managing and orchestrating data pipelines.