Bulletproof Data Integration: How To Build The Best Incremental Data Pipeline From Oracle To Snowflake
on 16 Jun, 2020. Last
updated 18 Jan, 2021
In this post, I’d like to talk about the essential elements of incremental data pipelines and how they can migrate Oracle data to Snowflake reliably, in a maintenance-free way.
If you’re an experienced data engineer, you might be thinking this is all obvious stuff and that pipelines are easy. If this is you, I have a quick summary of the stand-out, hot topics that I’m going to cover below:
How to minimise future maintenance by creating incremental extracts that are stateless and run the same for initial loads as they do for ongoing deltas
How simple manifest files allow you to increase the speed of your extracts and avoid data loss
Why we shouldn’t extract source data newer than the oldest database transaction start time
In my previous two articles, I’ve shown the complexity and limitations around using Pentaho to migrate Oracle data to Snowflake in an incremental way. So this also an opportunity to shine a light on my old Pentaho pipeline and its fundamental flaws, especially if the goal is to make something maintenance-free and permanently cheap to run.
After that, I offer a simple alternative that reduces all of the complexity of an incremental pipeline down to a single command.
But What About Other Patterns?
Now, you might be wondering about the other common data integration patterns that we need, like those below.
When I started writing this article, I wanted to cover them here for completeness. But I think it will be too much to focus on for one post so I’ll come back to them in future.
Please do let me know in the comments below which of these you find most challenging in your experience while working with Snowflake:
Snapshots (replacing or appending records to the target)
Synchronising objects (make the target look the same as the source)
Slowly changing dimensions
Before we get going, I must say this blog discusses data flow steps at length, so grab a cuppa now or check out if the above doesn’t resonate with you.
So let’s do this.
The Ideal Incremental Extract
Here’s a simplified diagram of the incremental extract process.
Now let’s dive into the detail around each step. Some of this is going to sound obvious, but things get more tricky in step-5. Hang in there!
Step 1 - Check The Target
First of all, we need to connect to the target Snowflake database to find out what data is already available there, so we can go back to the source and fetch the records needed to update the target.
This is easy if the target is a SQL database, as we can query it. But if the target is actually an S3 bucket instead of Snowflake then we need to rely on the naming convention of objects. So a date-time or sequence needs to be found by listing the bucket contents, sorting it and using a regular expression to extract the maximum date-time/sequence required.
Either way, there are a couple of scenarios here:
It might be that the target is empty, so we need to fetch everything from the beginning of time, or whatever sequence the primary key is based on.
It might be that we just need stuff that’s changed since we last looked.
We must choose a date-time or sequence number to get going. If there isn’t any data in the target yet, we choose a default “zero” value that works with the source system.
The aim is to rely on the target data to keep all of the state required to operate the pipeline. This is preferred over supplemental metadata like another database table or flat file, as that creates a maintenance overhead when processes inevitably fail in future.
Teams spend 80% of their time maintaining systems instead of adding new features to them so we need to work hard not to introduce any maintenance vectors, even if we think they’re small. They quickly add up, preventing a team from scaling over time and that increases run costs.
Step 2 - Fetch The Source Data
Next, we go to the source to fetch all data more recent than our date-time or sequence found in step-1 (this may be the default “zero” value).
If we’re using date-times to track changes, we need to ensure we don’t select any records newer than the oldest transaction start time, since those open transactions may have uncommitted “old” data.
If we don’t handle this, we’ll end up with data in the target that’s newer that the “old” records that are yet to be committed to the source. The next time our pipeline fires up, it will go around the loop (step-1) to figure out where it left off and will never be able to fetch the “old” records from the source. Put simply, we’ll end up with missing data in the target!
In the past, I’ve run into resistance from developers around this concept, but it’s essential if we want to build a bulletproof, maintenance-free system. The goal is to be able to run our pipelines at any time, without the need for human intervention to decide what data is or isn’t missing after a failure of any kind. That intervention slows our business down, adds to costs and is entirely avoidable.
Step 3 - Extract Data In Chunks
So now we know the range of records we need to extract from the source to get the target up-to-date. There’s another problem, though. We don’t know the density of data in the source so we can’t just select all of it in one go and assume it’s going to be okay.
The more records there are, the longer it will take and — in the case of Oracle — we want to avoid generating ORA-1555 “snapshot too old” errors, which are produced if there’s a lot of change made to the source data while we’re reading it.
A stretch goal would be to make the pipeline automatically determine the chunk size based on data density, rate-of-change of the records and system configuration parameters like “undo” tablespace size.
This may be pushing boundaries, as database owners who have privileges to understand the database infrastructure are often separated from the application developers. To resolve this, we’ll need some joined up thinking. The more joined up we can be here, the less maintenance we’ll need to do to keep data in sync in future.
If you have any experience of how achievable this is, it’d be great to know your thoughts in the comments below.
Step 3.1 - Write CSV Files With Optimal Size
The CSV file format is a good start. When writing them, they may rotate out into smaller subparts depending on how big each chunk from step-3 is and how big we want each file to be.
It may also be that we choose to rotate the CSV files based on number of rows per file instead.
The CSV files need to be compressed as well and gzip is a good format. It provides a high compression ratio while still being fast to load into Snowflake. Here’s a discussion on the performance of loading various file formats.
Splitting files like this helps Snowflake ingest data in a performant way. The last time I looked, it was recommended to use 10-100 MB per file.
I need to clarify the facts around why sizes matter so much, and I wonder if it’s similar to the implications around HDFS block sizes.
If files are too small then space is wasted as we under-fill them, or when files are too large then we fill some blocks entirely, but spill over into other blocks that are essentially partially used. Either way, space is wasted somewhere.
I don’t think this is the case with S3 though. If anyone has some good info on this, it would be great to hear about it.
We may also choose to parallelise extracts where CSV files are generated out-of-order. But this has serious implications and we will need to take extra care to ensure a set of files is complete. That’s the hot topic in step-5.
Step 4 - Copy The Files To S3 or a Snowflake Stage
If we’re planning to keep our options as open as possible for future data processing, it would be wise to start building a data lake as part of the migration away from Oracle. After all, we may want to use more analytics services than just Snowflake in future.
If this is true, we would need to start collecting our data files in an S3 bucket (or equivalent cloud storage; it’s not all about AWS) configured in Snowflake as an external stage.
If, on the other hand, we don’t want the overhead of managing bucket contents then we can copy the data files directly to Snowflake internal stages instead.
As the CSV files records are produced, we have a couple of choices. The obvious one is to write files to local storage, copy them to the target cloud storage service when they are complete and delete the local files as we go. This keeps as much local free space as possible.
Unfortunately with this approach, engineers still need to consider the amount of local storage available to hold the extracts temporarily. This is something we’d like to avoid, as any kind of thinking requires expertise and ultimately slows a team down. It just costs more 👎
Where AWS S3 is used, an improvement would be to use the multi-part upload API to minimise the amount of local storage required and essentially stream data straight to S3. I suppose there are equivalent concepts available in Azure and GCP too, but I’m not going to investigate them right now.
Step 5 - Write A Manifest File - Bulletproofing The Feed
This one is controversial. Do we even need to care? Well, there are a few variables that determine whether it’s an essential element or just an optional part of a bulletproof pipeline.
So what is a manifest file? It’s a file that acts as a receipt or flag to show that a chunk of data extracted in step-3 was written to multiple files, completely, during step-4.
Before we decide whether we need a manifest, let’s define two types of pipeline:
Synchronous — where all pipeline elements run in parallel and the output of one element is the input of another at run-time. For those who know Pentaho Data Integration, this is a “transform”.
Asynchronous — where the outputs of each element are persisted somewhere for subsequent elements to fetch and use asynchronously. For those who know Airflow, I’m thinking of DAG “tasks” that run in series or parallel.
If we have any of the following situations, we’re going to need one:
CSV files generated out-of-order, as you would find if you parallelise step-3 above for maximum performance
An unknown quantity of CSV files per chunk generated by step-3
The simple reason why we need a manifest is that, if a pipeline only writes a partial CSV file or incomplete set of files, due to a failure, then it breaks either step-1 or step-6.
By way of some examples, I’ve come up with four reasons when a manifest is essential:
If S3 is our target system, we’re relying on the object naming convention to get a good result in step-1 so we can go back to the source and fetch the latest data. It’s important that we flag that a set of object names can be trusted as complete, else we’ll end up with missing data in the target.
To maximise the speed of extracts in step-3 we can parallelise our SQL queries. If we do this, it means CSV files can be produced out-of-order. Remember, because the CSV files are split based on size or row count, we don’t know how many of them are required for a complete set. So the manifest file essentially flags that a set of object names can be trusted to represent a known range of records AND that all records in the range are present.
If Snowflake is the target, instead of cloud storage, we must only load data for a range of records once they have all become available. We want to minimise the risk that if we load data for a recent date or sequence number then we don’t fail to load data for an earlier date within the same chunk. If this happens, we will end up missing data in the target due to the logic of step-1.
Any analysis done directly against the S3 bucket contents must be aware that partial data may exist due to a prior failure of an extract job. Without a manifest to flag what can be trusted, the external analysis has no way of knowing what is complete or what is not.
Wow, that was a lot of detail! Not much further to go now before we can wrap this up…
Put simply, any objects in the bucket that are not named in a manifest file can be considered junk straight away.
Ultimately, by using a manifest file, we’re trying to reduce the risk that we end up with partial data in the target.
We don’t want to have to run ANY reconciliation jobs that fill in data gaps later. They are expensive both in terms of developer time and compute cost where Snowflake is concerned. They should be avoided at all costs if we want to scale the number of data pipelines over time and avoid accumulating tech debt.
Having said all this, what’s the counter argument? Are manifests a waste of time and when can we avoid writing them?
If the following statements are true then we can ignore them:
We don’t care about keeping data in cloud storage for future analysis (Snowflake will be the ultimate target for the data, not a data lake)
We are happy to extract and load data into Snowflake in order, not in parallel
The number of CSV files generated by step-3 for a chunk of data is deterministic
If you got this far and you’re still with me, thanks very much for reading through step-5! It’s a big topic and has plagued my mind on many occasions so I’m glad I finally committed it to words. This is a problem created because we’re dealing with flat files again. When data just stays inside a good RDBMS that’s ACID-compliant instead, these problems just disappear!
Step 6 - Load The Correct Data
We’re getting close to the end result we want — data in Snowflake!
If we want to be maintenance free, we know we can’t use the “load history” in Snowflake to perform a blanket COPY INTO operation, since there may be junk in the staging area due to a pipeline failure. It might be unlikely, but it could happen.
The blanket COPY INTO would automatically search for files that it thinks are new and ingest them, but it’s not right for us.
It’s far better for our pipeline to tell Snowflake exactly what files to load, and in reality this shouldn’t be hard since the pipeline is the thing that wrote them in the first place!
So weread manifest files that are newer than the latest Snowflake contents and load the correct CSV data files into our target table.
We have to be smart about this, as there may be changed records in the CSV files as well as new ones. We can’t just insert the data and forget about it. We will need to join the staged records to the existing ones on a known key and update the target where there’s a match. For the records that don’t match by our key, we can just insert them.
The great thing about Snowflake is that is lets you reference the contents of the CSV files in SQL queries and the performance is great!
If we don’t need a manifest because we’re loading data in-order in a synchronous pipeline and we’ve accepted the performance hit, then we just posting one file at a time to Snowflake.
Either way, our target remains consistent without any data gaps.
And that just about sums up what a bulletproof incremental data pipeline needs to do.
What About Deleted Records?
But this doesn’t take care of records that are deleted in the source.
They’re not always a problem in data warehouses as records are often just accumulated, but what’s the best way to handle them in case we need to?
There are three options. We can:
Use transaction log mining
Add light-weight triggers to log deleted keys
Soft-delete records instead, if the application is in your control
You might be thinking it would be easier if we just used a log mining solution, but the decision to go down that road shouldn’t be taken lightly.
I’ve written another post
on the pros and cons of log mining vs the use of triggers. It includes some performance stats to show the true impact of option-2.
Long story short, if you prefer low-cost, agile solutions then you should avoid log mining and use option-2 if you can.
What’s Missing From The Pentaho Pipeline?
Now let’s check how well the Pentaho pipeline from my previous article stands up against the steps above:
The Pentaho feed doesn’t account for any open transactions called out in step-2 so we risk missing data in target if there are long running sessions.
The chunk size (see step-3) is hard-coded into the pipeline rather than being configurable per table/view.
It doesn’t write manifest files or handle any of the related failure scenarios in step-5. If the pipeline fails, manual intervention is required to remove junk files from S3, to get the feed going again at step-1 without rendering missing data in the target.
While the Pentaho code that I published is great to get going quickly, I wouldn’t recommend using it in production unless you’re prepared to enhance it or nurse it along.
You could develop monitoring and automate corrective actions around each of the shortcomings instead, but unfortunately this is another time sink that has to be paid down.
I heard this phrase once and I think it’s so true: “open-source might be free, but only if you don’t value your time.”
Pentaho allows us to parallelise SQL queries to get a performance increase with just a few clicks in the UI. So the SQL input step used to extract records could easily be enhanced to work with a dynamic “number of copies to start” populated by a variable. Once the pipeline is enhanced to generate manifest files, this would be an obvious quick win to improve the solution. It would also mean the code to generate SQL statements becomes more complex, but this would probably be a price worth paying.
The Pentaho feed doesn’t automate the last hop in the journey to Snowflake — loading the CSV files. That said, it could easily be enhanced to take care of it. Again, this would obviously add to the 20+ existing jobs/transforms that we will have to maintain so we’re accruing more tech debt as soon as we start!
A Better Alternative
The question is, would you be interested to have a pre-canned pipeline that fixes all of these shortcomings?
If you do, I’d like to introduce you to a solution called Halfpipe.
In its current form, it’s a command-line tool called hp that simplifies ALL of this complexity down to a SINGLE action like this:
hp cp delta \
The source database can be Oracle and the target can be any S3 bucket or a Snowflake database.
Credentials for the source and target can be added to the tool beforehand where the secrets are all encrypted using current industry best practices.
There are other patterns available too, which I’d like to cover in another set of articles. Each of them is essentially a SINGLE command to wrap up all of the complexity:
Synchronising tables (making the target look the same as the source)
Starting a real-time event stream using Oracle change notifications (has gotchas of its own, but is useful in specific cases)
Copying table metadata (auto convert and apply DDL from source to target)
Run a micro-service or pipeline from a config file (JSON/YAML)
More on the roadmap
The internal architecture it uses is the synchronous component model mentioned in step-5 so it’s possible to wire up your own transforms in JSON or YAML using a bunch of built-in components.
It’s a single binary written in Go that I wanted to make simple for any engineer to use, both locally and in production.
It ultimately takes the mundane pain out of data integration and flat files so engineers can get on with opening the door to new business value.
I haven’t yet decided what my next article will be about, but there’s a lot more to explain around how Halfpipe works and the other patterns that are pre-canned and ready to go now. There’s also the roadmap for new features.
Let me know what you’d like to hear more about in the comments below. And thanks again if you managed to read through this lengthly discussion!