💡 In this recipe, you will learn how to use Statement Sets to run multiple
INSERT INTO
statements in a single, optimized Flink Job.
Many product requirements involve outputting the results of a streaming application to two or more sinks, such as Apache Kafka for real-time use cases, or a Filesystem for offline ones. Other times, two queries are not the same but share some extensive intermediate operations.
When working with server logs, the support team would like to see the number of status codes per browser every 5 minutes to have real-time insights into a web pages' status. Additionally, they would like the same information on an hourly basis made available as partitioned Apache Parquet files so they can perform historical analysis.
We could quickly write two Flink SQL queries to solve both these requirements, but that would not be efficient. These queries have a lot of duplicated work, like reading the source logs Kafka topic and cleansing the data.
Ververica Platform includes a feature called STATEMENT SET
s, that allows for multiplexing INSERT INTO
statements into a single query holistically optimized by Apache Flink and deployed as a single application.
CREATE TEMPORARY TABLE server_logs (
client_ip STRING,
client_identity STRING,
userid STRING,
user_agent STRING,
log_time TIMESTAMP(3),
request_line STRING,
status_code STRING,
size INT,
WATERMARK FOR log_time AS log_time - INTERVAL '30' SECONDS
) WITH (
'connector' = 'faker',
'fields.client_ip.expression' = '#{Internet.publicIpV4Address}',
'fields.client_identity.expression' = '-',
'fields.userid.expression' = '-',
'fields.user_agent.expression' = '#{Internet.userAgentAny}',
'fields.log_time.expression' = '#{date.past ''15'',''5'',''SECONDS''}',
'fields.request_line.expression' = '#{regexify ''(GET|POST|PUT|PATCH){1}''} #{regexify ''(/search\.html|/login\.html|/prod\.html|cart\.html|/order\.html){1}''} #{regexify ''(HTTP/1\.1|HTTP/2|/HTTP/1\.0){1}''}',
'fields.status_code.expression' = '#{regexify ''(200|201|204|400|401|403|301){1}''}',
'fields.size.expression' = '#{number.numberBetween ''100'',''10000000''}'
);
CREATE TEMPORARY TABLE realtime_aggregations (
`browser` STRING,
`status_code` STRING,
`end_time` TIMESTAMP(3),
`requests` BIGINT NOT NULL
) WITH (
'connector' = 'kafka',
'topic' = 'browser-status-codes',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'browser-countds',
'format' = 'avro'
);
CREATE TEMPORARY TABLE offline_datawarehouse (
`browser` STRING,
`status_code` STRING,
`dt` STRING,
`hour` STRING,
`requests` BIGINT NOT NULL
) PARTITIONED BY (`dt`, `hour`) WITH (
'connector' = 'filesystem',
'path' = 's3://my-bucket/browser-into',
'sink.partition-commit.trigger' = 'partition-time',
'format' = 'parquet'
);
-- This is a shared view that will be used by both
-- insert into statements
CREATE TEMPORARY VIEW browsers AS
SELECT
REGEXP_EXTRACT(user_agent,'[^\/]+') AS browser,
status_code,
log_time
FROM server_logs;
BEGIN STATEMENT SET;
INSERT INTO realtime_aggregations
SELECT
browser,
status_code,
TUMBLE_ROWTIME(log_time, INTERVAL '5' MINUTE) AS end_time,
COUNT(*) requests
FROM browsers
GROUP BY
browser,
status_code,
TUMBLE(log_time, INTERVAL '5' MINUTE);
INSERT INTO offline_datawarehouse
SELECT
browser,
status_code,
DATE_FORMAT(TUMBLE_ROWTIME(log_time, INTERVAL '1' HOUR), 'yyyy-MM-dd') AS `dt`,
DATE_FORMAT(TUMBLE_ROWTIME(log_time, INTERVAL '1' HOUR), 'HH') AS `hour`,
COUNT(*) requests
FROM browsers
GROUP BY
browser,
status_code,
TUMBLE(log_time, INTERVAL '1' HOUR);
END;
Looking at the deployed Job Graph, we can see Flink SQL only performs the shared computation once to achieve the most cost and resource-efficient execution of our query!