Lessons and exercises to get familiar with the wonderful SETL framework!
Lesson
The entry point is the first thing you need to learn to code with SETL. It is the starting point to run your ETL project.
val setl0: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
This is the minimum code needed to create a Setl
object. It is the entry point of every SETL app. This will create a SparkSession, which is the entry point of any Spark job. Additionally, the withDefaultConfigLoader()
method is used. This means that Setl
will read the default ConfigLoader located in resources/application.conf
, where setl.environment
must be set. The ConfigLoader will then read the corresponding configuration file <app_env>.conf
in the resources
folder, where <app_env>
is the value set for setl.environment
.
resources/application.conf
:setl.environment = <app.env>
<app.env>.conf
:setl.config.spark { some.config.option = "some-value" }
The configuration file is where you can specify your SparkSession
options, like when you create one in a basic Spark
process. You must specify your SparkSession
options under setl.config.spark
.
You can specify the configuration file that the default ConfigLoader
should read. In the code below, instead of reading <app_env>.conf
where <app_env>
is defined in application.conf
, it will read own_config_file.conf
.
val setl1: Setl = Setl.builder() .withDefaultConfigLoader("own_config_file.conf") .getOrCreate()
resources/own_config_file.conf
:setl.config.spark { some.config.option = "some-other-value" }
You can also set your own ConfigLoader
. In the code below, Setl
will load local.conf
from the setAppEnv()
method. If no <app_env>
is set, it will fetch the environment from the default ConfigLoader
, located in resources/application.conf
.
val configLoader: ConfigLoader = ConfigLoader.builder() .setAppEnv("local") .setAppName("Setl2_AppName") .setProperty("setl.config.spark.master", "local[*]") .setProperty("setl.config.spark.custom-key", "custom-value") .getOrCreate() val setl2: Setl = Setl.builder() .setConfigLoader(configLoader) .getOrCreate()
You can also set your own SparkSession
which will be used by Setl
, with the setSparkSession()
method. Please refer to the documentation or the source code of SETL.
There are some quick methods that can be used to set your SparkSession
configurations.
val setl3: Setl = Setl.builder() .withDefaultConfigLoader() .setSparkMaster("local[*]") // set your master URL .setShufflePartitions(200) // spark setShufflePartitions .getOrCreate()
setSparkMaster()
method set thespark.master
property of theSparkSession
in yourSetl
entry pointsetShufflePartitions()
method set thespark.sql.shuffle.partitions
property of theSparkSession
in yourSetl
entry point
As mentioned earlier, the options you want to define in your SparkSession
must be specified under setl.config.spark
in your configuration file. However, you can change this path by using the setlSetlConfigPath()
method:
val setl4: Setl = Setl.builder() .withDefaultConfigLoader("own_config_file.conf") .setSetlConfigPath("myApp") .getOrCreate()
resources/own_config_file.conf
:myApp.spark { some.config.option = "my-app-some-other-value" }
Exercises
Nothing too crazy: try to build your own Setl
object! Run your code and examine the logs to check about the options you specified. Make sure it loads the correct configuration file.
Lesson
SETL supports two types of data accessors: Connector and SparkRepository.
- A Connector is a non-typed abstraction of data access layer (DAL). For simplicity, you can understand it to as a Spark DataFrame.
- A SparkRepository is a typed abstraction data access layer (DAL). For simplicity, you can understand it as a Spark Dataset. For more information, please refer to the official documentation.
SETL
supports multiple data format, such as CSV, JSON, Parquet, Excel, Cassandra, DynamoDB, JDBC or Delta.
To ingest data in the Setl
object entry point, you first must register the data, using the setConnector()
or the setSparkRepository[T]
methods.
val setl: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
setl
.setConnector("testObjectRepository", deliveryId = "id")
The first argument provided is a String
that refers to an item in the specified configuration file. The second argument, deliveryId
, must be specified for data ingestion. We will see in section 2.3 why it is necessary. Just think of it as an ID, and the only way for SETL
to ingest a Connector
is with its ID.
Note that deliveryId
is not necessary for the registration but it is for the ingestion. However there is no much use if we only register the data. If you are a beginner in SETL
, you should think as setting a Connector
must always come with a deliveryId
.
local.conf
:
setl.config.spark {
some.config.option = "some-value"
}
testObjectRepository {
storage = "CSV"
path = "src/main/resources/test_objects.csv"
inferSchema = "true"
delimiter = ","
header = "true"
saveMode = "Overwrite"
}
As you can see, testObjectRepository
defines a configuration for data of type CSV
. This data is in a file, located in src/main/resources/test_objects.csv
. Other classic read or write options are configured.
In summary, to register a Connector
, you need to:
- Specify an item in your configuration file. This item must have a
storage
key, which represents the type of the data. Other keys might be mandatory depending on this type. - Register the data in your
Setl
object, usingsetConnector("<item>", deliveryId = "<id>")
.
val setl: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
setl
.setSparkRepository[TestObject]("testObjectRepository")
Like setConnector()
, the argument provided is a String
that refers to an item in the specified configuration file.
local.conf
:
setl.config.spark {
some.config.option = "some-value"
}
testObjectRepository {
storage = "CSV"
path = "src/main/resources/test_objects.csv"
inferSchema = "true"
delimiter = ","
header = "true"
saveMode = "Overwrite"
}
Notice that the above SparkRepository
is set with the TestObject
type. In this example, the data we want to register is a CSV file containing two columns: value1
of type String
and value2
of type Int
. That is why the TestObject
class should be:
case class TestObject(value1: String,
value2: Int)
In summary, to register a SparkRepository
, you need to:
- Specify an item in your configuration file. This item must have a
storage
key, which represents the type of the data. Other keys might be mandatory depending on this type. - Create a class or a case class representing the object type of your data.
- Register the data in your
Setl
object, usingsetSparkRepository[T]("<item>")
.
-
Connector
orSparkRepository
?Sometimes, the data your are ingesting contain irrelevant information that you do not want to keep. For example, let's say that the CSV file you want to ingest contain 10 columns:
value1
,value2
,value3
and 7 other columns you are not interested in.It is possible to ingest these 3 columns only with a
SparkRepository
if you specify the correct object type of your data:case class A(value1: T1, value2: T2, value3: T3) setl .setSparkRepository[A]("itemInConfFile")
This is not possible with a
Connector
. If you register this CSV file with aConnector
, all 10 columns will appear. -
Annotations
-
@ColumnName
@ColumnName
is an annotation used in a case class. When you want to rename some columns in your code for integrity but also keep the original name when writing the data, you can use this annotation.case class A(@ColumnName("value_one") valueOne: T1, @ColumnName("value_two") valueTwo: T2)
As you probably know, Scala does not use
snake_case
butcamelCase
. If you register aSparkRepository
of type[A]
in yourSetl
object, and if you read it, the columns will be named asvalueOne
andvalueTwo
. The file you read will still keep their name, i.evalue_one
andvalue_two
. -
@CompoundKey
TODO
-
@Compress
TODO
Most of the time, you will need to register multiple data sources.
Let's start with Connector
. Note that it is perfectly possible to register multiple Connector
, as said previously. However, there will be an issue during the ingestion. Setl
has no way to differentiate one Connector
from another. You will need to set what is called a deliveryId
.
val setl1: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
// /!\ This will work for data registration here but not for data ingestion later /!\
setl1
.setConnector("testObjectRepository")
.setConnector("pokeGradesRepository")
// Please get used to set a `deliveryId` when you register one or multiple `Connector`
setl1
.setConnector("testObjectRepository", deliveryId = "testObject")
.setConnector("pokeGradesRepository", deliveryId = "grades")
Let's now look at how we can register multiple SparRepository
. If the SparkRepository
you register all have different type, there will be no issue during the ingestion. Indeed, Setl
is capable of differentiating the upcoming data by inferring the object type.
val setl2: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
setl2
.setSparkRepository[TestObject]("testObjectRepository")
.setSparkRepository[Grade]("pokeGradesRepository")
However, if there are multiple SparkRepository
with the same type, you must use a deliveryId
for each of them. Otherwise, there will be an error during the data ingestion. This is the same reasoning as multiple Connector
: there is no way to differentiate two SparkRepository
of the same type.
val setl3: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
// /!\ This will work for data registration here but not for data ingestion later /!\
setl3
.setSparkRepository[Grade]("pokeGradesRepository")
.setSparkRepository[Grade]("digiGradesRepository")
// Please get used to set a `deliveryId` when you register multiple `SparkRepository` of same type
setl3
.setSparkRepository[Grade]("pokeGradesRepository", deliveryId = "pokeGrades")
.setSparkRepository[Grade]("digiGradesRepository", deliveryId = "digiGrades")
Before deep diving into data ingestion, we first must learn about how SETL
organizes an ETL process. SETL
uses Pipeline
and Stage
to organize workflows. A Pipeline
is where the whole ETL process will be done. The registered data are ingested inside a Pipeline
, and all transformations and restitution will be done inside it. A Pipeline
is composed of multiple Stage
. A Stage
allows you to modularize your project. It can be constituted of multiple Factory
. You can understand a Factory
as a module of your ETL process. So in order to "see" the data ingestion, we have to create a Pipeline
and add a Stage
to it. As it may be a little bit theoretical, let's look at some examples.
App.scala
:
val setl4: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
setl4
.setConnector("testObjectRepository", deliveryId = "testObjectConnector")
.setSparkRepository[TestObject]("testObjectRepository", deliveryId = "testObjectRepository")
setl4
.newPipeline() // Creation of a `Pipeline`.
.addStage[IngestionFactory]() // Add a `Stage` composed of one `Factory`: `IngestionFactory`.
.run()
Before running the code, let's take a look at IngestionFactory
.
class IngestionFactory extends Factory[DataFrame] with HasSparkSession {
import spark.implicits._
override def read(): IngestionFactory.this.type = this
override def process(): IngestionFactory.this.type = this
override def write(): IngestionFactory.this.type = this
override def get(): DataFrame = spark.emptyDataFrame
}
This is a skeleton of a SETL Factory
. A SETL Factory
contains 4 main functions: read()
, process()
, write()
and get()
. These functions will be executed in this order. These 4 functions are the core of your ETL process. This is where you will write your classic Spark
code of data transformation.
You can see that IngestionFactory
is a child class of Factory[DataFrame]
. This simply means that the output of this data transformation must be a DataFrame
. IngestionFactory
also has the trait HasSparkSession
. It allows you to access the SparkSession
easily. Usually, we use it simply to import spark.implicits
.
Where is the ingestion?
class IngestionFactory extends Factory[DataFrame] with HasSparkSession {
import spark.implicits._
@Delivery(id = "testObjectConnector")
val testObjectConnector: Connector = Connector.empty
@Delivery(id = "testObjectRepository")
val testObjectRepository: SparkRepository[TestObject] = SparkRepository[TestObject]
var testObjectOne: DataFrame = spark.emptyDataFrame
var testObjectTwo: Dataset[TestObject] = spark.emptyDataset[TestObject]
override def read(): IngestionFactory.this.type = this
override def process(): IngestionFactory.this.type = this
override def write(): IngestionFactory.this.type = this
override def get(): DataFrame = spark.emptyDataFrame
}
The structure of a SETL Factory
starts with the @Delivery
annotation. This annotation is the way SETL
ingest the corresponding registered data. If you look at App.scala
where this IngestionFactory
is called, the associated Setl
object has registered a Connector
with id testObjectConnector
and a SparkRepository
with id testObjectRepository
.
Note that it is not mandatory to use a
deliveryId
in this case, because there is only oneFactory
withTestObject
as object type. You can try to remove thedeliveryId
when registering theSparkRepository
and theid
in the@Delivery
annotation. The code will still run. Same can be said for theConnector
.
With the @Delivery
annotation, we retrieved a Connector
and SparkRepository
. The data has been correctly ingested, but these are data access layers. To process the data, we have to retrieve the DataFrame
of the Connector
and the Dataset
of the SparkRepository
. This is why we defined two var
, one of type DataFrame
and one of type Dataset[TestObject]
. We will assign values to them during the read()
function. These var
are accessible from all the 4 core functions, and you will use them for your ETL process.
To retrieve the DataFrame
of the Connector
and the Dataset
of the SparkRepository
, we can use the read()
function.
override def read(): IngestionFactory.this.type = {
testObjectOne = testObjectConnector.read()
testObjectTwo = testObjectRepository.findAll()
this
}
The read()
function is typically where you will do your data preprocessing. Usually, we will simply assign values to our variables. Occasionally, this is typically where you would want to do some filtering on your data.
- To retrieve the
DataFrame
of aConnector
, use theread()
method. - To retrieve the
Dataset
of aSparkRepository
, you can use thefindAll()
method, or thefindBy()
method. The latter allows you to do filtering based onCondition
. More info here.
The registered data is then correctly ingested. It is now ready to be used during the process()
function.
In the previous IngestionFactory
, we would set a val
of type SparkRepository
but also a var
in which we assign the corresponding Dataset
in the read()
function. With autoLoad = true
, we can skip the first step and directly declare a Dataset
. The Dataset
of the SparkRepository
will be automatically assigned in it.
App.scala
:
val setl5: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
setl5
.setSparkRepository[TestObject]("testObjectRepository", deliveryId = "testObjectRepository")
setl5
.newPipeline()
.addStage[AutoLoadIngestionFactory]()
.run()
AutoLoadIngestionFactory
class AutoLoadIngestionFactory extends Factory[DataFrame] with HasSparkSession {
import spark.implicits._
@Delivery(id = "testObjectRepository", autoLoad = true)
val testObject: Dataset[TestObject] = spark.emptyDataset[TestObject]
override def read(): AutoLoadIngestionFactory.this.type = {
testObject.show(false)
this
}
override def process(): AutoLoadIngestionFactory.this.type = this
override def write(): AutoLoadIngestionFactory.this.type = this
override def get(): DataFrame = spark.emptyDataFrame
}
Note that there is no way to use the findBy()
method to filter the data, compared to the previous Factory
. Also, autoLoad
is available for SparkRepository
only, and not for Connector
.
If you want to set some primary type parameters, you can use the setInput[T]()
method. Those inputs are directly set in the Pipeline
, and there are no registrations like for Connector
or SparkRepository
.
App.scala
:
val setl5: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
setl5
.newPipeline()
.setInput[Int](42)
.setInput[String]("SETL", deliveryId = "ordered")
.setInput[String]("LTES", deliveryId = "reversed")
.setInput[Array[String]](Array("S", "E", "T", "L"))
.addStage[AutoLoadIngestionFactory]()
.run()
Inputs are retrieved in the same way Connector
or SparkRepository
are retrieved: the @Delivery
annotation, and the deliveryId
if necessary.
AutoLoadIngestionFactory.scala
:
class AutoLoadIngestionFactory extends Factory[DataFrame] with HasSparkSession {
import spark.implicits._
@Delivery
val integer: Int = 0
@Delivery(id = "ordered")
val firstString: String = ""
@Delivery(id = "reversed")
val secondString: String = ""
@Delivery
val stringArray: Array[String] = Array()
override def read(): AutoLoadIngestionFactory.this.type = {
// Showing that inputs work correctly
println("integer: " + integer) // integer: 42
println("ordered: " + firstString) // ordered: SETL
println("reversed: " + secondString) // reversed: LTES
println("array: " + stringArray.mkString(".")) // array: S.E.T.L
this
}
override def process(): AutoLoadIngestionFactory.this.type = this
override def write(): AutoLoadIngestionFactory.this.type = this
override def get(): DataFrame = spark.emptyDataFrame
}
In summary, the extraction part of an ETL process translates to the following in a SETL
project:
- Create a configuration item representing the data you want to ingest in your configuration file.
- Register the data in your
Setl
object by using thesetConnector()
or thesetSparkRepository[]()
method. Reminder: the mandatory parameter is the name of your object in your configuration file, and you might want to add adeliveryId
. - Create a new
Pipeline
in yourSetl
object, then add aStage
with aFactory
in which you want to process your data. - Create a
SETL Factory
, containing the 4 core functions:read()
,process()
,write()
andget()
. - Retrieve your data using the
@Delivery
annotation. - Your data is ready to be processed.
Cheat sheet can be found here.
Exercises
In these exercises, we are going to practice registering and ingesting different types of storage: a CSV file, a JSON file, a Parquet file, an Excel file, a table from DynamoDB, a table from Cassandra, a table from PostgreSQL and a table from Delta.
An App.scala
is already prepared. We created a SETL
entry point and use a configuration file located at src/main/resources/exercise/extract/extract.conf
. In this file, a configuration object has been created for each storage type, but they are incomplete.
The goal here is to complete the configuration objects with the help of the documentation, register the data as Connector
in a Pipeline
and print them in a Factory
after ingestion.
To verify registration and ingestion, we prepared CheckExtractFactory
. To test your code, complete the ???
parts and uncomment the corresponding lines.
a) Ingesting a CSV file
The goal of this first exercise is to register and ingest a CSV file.
We are looking to read the CSV file located at src/main/resources/exercise/extract/paris-wi-fi-service.csv
.
- Complete the configuration object
csvFile
insrc/main/resources/exercise/extract/extract.conf
. - In
App.scala
, register aConnector
with this data. - You may create your own
Factory
and implement theread()
function to verify if you can ingest the data. If you are not sure how yet, we already added aFactory
to aStage
, which is added to thePipeline
. ThisCheckExtractFactory
will ingest aConnector
namedcsvFileConnector
. It will read it intocsvFile
in theread()
method, and verify the number of lines in this data. Uncomment the corresponding lines and complete the part on the@Delivery
annotation before running your code to test your implementation of CSV file registration and ingestion.
b) Ingesting a JSON file
The goal of this second exercise is to register and ingest a JSON file.
We are looking to read the JSON file located at src/main/resources/exercise/extract/paris-notable-trees.json
.
- Complete the configuration object
jsonFile
insrc/main/resources/exercise/extract/extract.conf
. - In
App.scala
, register aConnector
with this data. - You may create your own
Factory
and implement theread()
function to verify if you can ingest the data. If you are not sure how to do that yet, we already added aFactory
to aStage
, which is added to thePipeline
. ThisCheckExtractFactory
will ingest aConnector
namedjsonFileConnector
. It will read it intoparquetFile
in theread()
method, and verify the number of lines in this data. Uncomment the corresponding lines and complete the part on the@Delivery
annotation before running your code to test your implementation of JSON file registration and ingestion.
c) Ingesting a Parquet file
The goal of this third exercise is to register and ingest a Parquet file.
We are looking to read the Parquet file located at src/main/resources/exercise/extract/paris-public-toilet.parquet
.
- Complete the configuration object
parquetFile
insrc/main/resources/exercise/extract/extract.conf
. - In
App.scala
, register aConnector
with this data. - You may create your own
Factory
and implement theread()
function to verify if you can ingest the data. If you are not sure how to do that yet, we already added aFactory
to aStage
, which is added to thePipeline
. ThisCheckExtractFactory
will ingest aConnector
namedparquetFileConnector
. It will read it intoparquetFile
in theread()
method, and verify the number of lines in this data. Uncomment the corresponding lines and complete the part on the@Delivery
annotation before running your code to test your implementation of Parquet file registration and ingestion.
d) Ingesting an Excel file
The goal of this fourth exercise is to register and ingest an Excel file.
We are looking to read the Excel file located at src/main/resources/exercise/extract/paris-textile-containers.xlsx
.
- Complete the configuration object
excelFile
insrc/main/resources/exercise/extract/extract.conf
. - In
App.scala
, register aConnector
with this data. - You may create your own
Factory
and implement theread()
function to verify if you can ingest the data. If you are not sure how to do that yet, we already added aFactory
to aStage
, which is added to thePipeline
. ThisCheckExtractFactory
will ingest aConnector
namedexcelFileConnector
. It will read it intoexcelFile
in theread()
method, and verify the number of lines in this data. Uncomment the corresponding lines and complete the part on the@Delivery
annotation before running your code to test your implementation of Excel file registration and ingestion.
e) Ingesting data from DynamoDB
The goal of this fifth exercise is to register and ingest data from a table in DynamoDB.
To work on this exercise, we need to host a local DynamoDB server. To do that, we prepared a docker-compose.yml
in the exercise-environment/
folder. Make sure you have Docker installed. In a terminal, change your directory to exercise-environment/
and execute docker-compose up
. It will create a local DynamoDB server at http://localhost:8000
. It will also create a table orders_table
in the us-east-1
region, and populate it with some data.
Make sure you launch the Docker containers before starting this exercise.
We are looking to read the orders_table
table from DynamoDB, located at the us-east-1
region.
- Complete the configuration object
dynamoDBData
insrc/main/resources/exercise/extract/extract.conf
. - In
App.scala
, register aConnector
with this data. We already set the endpoint to behttp://localhost:8000
so that the requests are pointing to your local DynamoDB instance. - You may create your own
Factory
and implement theread()
function to verify if you can ingest the data. If you are not sure how to do that yet, we already added aFactory
to aStage
, which is added to thePipeline
. ThisCheckExtractFactory
will ingest aConnector
nameddynamoDBDataConnector
. It will read it intodynamoDBData
in theread()
method, and verify the number of lines in this data. Uncomment the corresponding lines and complete the part on the@Delivery
annotation before running your code to test your implementation of DynamoDB data registration and ingestion.
f) Ingesting data from Cassandra
The goal of this sixth exercise is to register and ingest data from a table in Cassandra.
To work on this exercise, we need to host a local Cassandra server. To do that, we prepared a docker-compose.yml
in the exercise-environment/
folder. Make sure you have Docker installed. In a terminal, change your directory to exercise-environment/
and execute docker-compose up
. It will create a local Cassandra server at http://localhost:9042
. It will also create a keyspace mykeyspace
and a table profiles
, and populate it with some data.
Make sure you launch the Docker containers before starting this exercise.
We are looking to read the profiles
table from Cassandra, located at the mykeyspace
keyspace.
- Complete the configuration object
cassandraDBData
insrc/main/resources/exercise/extract/extract.conf
. - In
App.scala
, register aConnector
with this data. We already set the endpoint to behttp://localhost:9042
so that the requests are pointing to your local Cassandra instance. - You may create your own
Factory
and implement theread()
function to verify if you can ingest the data. If you are not sure how to do that yet, we already added aFactory
to aStage
, which is added to thePipeline
. ThisCheckExtractFactory
will ingest aConnector
namedcassandraDataConnector
. It will read it intocassandraData
in theread()
method, and verify the number of lines in this data. Uncomment the corresponding lines and complete the part on the@Delivery
annotation before running your code to test your implementation of Cassandra data registration and ingestion.
g) Ingesting data from PostgreSQL
The goal of this seventh exercise is to register and ingest data from a table in PostgreSQL.
To work on this exercise, we need to host a local PostgreSQL server. To do that, we prepared a docker-compose.yml
in the exercise-environment/
folder. Make sure you have Docker installed. In a terminal, change your directory to exercise-environment/
and execute docker-compose up
. It will create a local PostgreSQL server at http://localhost:5432
. It will also create a database postgres
and a table products
, and populate it with some data.
You will also need the PostgreSQL JDBC driver. As specified in the documentation, you must provide a JDBC driver when using JDBC storage type. o provide the PostgreSQL JDBC driver, head to https://jdbc.postgresql.org/download.html, download the driver, and make the JDBC library jar available to the project. If you are using IntelliJ IDEA, right click on the jar and click on Add as Library
.
Make sure you launch the Docker containers before starting this exercise.
We are looking to read the products
table from PostgreSQL, located at the postgres
database.
- Complete the configuration object
jdbcDBData
insrc/main/resources/exercise/extract/extract.conf
. - In
App.scala
, register aConnector
with this data. Remember that the endpoint should behttp://localhost:5432
. - You may create your own
Factory
and implement theread()
function to verify if you can ingest the data. If you are not sure how to do that yet, we already added aFactory
to aStage
, which is added to thePipeline
. ThisCheckExtractFactory
will ingest aConnector
namedjdbcDataConnector
. It will read it intojdbcData
in theread()
method, and verify the number of lines in this data. Uncomment the corresponding lines and complete the part on the@Delivery
annotation before running your code to test your implementation of PostgreSQL data registration and ingestion.
h) Ingesting a local Delta table
The goal of this eighth exercise is to register and ingest a local Delta table.
We are looking to read the Delta table located at src/main/resources/exercise/extract/delta-table
. This table contains two versions, and we will read those two versions.
- Complete the two configuration objects
deltaDataVersionZero
anddeltaDataVersionOne
insrc/main/resources/exercise/extract/extract.conf
. - In
App.scala
, register oneConnector
with version zero data and one with version one data. - You may create your own
Factory
and implement theread()
function to verify if you can ingest the data. If you are not sure how to do that yet, we already added aFactory
to aStage
, which is added to thePipeline
. ThisCheckExtractFactory
will ingest aConnector
nameddeltaDataVersionZeroConnector
and aConnector
nameddeltaDataVersionOneConnector
. It will read it intodeltaDataVersionZero
anddeltaDataVersionOne
respectively in theread()
method, and verify the number of lines in this data. Uncomment the corresponding lines and complete the part on the@Delivery
annotation before running your code to test your implementation of local Delta table registration and ingestion.
To challenge yourself, try to replace the different Connector
with SparkRepository
. Make use of what you have learned in the lesson!
Lesson
Transformations in SETL
are the easiest part to learn. There is nothing new if you are used to write ETL jobs with Spark
. This is where you will transfer the code you write with Spark
into SETL
.
After seeing what the read()
function in a Factory
looks like, let's have a look at the process()
function that is executed right after.
class ProcessFactory extends Factory[DataFrame] with HasSparkSession {
@Delivery(id = "testObject")
val testObjectConnector: Connector = Connector.empty
var testObject: DataFrame = spark.emptyDataFrame
var result: DataFrame = spark.emptyDataFrame
override def read(): ProcessFactory.this.type = {
testObject = testObjectConnector.read()
this
}
override def process(): ProcessFactory.this.type = {
val testObjectDate = testObject.withColumn("date", lit("2020-11-20"))
result = testObjectDate
.withColumnRenamed("value1", "name")
.withColumnRenamed("value2", "grade")
this
}
override def write(): ProcessFactory.this.type = this
override def get(): DataFrame = spark.emptyDataFrame
}
You should understand the first part of the code with the ingestion thanks to the @Delivery
and the read()
function. Here is declared a var result
in which will be stored the result of the data transformations. It is declared globally so that it can be accessed later in the write()
and get()
functions. The data transformations are what is inside the process()
function, and you must surely know what they do.
As it is previously said, there is nothing new to learn here: you just write your Spark
functions to transform your data, and this is unrelated to SETL
.
You might not learn anything new for SETL
for data transformations in itself, but SETL
helps you to structure them. We will now take a look about SETL Transformer
. You already know about Factory
. A Factory
can contain multiple Transformer
. A Transformer
is a piece of highly reusable code that represents one data transformation. Let's look at how it works.
class ProcessFactoryWithTransformer extends Factory[DataFrame] with HasSparkSession {
@Delivery(id = "testObject")
val testObjectConnector: Connector = Connector.empty
var testObject: DataFrame = spark.emptyDataFrame
var result: DataFrame = spark.emptyDataFrame
override def read(): ProcessFactoryWithTransformer.this.type = {
testObject = testObjectConnector.read()
this
}
override def process(): ProcessFactoryWithTransformer.this.type = {
val testObjectDate = new DateTransformer(testObject).transform().transformed
result = new RenameTransformer(testObjectDate).transform().transformed
this
}
override def write(): ProcessFactoryWithTransformer.this.type = this
override def get(): DataFrame = spark.emptyDataFrame
}
If you compare this Factory
with the previous ProcessFactory
in the last section, it does the same job. However, the workflow is more structured. You can see that in the process()
function, there is no Spark
functions for data transformations. Instead, we used Transformer
. The data transformation will be done in Transformer
. This allows to make to code highly reusable and add a lot more structure to it. In the previous ProcessFactory
, we can divide the job by two: the first process is adding a new column, and the second process is renaming the column.
First, we are calling the first Transformer
by passing our input DataFrame
. The transform()
method is then called, and the result is retrieved with the transformed
getter. The second data transformation is done with RenameTransformer
, and the result is assigned to our result
variable. Let's have a look at each Transformer
.
A Transformer
has two core methods:
transform()
which is where the data transformation should happen.transformed
which is a getter to retrieve the result.
Typically, we will also declare a variable in which we will assign the result of the transformation. In this case, transformedData
. The transformed
getter returns this variable. This is why in ProcessingFactoryWithTransformer
, the transform()
method is called, before calling the transformed
getter.
DateTransformer.scala
:
class DateTransformer(testObject: DataFrame) extends Transformer[DataFrame] with HasSparkSession {
private[this] var transformedData: DataFrame = spark.emptyDataFrame
override def transformed: DataFrame = transformedData
override def transform(): DateTransformer.this.type = {
transformedData = testObject
.withColumn("date", lit("2020-11-20"))
this
}
}
DateTransformer
represents the first data transformation that is done in the ProcessFactory
in the previous section: adding a new column.
RenameTransformer
:
class RenameTransformer(testObjectDate: DataFrame) extends Transformer[DataFrame] with HasSparkSession {
private[this] var transformedData: DataFrame = spark.emptyDataFrame
override def transformed: DataFrame = transformedData
override def transform(): RenameTransformer.this.type = {
transformedData = testObjectDate
.withColumnRenamed("value1", "name")
.withColumnRenamed("value2", "grade")
this
}
}
RenameTransformer
represents the second data transformation that is done in the ProcessFactory
in the previous section: renaming the columns.
The classic data transformations happen in the process()
function of your Factory
. This is how you write your data transformations in SETL
, given that you already did what is needed in the Extract part. You have two solutions:
- Write all the data transformations with
Spark
functions in theprocess()
function of yourFactory
. Remember to set a global variable to store the result so that it can be used in the next functions of theFactory
. - Organize your workflow with
Transformer
. This is best for code reusability, readability, understanding and structuring. To use aTransformer
, remember that you need to pass parameters, usually theDataFrame
or theDataset
you want to transform, eventually some parameters. You need to add thetransform()
function which is where the coreSpark
functions should be called, and thetransformed
getter to retrieve the result.
Exercises
In this exercise, we are going to practice about how to structure a SETL
project for transformation processes.
An App.scala is already prepared. We created a SETL entry point and use a configuration file located at src/main/resources/exercise/transform/transform.conf
. In this file, configuration objects are already created. We will be working with pokeGrades.csv
and digiGrades.csv
, both files located at src/main/resources/
. We are looking to looking to compute the mean score of each "poke" and then of each "digi".
- We are going to extract the data:
pokeGrades.csv
anddigiGrades.csv
, fromsrc/main/resources/
. InApp.scala
, register these two asSparkRepository
. - Next step is to complete the
Factory
. Head over toMeanGradeFactory
to complete the part about data ingestion. - You should know a
Factory
has 4 core mandatory functions. Leave thewrite()
andget()
functions as they are. Use theread()
function if necessary. Keep in mind about whatautoLoad
is. - In the
process()
function, we are going to compute the mean grade forpokeGrades
anddigiGrades
data. To do that, we are going to create aTransformer
, namedMeanGradeTransformer
. ThisTransformer
takes a parameter of typeDataset[Grade]
and outputs an object of typeDataFrame
. There should be two columns: one columnname
and one columngrade
for the mean grade. - In the
process()
function, we can now call theTransformer
on each data, apply transformations and store the result in variables. - Lastly, we can merge the two results and verify the final
DataFrame
by printing it.
Follow the instructions in the code to achieve this exercise. If you'd like to challenge yourself, try to write a complete Pipeline
by yourself, without the help of the prepared code files. For example, you can try to find the top-3 scores of each "poke" and each "digi".
Lesson
The Load processes with SETL correspond to two key ideas: writing the output, or passing the output. Passing the output allows to pass the result of a Factory
to another Factory
, for example. The second Factory
is then using the result of a previous Factory
as an input.
In order to write data, you need to register a Connector
or a SparkRepository
. As you probably already know, if you want to write a DataFrame
, register a Connector
. If you want to write a Dataset
, register a SparkRepository
. Do not forget that you must create a configuration item in the configuration file. There, you can specify the path of your output.
App.scala
:
val setl0: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
setl0
.setConnector("testObjectRepository", deliveryId = "testObject")
.setConnector("testObjectWriteRepository", deliveryId = "testObjectWrite")
setl0
.newPipeline()
.setInput[String]("2020-11-23", deliveryId = "date")
.addStage[WriteFactory]()
local.conf
:
testObjectRepository {
storage = "CSV"
path = "src/main/resources/test_objects.csv"
inferSchema = "true"
delimiter = ","
header = "true"
saveMode = "Overwrite"
}
testObjectWriteRepository {
storage = "EXCEL"
path = "src/main/resources/test_objects_write.xlsx"
useHeader = "true"
saveMode = "Overwrite"
}
WriteFactory.scala
:
class WriteFactory extends Factory[DataFrame] with HasSparkSession {
@Delivery(id = "date")
val date: String = ""
@Delivery(id = "testObject")
val testObjectConnector: Connector = Connector.empty
@Delivery(id = "testObjectWrite")
val testObjectWriteConnector: Connector = Connector.empty
var testObject: DataFrame = spark.emptyDataFrame
var result: DataFrame = spark.emptyDataFrame
override def read(): WriteFactory.this.type = {
testObject = testObjectConnector.read()
this
}
override def process(): WriteFactory.this.type = {
result = testObject
.withColumn("date", lit(date))
this
}
override def write(): WriteFactory.this.type = {
testObjectWriteConnector.write(result.coalesce(1))
this
}
override def get(): DataFrame = spark.emptyDataFrame
}
Note that in the Deliveries
, there is one with the ID testObjectWrite
. It has been previously registered in the Pipeline
. We are retrieving it, but using it as a way to write our output.
The write()
function is the third executed function in a Factory
, after read()
and process()
. The idea is to call the write()
method of a Connector
or a SparkRepository
, and pass the result DataFrame
or Dataset
as argument. SETL
will automatically read the configuration item; storage type, path and options, and write the result there.
The advantage of using SETL
for the Load process is that it makes it easier for you because you can change everything you need in your configuration item. If you ever want to change the data storage, you only need to modify the value of the corresponding key. Same for the path, or other options.
In summary, to write an output in SETL
, you need to:
- Create a configuration item in your configuration file
- Register the corresponding
Connector
orSparkRepository
- Ingest it in your
Factory
with the@Delivery
annotation - Use it in the
write()
function to write your output
As SETL is organized with Factory
, it is possible to pass the result of a Factory
to another. The result of a Factory
can be of any type, it generally is a DataFrame
or a Dataset
.
We are now going to ingest data and make some transformations in FirstFactory
, then use the result in SecondFactory
. You can see in the Pipeline
that FirstFactory
is before SecondFactory
.
App.scala
:
val setl1: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
setl1.setConnector("testObjectRepository", deliveryId = "testObject")
setl1
.newPipeline()
.setInput[String]("2020-12-18", deliveryId = "date")
.addStage[FirstFactory]()
.addStage[SecondFactory]()
.run()
FirstFactory.scala
:
class FirstFactory extends Factory[DataFrame] with HasSparkSession {
@Delivery(id = "date")
val date: String = ""
@Delivery(id = "testObject")
val testObjectConnector: Connector = Connector.empty
var testObject: DataFrame = spark.emptyDataFrame
var result: DataFrame = spark.emptyDataFrame
override def read(): FirstFactory.this.type = {
testObject = testObjectConnector.read()
this
}
override def process(): FirstFactory.this.type = {
result = testObject
.withColumn("date", lit(date))
this
}
override def write(): FirstFactory.this.type = this
override def get(): DataFrame = result
}
This FirstFactory
is similar to the previous WriteFactory
. Instead of writing the result, we are going to pass it in the get()
function. The get()
function is the fourth executed function in a Factory
, after read()
, process()
and write()
. In the above example, the output is simply returned.
Remember that the type of the output is defined at the start of the Factory
, when specifying the parent class. In this case, the output is a DataFrame
. This output is then injected in the Pipeline
as a Deliverable
. The other Factory
can then ingest it.
SecondFactory.scala
:
class SecondFactory extends Factory[DataFrame] with HasSparkSession {
import spark.implicits._
@Delivery(producer = classOf[FirstFactory])
val firstFactoryResult: DataFrame = spark.emptyDataFrame
var secondResult: DataFrame = spark.emptyDataFrame
override def read(): SecondFactory.this.type = this
override def process(): SecondFactory.this.type = {
secondResult = firstFactoryResult
.withColumn("secondDate", $"date")
secondResult.show(false)
this
}
override def write(): SecondFactory.this.type = this
override def get(): DataFrame = secondResult
}
In this SecondFactory
, we want to retrieve the output produced by FirstFactory
. Noticed that we used the producer
argument in the @Delivery
annotation. This is how SETL Pipeline
retrieves the output of a Factory
: the result of a Factory
is injected into the Pipeline
as a Deliverable
, which can be ingested with the @Delivery
annotation.
In the previous Pipeline
, we retrieved the result of FirstFactory
to use it in SecondFactory
. The result of FirstFactory
was a DataFrame
, and we needed to retrieve it in SecondFactory
by using the producer
argument in the @Delivery
annotation. In the following Pipeline
, we are going to produce a Dataset
from FirstFactoryBis
and use it in SecondFactoryBis
.
App.scala
:
val setl2: Setl = Setl.builder()
.withDefaultConfigLoader()
.getOrCreate()
setl2
.setConnector("testObjectRepository", deliveryId = "testObject")
setl2
.newPipeline()
.setInput[String]("2020-12-18", deliveryId = "date")
.addStage[FirstFactoryBis]()
.addStage[SecondFactoryBis]()
.run()
FirstFactoryBis.scala
:
class FirstFactoryBis extends Factory[Dataset[TestObject]] with HasSparkSession {
import spark.implicits._
@Delivery(id = "testObject")
val testObjectConnector: Connector = Connector.empty
var testObject: DataFrame = spark.emptyDataFrame
var result: Dataset[TestObject] = spark.emptyDataset[TestObject]
override def read(): FirstFactoryBis.this.type = {
testObject = testObjectConnector.read()
this
}
override def process(): FirstFactoryBis.this.type = {
result = testObject
.withColumn("value1", concat($"value1", lit("42")))
.as[TestObject]
this
}
override def write(): FirstFactoryBis.this.type = this
override def get(): Dataset[TestObject] = result
}
Noticed that the FirstFactoryBis
is a child class of Factory[Dataset[TestObject]]
, meaning that the output of it must be a Dataset[TestObject]
. result
is a variable of type Dataset[TestObject]
, and the get()
function returns it. This Dataset
is injected into the Pipeline
.
SecondFactoryBis.scala
:
class SecondFactoryBis extends Factory[DataFrame] with HasSparkSession {
import spark.implicits._
@Delivery(id = "date")
val date: String = ""
@Delivery
val firstFactoryBisResult: Dataset[TestObject] = spark.emptyDataset
var secondResult: DataFrame = spark.emptyDataFrame
override def read(): SecondFactoryBis.this.type = this
override def process(): SecondFactoryBis.this.type = {
secondResult = firstFactoryBisResult
.withColumn("secondDate", lit("date"))
secondResult.show(false)
this
}
override def write(): SecondFactoryBis.this.type = this
override def get(): DataFrame = secondResult
}
The result of FirstFactoryBis
is a Dataset[TestObject]
. We used the @Delivery
annotation to retrieve it. Compared to SecondFactory
, we did not need to use the producer
in the @Delivery
annotation. This is because the Pipeline
can infer on the data, and the only Dataset[TestObject]
that it found is produced by FirstFactoryBis
. So there is no need to specify it. This is the same mechanism that explains why a Connector
needs a deliveryId
to be retrieved, and not a SparkRepository[T]
if there is only one of type T that is registered.
In summary, to use the output of a Factory
in another one:
- Check the type of the output.
- Make sure that the
Stage
of the firstFactory
is before theStage
of the secondFactory
. - The second
Factory
must be a child class ofFactory[T]
whereT
is the type of the output of the firstFactory
. - Retrieve the output of the first
Factory
by using the@Delivery
annotation. If it is aDataFrame
, also use theproducer
argument.
Note: Although it is possible to retrieve the output of a Factory
in another one, most of the time, we would prefer to save the output of the first Factory
in a Connector
or SparkRepository
, and re-use the same Connector
or SparkRepository
in the second Factory
to retrieve the output.
Exercises
In this exercise, we are going to practice about how to the Load processes with SETL, that is, how to pass the result of a Factory
to another Factory
, and how to write the result of a Factory
.
An App.scala is already prepared. We created a SETL entry point and use a configuration file located at src/main/resources/exercise/load/load.conf
. In this file, configuration objects are already created. We will be working with pokeGrades.csv
and digiGrades.csv
, both files located at src/main/resources/
. We are going to find out how many exams there are per year. To do that, a first Factory
will "compute" all the dates from the data, and pass this result to the second Factory
. This second Factory
ingest the result, extract the year of each date and count the number of exams per year.
- We are going to extract the data:
pokeGrades.csv
anddigiGrades.csv
, fromsrc/main/resources/
. InApp.scala
, register these two asSparkRepository
. Also register aConnector
where to write your output. Remind that a configuration file have been provided. - Next step is to complete the two
Factory
. Head over toGetExamsDateFactory
first. - Complete the part on the data ingestion by setting the
Delivery
. InGetExamsDateFactory
, the goal is to get the different exam dates ofpokeGrades.csv
anddigiGrades.csv
. Use theread()
function if necessary. In theprocess()
function, concatenate both the "poke" and the "digi" data. Then, only keep thedate
column, as it is the only relevant column in this exercise. Leave thewrite()
function as is, and complete theget()
function by returning the result of your process. - Now, head over to
ExamStatsFactory
. ThisFactory
will ingest the result ofGetExamsDateFactory
. As usual, the first step is to add theDelivery
. Remember that to write an output, you also have to add aConnector
orSparkRepository
for the output, as it can define the storage type and the path. Also remember aboutproducer
. Go over to the lesson if you forgot about it. - Use the
read()
function if necessary. In theprocess()
function, we are looking to compute the number of exams per year. Our input data is aDataFrame
of 1 single columndate
. Replace thedate
column by extracting the year only: it is the first 4 characters of thedate
column. Then, count the number ofdate
. Use thegroupBy()
,agg()
andcount()
functions. In our data, each year is duplicated 10 times. Indeed, for each exam, there are always 10 "poke" or 10 "digi". As a consequence, we need to divide the count by 10. - Use the output Delivery you declared to save the result output in the
write()
function. Complete theget()
function to return the result, even though it is not used. If you run the code, you should have the number of exams per year, located insrc/main/resources/examsStats/
.
Follow the instructions in the code to achieve this exercise. If you'd like to challenge yourself, try to write a complete Pipeline
by yourself, without the help of the prepared code files. For example, you can try to save in a file the list of "poke" and "digi".
Note that these exercises are simply used to practise SETL
and their structure may very well not be optimized for your production workflow. These are just simple illustrations of what you can usually do with the framework.
Lesson
The difference between multiple development environment consists in the location of files/data we want to read and the location of files/data we want to write.
In order to see how SETL
handles between local and production environment, we are going to set two Connector
: one for local
and one for prod
.
App.scala
:
val setl: Setl = Setl.builder()
.withDefaultConfigLoader("storage.conf")
.setSparkMaster("local[*]")
.getOrCreate()
setl
.setConnector("pokeGradesRepository", deliveryId = "pokeGradesRepository")
.setConnector("pokeGradesRepositoryProd", deliveryId = "pokeGradesRepositoryProd")
setl
.newPipeline()
.addStage[ProductionFactory]()
.run()
storage.conf
:
setl.config.spark {
spark.hadoop.fs.s3a.access.key = "dummyaccess" // Used to connect to AWS S3 prod environment
spark.hadoop.fs.s3a.secret.key = "dummysecret" // Used to connect to AWS S3 prod environment
spark.driver.bindAddress = "127.0.0.1"
}
pokeGradesRepository {
storage = "CSV"
path = "src/main/resources/pokeGrades.csv"
inferSchema = "true"
delimiter = ","
header = "true"
saveMode = "Overwrite"
}
pokeGradesRepositoryProd {
storage = "CSV"
path = "s3a://setl-examples/pokeGrades.csv"
inferSchema = "true"
delimiter = ","
header = "true"
saveMode = "Overwrite"
}
The difference between these two repositories is the path. The first object uses a local path, and the second uses a AWS S3 path, considered as a production environment. They are exactly the same file. When ingesting these files into a Factory
, we can retrieve the same DataFrame
. Thus, in SETL
, it is possible to switch your development environment without looking at the code. You just need to make adjustments to the path
of your configuration objects.
Most of the time, you will have a lot of configuration objects for both input and output. Changing the path for all of these objects may not be efficient. Instead of having two configuration objects (pokeGradesRepository
and pokeGradesRepositoryProd
) like in the last section, you can simply declare one configuration object, and make it reusable.
local.conf
:
setl.config.spark {
some.config.option = "some-value"
}
root {
path = "src/main/resources"
}
include "smartConf.conf" // /!\ important
prod.conf
setl.config.spark {
spark.hadoop.fs.s3a.endpoint = "http://localhost:9090"
spark.hadoop.fs.s3a.access.key = "dummyaccess"
spark.hadoop.fs.s3a.secret.key = "dummysecret"
spark.hadoop.fs.s3a.path.style.access = "true"
spark.driver.bindAddress = "127.0.0.1"
}
root {
path = "s3a://setl-examples"
}
include "smartConf.conf" // /!\ important
smartConf.conf
:
smartPokeGradesRepository {
storage = "CSV"
path = ${root.path}"/pokeGrades.csv"
inferSchema = "true"
delimiter = ","
header = "true"
saveMode = "Overwrite"
}
If you look at smartConf.conf
, notice the path
key: it uses the root.path
key. smartConf.conf
is included in both in local.conf
and prod.conf
, which are the configuration files to be loaded. In local.conf
, root.path
is set to a value corresponding to a local path, and in prod.conf
, it is set to a value corresponding to a prod path, which is a S3 path in this example. Let's now see how to switch development environment.
Note that in the Setl
object below, we used the withDefaultConfigLoader()
method. This means that application.conf
will be loaded, and it retrieves the app.environment
. app.environment
is a VM option. By default, it is set to local
in the pom.xml
file. Depending on the app.environment
, it will load the corresponding configuration file, i.e <app.environment>.conf
.
App.scala
:
val smartSetl: Setl = Setl.builder()
.withDefaultConfigLoader()
.setSparkMaster("local[*]")
.getOrCreate()
smartSetl.setConnector("smartPokeGradesRepository", deliveryId = "smartPokeGradesRepository")
println(smartSetl.getConnector[Connector]("smartPokeGradesRepository").asInstanceOf[FileConnector].options.getPath)
Now, to see how easy it is to switch development environment with SETL
, change the VM option -Dapp.environment
by setting it to local
or prod
. If you run App.scala
, you will see that the path will change according to the environment:
src/main/resources/pokeGrades.csv
if-Dapp.environment=local
s3a://setl-examples/pokeGrades.csv
if-Dapp.environment=prod
In summary, you can change your development environment by changing to path of your configuration objects. However, this can be obnoxious especially if you have a lot of input/output storage object. By writing a general configuration file, you simply need adjust the VM option to switch your development environment, and get the corresponding paths of your data.
Remind that SETL
aims at simplifying the Extract and Load processes so that a Data Scientist can focus on his core job: data transformations. On top of that, it gives structure and allows more modularization of your code!