Dealing with Late-Arriving Data in Data Vault
There is always some late-arriving data. How do we deal with it in Data Vault?
There is always some late-arriving data. An equipment was disconnected for a few hours and then came back online. Someone forgot to enter a purchase order into the system and found out a month later. A maintenance work order was not marked closed for a few days. It's just a normal part of business.
Late-arriving data handling is messy because there's no boundary on how late it can be. It could be a few seconds, a few minutes, a few hours, a few days, or even a few weeks. Because we don't know when it will come, or even whether there will be any late-arriving data at all, people tend to deliberately set a limit, say 7 days, and just periodically reprocess it for 7 days, just in case there is late-arriving data. That sucks.
I talked about the late-arriving data problem previously and how to deal with it by maintaining two timelines in Data Vault. Now's the time to fill the gap, talking about some practical implementation details.
At a high level, there are two stages of processing required after receiving late-arriving data:
Correctly saving the late-arriving data
Properly reprocess its derived data
Let's see each stage in detail.
Saving Late-Arriving Raw Data
With Data Vault, saving late-arriving "raw" data is no different from saving normally-arriving data. We maintain two timelines by also keeping the original business event time, so arriving late does not cause problems. Late-arriving data comes and sits right next to normally-arriving data nicely.
So this is pretty straightforward as it is taken care of by the normal ingestion pipeline and no special handling is required.
We've adopted an incremental loading pattern for Data Vault with our custom dbt materialization. This pattern requires the use of a monotonically increasing timestamp as the basis for incremental data processing. Each incremental batch first finds out from the target table the largest timestamp (essentially the point where the last run finished) and uses it as the lower bound to query the new data to be processed in the current batch. In this way, there's no special bookkeeping needed which makes it very simple.
To use it, simply set the materialization's config timestamp_field
. By default, the system-maintained load_datetime
is used. It first queries max(load_datetime)
from the target table and then uses it as the lower bound to query the source table. load_datetime
is system-maintained and is guaranteed to be always monotonically increasing.
{{ config(
materialized = "rc_incremental",
timestamp_field = "load_datetime"
) }}
The materialization's config also allows for explicitly specifying the data range by giving the start_timestamp
and/or end_timestamp
(both optional). If start_timestamp
is given, it is used directly instead of querying from the target table. The combination of these 3 configs is useful for one-off data-loading jobs like backloading or incremental initial bulk loading.
{{ config(
materialized = "rc_incremental",
timestamp_field = "load_datetime",
start_timestamp = "...",
end_timestamp = "..."
) }}
So how is it implemented? In the dbt model, add a replacement stub for the filtering conditions when querying the source table:
{% if model.config.materialized == 'rc_incremental' %}
AND __TIMESTAMP_INCREMENTAL_FILTER__
{% endif %}
The materialization macro replaces the stub with the filtering conditions queried from the target table (and/or start_timestamp
/end_timestamp
):
{%- set timerange_filter -%}
{{ dbtvault.build_timestamp_filter(target_relation, timestamp_field, target_timestamp_field) }}
{%- endset -%}
{%- set sql = sql | replace("__TIMESTAMP_INCREMENTAL_FILTER__", timerange_filter) -%}
Refreshing Derived Data
Now that we have late-arriving "raw" data saved into the Data Vault, we still need to refresh any derived data based on it. Otherwise, the derived data would become inconsistent.
There are many kinds of derived data. There could be some business vaults generated from applying business rules (transformation) on the raw vault. There could also be some aggregation data generated from the raw vault for analytics or reporting purposes.
For transformed data, dbt's model dependency with the incremental strategy mentioned in the previous section can take care of it quite nicely. This is no different from handling the normally-arriving data. The same approach also works just fine for non-aggregation facts.
For entities (hub/satellite combos) and relations (link/satellite combos), we need to refresh their PIT tables to include the late-arrived data. To not always regenerate complete PIT tables, the affected data range can be identified by PIT table refreshing jobs. It first uses load_datetime
to identify the new data for the current batch, and then from that new data set it identifies the set of effective_datetime
(the business time) needed to be refreshed for the current batch. Our dimensions are dynamic views on top of PIT tables. As long as PIT tables are refreshed, dimensions automatically reflect the latest, correct data.
For aggregation facts, we use a different approach.
Not all aggregation methods are commutative like sum or count. Also, deletion (marker) needs to be handled properly. A more general drop-first-then-recreate approach is more suitable. When late-arriving data is received, any aggregation result it logically belongs to is purged first, and then the normal aggregation processing regenerates the result.
Our custom dbt materialization has another config for specifying the target table's column(s) as the replacement key(s). For example, a daily aggregation fact table contains a "date" column which should be set as the replacement key. When some late-arriving yesterday's data is received, it drops yesterday's aggregation results first and then re-generates the aggregation with the latest data (which now includes the late-arriving data).
{{ config(
materialized = "rc_incremental",
timestamp_field = "load_datetime",
replace_by = "date"
) }}
Usually, we use a hashed composite key like hash([date, device_id])
for the replacement key. This way, we can have finer control of the exact data set instead of having to always reprocess everything.
Note that this approach relies on the system-maintained load_datetime
to determine the data range to be processed for each batch. This timestamp needs to be centrally generated, either by an orchestrator to assign it while scheduling jobs, or automatically generated by the database server.
Conclusion
Traditionally, late-arriving data handling was messy. Mostly, people ended up using daily jobs handling a rolling n-day window. This was not only slow but also expensive. In Data Vault, accepting and saving late-arriving "raw" data is no different from handling normally-arriving data. They come and get saved correctly together with normal, in-time arriving data. With the same automated, repeatable, and modular pattern, the refreshing of derived data also becomes easy. No more daily job reprocessing the same data over and over again. The reprocessing is launched only when there's late-arriving data, and only for the relevant part. This is fast and cheap. This makes a solid foundation for generating star schema from Data Vault.