forked from ecorrea1010/bigdata_task3
-
Notifications
You must be signed in to change notification settings - Fork 0
/
process_kafka_eco.py
55 lines (44 loc) · 1.59 KB
/
process_kafka_eco.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import StructType, DoubleType, IntegerType
# Start Spark
spark = SparkSession.builder \
.appName("ProcessKafkaTask3") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.3") \
.getOrCreate()
# Schema
schema = StructType() \
.add("ID", IntegerType()) \
.add("Price", DoubleType()) \
.add("OpenPrice", DoubleType()) \
.add("HighPrice", DoubleType()) \
.add("LowPrice", DoubleType()) \
.add("ClosePrice", DoubleType()) \
.add("Volume", DoubleType())
dfKafka = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "topic-task3") \
.load()
kafkaSchema = dfKafka.selectExpr("CAST(value as STRING) as json") \
.select(F.from_json(F.col("json"), schema).alias("data")) \
.select("data.*")
kafkaSchema.printSchema()
kafkaSchemaWithTimestamp = kafkaSchema.withColumn("Timestamp", F.current_timestamp())
kafkaSchemaWithWatermark = kafkaSchemaWithTimestamp.withWatermark("Timestamp", "1 minute")
#AVG
average_prices = kafkaSchemaWithWatermark.groupBy(F.window("Timestamp", "10 seconds")) \
.agg(F.avg("Price").alias("average_price"))
#Filter
filtered_data = kafkaSchemaWithWatermark.filter(F.col("Volume") > 100000)
average_prices.writeStream \
.outputMode("update") \
.format("console") \
.start() \
.awaitTermination()
filtered_data.writeStream \
.outputMode("update") \
.format("console") \
.option("truncate", "false") \
.start() \
.awaitTermination()