Replicando dados com Debezium Oracle CDC
Olá pessoal, como vão ?
No artigo anterior foi explicado como integrar o Flink com o PostgreSQL. No capítulo de hoje, eu vou ensinar como instalar o conector Debezium Oracle CDC. Sei que tinha falado que já começaríamos o nosso trabalho de enriquecimento de dados com o Flink, mas devido a pedidos, resolvi incluir esse tema. Garanto que o conhecimento que você irá adquirir aqui, irá ser de grande valia !
O que instalaremos em nosso ambiente:
- Banco de dados Oracle 23ai
- cx_Oracle para Python
- Oracle Instant Client
- Debezium Oracle CDC para KAfka
Nós iremos utilizar o conector CDC para replicar os dados de uma tabela no Oracle (clients_account_details) e a replicaremos no nosso banco de dados PostgreSQL. Esses dados também servirão para podermos enriquecer o nosso tópico do kafka através do Flink que mostrarei no próximo artigo.
Para isso, faremos algumas alterações em nosso arquivo .env
(que será unificado) e também no docker-compose.yml
:
.env
Foi unificado os dois arquivos .env
(o utilizado pelo docker-compose.yml
e o utilizado pelos scripts Python).
# PostgreSQL
PGS_DATABASE_HOST=localhost
PGS_DATABASE_NAME=flink_db
PGS_DATABASE_USER=flink_kafka
PGS_DATABASE_PASSWORD=flink_kafka
# PGADMIN
PGA_USER=flink@kafka.com
PGA_PASSWORD=flink_db
# Oracle
ORA_DATABASE_DSN=localhost:1521/FREEPDB1
ORA_DATABASE_SYS_PASSWORD=flink_kafka
ORA_DATABASE_USER=c##dbzuser
ORA_DATABASE_PASSWORD=dbz
docker-compose.yml
Foi incluso a configuração do container com o banco de dados Oracle 23ai e o conector Debezium Oracle CDC.
---
services:
oracle-db:
image: container-registry.oracle.com/database/free:latest
container_name: oracle_23ai
environment:
ORACLE_PWD: ${ORA_DATABASE_SYS_PASSWORD}
ORACLE_CHARACTERSET: "AL32UTF8"
ENABLE_ARCHIVELOG: "true"
ENABLE_FORCE_LOGGING: "true"
ports:
- "1521:1521"
volumes:
- /home/swillians/workarea/oracle/23ai/data:/opt/oracle/oradata
restart: always
postgres:
image: postgres:13
container_name: postgres_container
environment:
POSTGRES_USER: ${PGS_DATABASE_USER}
POSTGRES_PASSWORD: ${PGS_DATABASE_PASSWORD}
POSTGRES_DB: ${PGS_DATABASE_NAME}
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
pgadmin:
image: dpage/pgadmin4
container_name: pgadmin_container
environment:
PGADMIN_DEFAULT_EMAIL: ${PGA_USER}
PGADMIN_DEFAULT_PASSWORD: ${PGA_PASSWORD}
ports:
- "5050:80"
depends_on:
- postgres
volumes:
- pgadmin_data:/var/lib/pgadmin
broker:
image: confluentinc/cp-server:7.5.1
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 'broker:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry:
image: confluentinc/cp-schema-registry:7.5.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
user: root
#entrypoint: /bin/sh -c "yum install --nogpgcheck --setopt=sslverify=false zip -y && exec /bin/bash"
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
volumes:
- workarea:/app/workarea
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
command:
- bash
- -c
- |
echo "Installing ZIP"
yum install --nogpgcheck --setopt=sslverify=false zip -y
#
echo "Installing Connectors"
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-http-1.7.6.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-oracle-cdc-2.2.1.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-sftp-3.2.4.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/streamthoughts-kafka-connect-file-pulse-2.10.0.zip
#
echo "Downloading Debezium Oracle Connector..."
wget -O debezium-connector-oracle-plugin.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/2.7.0.Final/debezium-connector-oracle-2.7.0.Final-plugin.tar.gz --no-check-certificate
wget -O debezium-scripting.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-scripting/2.7.0.Final/debezium-scripting-2.7.0.Final.tar.gz --no-check-certificate
echo "Downloading JDBC driver for Oracle ..."
wget https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/21.6.0.0/ojdbc8-21.6.0.0.jar -P /usr/share/java/kafka/ --no-check-certificate
echo "Downloading Groovy SDK ..."
wget https://groovy.jfrog.io/artifactory/dist-release-local/groovy-zips/apache-groovy-sdk-4.0.17.zip -P /tmp/groovy/ --no-check-certificate
echo "Extracting the archive...Debezium !"
tar -xzvf debezium-connector-oracle-plugin.tar.gz -C /usr/share/java/
tar -xzvf debezium-scripting.tar.gz -C /usr/share/java/debezium-connector-oracle/ --strip-components=1
echo "Extracting the archive...Groovy !"
unzip /tmp/groovy/apache-groovy-sdk-4.0.17.zip */*/groovy-4.0.17.jar */*/groovy-jsr 223-4.0.17.jar */*/groovy-json-4.0.17.jar -d /tmp/groovy/
echo "Copying Groovy archives..."
cp /tmp/groovy/*/*/*.jar /usr/share/java/debezium-connector-oracle/
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
control-center:
image: confluentinc/cp-enterprise-control-center:7.5.1
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.5.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.5.1
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.5.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.5.1
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
flink-init:
image: busybox
container_name: flink-init
command: >
sh -c "
echo 'Downloading Flink Kafka connector...' &&
wget -O /opt/flink/plugins/kafka/flink-sql-connector-kafka-3.2.0-1.19.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar --no-check-certificate &&
echo 'Connector downloaded successfully.'
echo 'Downloading Flink JDBC connector...' &&
wget -O /opt/flink/plugins/kafka/postgresql-42.7.3.jar https://jdbc.postgresql.org/download/postgresql-42.7.3.jar --no-check-certificate &&
echo 'Connector downloaded successfully.'
"
volumes:
- flink-plugins:/opt/flink/plugins/kafka
depends_on:
- broker
flink-jobmanager:
image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
hostname: flink-jobmanager
container_name: flink-jobmanager
ports:
- 9081:9081
command: jobmanager
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
rest.bind-port: 9081
volumes:
- flink-plugins:/opt/flink/plugins/kafka
depends_on:
- flink-init
flink-taskmanager:
image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
hostname: flink-taskmanager
container_name: flink-taskmanager
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 10
volumes:
- flink-plugins:/opt/flink/plugins/kafka
flink-sql-client:
image: cnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17
hostname: flink-sql-client
container_name: flink-sql-client
depends_on:
- flink-jobmanager
environment:
FLINK_JOBMANAGER_HOST: flink-jobmanager
volumes:
- ./settings/:/settings
- flink-plugins:/opt/flink/plugins/kafka
volumes:
workarea:
driver: local
driver_opts:
type: none
o: bind
device: /home/swillians/workarea
flink-plugins:
driver: local
pgdata:
pgadmin_data:
Antes de executar o docker-compose.yml
, precisamos criar o diretório onde ficarão armazenados os datafiles do Oracle. Para isso, execute os comandos abaixo:
sudo mkdir -p /home/swillians/workarea/oracle/23ai/data
sudo chown 54321:54321 /home/swillians/workarea/oracle/23ai/data
sudo chmod 775 /home/swillians/workarea/oracle/23ai/data
Esses diretórios são os que eu usarei, mas você pode alterar para qualquer caminho que desejar. Só não esqueça de também alterar no docker-compose.yml
.
Agora é só executar e levantar o ambiente:
sudo docker compose up -d
Quando o ambiente estiver up, vamos configurar o banco de dados Oracle para que o conector Debezium Oracle CDC possa funcionar. Você pode fazer isso através de algum client (eu uso o SQL Developer para VSCODE) ou acessar o container e executar os comandos via SQLPLUS. Vou mostrar como fazer isso através da segunda opção:
sudo docker exec -it oracle_23ai sh
Agora vamos conectar no SQL Plus:
sqlplus sys as sysdba
Coloque suas credencias e você se conectará no CDB do banco de dados:
Vamos começar a aplicar os scripts para criação do usuário do debezium
# Cria a tablespace no CDB
CREATE TABLESPACE logminer_tbs_kafka DATAFILE '/opt/oracle/oradata/FREE/logminer_tbs_kafka.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE 5G;
# Implementa o Supplental log no CDB
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
# Mudando para o PDB
ALTER SESSION SET CONTAINER=FREEPDB1;
# Cria a tablespace no PDB
CREATE TABLESPACE logminer_tbs_kafka DATAFILE '/opt/oracle/oradata/FREE/FREEPDB1/logminer_tbs_kafka.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE 5G;
# Implementa o Supplental log no PDB
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
# Mudando para o CDB
ALTER SESSION SET CONTAINER=CDB$ROOT;
# Cria o usuário para o conector Debezium e aplica os GRANTS necessário
CREATE USER c##dbzuser IDENTIFIED BY dbz
DEFAULT TABLESPACE logminer_tbs
QUOTA UNLIMITED ON logminer_tbs
CONTAINER=ALL;
GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$MYSTAT TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$STATNAME TO c##dbzuser CONTAINER=ALL;
Pronto ! O usuário para o conector Debezium foi criado com sucesso no banco de dados !
Instalando o Oracle Instant Client
Como vamos executar um script para criar a nossa tabela e gerar os dados no banco de dados Oracle, precisaremos instalar o Oracle Instant Client.
Faça o download do instant client através do link: https://download.oracle.com/otn_software/linux/instantclient/2350000/instantclient-basic-linux.x64-23.5.0.24.07.zip
Agora descompactemos o arquivo:
sudo mkdir -p /opt/oracle
sudo unzip instantclient-basic-linux.x64-23.5.0.24.07.zip -d /opt/oracle
Criemos o link simbólico:
cd /opt/oracle/instantclient_23_5/
sudo ln -s libclntsh.so.23.1 libclntsh.so
sudo ln -s libocci.so.23.1 libocci.so
echo 'export LD_LIBRARY_PATH=/opt/oracle/instantclient_23_5:$LD_LIBRARY_PATH' >> ~/.bashrc
source ~/.bashrc
Caso apareça o erro:
Traceback (most recent call last):
File "/home/swillians/workarea/confluent-spooldir-test/files/populate_table_oracle.py", line 10, in <module>
connection = cx_Oracle.connect(username, password, dsn)
cx_Oracle.DatabaseError: DPI-1047: Cannot locate a 64-bit Oracle Client library: "libaio.so.1: cannot open shared object file: No such file or directory". See https://cx-oracle.readthedocs.io/en/latest/user_guide/installation.html for help
Executem o comando abaixo:
sudo apt-get update
sudo apt-get install libaio1
Agora vamos instalar o pacote cx_Oracle no Python:
python -m pip install cx_Oracle --upgrade
Com o instant client e o pacote cx_Oracle instalados, vamos executar o nosso script:
import cx_Oracle
import random
from dotenv import load_dotenv
import os
# Carregar variáveis de ambiente do arquivo .env
from pathlib import Path
dotenv_path = Path('/home/swillians/confluent/confluent-platform-flink-docker/.env') #meu diretório onde o .env se encontra
load_dotenv(dotenv_path=dotenv_path)
# Configurações de conexão com o banco de dados
DB_DSN = os.getenv("ORA_DATABASE_DSN")
DB_USER = os.getenv("ORA_DATABASE_USER")
DB_PASS = os.getenv("ORA_DATABASE_PASSWORD")
# Criação da conexão
connection = cx_Oracle.connect(DB_USER, DB_PASS, DB_DSN)
cursor = connection.cursor()
# Comando SQL para criar a tabela
create_table_sql = """
CREATE TABLE clients_account_details (
first_name VARCHAR2(100),
last_name VARCHAR2(100),
bank_number NUMBER(10),
bank_account VARCHAR2(10),
CONSTRAINT client_pk PRIMARY KEY (first_name, last_name)
)
"""
# Tentativa de criação da tabela e da chave primária
try:
cursor.execute(create_table_sql)
print("Tabela 'clients_account_details' e chave primária 'client_pk' criadas com sucesso.")
except cx_Oracle.DatabaseError as e:
error, = e.args
if error.code == 955: # ORA-00955: name is already used by an existing object
print("Tabela 'clients_account_details' já existe. Pulando criação.")
else:
print(f"Erro ao criar a tabela: {e}")
raise
# Dados para serem inseridos na tabela
first_names = ["John", "Jane", "Alice", "Bob", "Eve", "Mike", "Anna", "Tom", "Lucy", "Jack"]
last_names = ["Smith", "Doe", "Johnson", "Brown", "Williams", "Jones", "Garcia", "Miller", "Davis", "Rodriguez"]
bank_numbers = list(range(1, 11))
# Função para gerar uma conta bancária aleatória no formato '058790-11'
def generate_bank_account():
return f"{random.randint(100000, 999999)}-{random.randint(10, 99)}"
# Gerar uma combinação única de bank_number e bank_account para cada combinação de first_name e last_name
for first_name in first_names:
for last_name in last_names:
bank_number = random.choice(bank_numbers)
bank_account = generate_bank_account()
insert_sql = """
INSERT INTO clients_account_details (first_name, last_name, bank_number, bank_account)
VALUES (:1, :2, :3, :4)
"""
try:
cursor.execute(insert_sql, (first_name, last_name, bank_number, bank_account))
except cx_Oracle.IntegrityError:
print(f"Registro duplicado para {first_name} {last_name}. Pulando inserção.")
except cx_Oracle.DatabaseError as e:
print(f"Erro ao inserir dados: {e}")
raise
# Confirmação da transação
connection.commit()
# Fechamento do cursor e da conexão
cursor.close()
connection.close()
print("Dados inseridos com sucesso na tabela 'clients_account_details'.")
Para isso, executemos o seguinte comando:
python3 populate_table_oracle.py
Vamos conferir no banco de dados se tudo ocorreu como deveria. Acessemos o SQLPLUS como mostrado mais acima nesse artigo e executemos o seguintes comandos SQL:
SELECT * FROM c##dbzuser.clients_account_details;
SELECT count(*) FROM c##dbzuser.clients_account_details;
Com o banco de dados configurado, com o usuário criado, com a tabela criada e os dados populados, podemos agora configurar o nosso conector Debezium Oracle CDC.
Configurando o conector Debezium Oracle CDC Source
Vamos acessar a Confluent Platform:
http://localhost:9021
Vamos agora salvar o arquivo json de configuração do conector Debezium. Salve como connector-debezium-oracle-cdc.json
{
"name": "clients.account.details",
"config": {
"auto.create.topics.enable": "true",
"schema.history.internal.kafka.topic": "history_schema",
"database.history.kafka.bootstrap.servers": "broker:29092",
"schema.history.internal.kafka.bootstrap.servers": "broker:29092",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"tasks.max": "1",
"errors.retry.timeout": "2000",
"errors.retry.delay.max.ms": "60000",
"errors.tolerance": "none",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topic.prefix": "table",
"database.hostname": "172.18.0.2",
"database.port": "1521",
"database.user": "c##dbzuser",
"database.password": "dbz",
"database.dbname": "FREE",
"database.pdb.name": "FREEPDB1",
"database.query.timeout.ms": "900000",
"snapshot.mode": "initial",
"table.include.list": "c##dbzuser.clients_account_details"
}
}
Vamos analisar cada configuração em detalhes:
1. name
: "clients.account.details"
- Descrição: Nome do conector. É uma identificação única para o conector que será usada dentro do Apache Kafka Connect.
- Função: Identifica a instância específica do conector que está capturando mudanças na tabela
clients_account_details
.
2. auto.create.topics.enable
: "true"
- Descrição: Indica se os tópicos do Kafka devem ser criados automaticamente.
- Função: Se ativado (
true
), os tópicos necessários no Kafka serão criados automaticamente pelo Kafka Connect quando o conector começar a funcionar.
3. schema.history.internal.kafka.topic
: "history_schema"
- Descrição: Nome do tópico no Kafka onde o histórico de esquemas (schema history) é armazenado.
- Função: Armazena mudanças de esquema de banco de dados para garantir que o conector saiba interpretar as alterações nos dados ao longo do tempo.
4. database.history.kafka.bootstrap.servers
e schema.history.internal.kafka.bootstrap.servers
: "broker:29092"
- Descrição: Endereços dos servidores Kafka (brokers) que serão usados para armazenar eventos de histórico e de dados.
- Função: Define quais servidores Kafka serão utilizados para a comunicação e armazenamento dos eventos capturados.
5. key.converter
e value.converter
: "io.confluent.connect.avro.AvroConverter"
- Descrição: Converters para a chave (
key
) e o valor (value
) dos eventos publicados no Kafka. - Função: Especifica que o formato dos dados será Avro, um formato compacto de serialização de dados. Avro é popular em sistemas de streaming de dados devido à sua eficiência e integração com o Kafka.
6. key.converter.schema.registry.url
e value.converter.schema.registry.url
: "http://schema-registry:8081"
- Descrição: URLs do Schema Registry para chaves e valores.
- Função: O Schema Registry armazena versões dos esquemas Avro. O conector usa essas URLs para registrar e buscar os esquemas dos dados sendo publicados no Kafka.
7. connector.class
: "io.debezium.connector.oracle.OracleConnector"
- Descrição: Classe do conector específico que está sendo utilizado.
- Função: Especifica que este conector é para um banco de dados Oracle. Essa configuração determina como o conector se comporta e quais funcionalidades estão disponíveis.
8. tasks.max
: "1"
- Descrição: Número máximo de tarefas (tasks) que o conector pode executar.
- Função: Define o número de threads paralelas que o conector pode usar para processar dados. Nesse caso, apenas uma tarefa será usada.
9. errors.retry.timeout
: "2000"
- Descrição: Tempo máximo (em milissegundos) para que uma operação seja reexecutada em caso de erro.
- Função: Se um erro ocorre, o conector tentará novamente por até 2000 milissegundos antes de desistir.
10. errors.retry.delay.max.ms
: "60000"
- Descrição: Tempo máximo (em milissegundos) de atraso entre tentativas de recuperação de erros.
- Função: Especifica que, entre tentativas de recuperação, o conector pode esperar até 60.000 milissegundos (1 minuto) para tentar novamente.
11. errors.tolerance
: "none"
- Descrição: Define o nível de tolerância a erros.
- Função: Quando definido como
"none"
, o conector falhará imediatamente em caso de qualquer erro. Isso é importante para garantir que os dados não sejam perdidos ou corrompidos.
12. errors.log.enable
: "true"
- Descrição: Habilita o registro de erros no log.
- Função: Garante que os erros encontrados pelo conector sejam registrados para facilitar a depuração e o monitoramento.
13. errors.log.include.messages
: "true"
- Descrição: Inclui mensagens de erro detalhadas no log.
- Função: Fornece informações detalhadas sobre os erros, o que pode ajudar a entender o problema mais rapidamente.
14. topic.prefix
: "table"
- Descrição: Prefixo usado para os nomes dos tópicos do Kafka onde as mudanças serão publicadas.
- Função: Todos os tópicos criados por este conector terão esse prefixo, seguido pelo nome da tabela monitorada. Exemplo:
table.clients_account_details
.
15. database.hostname
: "172.18.0.2"
- Descrição: Endereço IP ou hostname do servidor Oracle.
- Função: Especifica onde o banco de dados Oracle está localizado.
16. database.port
: "1521"
- Descrição: Porta do banco de dados Oracle.
- Função: A porta padrão do Oracle Database é
1521
, mas isso pode ser alterado dependendo da configuração específica do servidor.
17. database.user
: "c##dbzuser"
- Descrição: Nome de usuário usado para conectar ao banco de dados Oracle.
- Função: O usuário deve ter permissões adequadas para ler logs de redo e monitorar as tabelas especificadas.
18. database.password
: "dbz"
- Descrição: Senha correspondente ao usuário do banco de dados.
- Função: Necessária para autenticar a conexão com o banco de dados Oracle.
19. database.dbname
: "FREE"
- Descrição: Nome do banco de dados Oracle.
- Função: Identifica qual banco de dados será monitorado pelo conector.
20. database.pdb.name
: "FREEPDB1"
- Descrição: Nome do Pluggable Database (PDB) dentro do Oracle.
- Função: Especifica o PDB que o conector irá monitorar. Isso é relevante em ambientes onde o Oracle usa arquitetura multitenant (com PDBs).
21. database.query.timeout.ms
: "900000"
- Descrição: Tempo limite para consultas ao banco de dados (em milissegundos).
- Função: Especifica que consultas que levam mais de 900.000 milissegundos (15 minutos) devem ser interrompidas.
22. snapshot.mode
: "initial"
- Descrição: Modo de captura instantânea dos dados.
- Função:
initial
indica que uma captura instantânea completa dos dados será realizada na primeira vez que o conector for iniciado. Isso é necessário para garantir que o conector comece a capturar mudanças a partir de um estado consistente.
23. table.include.list
: "c##dbzuser.clients_account_details"
- Descrição: Lista de tabelas que o conector deve monitorar.
- Função: Especifica que apenas a tabela
clients_account_details
será monitorada para capturar mudanças.
Resumo
Esta configuração detalha como o conector Debezium se conecta ao banco de dados Oracle, captura as mudanças na tabela clients_account_details
, e publica essas mudanças em tópicos do Kafka usando o formato Avro. Se precisar de mais detalhes, acesse AQUI a documentação oficial do conector Debezium.
Vamos agora subir o conector na Confluent Platform:
Cliquemos em Upload connector config File
Clique em Next
Agora clique em Launch
Vamos verificar os dados em nosso tópico:
Conector Source funcionando ! Agora é hora de configurarmos o conector JDB Sink para replicar os dados no PostgreeSQL.
Salve o arquivo como postgres-sink.json
{
"name": "oracle.client.sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "cad.clients_account_details",
"connection.url": "jdbc:postgresql://172.18.0.3:5432/flink_db",
"connection.user": "flink_kafka",
"connection.password": "flink_kafka",
"insert.mode": "upsert",
"table.name.format": "clients_account_details",
"pk.mode": "record_key",
"pk.fields": "FIRST_NAME,LAST_NAME",
"batch.size": 1,
"auto.create": true,
"auto.evolve": true,
"delete.enabled": true,
"errors.log.enable": true,
"errors.log.include.messages": true,
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false
}
}
Analisemos com detalhes cada parâmetro:
1. name
: "oracle.client.sink"
- Descrição: Nome do conector.
- Função: Identifica essa instância do conector dentro do Apache Kafka Connect.
2. connector.class
: "io.confluent.connect.jdbc.JdbcSinkConnector"
- Descrição: Classe do conector JDBC Sink.
- Função: Especifica que este conector será utilizado para gravar dados em um banco de dados via JDBC.
3. tasks.max
: "1"
- Descrição: Número máximo de tarefas (tasks) que o conector pode executar.
- Função: Define o número de threads paralelas que o conector pode usar. Nesse caso, apenas uma tarefa será usada.
4. topics
: "cad.clients_account_details"
- Descrição: Nome do tópico Kafka que o conector irá consumir.
- Função: Especifica o tópico Kafka do qual os dados serão lidos e enviados para o banco de dados PostgreSQL.
5. connection.url
: "jdbc:postgresql://172.18.0.3:5432/flink_db"
- Descrição: URL de conexão JDBC ao banco de dados PostgreSQL.
- Função: Define onde o banco de dados PostgreSQL está localizado e como conectar-se a ele.
6. connection.user
: "flink_kafka"
- Descrição: Nome de usuário para se conectar ao banco de dados.
- Função: Autentica a conexão com o banco de dados PostgreSQL.
7. connection.password
: "flink_kafka"
- Descrição: Senha para o usuário do banco de dados.
- Função: Autentica a conexão com o banco de dados PostgreSQL.
8. insert.mode
: "upsert"
- Descrição: Modo de inserção de dados no banco.
- Função: Define que os dados serão inseridos ou atualizados (
upsert
). Se uma linha com a mesma chave primária já existir, ela será atualizada; caso contrário, será inserida.
9. table.name.format
: "clients_account_details"
- Descrição: Formato do nome da tabela no banco de dados.
- Função: Especifica a tabela no PostgreSQL onde os dados do tópico Kafka serão inseridos.
10. pk.mode
: "record_key"
- Descrição: Define como as chaves primárias serão geradas.
- Função: Indica que as chaves primárias serão extraídas da chave do registro Kafka.
11. pk.fields
: "FIRST_NAME,LAST_NAME"
- Descrição: Campos que compõem a chave primária.
- Função: Especifica que os campos
FIRST_NAME
eLAST_NAME
serão usados como chave primária na tabela PostgreSQL.
12. batch.size
: 1
- Descrição: Tamanho do lote de registros a serem enviados em uma única operação de banco de dados.
- Função: Define que cada registro será enviado individualmente ao banco de dados (não será feito em lotes). Isso é útil para garantir a menor latência possível, mas pode ter impacto na performance.
13. auto.create
: true
- Descrição: Habilita a criação automática de tabelas.
- Função: Se a tabela
clients_account_details
não existir no banco de dados PostgreSQL, ela será criada automaticamente.
14. auto.evolve
: true
- Descrição: Habilita a evolução automática do esquema.
- Função: Se o esquema da tabela mudar (por exemplo, novos campos forem adicionados), o conector ajustará a tabela automaticamente para acomodar essas mudanças.
15. delete.enabled
: true
- Descrição: Habilita a exclusão de registros.
- Função: Permite que o conector processe mensagens de exclusão (delete) e as reflita no banco de dados, removendo os registros correspondentes.
16. errors.log.enable
: true
- Descrição: Habilita o registro de erros no log.
- Função: Garante que os erros encontrados pelo conector sejam registrados para facilitar a depuração e o monitoramento.
17. errors.log.include.messages
: true
- Descrição: Inclui mensagens de erro detalhadas no log.
- Função: Fornece informações detalhadas sobre os erros, o que pode ajudar a entender o problema mais rapidamente.
18. key.converter
e value.converter
: "io.confluent.connect.avro.AvroConverter"
- Descrição: Converters para a chave (
key
) e o valor (value
) dos eventos. - Função: Define que os dados no Kafka estão em formato Avro e especifica o uso do AvroConverter para converter esses dados ao gravá-los no banco de dados.
19. key.converter.schema.registry.url
e value.converter.schema.registry.url
: "http://schema-registry:8081"
- Descrição: URLs do Schema Registry.
- Função: O Schema Registry é usado para gerenciar os esquemas Avro dos dados que estão sendo consumidos do Kafka.
20. transforms
: "unwrap"
- Descrição: Lista de transformações a serem aplicadas aos dados.
- Função: Aplica a transformação especificada para modificar os registros antes que sejam enviados ao banco de dados.
21. transforms.unwrap.type
: "io.debezium.transforms.ExtractNewRecordState"
- Descrição: Especifica o tipo de transformação
unwrap
. - Função: A transformação
ExtractNewRecordState
é usada para extrair o estado dos novos registros, ou seja, remove o envelope Debezium e deixa apenas o conteúdo do registro, simplificando a estrutura do dado antes de gravá-lo no banco.
22. transforms.unwrap.drop.tombstones
: false
- Descrição: Define se registros “tombstone” devem ser descartados.
- Função: Registros “tombstone” são marcadores de exclusão. Quando
false
, esses registros não serão descartados, permitindo que operações de exclusão sejam processadas corretamente.
Resumo
Este conector JDBC Sink é configurado para ler dados do tópico Kafka, processá-los (remover o envelope Debezium e aplicar a transformação unwrap
) e gravá-los em uma tabela PostgreSQL chamada clients_account_details
. O modo de inserção é upsert
, o que significa que os registros serão atualizados se já existirem ou inseridos se forem novos. O conector também é configurado para criar e evoluir automaticamente a tabela no PostgreSQL, e para lidar com exclusões de registros.
Vamos subir agora o conector JDBC Sink na Confluent Platform:
Vamos conferir no postgres se a tabela e os dados foram reeplicados:
Tabela e dados replicados com sucesso !
No próximo artigo vamos juntar as tabelas flink_clients_account_balance
, clients_addres
e clients_account_details
no Flink e depois vxxamos salvar esses dados em um novo tópico do Kafka.
Até lá !
Nota de agradecimento: Gostaria de agradecer ao meu camarada Robson Rozati pelas dicas e ajuda para fazer o conector Debezium funcionar !
ATENÇÃO: O banco de dados Oracle 23ai NÃO é homologado para o conector Debezium Oracle CDC. Porém, não encontrei nenhum problema ou mal funcionamento em meus testes. A escolha se deu pelo fato de que é uma versão FREE do Oracle e possui todas as funcionalidades que precisamos.
Referências
Sergio parabéns pelo artigo!
Eu tenho uma dúvida: Por que usar o Oracle Debezium CDC ao invés do Oracle CDC? (eu vi seus outros artigos).
Olá Junior !
Um dos grandes motivos é o custo. O Debezium Oracle CDC é gratuito, enquanto o conector Oracle CDC é bem custoso (A Confluent o classifica como Premium).