forked from spark-examples/pyspark-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pyspark-partitionby.py
60 lines (44 loc) · 1.56 KB
/
pyspark-partitionby.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# -*- coding: utf-8 -*-
"""
author SparkByExamples.com
"""
from pyspark.sql import SparkSession
# Create SparkSession
spark = SparkSession.builder \
.appName('SparkByExamples.com') \
.getOrCreate()
df=spark.read.option("header",True) \
.csv("C:/apps/sparkbyexamples/src/pyspark-examples/resources/simple-zipcodes.csv")
df.show()
print(df.rdd.getNumPartitions())
df.write.option("header",True) \
.partitionBy("state") \
.mode("overwrite") \
.csv("c:/tmp/zipcodes-state")
df.write.option("header",True) \
.partitionBy("state","city") \
.mode("overwrite") \
.csv("c:/tmp/zipcodes-state-city")
df=df.repartition(2)
print(df.rdd.getNumPartitions())
df.write.option("header",True) \
.partitionBy("state") \
.mode("overwrite") \
.csv("c:/tmp/zipcodes-state-more")
dfPartition=spark.read.option("header",True)\
.csv("c:/tmp/zipcodes-state")
dfPartition.printSchema()
dfSinglePart=spark.read.option("header",True) \
.csv("c:/tmp/zipcodes-state/state=AL/city=SPRINGVILLE")
dfSinglePart.printSchema()
dfSinglePart.show()
parqDF = spark.read.option("header",True) \
.csv("c:/tmp/zipcodes-state")
parqDF.createOrReplaceTempView("ZIPCODE")
spark.sql("select * from ZIPCODE where state='AL' and city = 'SPRINGVILLE'") \
.show()
df.write.option("header",True) \
.option("maxRecordsPerFile", 2) \
.partitionBy("state") \
.mode("overwrite") \
.csv("/tmp/zipcodes-state-maxrecords")