Creating a New JavaScript Source Plugin from Scratch
What You Need to Know #
Prerequisites #
The Plan #
- The plugin should support reading a set of CSV files from a directory.
- Each file will be synced as a single table in the destination.
- The column names will be read from the first row.
- The plugin should recognize basic types - I will start with numbers and everything else will be a string.
kind: source
spec:
name: 'text-file'
registry: 'grpc' # for local testing only, this will change once the plugin is published
path: 'localhost:7777' # same as above
version: 'v1.0.0'
tables: ['*']
destinations:
- 'sqlite'
spec:
path: 'test_data/' # specify folder or a single file
csvDelimiter: ';'
---
kind: destination
spec:
name: sqlite
path: cloudquery/sqlite
registry: cloudquery
version: 'v2.10.3'
spec:
connection_string: ./db.sql
Getting Started #
npm install
npm run dev # this will run the plugin in dev mode as a server listening on localhost:7777
cloudquery sync sync.yml #run this in a new terminal tab
db.sql
with table Names
and two records. You can open this file using DB Browser for SQLite or any other SQLite client.What's inside #
src
folder:main.ts
- The main wrapper for the plugin. It's responsibility is to start the plugin the CLI will communicate with when running the sync.plugin.ts
- The body of the plugin. The main responsibility of this module is to return an initialized instance of the plugin to serve.spec.ts
- This module is responsible for handling and validating plugin configuration.tables.ts
- This is the module where the main work happens and the one we'll be working with the most.tables.test.ts
- Sample unit tests for the module above.
How it all works #
sync
function and expect the plugin to load the tables. We will be implementing the functions to return the tables back to the plugin client.newPlugin
function.plugin.ts
) takes care of creating a newClient
function that will be called by the SDK and it will connect the plugin configuration passed in spec
with the getTables
where the actual implementation will happen.const newClient: NewClientFunction = async (logger, spec, { noConnection }) => {
pluginClient.spec = parseSpec(spec);
pluginClient.client = { id: () => 'cq-js-sample' };
if (noConnection) {
pluginClient.allTables = [];
return pluginClient;
}
pluginClient.allTables = await getTables();
return pluginClient;
};
pluginClient.plugin = newPlugin('cq-js-sample', version, newClient);
getTables
is a function exported from the tables
module. It is an asynchronous function returning an array of tables that will be passed to the destination plugin. In this sample plugin, it only returns one table.// tables.ts
export const getTables = async (): Promise<Table[]> => {
const table = await getTable();
return [table];
};
tableResolver
, which takes care of writing to a stream provided by the SDK.const columnNames = ['First Name', 'Last Name'];
const tableRecords = [
{ 'First Name': 'Jack', 'Last Name': 'Bauer' },
{ 'First Name': 'Thomas', 'Last Name': 'Kirkman' },
];
// ...
const tableResolver: TableResolver = (clientMeta, parent, stream) => {
for (const r of tableRecords) stream.write(r);
return Promise.resolve();
};
return createTable({ name: 'Names', columns: columnDefinitions, resolver: tableResolver });
tableRecords
from a CSV file.Let's Code #
Plugin configuration #
- a path to where the files are; mandatory
- a CSV delimiter; defaults to ","
spec:
path: 'test_data/'
csvDelimiter: ';'
spec.ts
file:const spec = {
type: "object",
properties: {
concurrency: { type: "integer" },
path: { type: "string" },
csvDelimiter: {type: "string" },
},
required: ["path"],
};
const ajv = new Ajv.default();
const validate = ajv.compile(spec);
export type Spec = {
concurrency: number;
path: string;
csvDelimiter: string;
};
export const parseSpec = (spec: string): Spec => {
const parsed = JSON.parse(spec) as Partial<Spec>;
const valid = validate(parsed);
if (!valid) {
throw new Error(`Invalid spec: ${JSON.stringify(validate.errors)}`);
}
const {
concurrency = 10_000,
path = "",
csvDelimiter = ",",
} = camelcaseKeys(parsed);
! return { concurrency, path, csvDelimiter };
};
getTables
function in tables.ts
, so we can add it as an argument there. We'll also add a logger and pass it from the plugin initialization.export const getTables = async (logger: Logger, spec: Spec): Promise<Table[]> => {
const table = await getTable();
return [table];
};
Reading the CSV files #
fs
module to get the list of files in the provided path
and then load the files using csv-parse
. We will add a function to get the CSV file paths and another one to parse a file.csv-parse
package using npm install csv-parse
.//update imports:
import type { Logger } from 'winston';
import fs from 'node:fs/promises';
import Path from 'node:path';
// ...
const getCsvFiles = async (logger: Logger, path: string): Promise<string[]> => {
const stats = await fs.stat(path);
if (stats.isDirectory()) {
const files = await fs.readdir(path, { withFileTypes: true });
return files.filter((f) => f.isFile()).map((f) => Path.join(path, f.name));
}
logger.error('Target path is not a directory.');
return [];
};
const parseCsvFile = async (path: string, csvDelimiter: string): Promise<string[][]> => {
const content = await fs.readFile(path);
return new Promise<string[][]>((resolve, reject) => {
parse(content, { delimiter: csvDelimiter }, (error, records) => {
if (error) {
reject(error);
return;
}
resolve(records);
});
});
};
Converting the raw CSV data to a table #
getTables
function needs to an array of Table
objects so we will need to convert our raw data. We can use the SDK's createTable
function which requires us to pass a few options:- table name
- column definitions
- table resolver - a function that will write the actual records to a provided stream
const getColumnResolver = (c: string): ColumnResolver => {
return (meta, resource) => {
const dataItem = resource.getItem();
resource.setColumData(c, (dataItem as Record<string, unknown>)[c]);
return Promise.resolve();
};
};
Utf8
as the default column type for now.getTable
that serves as an example so let's modify it to write the data from the CSV file into the stream:const getTable = async (rows: string[][], tableName: string): Promise<Table> => {
if (rows.length === 0) {
throw new Error('No rows found');
}
const getRecordObjectFromRow = (row: string[]) => {
const record: Record<string, string> = {};
for (const [index, element] of row.entries()) {
record[columnNames[index]] = element;
}
return record;
};
const columnNames = rows[0];
// convert all rows except column definitions to an array of Record<string, string> objects
const tableRecords = rows.filter((_, index) => index > 0).map((r) => getRecordObjectFromRow(r));
const columnDefinitions: Column[] = columnNames.map((c) => ({
name: c,
type: new Utf8(),
description: '',
primaryKey: false,
notNull: false,
incrementalKey: false,
unique: false,
ignoreInTests: false,
resolver: getColumnResolver(c),
}));
const tableResolver: TableResolver = (clientMeta, parent, stream) => {
for (const r of tableRecords) stream.write(r);
return Promise.resolve();
};
return createTable({ name: tableName, columns: columnDefinitions, resolver: tableResolver });
};
Connecting it all together #
getTables
function to load the CSV files and convert them to tables. This is the place where I will use pMap
to use the concurrency provided in the spec. It is probably not necessary in this case but it is a good practice to use it when you are doing any IO operations.import pMap from 'p-map';
// ...
export const getTables = async (logger: Logger, spec: Spec): Promise<Table[]> => {
const { path, csvDelimiter, concurrency } = spec;
const files = await getCsvFiles(logger, path);
logger.info(`done discovering files. Found ${files.length} files`);
const allTables = await pMap(
files,
async (filePath) => {
const csvFile = await parseCsvFile(filePath, csvDelimiter);
return getTable(csvFile, Path.basename(filePath, '.csv'));
},
{
concurrency,
},
);
return allTables;
};
plugin.ts
and update the call to getTables
to pass the spec:const newClient: NewClientFunction = async (logger, spec, { noConnection }) => {
pluginClient.spec = parseSpec(spec);
pluginClient.client = { id: () => 'cq-js-sample' };
if (noConnection) {
pluginClient.allTables = [];
return pluginClient;
}
pluginClient.allTables = await getTables(logger, pluginClient.spec);
return pluginClient;
};
First test #
sync.yml
to add the required configuration:kind: source
spec:
name: 'text-file'
registry: 'grpc'
path: 'localhost:7777'
version: 'v1.0.0'
tables: ['*']
destinations:
- 'sqlite'
spec:
path: 'test_data'
csvDelimiter: ','
#...
Name,Count,Age
Peter,3,1.5
Mike,5,2.3
Jack,2,3.2
sync.yml
in the project.$ npm run dev
> @cloudquery/[email protected] dev
> ts-node --esm src/main.ts serve
[2023-09-28T09:57:13.324Z] info server running on port: 7777
$ cloudquery sync sync.yml
Loading spec(s) from sync.yml
Starting sync for: text-file (grpc@localhost:7777) -> [sqlite (v2.4.9)]
/ Syncing resources... (12772/-, 2141 resources/s) [11s]
Sync completed successfully. Resources: 22463, Errors: 0, Warnings: 0, Time: 14s
db.sql
file should contain new tables with the data from the CSV files. If you imported the example above, it should be created as a table with CloudQuery columns _cq_sync_time
and _cq_source_name
and three columns of type TEXT
with the names Name
, Count
, and Age
.Custom column types #
Float64
type for floating-point numbers and Int64
for integers.DataType
objects.import type { DataType } from '@cloudquery/plugin-sdk-javascript/arrow';
import { Utf8, Int64, Float64 } from '@cloudquery/plugin-sdk-javascript/arrow';
//...
const getColumnType = (value: string): DataType => {
const number = Number(value);
if (Number.isNaN(number)) return new Utf8();
if (Number.isInteger(number)) return new Int64();
return new Float64();
};
const getColumnTypes = (row: string[]): DataType[] => {
return row.map((value) => getColumnType(value));
};
getTable
function to use the getColumnTypes
and to determine the proper type for the data records:const getTable = async (
rows: string[][],
tableName: string,
): Promise<Table> => {
if (rows.length === 0) {
throw new Error("No rows found");
}
const columnNames = rows[0];
const getRecordObjectFromRow = (row: string[]) => {
const record: Record<string, string | number> = {};
for (const [index, element] of row.entries()) {
record[columnNames[index]] = Number.isNaN(Number(element)) ? element : Number(element);
}
return record;
};
const columnTypes = rows.length > 1 ? getColumnTypes(rows[1]) : rows[0].map(()=>new Utf8());
// convert all rows except column definitions to an array of Record<string, string> objects
const tableRecords = rows.filter((_, index) => index > 0).map((r)=>getRecordObjectFromRow(r));
const columnDefinitions: Column[] = columnNames.map((c, index) => ({
name: c,
type: columnTypes[index],
description: "",
primaryKey: false,
notNull: false,
incrementalKey: false,
unique: false,
ignoreInTests: false,
resolver: getColumnResolver(c),
}));
db.sql
file, restart the plugin, and test the sync again. This time, the db.sql
file should contain the right type for your number columns: TEXT
, INTEGER
, and REAL
.Release #
npm run package:container
command to build it.sync.yml
file:spec:
name: "cq-js-sample"
registry: "docker"
path: "cq-js-sample:latest" # this is the name and tag of the docker image to be pulled.
version: "v1.0.0"
tables:
["*"]
// ...
Resources #
csv-file-sync
branch of the repository above to get to the final code for our CSV plugin.Other JavaScript Source Plugin Examples #
Written by Michal Brutvan
Michal is CloudQuery's senior product manager and has responsibility for new features and CloudQuery's product roadmap. He has had a wealth of product ownership roles and prior to that, worked as a software engineer.