Using CloudQuery Within Apache Airflow
- Download CloudQuery binary: it will inspect the host's OS & ARCH and get the appropriate binary for the supplied version.
- Run the Sync: it will read our spec and run the CloudQuery sync.
- The version of the CloudQuery binary. It defaults to the current latest release: v6.4.1.
- The path to the sync's specification file. It defaults to the folder where the dag file is.
---
kind: source
spec:
name: "xkcd"
path: "cloudquery/xkcd"
version: "v1.0.6"
tables: ['*']
destinations:
- "sqlite"
spec:
---
kind: destination
spec:
name: sqlite
path: cloudquery/sqlite
registry: cloudquery
version: "v2.9.4"
spec:
# Make sure to use an absolute path here, because it's
# unlikely that Airflow will run this job as your user.
connection_string: "/your/user/path/Desktop/newdatabase.db"
# Task to run the CloudQuery sync command
@task(
dag=dag,
task_id='run_xkcd_to_sqlite_sync',
doc_md="""### Run XKCD to SQLite Sync
This task runs the CloudQuery sync command using the provided `spec_file_path`
parameter to sync XKCD data into a SQLite database.
It uses the path of the CloudQuery binary from the previous task.
""")
def run_xkcd_to_sqlite_sync(spec_file_path: str, cloudquery_path: str):
result = subprocess.run(
[cloudquery_path, 'sync', spec_file_path], capture_output=True
)
if result.returncode != 0:
raise Exception(
f"CloudQuery sync failed with return code {result.returncode}." +
" Output: {result.stdout.decode()}"
)
Written by Mariano Gappa
Mariano is a software engineer working at CloudQuery with 15 years of experience in the industry. His speciality is in improving performance and his work has reduced sync times and significantly improved CloudQuery's performance.