Pular para o conteúdo

Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte VII – Replicando dados com Debezium Oracle CDC

Replicando dados com Debezium Oracle CDC

Olá pessoal, como vão ?

No artigo anterior foi explicado como integrar o Flink com o PostgreSQL. No capítulo de hoje, eu vou ensinar como instalar o conector Debezium Oracle CDC. Sei que tinha falado que já começaríamos o nosso trabalho de enriquecimento de dados com o Flink, mas devido a pedidos, resolvi incluir esse tema. Garanto que o conhecimento que você irá adquirir aqui, irá ser de grande valia !

O que instalaremos em nosso ambiente:

  • Banco de dados Oracle 23ai
  • cx_Oracle para Python
  • Oracle Instant Client
  • Debezium Oracle CDC para KAfka

Nós iremos utilizar o conector CDC para replicar os dados de uma tabela no Oracle (clients_account_details) e a replicaremos no nosso banco de dados PostgreSQL. Esses dados também servirão para podermos enriquecer o nosso tópico do kafka através do Flink que mostrarei no próximo artigo.

Para isso, faremos algumas alterações em nosso arquivo .env (que será unificado) e também no docker-compose.yml:

.env

Foi unificado os dois arquivos .env (o utilizado pelo docker-compose.yml e o utilizado pelos scripts Python).

Bash
# PostgreSQL
PGS_DATABASE_HOST=localhost
PGS_DATABASE_NAME=flink_db
PGS_DATABASE_USER=flink_kafka
PGS_DATABASE_PASSWORD=flink_kafka

# PGADMIN
PGA_USER=flink@kafka.com
PGA_PASSWORD=flink_db

# Oracle
ORA_DATABASE_DSN=localhost:1521/FREEPDB1
ORA_DATABASE_SYS_PASSWORD=flink_kafka
ORA_DATABASE_USER=c##dbzuser
ORA_DATABASE_PASSWORD=dbz

docker-compose.yml

Foi incluso a configuração do container com o banco de dados Oracle 23ai e o conector Debezium Oracle CDC.

YAML
---
services:
  oracle-db:
    image: container-registry.oracle.com/database/free:latest
    container_name: oracle_23ai
    environment:
      ORACLE_PWD: ${ORA_DATABASE_SYS_PASSWORD}
      ORACLE_CHARACTERSET: "AL32UTF8"
      ENABLE_ARCHIVELOG: "true"
      ENABLE_FORCE_LOGGING: "true"
    ports:
      - "1521:1521"  
    volumes:
      - /home/swillians/workarea/oracle/23ai/data:/opt/oracle/oradata
    restart: always

  postgres:
    image: postgres:13
    container_name: postgres_container
    environment:
      POSTGRES_USER: ${PGS_DATABASE_USER}
      POSTGRES_PASSWORD: ${PGS_DATABASE_PASSWORD}
      POSTGRES_DB: ${PGS_DATABASE_NAME}
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data

  pgadmin:
    image: dpage/pgadmin4
    container_name: pgadmin_container
    environment:
      PGADMIN_DEFAULT_EMAIL: ${PGA_USER}
      PGADMIN_DEFAULT_PASSWORD: ${PGA_PASSWORD}
    ports:
      - "5050:80"
    depends_on:
      - postgres
    volumes:
      - pgadmin_data:/var/lib/pgadmin

  broker:
    image: confluentinc/cp-server:7.5.1
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 'broker:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" 
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
    user: root
    #entrypoint: /bin/sh -c "yum install --nogpgcheck --setopt=sslverify=false zip -y && exec /bin/bash"
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - workarea:/app/workarea
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    command:
      - bash
      - -c
      - |
        echo "Installing ZIP"
        yum install --nogpgcheck --setopt=sslverify=false zip -y
        #
        echo "Installing Connectors"
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-http-1.7.6.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-oracle-cdc-2.2.1.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-sftp-3.2.4.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/streamthoughts-kafka-connect-file-pulse-2.10.0.zip
        #
        echo "Downloading Debezium Oracle Connector..."
        wget -O debezium-connector-oracle-plugin.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/2.7.0.Final/debezium-connector-oracle-2.7.0.Final-plugin.tar.gz --no-check-certificate
        wget -O debezium-scripting.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-scripting/2.7.0.Final/debezium-scripting-2.7.0.Final.tar.gz --no-check-certificate
    
        echo "Downloading JDBC driver for Oracle ..."
        wget https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/21.6.0.0/ojdbc8-21.6.0.0.jar -P /usr/share/java/kafka/ --no-check-certificate

        echo "Downloading Groovy SDK ..."
        wget https://groovy.jfrog.io/artifactory/dist-release-local/groovy-zips/apache-groovy-sdk-4.0.17.zip -P /tmp/groovy/ --no-check-certificate

        echo "Extracting the archive...Debezium !"
        tar -xzvf debezium-connector-oracle-plugin.tar.gz -C /usr/share/java/
        tar -xzvf debezium-scripting.tar.gz -C /usr/share/java/debezium-connector-oracle/ --strip-components=1
        echo "Extracting the archive...Groovy !"
        unzip /tmp/groovy/apache-groovy-sdk-4.0.17.zip */*/groovy-4.0.17.jar */*/groovy-jsr     223-4.0.17.jar */*/groovy-json-4.0.17.jar -d /tmp/groovy/
        echo "Copying Groovy archives..."
        cp /tmp/groovy/*/*/*.jar /usr/share/java/debezium-connector-oracle/

        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.5.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.5.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.5.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.5.1
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.5.1
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

  flink-init:
    image: busybox
    container_name: flink-init
    command: >
      sh -c "
      echo 'Downloading Flink Kafka connector...' &&
      wget -O /opt/flink/plugins/kafka/flink-sql-connector-kafka-3.2.0-1.19.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar --no-check-certificate &&
      echo 'Connector downloaded successfully.'
      echo 'Downloading Flink JDBC connector...' &&
      wget -O /opt/flink/plugins/kafka/postgresql-42.7.3.jar https://jdbc.postgresql.org/download/postgresql-42.7.3.jar --no-check-certificate &&
      echo 'Connector downloaded successfully.'
      "
    volumes:
      - flink-plugins:/opt/flink/plugins/kafka
    depends_on:
      - broker

  flink-jobmanager:
    image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
    hostname: flink-jobmanager
    container_name: flink-jobmanager
    ports:
      - 9081:9081
    command: jobmanager
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: flink-jobmanager
        rest.bind-port: 9081
    volumes:
      - flink-plugins:/opt/flink/plugins/kafka
    depends_on:
      - flink-init

  flink-taskmanager:
    image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
    hostname: flink-taskmanager
    container_name: flink-taskmanager
    depends_on:
      - flink-jobmanager
    command: taskmanager
    scale: 1
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 10
    volumes:
      - flink-plugins:/opt/flink/plugins/kafka

  flink-sql-client:
    image: cnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17
    hostname: flink-sql-client
    container_name: flink-sql-client
    depends_on:
      - flink-jobmanager
    environment:
      FLINK_JOBMANAGER_HOST: flink-jobmanager
    volumes:
      - ./settings/:/settings
      - flink-plugins:/opt/flink/plugins/kafka

volumes:
  workarea:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: /home/swillians/workarea
  flink-plugins:
    driver: local
  pgdata:  
  pgadmin_data:

Antes de executar o docker-compose.yml, precisamos criar o diretório onde ficarão armazenados os datafiles do Oracle. Para isso, execute os comandos abaixo:

Bash
sudo mkdir -p /home/swillians/workarea/oracle/23ai/data
sudo chown 54321:54321 /home/swillians/workarea/oracle/23ai/data
sudo chmod 775 /home/swillians/workarea/oracle/23ai/data

Esses diretórios são os que eu usarei, mas você pode alterar para qualquer caminho que desejar. Só não esqueça de também alterar no docker-compose.yml.

Agora é só executar e levantar o ambiente:

Bash
sudo docker compose up -d

Quando o ambiente estiver up, vamos configurar o banco de dados Oracle para que o conector Debezium Oracle CDC possa funcionar. Você pode fazer isso através de algum client (eu uso o SQL Developer para VSCODE) ou acessar o container e executar os comandos via SQLPLUS. Vou mostrar como fazer isso através da segunda opção:

Bash
sudo docker exec -it oracle_23ai sh

image 58

Agora vamos conectar no SQL Plus:

Bash
sqlplus sys as sysdba

Coloque suas credencias e você se conectará no CDB do banco de dados:

image 59

Vamos começar a aplicar os scripts para criação do usuário do debezium

SQL
# Cria a tablespace no CDB
CREATE TABLESPACE logminer_tbs_kafka DATAFILE '/opt/oracle/oradata/FREE/logminer_tbs_kafka.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE 5G;

# Implementa o Supplental log no CDB
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

# Mudando para o PDB
ALTER SESSION SET CONTAINER=FREEPDB1;

# Cria a tablespace no PDB
CREATE TABLESPACE logminer_tbs_kafka DATAFILE '/opt/oracle/oradata/FREE/FREEPDB1/logminer_tbs_kafka.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE 5G;

# Implementa o Supplental log no PDB
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

# Mudando para o CDB
ALTER SESSION SET CONTAINER=CDB$ROOT;

# Cria o usuário para o conector Debezium e aplica os GRANTS necessário
CREATE USER c##dbzuser IDENTIFIED BY dbz
    DEFAULT TABLESPACE logminer_tbs
    QUOTA UNLIMITED ON logminer_tbs
    CONTAINER=ALL;
    
GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
  
GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;

GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;

GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$TRANSACTION TO c##dbzuser CONTAINER=ALL;

GRANT SELECT ON V_$MYSTAT TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$STATNAME TO c##dbzuser CONTAINER=ALL;

Pronto ! O usuário para o conector Debezium foi criado com sucesso no banco de dados !

Instalando o Oracle Instant Client

Como vamos executar um script para criar a nossa tabela e gerar os dados no banco de dados Oracle, precisaremos instalar o Oracle Instant Client.

Faça o download do instant client através do link: https://download.oracle.com/otn_software/linux/instantclient/2350000/instantclient-basic-linux.x64-23.5.0.24.07.zip

Agora descompactemos o arquivo:

Bash
sudo mkdir -p /opt/oracle

sudo unzip instantclient-basic-linux.x64-23.5.0.24.07.zip -d /opt/oracle

Criemos o link simbólico:

Bash
cd /opt/oracle/instantclient_23_5/

sudo ln -s libclntsh.so.23.1 libclntsh.so
sudo ln -s libocci.so.23.1 libocci.so

echo 'export LD_LIBRARY_PATH=/opt/oracle/instantclient_23_5:$LD_LIBRARY_PATH' >> ~/.bashrc
source ~/.bashrc

Caso apareça o erro:

Bash
Traceback (most recent call last):
  File "/home/swillians/workarea/confluent-spooldir-test/files/populate_table_oracle.py", line 10, in <module>
    connection = cx_Oracle.connect(username, password, dsn)
cx_Oracle.DatabaseError: DPI-1047: Cannot locate a 64-bit Oracle Client library: "libaio.so.1: cannot open shared object file: No such file or directory". See https://cx-oracle.readthedocs.io/en/latest/user_guide/installation.html for help

Executem o comando abaixo:

Bash
sudo apt-get update
sudo apt-get install libaio1

Agora vamos instalar o pacote cx_Oracle no Python:

Bash
python -m pip install cx_Oracle --upgrade

Com o instant client e o pacote cx_Oracle instalados, vamos executar o nosso script:

Python
import cx_Oracle
import random
from dotenv import load_dotenv
import os

# Carregar variáveis de ambiente do arquivo .env
from pathlib import Path
dotenv_path = Path('/home/swillians/confluent/confluent-platform-flink-docker/.env') #meu diretório onde o .env se encontra
load_dotenv(dotenv_path=dotenv_path)

# Configurações de conexão com o banco de dados
DB_DSN  = os.getenv("ORA_DATABASE_DSN")
DB_USER = os.getenv("ORA_DATABASE_USER")
DB_PASS = os.getenv("ORA_DATABASE_PASSWORD")

# Criação da conexão
connection = cx_Oracle.connect(DB_USER, DB_PASS, DB_DSN)
cursor = connection.cursor()

# Comando SQL para criar a tabela
create_table_sql = """
CREATE TABLE clients_account_details (
    first_name VARCHAR2(100),
    last_name VARCHAR2(100),
    bank_number NUMBER(10),
    bank_account VARCHAR2(10),
    CONSTRAINT client_pk PRIMARY KEY (first_name, last_name)
)
"""

# Tentativa de criação da tabela e da chave primária
try:
    cursor.execute(create_table_sql)
    print("Tabela 'clients_account_details' e chave primária 'client_pk' criadas com sucesso.")
except cx_Oracle.DatabaseError as e:
    error, = e.args
    if error.code == 955:  # ORA-00955: name is already used by an existing object
        print("Tabela 'clients_account_details' já existe. Pulando criação.")
    else:
        print(f"Erro ao criar a tabela: {e}")
        raise

# Dados para serem inseridos na tabela
first_names = ["John", "Jane", "Alice", "Bob", "Eve", "Mike", "Anna", "Tom", "Lucy", "Jack"]
last_names = ["Smith", "Doe", "Johnson", "Brown", "Williams", "Jones", "Garcia", "Miller", "Davis", "Rodriguez"]
bank_numbers = list(range(1, 11))

# Função para gerar uma conta bancária aleatória no formato '058790-11'
def generate_bank_account():
    return f"{random.randint(100000, 999999)}-{random.randint(10, 99)}"

# Gerar uma combinação única de bank_number e bank_account para cada combinação de first_name e last_name
for first_name in first_names:
    for last_name in last_names:
        bank_number = random.choice(bank_numbers)
        bank_account = generate_bank_account()
        insert_sql = """
        INSERT INTO clients_account_details (first_name, last_name, bank_number, bank_account)
        VALUES (:1, :2, :3, :4)
        """
        try:
            cursor.execute(insert_sql, (first_name, last_name, bank_number, bank_account))
        except cx_Oracle.IntegrityError:
            print(f"Registro duplicado para {first_name} {last_name}. Pulando inserção.")
        except cx_Oracle.DatabaseError as e:
            print(f"Erro ao inserir dados: {e}")
            raise

# Confirmação da transação
connection.commit()

# Fechamento do cursor e da conexão
cursor.close()
connection.close()

print("Dados inseridos com sucesso na tabela 'clients_account_details'.")

Para isso, executemos o seguinte comando:

Bash
python3 populate_table_oracle.py

image 60

Vamos conferir no banco de dados se tudo ocorreu como deveria. Acessemos o SQLPLUS como mostrado mais acima nesse artigo e executemos o seguintes comandos SQL:

SQL
SELECT * FROM c##dbzuser.clients_account_details;

SELECT count(*) FROM c##dbzuser.clients_account_details;

image 62

Com o banco de dados configurado, com o usuário criado, com a tabela criada e os dados populados, podemos agora configurar o nosso conector Debezium Oracle CDC.

Configurando o conector Debezium Oracle CDC Source

Vamos acessar a Confluent Platform:

HTTP
http://localhost:9021

Vamos agora salvar o arquivo json de configuração do conector Debezium. Salve como connector-debezium-oracle-cdc.json

JSON
{
  "name": "clients.account.details",
  "config": {
    "auto.create.topics.enable": "true",
    "schema.history.internal.kafka.topic": "history_schema",
    "database.history.kafka.bootstrap.servers": "broker:29092",
    "schema.history.internal.kafka.bootstrap.servers": "broker:29092",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "tasks.max": "1",
    "errors.retry.timeout": "2000",
    "errors.retry.delay.max.ms": "60000",
    "errors.tolerance": "none",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "topic.prefix": "table",
    "database.hostname": "172.18.0.2",
    "database.port": "1521",
    "database.user": "c##dbzuser",
    "database.password": "dbz",
    "database.dbname": "FREE",
    "database.pdb.name": "FREEPDB1",
    "database.query.timeout.ms": "900000",
    "snapshot.mode": "initial",
    "table.include.list": "c##dbzuser.clients_account_details"
  }
}

Vamos analisar cada configuração em detalhes:

1. name: "clients.account.details"

  • Descrição: Nome do conector. É uma identificação única para o conector que será usada dentro do Apache Kafka Connect.
  • Função: Identifica a instância específica do conector que está capturando mudanças na tabela clients_account_details.

2. auto.create.topics.enable: "true"

  • Descrição: Indica se os tópicos do Kafka devem ser criados automaticamente.
  • Função: Se ativado (true), os tópicos necessários no Kafka serão criados automaticamente pelo Kafka Connect quando o conector começar a funcionar.

3. schema.history.internal.kafka.topic: "history_schema"

  • Descrição: Nome do tópico no Kafka onde o histórico de esquemas (schema history) é armazenado.
  • Função: Armazena mudanças de esquema de banco de dados para garantir que o conector saiba interpretar as alterações nos dados ao longo do tempo.

4. database.history.kafka.bootstrap.servers e schema.history.internal.kafka.bootstrap.servers: "broker:29092"

  • Descrição: Endereços dos servidores Kafka (brokers) que serão usados para armazenar eventos de histórico e de dados.
  • Função: Define quais servidores Kafka serão utilizados para a comunicação e armazenamento dos eventos capturados.

5. key.converter e value.converter: "io.confluent.connect.avro.AvroConverter"

  • Descrição: Converters para a chave (key) e o valor (value) dos eventos publicados no Kafka.
  • Função: Especifica que o formato dos dados será Avro, um formato compacto de serialização de dados. Avro é popular em sistemas de streaming de dados devido à sua eficiência e integração com o Kafka.

6. key.converter.schema.registry.url e value.converter.schema.registry.url: "http://schema-registry:8081"

  • Descrição: URLs do Schema Registry para chaves e valores.
  • Função: O Schema Registry armazena versões dos esquemas Avro. O conector usa essas URLs para registrar e buscar os esquemas dos dados sendo publicados no Kafka.

7. connector.class: "io.debezium.connector.oracle.OracleConnector"

  • Descrição: Classe do conector específico que está sendo utilizado.
  • Função: Especifica que este conector é para um banco de dados Oracle. Essa configuração determina como o conector se comporta e quais funcionalidades estão disponíveis.

8. tasks.max: "1"

  • Descrição: Número máximo de tarefas (tasks) que o conector pode executar.
  • Função: Define o número de threads paralelas que o conector pode usar para processar dados. Nesse caso, apenas uma tarefa será usada.

9. errors.retry.timeout: "2000"

  • Descrição: Tempo máximo (em milissegundos) para que uma operação seja reexecutada em caso de erro.
  • Função: Se um erro ocorre, o conector tentará novamente por até 2000 milissegundos antes de desistir.

10. errors.retry.delay.max.ms: "60000"

  • Descrição: Tempo máximo (em milissegundos) de atraso entre tentativas de recuperação de erros.
  • Função: Especifica que, entre tentativas de recuperação, o conector pode esperar até 60.000 milissegundos (1 minuto) para tentar novamente.

11. errors.tolerance: "none"

  • Descrição: Define o nível de tolerância a erros.
  • Função: Quando definido como "none", o conector falhará imediatamente em caso de qualquer erro. Isso é importante para garantir que os dados não sejam perdidos ou corrompidos.

12. errors.log.enable: "true"

  • Descrição: Habilita o registro de erros no log.
  • Função: Garante que os erros encontrados pelo conector sejam registrados para facilitar a depuração e o monitoramento.

13. errors.log.include.messages: "true"

  • Descrição: Inclui mensagens de erro detalhadas no log.
  • Função: Fornece informações detalhadas sobre os erros, o que pode ajudar a entender o problema mais rapidamente.

14. topic.prefix: "table"

  • Descrição: Prefixo usado para os nomes dos tópicos do Kafka onde as mudanças serão publicadas.
  • Função: Todos os tópicos criados por este conector terão esse prefixo, seguido pelo nome da tabela monitorada. Exemplo: table.clients_account_details.

15. database.hostname: "172.18.0.2"

  • Descrição: Endereço IP ou hostname do servidor Oracle.
  • Função: Especifica onde o banco de dados Oracle está localizado.

16. database.port: "1521"

  • Descrição: Porta do banco de dados Oracle.
  • Função: A porta padrão do Oracle Database é 1521, mas isso pode ser alterado dependendo da configuração específica do servidor.

17. database.user: "c##dbzuser"

  • Descrição: Nome de usuário usado para conectar ao banco de dados Oracle.
  • Função: O usuário deve ter permissões adequadas para ler logs de redo e monitorar as tabelas especificadas.

18. database.password: "dbz"

  • Descrição: Senha correspondente ao usuário do banco de dados.
  • Função: Necessária para autenticar a conexão com o banco de dados Oracle.

19. database.dbname: "FREE"

  • Descrição: Nome do banco de dados Oracle.
  • Função: Identifica qual banco de dados será monitorado pelo conector.

20. database.pdb.name: "FREEPDB1"

  • Descrição: Nome do Pluggable Database (PDB) dentro do Oracle.
  • Função: Especifica o PDB que o conector irá monitorar. Isso é relevante em ambientes onde o Oracle usa arquitetura multitenant (com PDBs).

21. database.query.timeout.ms: "900000"

  • Descrição: Tempo limite para consultas ao banco de dados (em milissegundos).
  • Função: Especifica que consultas que levam mais de 900.000 milissegundos (15 minutos) devem ser interrompidas.

22. snapshot.mode: "initial"

  • Descrição: Modo de captura instantânea dos dados.
  • Função: initial indica que uma captura instantânea completa dos dados será realizada na primeira vez que o conector for iniciado. Isso é necessário para garantir que o conector comece a capturar mudanças a partir de um estado consistente.

23. table.include.list: "c##dbzuser.clients_account_details"

  • Descrição: Lista de tabelas que o conector deve monitorar.
  • Função: Especifica que apenas a tabela clients_account_details será monitorada para capturar mudanças.

Resumo

Esta configuração detalha como o conector Debezium se conecta ao banco de dados Oracle, captura as mudanças na tabela clients_account_details, e publica essas mudanças em tópicos do Kafka usando o formato Avro. Se precisar de mais detalhes, acesse AQUI a documentação oficial do conector Debezium.

Vamos agora subir o conector na Confluent Platform:

image 63

Cliquemos em Upload connector config File

image 64

Clique em Next

image 65

Agora clique em Launch

image 66

Vamos verificar os dados em nosso tópico:

image 67
image 68

Conector Source funcionando ! Agora é hora de configurarmos o conector JDB Sink para replicar os dados no PostgreeSQL.

Salve o arquivo como postgres-sink.json

JSON
{
  "name": "oracle.client.sink",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "cad.clients_account_details",
    "connection.url": "jdbc:postgresql://172.18.0.3:5432/flink_db",
    "connection.user": "flink_kafka",
    "connection.password": "flink_kafka",
    "insert.mode": "upsert",
    "table.name.format": "clients_account_details",
    "pk.mode": "record_key",
    "pk.fields": "FIRST_NAME,LAST_NAME",
    "batch.size": 1,
    "auto.create": true,
    "auto.evolve": true,
    "delete.enabled": true,
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": false
  }
}

Analisemos com detalhes cada parâmetro:

1. name: "oracle.client.sink"

  • Descrição: Nome do conector.
  • Função: Identifica essa instância do conector dentro do Apache Kafka Connect.

2. connector.class: "io.confluent.connect.jdbc.JdbcSinkConnector"

  • Descrição: Classe do conector JDBC Sink.
  • Função: Especifica que este conector será utilizado para gravar dados em um banco de dados via JDBC.

3. tasks.max: "1"

  • Descrição: Número máximo de tarefas (tasks) que o conector pode executar.
  • Função: Define o número de threads paralelas que o conector pode usar. Nesse caso, apenas uma tarefa será usada.

4. topics: "cad.clients_account_details"

  • Descrição: Nome do tópico Kafka que o conector irá consumir.
  • Função: Especifica o tópico Kafka do qual os dados serão lidos e enviados para o banco de dados PostgreSQL.

5. connection.url: "jdbc:postgresql://172.18.0.3:5432/flink_db"

  • Descrição: URL de conexão JDBC ao banco de dados PostgreSQL.
  • Função: Define onde o banco de dados PostgreSQL está localizado e como conectar-se a ele.

6. connection.user: "flink_kafka"

  • Descrição: Nome de usuário para se conectar ao banco de dados.
  • Função: Autentica a conexão com o banco de dados PostgreSQL.

7. connection.password: "flink_kafka"

  • Descrição: Senha para o usuário do banco de dados.
  • Função: Autentica a conexão com o banco de dados PostgreSQL.

8. insert.mode: "upsert"

  • Descrição: Modo de inserção de dados no banco.
  • Função: Define que os dados serão inseridos ou atualizados (upsert). Se uma linha com a mesma chave primária já existir, ela será atualizada; caso contrário, será inserida.

9. table.name.format: "clients_account_details"

  • Descrição: Formato do nome da tabela no banco de dados.
  • Função: Especifica a tabela no PostgreSQL onde os dados do tópico Kafka serão inseridos.

10. pk.mode: "record_key"

  • Descrição: Define como as chaves primárias serão geradas.
  • Função: Indica que as chaves primárias serão extraídas da chave do registro Kafka.

11. pk.fields: "FIRST_NAME,LAST_NAME"

  • Descrição: Campos que compõem a chave primária.
  • Função: Especifica que os campos FIRST_NAME e LAST_NAME serão usados como chave primária na tabela PostgreSQL.

12. batch.size: 1

  • Descrição: Tamanho do lote de registros a serem enviados em uma única operação de banco de dados.
  • Função: Define que cada registro será enviado individualmente ao banco de dados (não será feito em lotes). Isso é útil para garantir a menor latência possível, mas pode ter impacto na performance.

13. auto.create: true

  • Descrição: Habilita a criação automática de tabelas.
  • Função: Se a tabela clients_account_details não existir no banco de dados PostgreSQL, ela será criada automaticamente.

14. auto.evolve: true

  • Descrição: Habilita a evolução automática do esquema.
  • Função: Se o esquema da tabela mudar (por exemplo, novos campos forem adicionados), o conector ajustará a tabela automaticamente para acomodar essas mudanças.

15. delete.enabled: true

  • Descrição: Habilita a exclusão de registros.
  • Função: Permite que o conector processe mensagens de exclusão (delete) e as reflita no banco de dados, removendo os registros correspondentes.

16. errors.log.enable: true

  • Descrição: Habilita o registro de erros no log.
  • Função: Garante que os erros encontrados pelo conector sejam registrados para facilitar a depuração e o monitoramento.

17. errors.log.include.messages: true

  • Descrição: Inclui mensagens de erro detalhadas no log.
  • Função: Fornece informações detalhadas sobre os erros, o que pode ajudar a entender o problema mais rapidamente.

18. key.converter e value.converter: "io.confluent.connect.avro.AvroConverter"

  • Descrição: Converters para a chave (key) e o valor (value) dos eventos.
  • Função: Define que os dados no Kafka estão em formato Avro e especifica o uso do AvroConverter para converter esses dados ao gravá-los no banco de dados.

19. key.converter.schema.registry.url e value.converter.schema.registry.url: "http://schema-registry:8081"

  • Descrição: URLs do Schema Registry.
  • Função: O Schema Registry é usado para gerenciar os esquemas Avro dos dados que estão sendo consumidos do Kafka.

20. transforms: "unwrap"

  • Descrição: Lista de transformações a serem aplicadas aos dados.
  • Função: Aplica a transformação especificada para modificar os registros antes que sejam enviados ao banco de dados.

21. transforms.unwrap.type: "io.debezium.transforms.ExtractNewRecordState"

  • Descrição: Especifica o tipo de transformação unwrap.
  • Função: A transformação ExtractNewRecordState é usada para extrair o estado dos novos registros, ou seja, remove o envelope Debezium e deixa apenas o conteúdo do registro, simplificando a estrutura do dado antes de gravá-lo no banco.

22. transforms.unwrap.drop.tombstones: false

  • Descrição: Define se registros “tombstone” devem ser descartados.
  • Função: Registros “tombstone” são marcadores de exclusão. Quando false, esses registros não serão descartados, permitindo que operações de exclusão sejam processadas corretamente.

Resumo

Este conector JDBC Sink é configurado para ler dados do tópico Kafka, processá-los (remover o envelope Debezium e aplicar a transformação unwrap) e gravá-los em uma tabela PostgreSQL chamada clients_account_details. O modo de inserção é upsert, o que significa que os registros serão atualizados se já existirem ou inseridos se forem novos. O conector também é configurado para criar e evoluir automaticamente a tabela no PostgreSQL, e para lidar com exclusões de registros.

Vamos subir agora o conector JDBC Sink na Confluent Platform:

image 70

Vamos conferir no postgres se a tabela e os dados foram reeplicados:

image 71

Tabela e dados replicados com sucesso !

No próximo artigo vamos juntar as tabelas flink_clients_account_balance, clients_addres e clients_account_details no Flink e depois vxxamos salvar esses dados em um novo tópico do Kafka.

Até lá !

Nota de agradecimento: Gostaria de agradecer ao meu camarada Robson Rozati pelas dicas e ajuda para fazer o conector Debezium funcionar !

ATENÇÃO: O banco de dados Oracle 23ai NÃO é homologado para o conector Debezium Oracle CDC. Porém, não encontrei nenhum problema ou mal funcionamento em meus testes. A escolha se deu pelo fato de que é uma versão FREE do Oracle e possui todas as funcionalidades que precisamos.

Referências

Quão útil foi este post ?

Clique em uma estrela para classificar o post

nota média 5 / 5. Contagem de votos: 10

Sem votos ! Seja o primeiro a classificar !

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

plugins premium WordPress