Tutorials
Using CloudQuery Within Apache Airflow
If your team's workflow is based on Apache Airflow, introducing CloudQuery syncs into it is simple!
In this blog post, we'll go through setting up a basic Airflow DAG that syncs the whole catalog of XKCD comics into a local SQLite database using the SQLite destination integration. This DAG can be run entirely locally, and you don't need to pre-download CloudQuery (although if you'd like to, you can download CloudQuery here), SQLite or log in to anything.
We're gonna define two tasks in our DAG:
- Download CloudQuery binary: it will inspect the host's OS & ARCH and get the appropriate binary for the supplied version.
- Run the Sync: it will read our spec and run the CloudQuery sync.
The DAG will require two parameters:
- The version of the CloudQuery binary. It defaults to the current latest release: v6.4.1.
- The path to the sync's specification file. It defaults to the folder where the dag file is.
Let's use this basic spec, which uses XKCD as a source and SQLite as a destination
---
kind: source
spec:
name: "xkcd"
path: "cloudquery/xkcd"
version: "v1.0.6"
tables: ['*']
destinations:
- "sqlite"
spec:
---
kind: destination
spec:
name: sqlite
path: cloudquery/sqlite
registry: cloudquery
version: "v2.9.4"
spec:
# Make sure to use an absolute path here, because it's
# unlikely that Airflow will run this job as your user.
connection_string: "/your/user/path/Desktop/newdatabase.db"
The code required to run the sync is surprisingly simple:
# Task to run the CloudQuery sync command
@task(
dag=dag,
task_id='run_xkcd_to_sqlite_sync',
doc_md="""### Run XKCD to SQLite Sync
This task runs the CloudQuery sync command using the provided `spec_file_path`
parameter to sync XKCD data into a SQLite database.
It uses the path of the CloudQuery binary from the previous task.
""")
def run_xkcd_to_sqlite_sync(spec_file_path: str, cloudquery_path: str):
result = subprocess.run(
[cloudquery_path, 'sync', spec_file_path], capture_output=True
)
if result.returncode != 0:
raise Exception(
f"CloudQuery sync failed with return code {result.returncode}." +
" Output: {result.stdout.decode()}"
)
Upon running the DAG in the Airflow UI, the download task succeeds:
You can visualize the DAG and wait until it completes:
And voilà! The whole XKCD catalog is in our local SQLite database!
Check the full example here: https://github.com/cloudquery/airflow-dag-cloudquery
Written by Mariano Gappa
Mariano is a software engineer working at CloudQuery with 15 years of experience in the industry. His speciality is in improving performance and his work has reduced sync times and significantly improved CloudQuery's performance.