Confluent Platform e Apache Flink: Enriquecendo dados utilizando o Flink
Aqui estou de volta para concluir nossa série de artigos sobre Confluent Platform, Kafka e Apache Flink. Enfim, chegamos ao último capítulo dessa jornada juntos.
Abaixo o modelo que iremos finalizar hoje:
Foi um longo caminho até chegarmos aqui ! Ao longo dessa série de artigos, nós exploramos:
- Instalação do ambiente
- No primeiro artigo, vimos como instalar o ambiente.
- Conector spooldir
- No segundo artigo, vimos como instalar o conector spooldir e gerar os dados para teste.
- KSQLDB
- No terceiro artigo, incluímos o KSQLDB atendendo a pedidos :).
- Rest Proxy e Confluent CLI
- No quarto artigo, falamos sobre Rest Proxy e Confluent CLI também atendendo a pedidos :).
- Apache Flink e Kafka
- No quinto artigo, começamos com o Flink e criamos uma tabela baseada em um tópico do Kafka.
- Apache Flink e PostgreSQL
- No sexto artigo, criamos uma tabela no Flink baseada em uma tabela do PostgreSQL.
- Replicando dados do Oracle para o PostgreSQL.
- No sétimo artigo, criamos uma replicação de dados entre o Oracle e o PostgreSQL utilizando o conector Debezium Oracle CDC.
Antes de chegarmos no objetivo final, vamos fazer algumas últimas mudanças em nosso docker-compose.yml
.
O que foi melhorado no ambiente:
- Adição de um novo container para geração de dados: Com essa alteração, ao iniciar o ambiente, o novo container automaticamente começa a criar os arquivos que serão processados pelo conector spooldir.
- Novo conector PostgreSQL no Apache Flink: Instalei um conector atualizado para PostgreSQL no Flink, já que o anterior apresentava instabilidade e não era compatível com a versão do PostgreSQL que estamos utilizando.
- Novo arquivo .env: Adicionei novas variáveis para tornar o ambiente mais personalizável e simples de atualizar.
Criação do ambiente de testes
Arquivo .env
# PostgreSQL
PGS_IMAGE_VERSION=postgres:13
PGS_DATABASE_HOST=localhost
PGS_DATABASE_NAME=flink_db
PGS_DATABASE_USER=flink_kafka
PGS_DATABASE_PASSWORD=flink_kafka
# PGADMIN
PGA_IMAGE_VERSION=dpage/pgadmin4
PGA_USER=flink@kafka.com
PGA_PASSWORD=flink_db
# Oracle
ORA_IMAGE_VERSION=container-registry.oracle.com/database/free:latest
ORA_DATABASE_DSN=localhost:1521/FREEPDB1
ORA_DATABASE_SYS_PASSWORD=flink_kafka
ORA_DATABASE_USER=c##dbzuser
ORA_DATABASE_PASSWORD=flink_kafka
# CONFLUENT Kafka
# Broker
KBR_IMAGE_VERSION=confluentinc/cp-server:7.5.1
# Schema Registry
KSR_IMAGE_VERSION=confluentinc/cp-schema-registry:7.5.1
# Connect
KCN_IMAGE_VERSION=cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
# Control Center
KCC_IMAGE_VERSION=confluentinc/cp-enterprise-control-center:7.5.1
# KSQLDB
KSD_IMAGE_VERSION=confluentinc/cp-ksqldb-server:7.5.1
# KSQLDB Client
KSC_IMAGE_VERSION=confluentinc/cp-ksqldb-cli:7.5.1
# KSQLDB Datagen
KSG_IMAGE_VERSION=confluentinc/ksqldb-examples:7.5.1
# Rest Proxy
KRP_IMAGE_VERSION=confluentinc/cp-kafka-rest:7.5.1
# Apache Flink
# Job Manager
FJM_IMAGE_VERSION=cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
# Task Manager
FTM_IMAGE_VERSION=cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
# Flink SQL Client
FSC_IMAGE_VERSION=cnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17
# Generate Data
PYTHON_VERSION=python:3.9-slim
SCRIPT_NAME=tsv-spooldir-source.py
OUTPUT_DIR_HOST=/home/swillians/workarea/confluent-spooldir-test/files
OUTPUT_DIR_CONT=/app/output
docker-compose.yml
---
services:
oracle-db:
image: ${ORA_IMAGE_VERSION}
container_name: oracle_23ai
environment:
ORACLE_PWD: ${ORA_DATABASE_SYS_PASSWORD}
ORACLE_CHARACTERSET: "AL32UTF8"
ENABLE_ARCHIVELOG: "true"
ENABLE_FORCE_LOGGING: "true"
ports:
- "1521:1521"
volumes:
- /home/swillians/workarea/oracle/23ai/data:/opt/oracle/oradata
restart: always
postgres:
image: ${PGS_IMAGE_VERSION}
container_name: postgres_container
environment:
POSTGRES_USER: ${PGS_DATABASE_USER}
POSTGRES_PASSWORD: ${PGS_DATABASE_PASSWORD}
POSTGRES_DB: ${PGS_DATABASE_NAME}
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
pgadmin:
image: ${PGA_IMAGE_VERSION}
container_name: pgadmin_container
environment:
PGADMIN_DEFAULT_EMAIL: ${PGA_USER}
PGADMIN_DEFAULT_PASSWORD: ${PGA_PASSWORD}
ports:
- "5050:80"
depends_on:
- postgres
volumes:
- pgadmin_data:/var/lib/pgadmin
broker:
image: ${KBR_IMAGE_VERSION}
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 'broker:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry:
image: ${KSR_IMAGE_VERSION}
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: ${KCN_IMAGE_VERSION}
user: root
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
volumes:
- workarea:/app/workarea
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
command:
- bash
- -c
- |
echo "Installing ZIP"
yum install --nogpgcheck --setopt=sslverify=false zip -y
#
echo "Installing Connectors"
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-http-1.7.6.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-oracle-cdc-2.2.1.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-sftp-3.2.4.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/streamthoughts-kafka-connect-file-pulse-2.10.0.zip
#
echo "Downloading Debezium Oracle Connector..."
wget -O debezium-connector-oracle-plugin.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/2.7.0.Final/debezium-connector-oracle-2.7.0.Final-plugin.tar.gz --no-check-certificate
wget -O debezium-scripting.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-scripting/2.7.0.Final/debezium-scripting-2.7.0.Final.tar.gz --no-check-certificate
echo "Downloading JDBC driver for Oracle ..."
wget https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/21.6.0.0/ojdbc8-21.6.0.0.jar -P /usr/share/java/kafka/ --no-check-certificate
echo "Downloading Groovy SDK ..."
wget https://groovy.jfrog.io/artifactory/dist-release-local/groovy-zips/apache-groovy-sdk-4.0.17.zip -P /tmp/groovy/ --no-check-certificate
echo "Extracting the archive...Debezium !"
tar -xzvf debezium-connector-oracle-plugin.tar.gz -C /usr/share/java/
tar -xzvf debezium-scripting.tar.gz -C /usr/share/java/debezium-connector-oracle/ --strip-components=1
echo "Extracting the archive...Groovy !"
unzip /tmp/groovy/apache-groovy-sdk-4.0.17.zip */*/groovy-4.0.17.jar */*/groovy-jsr223-4.0.17.jar */*/groovy-json-4.0.17.jar -d /tmp/groovy/
echo "Copying Groovy archives..."
cp /tmp/groovy/*/*/*.jar /usr/share/java/debezium-connector-oracle/
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
control-center:
image: ${KCC_IMAGE_VERSION}
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: ${KSD_IMAGE_VERSION}
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: ${KSC_IMAGE_VERSION}
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: ${KSG_IMAGE_VERSION}
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: ${KRP_IMAGE_VERSION}
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
flink-init:
image: busybox
container_name: flink-init
command: >
sh -c "
echo 'Downloading Flink Kafka connector...' &&
wget -O /opt/flink/plugins/kafka/flink-sql-connector-kafka-3.2.0-1.19.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar --no-check-certificate &&
echo 'Connector downloaded successfully.'
echo 'Downloading Flink JDBC connector...' &&
wget -O /opt/flink/plugins/jdbc/flink-connector-jdbc-3.2.0-1.19.jar https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/3.2.0-1.19/flink-connector-jdbc-3.2.0-1.19.jar --no-check-certificate &&
echo 'Connector downloaded successfully.'
"
volumes:
- flink-plugins-kafka:/opt/flink/plugins/kafka
- flink-plugins-jdbc:/opt/flink/plugins/jdbc
depends_on:
- broker
flink-jobmanager:
image: ${FJM_IMAGE_VERSION}
hostname: flink-jobmanager
container_name: flink-jobmanager
ports:
- 9081:9081
command: jobmanager
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
rest.bind-port: 9081
volumes:
- flink-plugins-kafka:/opt/flink/plugins/kafka
- flink-plugins-jdbc:/opt/flink/plugins/jdbc
depends_on:
- flink-init
flink-taskmanager:
image: ${FTM_IMAGE_VERSION}
hostname: flink-taskmanager
container_name: flink-taskmanager
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 10
volumes:
- flink-plugins-kafka:/opt/flink/plugins/kafka
- flink-plugins-jdbc:/opt/flink/plugins/jdbc
flink-sql-client:
image: ${FSC_IMAGE_VERSION}
hostname: flink-sql-client
container_name: flink-sql-client
depends_on:
- flink-jobmanager
environment:
FLINK_JOBMANAGER_HOST: flink-jobmanager
volumes:
- ./settings/:/settings
- flink-plugins-kafka:/opt/flink/plugins/kafka
- flink-plugins-jdbc:/opt/flink/plugins/jdbc
generate-account-data:
image: ${PYTHON_VERSION}
hostname: generate-account-data
container_name: generate-account-data
environment:
OUTPUT_DIR_CONT: ${OUTPUT_DIR_CONT}
command:
- bash
- -c
- |
echo "Copiando Script Python.."
cd ${OUTPUT_DIR_CONT}
echo "Executando Script Python.."
python3 ${SCRIPT_NAME}
volumes:
- ${OUTPUT_DIR_HOST}:${OUTPUT_DIR_CONT}
volumes:
workarea:
driver: local
driver_opts:
type: none
o: bind
device: /home/swillians/workarea
flink-plugins-kafka:
driver: local
flink-plugins-jdbc:
driver: local
pgdata:
pgadmin_data:
Essas mudanças são suficientes para garantir um ambiente de testes mais estável. No entanto, ainda há muito espaço para melhorias, então deixo com vocês a missão de continuar evoluindo e aperfeiçoando.
Para subir o ambiente, utilize o comando sudo docker compose up -d
. Assim que todos os containers estiverem ativos, vamos rodar o Flink SQL Client. Como reorganizei os diretórios e instalei um novo conector, o comando de execução será:
sudo docker compose exec flink-sql-client sql-client.sh embedded --library /opt/flink/plugins/kafka --library /opt/flink/plugins/jdbc
Ao acessar o Flink SQL Client, execute os comandos a seguir para criar as tabelas que serão usadas no Flink:
Criando as tabelas no Apache Flink
CLIENTS_ADDRESS
(PostgreSQL)
- Contém os endereços dos clientes, com origem no banco de dados PostgreSQL.
CREATE TABLE clients_address (
first_name STRING,
last_name STRING,
address STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres_container:5432/flink_db',
'table-name' = 'clients_address',
'username' = 'flink_kafka',
'password' = 'flink_kafka',
'driver' = 'org.postgresql.Driver'
);
CLIENTS_ACCOUNT_DETAILS
(PostgreSQL)
- Armazena as informações da conta, com dados provenientes do PostgreSQL. Essa tabela é replicada do Oracle utilizando o conector Debezium Oracle CDC (SOURCE) e JDBC (SINK).
CREATE TABLE clients_account_details (
first_name STRING,
last_name STRING,
bank_number BIGINT,
bank_account STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres_container:5432/flink_db',
'table-name' = 'clients_account_details',
'username' = 'flink_kafka',
'password' = 'flink_kafka',
'driver' = 'org.postgresql.Driver'
);
FLINK_CLIENTS_ACCOUNT_BALANCES
(Kafka)
- Possui as informações das movimentações de saldo (Kafka).
CREATE TABLE flink_clients_account_balance (
first_name STRING,
last_name STRING,
account_balance DECIMAL(10,2),
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR `ts` AS `ts` - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'tsv.spooldirfile.clients',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'flink-group',
'properties.bootstrap.servers' = 'broker:29092',
'value.format' = 'json'
);
Após criar as tabelas, faremos um teste com uma delas para garantir que está tudo funcionando corretamente:
SELECT * FROM clients_account_details;
Agora chegamos ao ponto principal. Vamos criar uma tabela chamada CLIENTS_ACCOUNTING_DETAILS
, que contará com atributos das tabelas CLIENTS_ACCOUNT_DETAILS
, CLIENTS_ADDRESS
e FLINK_CLIENTS_ACCOUNT_BALANCES
. Esses dados serão enviados a um tópico do Kafka
Primeiramente, criemos o tópico no kafka:
Agora executemos o código abaixo para criarmos a tabela CLIENTS_ACCOUNTING_DETAILS
no Flink:
CREATE TABLE clients_accounting_details
WITH (
'connector' = 'kafka',
'topic' = 'clients.accounting.details',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'flink-group',
'value.format' = 'json',
'scan.startup.mode' = 'earliest-offset' -- Ou 'latest-offset'
) AS
SELECT
ca.first_name,
ca.last_name,
ca.address,
cad.bank_number,
cad.bank_account,
fab.account_balance
FROM
clients_address AS ca
JOIN
clients_account_details AS cad
ON
ca.first_name = cad.first_name
AND
ca.last_name = cad.last_name
JOIN
flink_clients_account_balance AS fab
ON
ca.first_name = fab.first_name
AND
ca.last_name = fab.last_name;
Com a tabela criada, vamos verificar no Kafka:
Olha os dados sendo gerados no nosso tópico ! 🙂
Verifiquemos os dados no Flink:
SELECT * FROM clients_accounting_details;
Veja que, a partir de uma origem (Kafka), enriquecemos nossos dados com informações do Oracle e do PostgreSQL, criando uma nova stream no Kafka com esses dados!
E assim concluímos esta série de artigos!
Agradeço a todos que me acompanharam nesta jornada. Peço desculpas por alguns pontos improvisados, mas acredito que alcancei meu objetivo de apresentar essas tecnologias para quem está começando.
Aqui vão algumas dicas finais:
- Aprimore o docker-compose e o ambiente. Há muito espaço para otimizar ainda mais esse ambiente. Além disso, você vai aprender o suficiente sobre Docker para criar seus próprios labs de forma simples!
- Experimente outros conectores. Existem mais de uma centena de conectores disponíveis na Confluent Platform e também no Apache Flink.
- Crie novas pipelines, automatize quando possível e deixe sua imaginação fluir!
Se tiver dúvidas ou apenas quiser conversar sobre essas tecnologias, fique à vontade para comentar ou entrar em contato ! 🙂
Nos vemos nos próximos artigos ! Um grande abraço !
Referências