Pular para o conteúdo

Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte VIII – Enriquecendo dados utilizando o Flink

Confluent Platform e Apache Flink: Enriquecendo dados utilizando o Flink

Aqui estou de volta para concluir nossa série de artigos sobre Confluent Platform, Kafka e Apache Flink. Enfim, chegamos ao último capítulo dessa jornada juntos.

Abaixo o modelo que iremos finalizar hoje:

Confluent Platform e Apache Flink

Foi um longo caminho até chegarmos aqui ! Ao longo dessa série de artigos, nós exploramos:

  1. Instalação do ambiente
  2. Conector spooldir
    • No segundo artigo, vimos como instalar o conector spooldir e gerar os dados para teste.
  3. KSQLDB
  4. Rest Proxy e Confluent CLI
    • No quarto artigo, falamos sobre Rest Proxy e Confluent CLI também atendendo a pedidos :).
  5. Apache Flink e Kafka
    • No quinto artigo, começamos com o Flink e criamos uma tabela baseada em um tópico do Kafka.
  6. Apache Flink e PostgreSQL
    • No sexto artigo, criamos uma tabela no Flink baseada em uma tabela do PostgreSQL.
  7. Replicando dados do Oracle para o PostgreSQL.
    • No sétimo artigo, criamos uma replicação de dados entre o Oracle e o PostgreSQL utilizando o conector Debezium Oracle CDC.

Antes de chegarmos no objetivo final, vamos fazer algumas últimas mudanças em nosso docker-compose.yml.

O que foi melhorado no ambiente:

  • Adição de um novo container para geração de dados: Com essa alteração, ao iniciar o ambiente, o novo container automaticamente começa a criar os arquivos que serão processados pelo conector spooldir.
  • Novo conector PostgreSQL no Apache Flink: Instalei um conector atualizado para PostgreSQL no Flink, já que o anterior apresentava instabilidade e não era compatível com a versão do PostgreSQL que estamos utilizando.
  • Novo arquivo .env: Adicionei novas variáveis para tornar o ambiente mais personalizável e simples de atualizar.

Criação do ambiente de testes

Arquivo .env

Bash
# PostgreSQL
PGS_IMAGE_VERSION=postgres:13
PGS_DATABASE_HOST=localhost
PGS_DATABASE_NAME=flink_db
PGS_DATABASE_USER=flink_kafka
PGS_DATABASE_PASSWORD=flink_kafka

# PGADMIN
PGA_IMAGE_VERSION=dpage/pgadmin4
PGA_USER=flink@kafka.com
PGA_PASSWORD=flink_db

# Oracle
ORA_IMAGE_VERSION=container-registry.oracle.com/database/free:latest 
ORA_DATABASE_DSN=localhost:1521/FREEPDB1
ORA_DATABASE_SYS_PASSWORD=flink_kafka
ORA_DATABASE_USER=c##dbzuser
ORA_DATABASE_PASSWORD=flink_kafka

# CONFLUENT Kafka
# Broker
KBR_IMAGE_VERSION=confluentinc/cp-server:7.5.1
# Schema Registry
KSR_IMAGE_VERSION=confluentinc/cp-schema-registry:7.5.1 
# Connect
KCN_IMAGE_VERSION=cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
# Control Center
KCC_IMAGE_VERSION=confluentinc/cp-enterprise-control-center:7.5.1
# KSQLDB
KSD_IMAGE_VERSION=confluentinc/cp-ksqldb-server:7.5.1
# KSQLDB Client
KSC_IMAGE_VERSION=confluentinc/cp-ksqldb-cli:7.5.1
# KSQLDB Datagen
KSG_IMAGE_VERSION=confluentinc/ksqldb-examples:7.5.1
# Rest Proxy
KRP_IMAGE_VERSION=confluentinc/cp-kafka-rest:7.5.1

# Apache Flink
# Job Manager
FJM_IMAGE_VERSION=cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
# Task Manager
FTM_IMAGE_VERSION=cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
# Flink SQL Client
FSC_IMAGE_VERSION=cnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17

# Generate Data
PYTHON_VERSION=python:3.9-slim
SCRIPT_NAME=tsv-spooldir-source.py
OUTPUT_DIR_HOST=/home/swillians/workarea/confluent-spooldir-test/files
OUTPUT_DIR_CONT=/app/output

docker-compose.yml

YAML
---
services:
  oracle-db:
    image: ${ORA_IMAGE_VERSION}
    container_name: oracle_23ai
    environment:
      ORACLE_PWD: ${ORA_DATABASE_SYS_PASSWORD}
      ORACLE_CHARACTERSET: "AL32UTF8"
      ENABLE_ARCHIVELOG: "true"
      ENABLE_FORCE_LOGGING: "true"
    ports:
      - "1521:1521"  
    volumes:
      - /home/swillians/workarea/oracle/23ai/data:/opt/oracle/oradata
    restart: always

  postgres:
    image: ${PGS_IMAGE_VERSION}
    container_name: postgres_container
    environment:
      POSTGRES_USER: ${PGS_DATABASE_USER}
      POSTGRES_PASSWORD: ${PGS_DATABASE_PASSWORD}
      POSTGRES_DB: ${PGS_DATABASE_NAME}
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data

  pgadmin:
    image: ${PGA_IMAGE_VERSION}
    container_name: pgadmin_container
    environment:
      PGADMIN_DEFAULT_EMAIL: ${PGA_USER}
      PGADMIN_DEFAULT_PASSWORD: ${PGA_PASSWORD}
    ports:
      - "5050:80"
    depends_on:
      - postgres
    volumes:
      - pgadmin_data:/var/lib/pgadmin

  broker:
    image: ${KBR_IMAGE_VERSION}
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 'broker:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" 
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

  schema-registry:
    image: ${KSR_IMAGE_VERSION}
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: ${KCN_IMAGE_VERSION}
    user: root
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - workarea:/app/workarea
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    command:
      - bash
      - -c
      - |
        echo "Installing ZIP"
        yum install --nogpgcheck --setopt=sslverify=false zip -y
        #
        echo "Installing Connectors"
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-http-1.7.6.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-oracle-cdc-2.2.1.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-sftp-3.2.4.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/streamthoughts-kafka-connect-file-pulse-2.10.0.zip
        #
        echo "Downloading Debezium Oracle Connector..."
        wget -O debezium-connector-oracle-plugin.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/2.7.0.Final/debezium-connector-oracle-2.7.0.Final-plugin.tar.gz --no-check-certificate
        wget -O debezium-scripting.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-scripting/2.7.0.Final/debezium-scripting-2.7.0.Final.tar.gz --no-check-certificate
    
        echo "Downloading JDBC driver for Oracle ..."
        wget https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/21.6.0.0/ojdbc8-21.6.0.0.jar -P /usr/share/java/kafka/ --no-check-certificate

        echo "Downloading Groovy SDK ..."
        wget https://groovy.jfrog.io/artifactory/dist-release-local/groovy-zips/apache-groovy-sdk-4.0.17.zip -P /tmp/groovy/ --no-check-certificate

        echo "Extracting the archive...Debezium !"
        tar -xzvf debezium-connector-oracle-plugin.tar.gz -C /usr/share/java/
        tar -xzvf debezium-scripting.tar.gz -C /usr/share/java/debezium-connector-oracle/ --strip-components=1
        echo "Extracting the archive...Groovy !"
        unzip /tmp/groovy/apache-groovy-sdk-4.0.17.zip */*/groovy-4.0.17.jar */*/groovy-jsr223-4.0.17.jar */*/groovy-json-4.0.17.jar -d /tmp/groovy/
        echo "Copying Groovy archives..."
        cp /tmp/groovy/*/*/*.jar /usr/share/java/debezium-connector-oracle/

        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity

  control-center:
    image: ${KCC_IMAGE_VERSION}
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: ${KSD_IMAGE_VERSION}
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: ${KSC_IMAGE_VERSION}
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: ${KSG_IMAGE_VERSION}
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: ${KRP_IMAGE_VERSION}
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

  flink-init:
    image: busybox
    container_name: flink-init
    command: >
      sh -c "
      echo 'Downloading Flink Kafka connector...' &&
      wget -O /opt/flink/plugins/kafka/flink-sql-connector-kafka-3.2.0-1.19.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar --no-check-certificate &&
      echo 'Connector downloaded successfully.'
      echo 'Downloading Flink JDBC connector...' &&
      wget -O /opt/flink/plugins/jdbc/flink-connector-jdbc-3.2.0-1.19.jar https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar --no-check-certificate &&
      echo 'Connector downloaded successfully.'
      "
    volumes:
      - flink-plugins-kafka:/opt/flink/plugins/kafka
      - flink-plugins-jdbc:/opt/flink/plugins/jdbc
    depends_on:
      - broker

  flink-jobmanager:
    image: ${FJM_IMAGE_VERSION}
    hostname: flink-jobmanager
    container_name: flink-jobmanager
    ports:
      - 9081:9081
    command: jobmanager
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: flink-jobmanager
        rest.bind-port: 9081
    volumes:
      - flink-plugins-kafka:/opt/flink/plugins/kafka
      - flink-plugins-jdbc:/opt/flink/plugins/jdbc
    depends_on:
      - flink-init

  flink-taskmanager:
    image: ${FTM_IMAGE_VERSION}
    hostname: flink-taskmanager
    container_name: flink-taskmanager
    depends_on:
      - flink-jobmanager
    command: taskmanager
    scale: 1
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 10
    volumes:
      - flink-plugins-kafka:/opt/flink/plugins/kafka
      - flink-plugins-jdbc:/opt/flink/plugins/jdbc

  flink-sql-client:
    image: ${FSC_IMAGE_VERSION}
    hostname: flink-sql-client
    container_name: flink-sql-client
    depends_on:
      - flink-jobmanager
    environment:
      FLINK_JOBMANAGER_HOST: flink-jobmanager
    volumes:
      - ./settings/:/settings
      - flink-plugins-kafka:/opt/flink/plugins/kafka
      - flink-plugins-jdbc:/opt/flink/plugins/jdbc
      
  generate-account-data:
    image: ${PYTHON_VERSION}
    hostname: generate-account-data
    container_name: generate-account-data
    environment:
      OUTPUT_DIR_CONT: ${OUTPUT_DIR_CONT}
    command:
      - bash
      - -c
      - |
        echo "Copiando Script Python.."
        cd ${OUTPUT_DIR_CONT}
        echo "Executando Script Python.."
        python3 ${SCRIPT_NAME}        
    volumes:
      - ${OUTPUT_DIR_HOST}:${OUTPUT_DIR_CONT}      

volumes:
  workarea:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: /home/swillians/workarea
  flink-plugins-kafka:
    driver: local
  flink-plugins-jdbc:
    driver: local  
  pgdata:  
  pgadmin_data:

Essas mudanças são suficientes para garantir um ambiente de testes mais estável. No entanto, ainda há muito espaço para melhorias, então deixo com vocês a missão de continuar evoluindo e aperfeiçoando.

Para subir o ambiente, utilize o comando sudo docker compose up -d. Assim que todos os containers estiverem ativos, vamos rodar o Flink SQL Client. Como reorganizei os diretórios e instalei um novo conector, o comando de execução será:

Bash
sudo docker compose exec flink-sql-client sql-client.sh embedded --library /opt/flink/plugins/kafka --library /opt/flink/plugins/jdbc

image 17

Ao acessar o Flink SQL Client, execute os comandos a seguir para criar as tabelas que serão usadas no Flink:

Criando as tabelas no Apache Flink

CLIENTS_ADDRESS (PostgreSQL)

  • Contém os endereços dos clientes, com origem no banco de dados PostgreSQL.
SQL
CREATE TABLE clients_address (
    first_name STRING,
    last_name STRING,
    address STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://postgres_container:5432/flink_db',
    'table-name' = 'clients_address',
    'username' = 'flink_kafka',
    'password' = 'flink_kafka',
    'driver' = 'org.postgresql.Driver'
);

CLIENTS_ACCOUNT_DETAILS (PostgreSQL)

  • Armazena as informações da conta, com dados provenientes do PostgreSQL. Essa tabela é replicada do Oracle utilizando o conector Debezium Oracle CDC (SOURCE) e JDBC (SINK).
SQL
CREATE TABLE clients_account_details (
    first_name STRING,
    last_name STRING,
    bank_number BIGINT,
    bank_account STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://postgres_container:5432/flink_db',
    'table-name' = 'clients_account_details',
    'username' = 'flink_kafka',
    'password' = 'flink_kafka',
    'driver' = 'org.postgresql.Driver'
);

FLINK_CLIENTS_ACCOUNT_BALANCES (Kafka)

  • Possui as informações das movimentações de saldo (Kafka).
SQL
CREATE TABLE flink_clients_account_balance (
  first_name STRING,
  last_name STRING,
  account_balance DECIMAL(10,2),
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka', 
    'topic' = 'tsv.spooldirfile.clients', 
    'scan.startup.mode' = 'earliest-offset', 
    'properties.group.id' = 'flink-group',
    'properties.bootstrap.servers' = 'broker:29092', 
    'value.format' = 'json'
);

Após criar as tabelas, faremos um teste com uma delas para garantir que está tudo funcionando corretamente:

Bash
SELECT * FROM clients_account_details;

image 18

Agora chegamos ao ponto principal. Vamos criar uma tabela chamada CLIENTS_ACCOUNTING_DETAILS, que contará com atributos das tabelas CLIENTS_ACCOUNT_DETAILS, CLIENTS_ADDRESS e FLINK_CLIENTS_ACCOUNT_BALANCES. Esses dados serão enviados a um tópico do Kafka

Primeiramente, criemos o tópico no kafka:

image 26

Agora executemos o código abaixo para criarmos a tabela CLIENTS_ACCOUNTING_DETAILS no Flink:

Bash
CREATE TABLE clients_accounting_details
WITH (
    'connector' = 'kafka',
    'topic' = 'clients.accounting.details',
    'properties.bootstrap.servers' = 'broker:29092',
    'properties.group.id' = 'flink-group',
    'value.format' = 'json',
    'scan.startup.mode' = 'earliest-offset'  -- Ou 'latest-offset'
) AS
SELECT
    ca.first_name,
    ca.last_name,
    ca.address,
    cad.bank_number,
    cad.bank_account,
    fab.account_balance
FROM
    clients_address AS ca
JOIN
    clients_account_details AS cad
ON
    ca.first_name = cad.first_name
AND
    ca.last_name = cad.last_name
JOIN
    flink_clients_account_balance AS fab
ON
    ca.first_name = fab.first_name
AND
    ca.last_name = fab.last_name;

image 23

Com a tabela criada, vamos verificar no Kafka:

image 28

Olha os dados sendo gerados no nosso tópico ! 🙂

Verifiquemos os dados no Flink:

Bash
SELECT * FROM clients_accounting_details;

image 25

Veja que, a partir de uma origem (Kafka), enriquecemos nossos dados com informações do Oracle e do PostgreSQL, criando uma nova stream no Kafka com esses dados!

E assim concluímos esta série de artigos!

Agradeço a todos que me acompanharam nesta jornada. Peço desculpas por alguns pontos improvisados, mas acredito que alcancei meu objetivo de apresentar essas tecnologias para quem está começando.

Aqui vão algumas dicas finais:

  • Aprimore o docker-compose e o ambiente. Há muito espaço para otimizar ainda mais esse ambiente. Além disso, você vai aprender o suficiente sobre Docker para criar seus próprios labs de forma simples!
  • Experimente outros conectores. Existem mais de uma centena de conectores disponíveis na Confluent Platform e também no Apache Flink.
  • Crie novas pipelines, automatize quando possível e deixe sua imaginação fluir!

Se tiver dúvidas ou apenas quiser conversar sobre essas tecnologias, fique à vontade para comentar ou entrar em contato ! 🙂

Nos vemos nos próximos artigos ! Um grande abraço !

Referências

Sergio Willians

Sergio Willians

Sergio Willians é o fundador do GPO (Grupo de Profissionais Oracle) e possui quase 30 anos de experiência em tecnologias Oracle, sendo especialista em desenvolvimento Forms/Reports, PL/SQL e EBS (E-Business Suite) nos módulos Receivables, Payables e General Ledger. Atualmente trabalha na Scania Latin America, onde se dedica à área de integração de dados com Confluent Kafka. Sua paixão é compartilhar conhecimento com a comunidade Oracle, contribuindo para o crescimento e a excelência da plataforma.

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

plugins premium WordPress