Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte V – Iniciando com o Apache Flink
No artigo anterior, discutimos o Rest Proxy e testamos sua funcionalidade utilizando o Confluent CLI. Mas agora, o tão esperado momento chegou! Vamos começar a utilizar o Flink. No entanto (sempre há um porém), precisaremos fazer uma alteração em nosso arquivo docker-compose.yml
.
Não fiquem bravos, essas mudanças serão benéficas para que tenhamos mais traquilidade nos próximos capítulos de nossa saga.
Abaixo o nosso novo docker-compose.yml
---
services:
broker:
image: confluentinc/cp-server:7.5.1
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 'broker:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry:
image: confluentinc/cp-schema-registry:7.5.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
volumes:
- workarea:/app/workarea
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-http-1.7.6.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-oracle-cdc-2.2.1.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-sftp-3.2.4.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/streamthoughts-kafka-connect-file-pulse-2.10.0.zip
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
control-center:
image: confluentinc/cp-enterprise-control-center:7.5.1
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.5.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.5.1
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.5.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.5.1
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
flink-init:
image: busybox
container_name: flink-init
command: >
sh -c "
echo 'Downloading Flink Kafka connector...' &&
wget -O /opt/flink/plugins/kafka/flink-sql-connector-kafka-3.2.0-1.19.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar --no-check-certificate &&
echo 'Connector downloaded successfully.'
"
volumes:
- flink-plugins:/opt/flink/plugins/kafka
depends_on:
- broker
flink-jobmanager:
image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
hostname: flink-jobmanager
container_name: flink-jobmanager
ports:
- 9081:9081
command: jobmanager
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
rest.bind-port: 9081
volumes:
- flink-plugins:/opt/flink/plugins/kafka
depends_on:
- flink-init
flink-taskmanager:
image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
hostname: flink-taskmanager
container_name: flink-taskmanager
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 10
volumes:
- flink-plugins:/opt/flink/plugins/kafka
flink-sql-client:
image: cnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17
hostname: flink-sql-client
container_name: flink-sql-client
depends_on:
- flink-jobmanager
environment:
FLINK_JOBMANAGER_HOST: flink-jobmanager
volumes:
- ./settings/:/settings
- flink-plugins:/opt/flink/plugins/kafka
volumes:
workarea:
driver: local
driver_opts:
type: none
o: bind
device: /home/swillians/workarea
flink-plugins:
driver: local
Fiz algumas alterações no arquivo, para que fosse copiada a versão correta do conector kafka para o Flink. Não está muito elegante, inclusive, eu poderia ter utilizado um arquivo .env
para organizar os paths. Mas isso ficará para um outro momento. Prometo que até o final da série, montaremos um docker-compose.yml
bem interessante !
Levante o ambiente ! Após todos os containers subirem, vamos executar o client do Flink:
sudo docker compose exec flink-sql-client sql-client.sh embedded --library /opt/flink/plugins/kafka
O comando –library /opt/flink/plugins/kafka serve para indicar ao client onde os plugins estão armazenados.
Vamos criar a nossa primeira tabela baseada em um tópico kafka:
CREATE TABLE flink_clients_account_balance (
first_name STRING,
last_name STRING,
account_balance DECIMAL(10,2),
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR `ts` AS `ts`
) WITH (
'connector' = 'kafka',
'topic' = 'tsv.spooldirfile.clients',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'broker:29092',
'value.format' = 'json'
);
Explicando o código
Definição da Tabela e Colunas
CREATE TABLE flink_clients_account_balance
- Define a criação de uma tabela chamada flink_clients_account_balance.
Colunas da Tabela
first_name STRING,
last_name STRING,
account_balance DECIMAL(10,2),
ts TIMESTAMP(3) METADATA FROM 'timestamp',
- first_name STRING: Coluna first_name do tipo STRING.
- last_name STRING: Coluna last_name do tipo STRING.
- account_balance DECIMAL: Coluna latest_account_balance do tipo DECIMAL.
- ts TIMESTAMP(3) METADATA FROM ‘timestamp’: Coluna ts do tipo TIMESTAMP com precisão de 3 dígitos para milissegundos. Esta coluna usa metadados provenientes do campo timestamp.
Watermark
- WATERMARK FOR `ts` AS `ts` :Define uma política de geração de watermarks para a coluna ts. Watermarks são usados para gerenciar eventos de processamento em tempo e lidar com dados fora de ordem (late data).
Configurações do Conector
) WITH (
'connector' = 'kafka',
'topic' = 'tsv.spooldirfile.clients',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'broker:29092',
'value.format' = 'json');
Conector Kafka
- ‘connector’ = ‘kafka’: Especifica que a tabela vai usar o conector Kafka.
- Nome do Tópico: ‘topic’ = ‘tsv.spooldirfile.clients’: Define o nome do tópico Kafka de onde os dados serão consumidos.
Modo de Início da Leitura
- ‘scan.startup.mode’ = ‘earliest-offset’: Define que a leitura dos dados deve começar do offset mais antigo disponível no tópico.
Servidores Bootstrap do Kafka
- ‘properties.bootstrap.servers’ = ‘broker:29092’: Define a lista de servidores Kafka para conexão. Neste caso, está usando broker:29092.
Formato dos Valores:
- ‘value.format’ = ‘json’: Define que o formato dos dados no tópico Kafka é JSON.
Resumo
Este código cria uma tabela flink_clients_account_balance no Flink que consome dados de um tópico Kafka chamado tsv.spooldirfile.clients. A tabela tem colunas para first_name, last_name, account_balance, e um campo ts que contém metadados do timestamp. A política de watermarks é definida para a coluna ts para gerenciar eventos em tempo e dados fora de ordem. As configurações adicionais especificam o conector Kafka, o nome do tópico, o modo de início da leitura, os servidores bootstrap do Kafka e o formato dos valores dos dados.
Copie o código de criação de tabela e tecle ENTER para que ela seja criada.
Agora vamos executar o SELECT para ler a nossa tabela flink_clients_account_balance:
SELECT * FROM flink_clients_account_balance;
Tabela criada e já recebendo os dados do tópico do kafka ! 🙂
Fique a vontade para navegar pelas opções. As principais são:
- O: Open Row: Abre o detalhamento dos campos da linha que você escolher.
- N e P: Navegam pelas páginas com os resutados
- G e L: G vai para a página escolhida e L para a última página
- – e +: Definem o tempo de refresh dos dados
- R: Atualiza a lista com os dados
- Q: Sai da lista dos dados
Vamos dar uma olhada no nosso job através do Apache Flink Dashboard. Você pode acessá-lo através do link: http://0.0.0.0:9081/#/overview
E assim terminamos mais um capítulo !
No próximo artigo, iremos criar uma tabela no Flink baseada em uma tabela do banco de dados PostgreSQL. Ele servirá como base para trabalharmos com enriquecimento de dados.
Sergio tenho acompanhado o seus artigos sobre Kafka e tenho aprendido muito sobre a tecnologia.
Fico muito agradecido por ter acesso a esse material. Obrigado por compartilhar seu conhecimento!