Kafka-Connect(Debezium) ile Change Data Capture(CDC) Projesi
Merhabalar,
Bu yazımda PostgreSQL veri tabanında gerçekleşen değişiklikleri Debezium ile gerçek zamanlı olarak yakalayacağız. Yakaladığımız değişiklikleri Kafka’ya göndereceğiz. Spark Streaming ile Kafka’dan bu değişiklikleri okuyacağız ve bir object storage’a yazacağız.
Hep beraber hem projeyi baştan sona ele alalım hem de kullandığım tool’lar nedir, ne yapar şeklinde ilerleyelim.
Öncelikle CDC nedir?
- Veri Manipülasyonu komutlarından INSERT, UPDATE ve DELETE komutları kullanılarak yapılan değişikliklerin ilk ve son hallerini yakalamaya denir.
- Veritabanları bu yakalanan kayıtlar için her işlemi/transaction kayıt altına alır.
- Bu kayıt dosyaları(transaction log) her bir veritabanında farklı isme sahiptir.
- Örneğin bu proje için kullandığım PostgreSQL veri tabanında wal(write-ahead log) diye geçer.
Bu kayıt dosyaları(transaction log) üzerinden işlem yapacağımız zaman burada devreye Kafka-Connect ve Debezium giriyor.
Kafka Nedir?
Kafka, akan verilerin(streaming data) çeşitli kaynaklardan toplanıp başka kaynaklara hızlı ve sorunsuz bir şekilde aktarılmasını sağlayan açık kaynaklı, dağıtık bir veri platformudur.
Kafka ile ilgili bilinmesi gereken temel kavramları linkteki yazıdan daha detaylı ulaşabilirsiniz. Projenin aşamalarını aktaracağım için kullandığım servislerin detaylarını bu yazıda es geçiyorum.
Örneğin, bu projede PostgreSQL’den gelen verilerin Kafka aracılığıyla Spark tarafından okunabilmesi sağlanır. Peki Kafka PostregSQL’e nasıl bağlanır? Burada imdadımıza Kafka-Connect yetişiyor :)
Kafka-Connect Nedir?
Kafka ve diğer veri sistemleri arasındaki veri akışının(streaming data) sağlanması için bir araçtır. Bağlantıların hızlı bir şekilde tanımlanmasını ve geniş ölçekteki verilerin Kafka içine ve dışına taşınmasını kolaylaştırır. Kısacası PostgreSQL’den gelen verinin Kafka-Connect ile Kafka’ya akmasını sağlar.
Peki PostgreSQL’deki değişiklikler nasıl yakalanıyor? İşte burada devreye Debezium giriyor.
Debezium Nedir?
Veri tabanındaki değişikliklerin yakalanmasını sağlayan bir Kafka-Connect kütüphanesidir. Yakalanan değişikliklerin ilk ve son hallerin yanı sıra işlemlerin gerçekleşme sırasını da korur. Dikkat edilmesi gereken en önemli noktalardan biri veri tabanıyla Kafka arasındaki bağlantıyı sağlayan Kafka-Connect’tin kendisidir. Bağlandığımız veri tabanındaki değişiklikleri yakalamayı sağlayan da Kafka-Connect kütüphanesi olan Debezium’dur.
Toparlamak gerekirse;
- PostgreSQL’e Kafka-Connect ile bağlanıp, değişiklik yaşanan verileri Debezium ile yakalayıp, Kafka ile stream ediyoruz.
Hadi şimdi projeyi uygulama zamanı!
İlk adım olarak projede yer alan bütün servislerin kurulumunu yapalım. Bunun için bize izole bir ortam sağlayacak olan Docker servisini aktif hale getirelim ve docker-compose.yaml dosyamızı oluşturalım.
1. Adım: docker-compose.yaml dosyasının oluşturulması.
- Container içinde olması gereken servisler:
- Zookeeper
- Kafka
- PostgreSQL
- Kafka-Connect
- Spark
- MinIO
Docker-compose dosyanının içeriği:
Docker dosyasının ilgili kısımlarını Debezium’un github adresindeki tutorial’lardan referans alarak oluşturdum. Aşağıdaki bağlantıdan tutorial’lara erişebilirsiniz.
Bu bir docker dosyası olduğu için aslında izole bir ortamda çalışıyor. Dolayısıyla dosyayı olduğu gibi docker çalışan herhangi bir yerde çalıştırabilirsiniz.
Docker-compose.yaml dosya içeriğini hep beraber inceleyelim.
İlk etapta Zookeeper servisi bulunuyor. Debezium ile uyumlu çalışacak şekilde ayarlanmış Zookeeper versiyonunu image olarak kullanıyoruz.
Kafka servisinde de aynı şekilde Debezium’la uyumlu Kafka’yı kullanacak. Port olarak 9092'de çalışacak. Links kısmında Zookeeper var. Yani Zookeeper’la beraber çalışacak demektir.
Environment kısmında ise Zookeeper’a hangi porttan bağlanacağımızı belirtiyoruz.
PostgreSQL veri tabanı için yine aynı şekilde Debezium’a uyumlu halini kullanıyoruz. Port kısmında sağda yani içeride 5432 portunu dinleyecek, dışarıdan ise 5433 portu ile bağlanacak. Environment kısmında ise PostgreSQL için bir kullanıcı ve şifre tanımlıyoruz. En sonda da bir database oluşturuyoruz.
En önemli kısma geldik. Kafka-Connect, yukarıda da belirttiğim gibi Kafka ile PostgreSQL arasında bir köprü görevi görüyor. Debezium da Kafka-Connect’in altında bir kütüphanedir. İmajını belirttik ve port kısmında 8083'te çalışacak. Links kısmında Kafka ve PostgreSQL’e bağlı çalışıyor çünkü her iki servis arasında köprü görevi görüyor.
Environment bölümünde;
- BOOTSTRAP_SERVERS ile Kafka’ya bağlanacağı port bilgisini veriyoruz.
- Kafka-Connect aslında çalışmak için 3 tane topic’e ihtiyaç duyuyor.
- CONFIG_STORAGE_TOPIC: ayarlarla ilgili bir topicdir.
- OFFSET_STORAGE_TOPIC: satır numarasını tutar.
- STATUS_STORAGE_TOPIC: çalışma durumunun bilgisini tutar.
Bu 3 topic olmadan Kafka-Connect çalışmıyor. Bu topic’leri burada tanımlayarak yaratmış oluyoruz.
Spark’ın farklı cluster yöneticileri vardır. Biz burada Spark’ı Standalone cluster modunda kullanıyoruz. Standalone cluster modunda spark-master, spark-worker ve spark-client container’ları bulunuyor.
- Spark-master: Yarn yerine kaynak yöneticisi olarak spark-master kullanıyoruz.
- Spark-worker: İşçi sunuculardır, hesaplamaların yapıldığı ve verilerin depolandığı sunucular olarak düşünebiliriz.
- Spark-Client: Notebook(Jupyter vs) kullanıp Spark kodlarını submit etmek için kullanacağız.
- Volumes: Spark-client dosyalarının kayıt edildiği local adrestir.
MinIO Amazon S3 uyumlu bir object storage’dır. Bu proje de kullanma amacım Spark ile düzenli hale getireceğim veriyi kaydetmek/depolamaktır.
- Environment: MinIO giriş bilgilerini bulunduruyor.
- Volumes: Spark ve MinIO’nun dosyalarının kaydolduğu yerlerdir.
Docker-compose.yaml dosyasının tamamına GitHub repomdan ulaşabilirsiniz.
2. Adım: Docker-compose dosyasının başlatılması ve kontrolü.
3. Adım: Kafka-Connect ile Kafka ve PostgreSQL arasındaki bağlantının kurulması.
Docker servislerimiz çalışıyor. Bizim Kafka ile PostgreSQL arasındaki bağlantıyı sağlamamız lazım. Kafka-Connect ile bu bağlantıyı sağlıyoruz demiştik. Kafka-Connect kütüphanesi olan Debezium’un PostgreSQL’i dinlemesini ve değişiklikleri Kafka’ya aktarılmasını sağlayacağız.
PostgreSQL bağlantısını kurmamız için gereken kod aşağıdadır. Curl sayesinde bu bağlantı isteğini gönderiyoruz.
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d @register-postgres.json
Kodu inceleyecek olursak; Kafka-Connect’in port numarası olan 8083 ile bağlantı kuracağız. En sonda ise register-postgres.json dosyasındaki ayarlara göre PostgreSQL bağlantısını başlatmış oluyoruz.
“HTTP/1.1 201 Created” çıktısı verdi dolayısıyla başarılı bağlantı kurmuş olduk.
Peki register-postgres.json dosyasının içerisinde neler var inceleyelim.
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"table.include.list": "public.customers"
}}
- name: Herhangi bir isim verilebilir ben tutorial’dakinin aynısını verdim.
- config:
- connector.class: Docker container dosyalarının altındaki PostgreSQL ile Debezium bağlantısını sağlayacak jar dosyasının adı.
- database.hostname, database.port, database.user, database.password, database.dbname kısımlarında ise database bilgilerini veriyoruz. Özellikle port kısmında ise 5433 yerine 5432'yi veriyorum çünkü Debezium içerideki PostgreSQL servisinin portunu istiyor. İki servis birbiriyle çakışmasın diye zaten en başta localdeki PostgreSQL servisini durdurmuştuk.
- database.server.name: Bu bizim dinleyeceğimiz topic’in ön adı olarak da kullanılıyor.
- table.include.list: Hangi tabloyu dinlemesi gerektiğini belirttiğimiz kısımdır. Burada public şeması içerisinden customers tablosunu dinleyecek.
Bu bağlantı bilgileri sayesinde PostgreSQL’de istediğimiz bağlantıyı kurmuş olduk. Şimdi diğer bir adıma geçelim. Bu bağlantı sonrasında verilerin Kafka’ya akışını gözlemlemek için adımlarımızı uygulayalım.
4. Adım: Kafka console consumer oluşturma.
Kafka’ya akan veriyi bizler geliştiriciler olarak gözlemlemek için console-consumer oluşturuyoruz. Bu sayede ekranımıza veri anlık olarak akacaktır.
İlk olarak topiclerimizi listeleyelim. Bunun için aşağıdaki kodu kullanıyoruz.
docker-compose exec kafka /kafka/bin/kafka-topics.sh \
--bootstrap-server kafka:9092 \
--list
Kafka-Connect’in ihtiyaç duyduğu 3 topic hali hazırda oluşmuş. Şimdi verileri gözlemleyeceğimiz topic’i yaratalım.
docker-compose exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic dbserver1.public.customers
Topic’imiz eli kulağında verinin gelmesini bekliyor yani verinin akmasını hazır bir şekilde bekliyor.
5. Adım: Data-generator ile PostgreSQL’e veri yazma.
Data-generator, VBO’nun Data Engineering eğitiminde gerçek zamanlı veri işleme örneklerini daha gerçekçi yapmak için özel hazırlanmış bir python script’idir. Statik yapıdaki verileri çeşitli formatlarda gerçek zamanlı verilere dönüştürmeyi sağlar.
Burada hazır bir veri seti olan customers verisini bir nevi stream ediyoruz.
Aşağıdaki linkten data-generator kurulumu yapabilirsiniz.
Öncelikle data-generator ile veriyi yazacağımız için PostgreSQL’in IP Adresi gerekiyor.
docker inspect final_project_2-postgres-1 | grep "IPAddress"
Şimdi data-generatora bağlanıp veriyi hem customers tablosuna kaydedelim ve stream edelim.
Data-generator bağlanma:
Data-generator bağlandıktan sonra PostgreSQL’e veri yazma dosyasını (dataframe_to_postgresql.py) kullanıyorum. Host bilgisi olarak PostgreSQL’in IP adresini veriyoruz. Port adresini 5432 veriyoruz çünkü içerdeki portu dinleyecek. Database giriş bilgilerini veriyoruz. “-i” argümanı ile veri seti adresini belirtiyoruz. Son olarak “-t” argümanı ile de yukarıda json dosyasında belirttiğimiz customers tablosunu bu sayede yaratmış oluyoruz. Aşağıdaki kodu çalıştırdığımız andan itibaren veri akışını başlatmış oluyoruz.
python dataframe_to_postgresql.py \
-hst 172.18.0.3 \
-p 5432 \
-u postgres \
-psw postgres \
-db postgres \
-i ~/datasets/customers.csv \
-t customers
Yukarıdaki görselde verinin data-generator ile aktığını görüyoruz.
Bu esnada PostgreSQL shell’ine bağlanmak lazım. PostgreSQL’in IP adresini portunu ve gerekli giriş bilgilerini belirterek PostgreSQL shell’ine bağlanmış oluyoruz.
docker-compose exec postgres bash \
-c 'psql -p 5432 -h 172.18.0.3 -U $POSTGRES_USER postgres'
Yukarıdaki kodun çıktısında customers adında bir tablomuzun oluştuğunu görüyoruz.
Aşağıda ise Kafka console topic’ine akan verileri json formatında gözlemliyoruz. Görsel içeriğine baktığımızda;
- op: c argümanı ne demek? Yeni bir satır oluşturma yani INSERT işlemi demektir. Data-generator ile veriyi yazarken her satır tek tek Kafka topic’ine de yansıdı. Bu işlem Debezium sayesinde yakalandı.
PostgreSQL shell’inde yapmamız gereken bazı ayarlar var. Değişikliklerin yakalanabilmesi için veri tabanının bazı ayarlarını değiştireceğiz. Öncelikle;
- Tablomuzun REPLICA IDENTITY ayarını FULL yapmalıyız çünkü Debezium’un verinin önceki ve sonraki halini göstermesini istiyorum. Normalde UPDATE işleminde sadece sonraki halini gösterir. Bu ayarla hem öncesini hem de sonrasını gösterecek şekilde ayarlıyoruz.
ALTER TABLE public.customers REPLICA IDENTITY FULL;
- UPDATE ve DELETE işlemlerini göstermesi için tablonun PRIMARY KEY’inin hangi sütun olduğunu belirtmek gerekiyor. Burada customerId yapıyoruz.
ALTER TABLE public.customers ADD CONSTRAINT customers_pk PRIMARY KEY ("customerId");
Bu işlemlerden sonra data-generator işlemini durduralım bizim için yeteri kadar veri yazıldı.
Şimdi UPDATE, DELETE işlemleri yapalım.
DELETE FROM customers WHERE "customerId"=1;
Aşağıdaki görselde DELETE işleminden sonra oluşan Kafka-console consumer’dan aldığımız çıktı var.
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":true,"field":"customerId"},{"type":"string","optional":true,"field":"customerFName"},{"type":"string","optional":true,"field":"customerLName"},{"type":"string","optional":true,"field":"customerEmail"},{"type":"string","optional":true,"field":"customerPassword"},{"type":"string","optional":true,"field":"customerStreet"},{"type":"string","optional":true,"field":"customerCity"},{"type":"string","optional":true,"field":"customerState"},{"type":"int64","optional":true,"field":"customerZipcode"}],"optional":true,"name":"dbserver1.public.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":true,"field":"customerId"},{"type":"string","optional":true,"field":"customerFName"},{"type":"string","optional":true,"field":"customerLName"},{"type":"string","optional":true,"field":"customerEmail"},{"type":"string","optional":true,"field":"customerPassword"},{"type":"string","optional":true,"field":"customerStreet"},{"type":"string","optional":true,"field":"customerCity"},{"type":"string","optional":true,"field":"customerState"},{"type":"int64","optional":true,"field":"customerZipcode"}],"optional":true,"name":"dbserver1.public.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.public.customers.Envelope"},"payload":{"before":{"customerId":1,"customerFName":"Richard","customerLName":"Hernandez","customerEmail":"XXXXXXXXX","customerPassword":"XXXXXXXXX","customerStreet":"6303 Heather Plaza","customerCity":"Brownsville","customerState":"TX","customerZipcode":78521},"after":null,"source":{"version":"1.9.7.Final","connector":"postgresql","name":"dbserver1","ts_ms":1676579068595,"snapshot":"false","db":"postgres","sequence":"[\"37421552\",\"37421840\"]","schema":"public","table":"customers","txId":878,"lsn":37421840,"xmin":null},"op":"d","ts_ms":1676579069011,"transaction":null}}
Bu çıktının daha anlaşılır olması için https://jsoncrack.com/editor aracılığıyla json çıktısının okunaklı haline daha detaylı bakabiliyoruz. Aşağıda gördüğümüz üzere before ve after olmak üzere kısımlara ayrılmıştır. Verinin öncesi ve sonrasını detaylı bir şekilde görmüş oluyoruz. After kısmındaki “op”: “d” ifadesi geçiyor yani bir DELETE işleminin gerçekleştiğini belirtiyor.
Bir diğer işlem olan UPDATE işlemini inceleyelim.
UPDATE customers SET "customerFName"='test' where "customerId"=3;
Aşağıdaki görselde UPDATE işleminden sonra oluşan Kafka-console consumer’dan aldığımız çıktı var.
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int64","optional":false,"field":"customerId"},{"type":"string","optional":true,"field":"customerFName"},{"type":"string","optional":true,"field":"customerLName"},{"type":"string","optional":true,"field":"customerEmail"},{"type":"string","optional":true,"field":"customerPassword"},{"type":"string","optional":true,"field":"customerStreet"},{"type":"string","optional":true,"field":"customerCity"},{"type":"string","optional":true,"field":"customerState"},{"type":"int64","optional":true,"field":"customerZipcode"}],"optional":true,"name":"dbserver1.public.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int64","optional":false,"field":"customerId"},{"type":"string","optional":true,"field":"customerFName"},{"type":"string","optional":true,"field":"customerLName"},{"type":"string","optional":true,"field":"customerEmail"},{"type":"string","optional":true,"field":"customerPassword"},{"type":"string","optional":true,"field":"customerStreet"},{"type":"string","optional":true,"field":"customerCity"},{"type":"string","optional":true,"field":"customerState"},{"type":"int64","optional":true,"field":"customerZipcode"}],"optional":true,"name":"dbserver1.public.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"int64","optional":true,"field":"txId"},{"type":"int64","optional":true,"field":"lsn"},{"type":"int64","optional":true,"field":"xmin"}],"optional":false,"name":"io.debezium.connector.postgresql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.public.customers.Envelope"},"payload":{"before":{"customerId":3,"customerFName":"Ann","customerLName":"Smith","customerEmail":"XXXXXXXXX","customerPassword":"XXXXXXXXX","customerStreet":"3422 Blue Pioneer Bend","customerCity":"Caguas","customerState":"PR","customerZipcode":725},"after":{"customerId":3,"customerFName":"test","customerLName":"Smith","customerEmail":"XXXXXXXXX","customerPassword":"XXXXXXXXX","customerStreet":"3422 Blue Pioneer Bend","customerCity":"Caguas","customerState":"PR","customerZipcode":725},"source":{"version":"1.9.7.Final","connector":"postgresql","name":"dbserver1","ts_ms":1676580907360,"snapshot":"false","db":"postgres","sequence":"[\"37430248\",\"37438688\"]","schema":"public","table":"customers","txId":879,"lsn":37438688,"xmin":null},"op":"u","ts_ms":1676580907439,"transaction":null}}
Aşağıda gördüğümüz üzere before, after ve op olmak üzere kısımlara ayrılmıştır. Verinin öncesi ve sonrasını detaylı bir şekilde görmüş oluyoruz. “op”: “u” ifadesi geçiyor yani bir UPDATE işleminin gerçekleştiğini belirtiyor.
6. Adım: Spark Streaming ve MinIO’ya veriyi yazma.
Son bölüme geldiğimizde ise Spark Streaming sayesinde Kafka’dan değişiklikleri okuyup, json formatındaki veriyi parse edip düzenli/tabular hale getireceğiz. Bunun için öncelikle spark-client’a bağlanıp bir bucket oluşturmamız gerekiyor.
- Spark-client bağlantısı:
docker exec -it spark-client bash
Sırasıyla spark isimli bir kullanıcı oluşturuyoruz. Bu kullanıcıya okuma ve yazma yetkisi veriyoruz. Sonrasında cdc-data adında bir bucket oluşturuyoruz.
mc admin user add mlops_minio <user> <password>
mc admin policy set mlops_minio readwrite user=<user>
mc mb mlops_minio/cdc-data
http://localhost:9001 adresinden MinIO Console erişebiliyoruz ve oluşturduğumuz bucket’ı görelim.
Artık finale geldik. Veriyi karmaşık json yapısından kurtarıp okunaklı bir dataframe haline dönüştüreceğiz. Şimdi jupyter lab’a bağlanıp Spark kodlarımızı yazalım.
jupyter lab --ip 0.0.0.0 --port 8888 --allow-root
Gerekli kütüphaneleri import ediyoruz.
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession, functions as F
import warnings
warnings.filterwarnings('ignore')
MinIO giriş bilgilerini tanımlıyoruz.
accessKeyId=<user>
secretAccessKey=<password>
SparkSession oluşturuyoruz. Master olarak spark-master’ı veriyoruz çünkü Standalone cluster’da çalışırken Yarn veya local diyemeyiz. Spark Streaming uygulaması yaptığımız için config kısmında Spark ile Kafka’nın beraber çalışmasını sağlayan jar dosyasını belirtiyoruz.
spark = SparkSession.builder \
.appName("Spark Example MinIO") \
.master("spark://spark-master:7077") \
.config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1") \
.getOrCreate()
MinIO’ya yazmak için gerekli ayarları Spark’ın içerisine tanımlıyoruz. Dikkat edilmesi gereken bir diğer nokta ise Spark ile MinIO jar dosyasının Dockerfile içerisinde belirtilmesi gerekiyor.
def load_config(spark_context: SparkContext):
spark_context._jsc.hadoopConfiguration().set('fs.s3a.access.key', accessKeyId)
spark_context._jsc.hadoopConfiguration().set('fs.s3a.secret.key', secretAccessKey)
spark_context._jsc.hadoopConfiguration().set('fs.s3a.path.style.access', 'true')
spark_context._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
spark_context._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://minio:9000')
load_config(spark.sparkContext)
Veri okuma kısmında Spark Streaming ile Kafka’ya akan veriyi okuyacağız. Bunun için öncelikle Kafka’nın IP adresine ulaşmamız gerekiyor.
Bu bir streaming uygulaması olduğu için formatımız kafka olacak. Kafka servers adresinde de Kafka’nın IP adresini veriyoruz. dbserver1.public.customers topic’ini dinleyeceğiz. Offsetleri, her çalıştığından offset sıfırdan itibaren okuyor. Yani ilk satırdan itibaren okuyacak şekilde earliest diyerek ayarlıyoruz. Multiline True diyerek de json formatındaki verimizde bir satırda birden fazla kayıt olacağından dolayı bu option’ı ekledik.
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "172.18.0.7:9092") \
.option("subscribe", "dbserver1.public.customers") \
.option("startingOffsets", "earliest") \
.option("multiline","true") \
.load()
Kafka-console consumer’dan bir UPDATE işlemini json şeması olarak önce bir görelim. Buna göre veriyi bir şemaya oturtacağız.
Veri, payload ve schema olmak üzere iki ana dala ayrılıyor. Şimdi bu iki kolonu veriden yakalayacağız. Aşağıdaki görsel üzerinden bunu detaylı görebiliyoruz.
df_schema ile Kafka console’dan okuduğumuz veriyi iki ana kolon altında şemaya giydiriyoruz. Her ikisini de string türünde tanımlıyoruz. Birincisi schema, ikincisi payload’dur. Bize lazım olan da payload’dur.
df_schema = StructType([
StructField('schema', StringType(), True),
StructField('payload', StringType(), True)
])
Payload kısmını key olarak düşünebiliriz. Python’daki sözlük yapısı gibi before ve after kısımlarını ise value olarak kabul edelim. Dolayısıyla iç içe sözlüklerden oluşan veriyi sırasıyla key-value şeklinde okuyup düzenli hale getireceğiz.
Aşağıda yaptığımız işlem ise Kafka’dan gelen value kısmını string’e dönüştürüp serialize ediyoruz yani rakamlardan harflere dönüştürüyoruz. Buradaki dönüştürme işlemi Kafka’nın aslında veriyi bitler/0–1 şeklinde tutmasından kaynaklanıyor. Daha sonrasında stringe dönüştürdüğümüz verinin ismini de json olarak veriyoruz. Yukarıda tanımladığım df_schema’yı giydiriyoruz. En sonda da veri içerisinden payload kısmını seçerek ilerliyoruz.
df2 = df.selectExpr("cast (value as string) as json") \
.select(F.from_json("json", schema=df_schema) \
.alias("data")).select("data.payload")
Aşağıdaki görsel de df2'nin payload kısmının düzensiz ve şemasız hali gözükmektedir. Biz buradaki json yapısını aslında adım adım düzenliyoruz.
Verinin şemasını yazdırdığımızda sadece payload’u görüyoruz. İstediğimize bir adım daha yaklaştık.
Şimdi payload kısmını daha yakından inceleyelim ve bu seferde burası için şema oluşturalım. Bu kısmı value olarak düşünüyoruz. Burada da tekrardan iç içe sözlük yapısı karşımıza çıkıyor.
Payload içerisinde before, after, op ve ts_ms alanlarına odaklanacağız. Before verinin önceki hali, after verinin sonraki hali, op yapılan işlemin türü örneğin burada UPDATE, ts_ms ise işlemin kaçıncı saniyede yazıldığının timestamp türünden ifadesidir.
Şema kısmına gelecek olursak;
before ve after kısımları aynı tipteler yani iç içe sözlük yapısına sahipler bu yüzden tipleri MapType olacak. İçerisindeki değerleri de string olarak tanımlıyoruz. op ve ts_ms zaten string olacak.
message_schema = StructType([
StructField('before', MapType(StringType(), StringType(), True), True),
StructField('after', MapType(StringType(), StringType(), True), True),
StructField('op', StringType(), True),
StructField('ts_ms', StringType(), True)
])
Veri içerisinden bu şemaya uygun şekilde kolonlarımızı elde edelim. Before ve after’ın içerisinde birden fazla kolon olduğu için bunu bir döngü yardımıyla halledeceğiz.
Her ikisindeki kolonların isimlerini yazıyoruz. Bu sayede aşağıdaki listeler içerisindeki değerleri getirebileceğiz.
after_fields = [
"customerId", "customerFName", "customerLName", "customerEmail",
"customerPassword", "customerStreet", "customerCity", "customerState", "customerZipcode"]
before_fields = [
"customerId", "customerFName", "customerLName", "customerEmail",
"customerPassword", "customerStreet", "customerCity", "customerState", "customerZipcode"]
df2 içerisindeki payload’dan value kısmını from json ile çekeceğiz.
Yukarıda tanımladığım message_schema ile de payload kısmının value’larını şemaya giydiriyoruz.
payload.before ve payload.after için comprehension yapısını kullanarak yukarıda tanımladığımız after_fields ve before_fields içindeki kolonları getirmiş olacağız. op ve ts_ms kolonlarını da şeçerek bütün value’ları getirmiş oluyoruz.
df_final = df2 \
.withColumn("payload",F.from_json(F.col("payload"), message_schema))\
.select(*[F.col("payload.before").getItem(f) \
.alias('payload.before.'+f) for f in before_fields], \
*[F.col("payload.after").getItem(f) \
.alias('payload.after.'+f) for f in after_fields], 'payload.op','payload.ts_ms')
Aşağıda final veri setinin şeması ve ilk 20 gözlemi bulunmaktadır.
df_final’in ilk 20 gözlemine baktığımızda düzenli bir şekilde oluştuğunu görüyoruz.
Artık en son aşamaya geldik. MinIO’ya foreach batch şeklinde verimizi yazacağız. Bunun için fonksiyonumuzu tanımladık. Formatı csv, mode’nu append şeklinde ve cdc-data bucket altında customers klasörüne değişiklik meydana gelen verileri yazıyoruz.
def write_to_multiple_sinks(df, batchId):
# write to file
df.write\
.format("csv") \
.mode("append") \
.save("s3a://cdc-data/datasets/customers")
# Sink
checkpointDir = "s3a://cdc-data/checkpoint/customers"
# start streaming
streamingQuery = (df_final
.writeStream
.foreachBatch(write_to_multiple_sinks)
.option("checkpointLocation", checkpointDir)
.start())
streamingQuery.awaitTermination()
MinIO console’dan bucket’ımıza baktığımızda csv dosyamız kaydedilmiş.
Csv dosyasını açtığımızda da hem yeni kayıtların hem de değişikliklerin yani INSERT, UPDATE ve DELETE işlemlerinin yakalandığını görüyoruz.
Böylelikle Change Data Capture projesini uçtan uca beraber tamamlamış bulunmaktayız. Okuduğunuz için teşekkürler :)
Projenin tamamına GitHub adresimden erişebilirsiniz.
Kaynakça: