My objective would be to quickly set up a data pipeline and move data from KSQL/PostgresSQL to PostgreSQL.
We would use the Debezium KSQL connector for reading KSQL Database logs. Debezium would put the events in Kafka.
We would use the JDBC Connector to read the events from the Kafka Topic and put in PostgreSQL.
We would use docker-compose to set up the following:
- KSQLDB-Server: The source DB.
- KSQLDB-CLI
- PostgreSQL: The destination DB
- Kafka-Connect(Debezium and JDBC Connector): Debezium for reading MySQL Logs and JDBC Connector for pushing the change to PostgreSQL.
- Kafka
- Schema Registry
- Zookeeper
To get a local copy up and running follow these simple example steps.
git clone https://github.com/akbari4yaseen/ksqldb-postgres-kafka-connect.git
Build the image
To build the Kafka-connect, we will add the JDBC connector and PostgreSQL JDBC Driver to our Kafka-Connect image.
cd debezium-jdbc
docker-compose build .
At the root of the project folder run:
docker-compose up -d
PostgresSQL
CREATE DATABASE kafka_test;
Debezium Connector
Execute the following curl command to set up the Debezium connector for reading the logs from PostgresSQL
curl http://localhost:8083/connectors -i -X POST -H 'Content-Type: application/json' -d '{"name": "postgres-source",
"config": {"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"tasks.max":"1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "kafka_test",
"database.server.name": "dbserver1",
"database.whitelist": "kafka_test",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.kafka_test",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
}
}'
JDBC connector
Execute the following curl command to set up the JDBC connector for writing the events from "kafka_test" KSQLDB to PostgreSQL.
We have kept "auto.create": "true" so that it automatically creates tables in PostgreSQL.
curl http://localhost:8083/connectors -i -X POST -H 'Content-Type: application/json' -d '{"name": "lead-sink",
"config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"10",
"topics": "lead_actors",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connection.url": "jdbc:postgresql://postgres:5432/kafka_test?user=postgres&password=postgres",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.fields": "title",
"pk.mode": "record_key"
}
}'
You can start editing the connections by modifying postgres-sink.json | postgres-source.json
.
👤 Yaseen Akbari
- GitHub: @githubhandle
- Twitter: @twitterhandle
- LinkedIn: LinkedIn
Contributions, issues, and feature requests are welcome!
Feel free to check the issues page.
Give a ⭐️ if you like this project!
Create "Table" and "Streams" in KSQL, see them getting reflected in PostgreSQL kafKa_test.