Tutorials
Pulling Stock Market Data with CloudQuery
In my recent presentation at State of Open Con, I wanted to push the envelope and go beyond what we usually do with CloudQuery. So I set my sights on a subject that is about as far away from the normal use cases of CloudQuery as possible - The Delayed Market Data from the London Stock Exchange.
This is that talk in blog format.
This data is fascinating, both because it contains individual transactions that were executed on the London Stock Exchange, and because it is data that they are legally required to provide but don’t want to - so they make it as awkward to retrieve as they can.
As regular readers of this blog will know, CloudQuery is a really powerful and flexible ELT Framework. If you’re a new reader, however, you might be thinking, “Did he just misspell ETL?” - and for that question I’d point you to some of our other content on ETL vs ELT.
Architecture #
Let’s start off with the High-Level Architecture of CloudQuery.
At the highest level, a CloudQuery Source Plugin is a gRPC service that collects data and provides it in an Apache Arrow-based tabular format.
This service and the tables it provides are used by the CloudQuery CLI (or Cloud if you prefer to reduce your DevOps workload) to collect data, which CloudQuery then uses to output via its destination plugins, which are gRPC services that provide a way to convert the tabular data to whatever storage format is needed.
This means that you can write a Source plugin once, and it will automatically work with any destination plugin without the need for any destination-specific code.
In theory, these plugins can be written from scratch to conform to the interface definition, but we strongly suggest plugin developers use the CloudQuery SDK as this abstracts away a lot of the complexity.
SDK Architecture #
So, let’s look at the architecture of a plugin that uses the Python SDK.
In the bottom right block, you’ll see that the SDK takes care of the gRPC interface, which, when called, initializes the Plugin Class and triggers the Scheduler to spawn the processing threads for each of the table resolvers.
In the upper left block, you’ll see the Table Resolvers that are responsible for iterating over the relevant generator from the API Client Class. These generators pull data from the target API, yielding one dictionary object per row. The Table Resolvers then perform any validation as well as any name and type munging needed for each Table row, before yielding the response back to the SDK.
The SDK then handles the conversion from the native dictionary object into the Apache Arrow structure defined in the Table Class, which it streams to the Orchestrator as quickly as it’s available.
This architecture enables plugins to process immense datasets as quickly as possible and without using significant amounts of RAM.
The Data #
Now that we have a high-level understanding of how plugins work, let’s take a look at the data we’re going to use for this project. To keep things simple, we’re only going to look at one of these endpoints for now, but you can find the others implemented in the linked project repository.
So, the first thing we want to do is download a CSV file for the XLON Post Trade data. These files contain every single transaction that occurred on the London Stock Exchange in a given one-minute-long trading window, as such there is one file for every minute the market is open.
This will give us an idea of the specific formatting, the available columns, and suitable data types. In an ideal world, we’d be able to get this information from a standards document or API spec, but LSE doesn’t seem to want to provide one for the free-delayed data, so here we are. Another downside of this free data is that it only covers the most recent trading day.
Complaints aside, let’s take a look a the data, here’s an example extract:
distributionTime;sourceVenue;instrumentId;transactionIdentificationCode;mifidPrice;mifidQuantity;tradingDateAndTime;instrumentIdentificationCodeType;instrumentIdentificationCode;priceNotation;priceCurrency;notionalAmount;notionalCurrency;venueOfExecution;publicationDateAndTime;transactionToBeCleared;measurementUnit;quantityInMeasurementUnit;type;mifidFlags
2024-01-30T16:40:05.520572Z;1;72057594038083673;928426642916541;1.23;100000;2024-01-30T16:40:05.519559Z;;VGG4392T1075;;GBX;;;AIMX;2024-01-30T16:40:05.520572Z;0;;;;,,,ALGO,,,
2024-01-30T16:40:05.521181Z;1;72057594038083673;928426642916543;1.23;302;2024-01-30T16:40:05.519559Z;;VGG4392T1075;;GBX;;;AIMX;2024-01-30T16:40:05.521181Z;0;;;;,,,ALGO,,,
2024-01-30T16:40:05.521260Z;1;72057594038083673;928426642916544;1.23;19855;2024-01-30T16:40:05.519559Z;;VGG4392T1075;;GBX;;;AIMX;2024-01-30T16:40:05.521260Z;0;;;;,,,ALGO,,,
Here, we can see that the data is
;
separated and uses 5 data types :- ISO8601 Date time (with microsecond/μs accuracy)
- unsigned int
- float - technically, the MiFID II / MiFIR specifies prices should be in fixed-point decimal (with 18/13 precision) - but again, this free data doesn’t appear to comply with such standards
- string
- CSV separated flags
There’s also a bunch of null fields that we can probably ignore.
If we parse this, transpose it to make it a little more legible, and annotate it with the data types, it looks like this:
Field Names | Datatypes | 1 | 2 | 3 |
---|---|---|---|---|
distributionTime | ISO8601 Datetime | 2024-01-30T16:40:05.520572Z | 2024-01-30T16:40:05.521181Z | 2024-01-30T16:40:05.521260Z |
sourceVenue | Unsigned Int | 1 | 1 | 1 |
instrumentId | Unsigned Int | 72057594038083673 | 72057594038083673 | 72057594038083673 |
transactionIdentificationCode | Unsigned Int | 928426642916541 | 928426642916543 | 928426642916544 |
mifidPrice | Float | 1.23 | 1.23 | 1.23 |
mifidQuantity | Unsigned Int | 100000 | 302 | 19855 |
tradingDateAndTime | ISO8601 Datetime | 2024-01-30T16:40:05.519559Z | 2024-01-30T16:40:05.519559Z | 2024-01-30T16:40:05.519559Z |
instrumentIdentificationCodeType | N/A - Null | - | - | - |
instrumentIdentificationCode | String (ISIN Format) | VGG4392T1075 | VGG4392T1075 | VGG4392T1075 |
priceNotation | N/A - Null | - | - | - |
priceCurrency | String (ISO4217 Format) | GBX | GBX | GBX |
notionalAmount | N/A - Null | - | - | - |
notionalCurrency | N/A - Null | - | - | - |
venueOfExecution | String | AIMX | AIMX | AIMX |
publicationDateAndTime | ISO8601 Datetime | 2024-01-30T16:40:05.520572Z | 2024-01-30T16:40:05.521181Z | 2024-01-30T16:40:05.521260Z |
transactionToBeCleared | Unsigned Int | 0 | 0 | 0 |
measurementUnit | N/A - Null | - | - | - |
quantityInMeasurementUnit | N/A - Null | - | - | - |
type | N/A - Null | - | - | - |
mifidFlags | CSV | ,,,ALGO,,, | ,,,ALGO,,, | ,,,ALGO,,, |
From the data, it appears the
transactionToBeCleared
field is always 0, the sourceVenue
field is always 1, and none of the fields marked here as N/A - Null
ever get populated.Dropping these rows leaves us with the following list of fields to collect:
Field Names | Datatypes |
---|---|
distributionTime | ISO8601 Datetime |
instrumentId | Unsigned Int |
transactionIdentificationCode | Unsigned Int |
mifidPrice | Float |
mifidQuantity | Unsigned Int |
tradingDateAndTime | ISO8601 Datetime |
instrumentIdentificationCode | String (ISIN Format) |
priceCurrency | String |
venueOfExecution | String |
publicationDateAndTime | ISO8601 Datetime |
mifidFlags | CSV |
The Table class #
So, looking at the SDK then, how does this translate into a Table class?
In the SDK, the Table class defines the structure of the Table row, which is used to both generate any migrations for the database and to define the Apache Arrow messages that carry the table row data.
If you start with a fresh copy of the Python Plugin Template, you’ll we’ll find a directory called
tables
, and in there, a module called items.py
In
items.py
you’ll find an example child of the Table
class, which we see has an __init__
definition, containing a name
, a human readable title
, and a list of columns
.If we use the data types above, clean up the field names to something that makes a little more sense, populate the
title
and name
fields, and rename the class to fit, we end up with something that looks like the following Table class:class XLONPostDelayed(Table):
def __init__(self) -> None:
super().__init__(
name="xlon_post_delayed",
title="LSE Post-trade Delayed",
is_incremental=True,
columns=[
Column("distribution_timestamp", pa.timestamp("us", ZoneInfo("UTC"))),
Column("trading_timestamp", pa.timestamp("us", ZoneInfo("UTC"))),
Column("transaction_id", pa.uint64(), primary_key=True, unique=True),
Column("instrument_id", pa.uint64()),
Column("isin_instrument_code", pa.string()),
Column("currency", pa.string()),
Column("price", pa.float64()),
Column("quantity", pa.uint64()),
],
)
@property
def resolver(self):
return XLONPostDelayedResolver(table=self)
You’ll notice here that I’ve reordered the fields a bit to group similar data together. This is an aesthetic decision that has no practical benefits beyond looking nicer.
It might be a good idea to convert the price back to a Fixed-point Decimal, as this is intended to improve calculation performance and ensure any rounding-based precision errors are in a known range independent of the software and hardware platforms used, but I’ll leave this up to you.
The API Client #
So, we have our table defined; now we need to implement the API client.
Since the London Stock Exchange seems to have decided to make it unreasonably difficult to get a copy of the free data, this one will be, unfortunately, interesting. As with so much of the free data available in the world, we will have to resort to a little web scraping.
Login #
Essentially, we need to scrape the login form to get the CSRF token so that we can login, get a session cookie, and start pulling data.
In an ideal world, they would implement an API key option, but as is the case with so much of the kind of data we want to collect, that’s not available in this case.
So, looking at the login page, we have a simple HTML form, so we’ll want to capture the hidden
_csrf
field.To do this, we’ll use
requests
and beautifulsoup
. The process here is fairly straightforward: Create a Session
; Use the Session
to pull the login page; Use beautifulsoup
to parse the login page and retrieve the _csrf
value; Post our login credentials and the CSRF token to the login page; and raise an exception if we get an error response.Again, in the Python Plugin Template we have a framework to build this into; In the directory called
example
you’ll find a module called client.py
that has a class called ExampleClient
. If we rename the class to LSEGClient
, set up our login fields as arguments, and write the __login
method it should look something like this:class LSEGClient:
def __init__(
self, username: str, password: str, base_url="https://dmd.lseg.com/dmd/"
):
self._base_url = base_url
self._username = username
self._password = password
self._session = self.__login()
def __login(self):
session = Session()
login_page = session.get(urljoin(self._base_url, "login.html"))
login_soup = BeautifulSoup(login_page.content, "html.parser")
csrf_token = login_soup.form.find("input", type="hidden")["value"]
result = session.post(
urljoin(self._base_url, "login.html"),
data={
"username": self._username,
"password": self._password,
"_csrf": csrf_token,
},
)
result.raise_for_status()
return session
Files and Generators #
Now that we have our logged-in session, we can write the code to pull the data.
Looking at the path for one of these files reveals a relatively simple pattern. Other than the directory path, it comes down to a table identifier followed by a DateTime.
If we look through these DateTimes, we’ll see that they are at minute intervals for today’s date starting at market-open (08:00) and will ultimately end at market-close (16:30).
Using this knowledge we can avoid additional web scraping and, we can build an iterator like this one:
def xlon_iterator(self) -> Generator[Dict[str, Any], None, None]:
dir_path = urljoin(self._base_url, "download/posttrade/LSE/FCA/")
end = datetime.combine(datetime.now().date(), time(16, 30))
cursor = datetime.combine(end.date(), time(8, 0))
if cursor.isoweekday() > 5:
return
while cursor < datetime.now() and cursor <= end:
response = self._session.get(
urljoin(
dir_path,
f"XLON-post-{cursor.year}-{cursor.month:02}-{cursor.day:02}T{cursor.hour:02}_{cursor.minute:02}.csv",
)
)
response.raise_for_status()
csv_file = response.content.decode()
reader = DictReader(csv_file.splitlines(), delimiter=";")
for row in reader:
yield row
cursor += timedelta(minutes=1)
The first thing it does is set up the cursor that’s going to tick through the known link URLs for the current day, returning early if we already know the market will be closed (i.e. it’s Saturday or Sunday). Then it loops over each minute between market-open to market-close pulling the CSV file, which it then decodes and parses, yielding each row to the resolver.
The Resolver #
Now, at this point, we have our API Client, and we have our
Table
Class, but we haven’t reconciled the difference between our Table
class’s column names and CSV files column names, which would lead to some problems if we didn’t correct it. This is where the resolver comes in.The resolver iterates over the response from the API Client, cleaning, renaming, and validating the data before yielding to the SDK.
By default, the resolver yields exactly what it receives, so if the names are aligned, and you don’t want to check the values are sane, you can skip this step. But since we chose slightly different names, and we don’t trust the quality of our data (given the number of null fields), we have to do something about it.
There are two ways we can tackle this:
- Change our
Table
class’s column names to match exactly the names that are in the API response - which is fine if those make sense and don’t change in the future - Align the names using the resolver, which we might need to do in the future anyway if the API response format changes
Since we should check for data conformity anyway, let’s do the latter.
To do this, the first thing we need to do is remap the names and ensure that our CSV data types are what we expect. Then, we’ll do some sanity checks that skip any potential nonsense rows.
class XLONPostDelayedResolver(TableResolver):
def __init__(self, table) -> None:
super().__init__(table=table)
def resolve(
self, client: Client, parent_resource: Resource
) -> Generator[Any, None, None]:
for item_response in client.client.xlon_iterator():
cleaned_row = {
"distribution_timestamp": datetime.fromisoformat(item_response.get("distributionTime")),
"trading_timestamp": datetime.fromisoformat(item_response.get("tradingDateAndTime")),
"transaction_id": int(item_response.get("transactionIdentificationCode")),
"instrument_id": int(item_response.get("instrumentId")),
"isin_instrument_code": item_response.get("instrumentIdentificationCode"),
"currency": item_response.get("priceCurrency"),
"price": float(item_response.get("mifidPrice")),
"quantity": int(item_response.get("mifidQuantity")),
}
if cleaned_row["trading_timestamp"] > datetime.now(tz=ZoneInfo("UTC")):
continue # trade has impossible datetime (it happened in the future)
if cleaned_row["isin_instrument_code"] is None:
continue # trade must have an instrument code
if cleaned_row["currency"] is None:
continue # trade must have a currency
if cleaned_row["quantity"] < 0:
continue # trade must have a positive quantity
yield cleaned_row
The Config Spec & Plugin Definition #
With that all sorted, we just need a way to provide the credentials to the API Client, to do this we need to adjust our ColoudQuery plugin’s config
Spec
and Plugin
interface.In the Python Plugin Template, you’ll find a directory called
client
in which you’ll find a module called client.py
, in which you’ll find the Spec
- a Dataclass that represents the fields we expect from our CloudQuery Config file.In this case, we’ll replace
access_token
with username and password, and we’ll adjust the base_url field’s default value. You can also add some validation to ensure that the username
and password
fields are populated.This should look something like:
@dataclass
class Spec:
username: str
password: str
base_url: str = field(default="https://dmd.lseg.com/dmd/")
concurrency: int = field(default=DEFAULT_CONCURRENCY)
queue_size: int = field(default=DEFAULT_QUEUE_SIZE)
def validate(self):
if self.username is None:
raise Exception("username must be provided")
if self.password is None:
raise Exception("password must be provided")
Finally, we need to tell the SDK how to represent our plugin to CloudQuery.
In the Python Plugin Template, you’ll find a module called
plugin.py
here, you’ll find a few constants (PLUGIN_NAME
, PLUGIN_VERSION
, and TEAM_NAME
) and a class called ExampleClient
. You’ll need to update these values appropriately to suit your plugins.Additionally, in the
get_tables
function of this class, you’ll find a list called all_tables
. This is where you list your instances of the Table
class - in this case, that’s just tables.XLONPostDelayed()
, but if you implement more than one table, you must also add these here.Note: If you’re working with tables that have relations (i.e., a table is the parent of one or more child tables), you only need to list the parent tables here. But this is out of the scope of this tutorial, so expect a blog post dedicated to relations in the near future.
PLUGIN_NAME = "lseg"
PLUGIN_VERSION = "0.0.1"
TEAM_NAME = "cloudquery"
PLUGIN_KIND = "source"
Running CloudQuery #
At this point, this single table version of our plugin is complete. We’ve implemented everything we need to pull the XLON Post data. So, how do we do that?
In the Python Plugin Template, you’ll find a file called
TestConfig.yaml
.We need to change the
name
of the Source plugin from comics
to lseg
(matching the name defined in instance of the Plugin
class). Then, we need to add the username
and password
fields, and since the base_url
field has a default value, we can comment that out or even delete it.Now that’s done, and assuming you have CloudQuery installed, you just need to run
cloudquery sync TestConfig.yaml.
Now, this is quite a lot of data that it’s going to pull, and it takes around 30 minutes to sync a full day’s worth of trading table data from the London Stock Exchange - on a good connection. But, if you started it after 16:45 London/UK time (taking daylight savings into account) when it’s done, you’ll have a complete copy of all the published raw trading data for that day.
What’s even more remarkable is that while this setup loads the data into an SQLite DB, changing it to use any destination plugin, you can find on hub.CloudQuery.io is as simple as updating the
destination
section of the TestConfig.yml file - no further work needed.For the completed code from this post, check out the
one_table
branchReady to get started with CloudQuery? You can try out CloudQuery locally with our quick start guide or explore the CloudQuery Platform (currently in beta) for a more scalable solution.
Want help getting started? Join the CloudQuery community to connect with other users and experts, or message our team directly here if you have any questions.
Written by Tim Armstrong
Tim is a developer advocate and software engineer with experience in creating content and tutorials for programming, application security, networking and devops.