💡 This example will show how you can identify and filter out duplicates in a stream of events.
There are different ways that duplicate events can end up in your data sources, from human error to application bugs. Regardless of the origin, unclean data can have a real impact in the quality (and correctness) of your results. Suppose that your order system occasionally generates duplicate events with the same order_id
, and that you're only interested in keeping the most recent event for downstream processing.
As a first step, you can use a combination of the COUNT
function and the HAVING
clause to check if and which orders have more than one event; and then filter out these events using ROW_NUMBER()
. In practice, deduplication is a special case of Top-N aggregation, where N is 1 (rownum = 1
) and the ordering column is either the processing or event time of events.
The source table orders
is backed by the built-in datagen
connector, which continuously generates rows in memory.
CREATE TABLE orders (
id INT,
order_time AS CURRENT_TIMESTAMP,
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
'connector' = 'datagen',
'rows-per-second'='10',
'fields.id.kind'='random',
'fields.id.min'='1',
'fields.id.max'='100'
);
--Check for duplicates in the `orders` table
SELECT id AS order_id,
COUNT(*) AS order_cnt
FROM orders o
GROUP BY id
HAVING COUNT(*) > 1;
--Use deduplication to keep only the latest record for each `order_id`
SELECT
order_id,
order_time
FROM (
SELECT id AS order_id,
order_time,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY order_time) AS rownum
FROM orders
)
WHERE rownum = 1;