Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte II
Olá pessoal !
No primeiro artigo, abordamos a instalação do nosso ambiente de estudos e a implementação da Confluent Platform. Agora, avançaremos ainda mais, focando na instalação e configuração do conector spooldir, fundamental para a geração de dados em nosso tópico.
Para dar início, efetuaremos algumas modificações em nosso arquivo docker-compose.yml. Isso incluirá a criação de um volume persistente e a instalação do conector spooldir no container connect.
Volume persistente
O volume persistente servirá para dois propósitos:
- Repositório do arquivo de instalação do conector
- Diretório de origem dos arquivos CSV a serem lidos
Então vamos começar. Primeiramente crie o diretório workarea:
/home/swillians/workarea
Abra o arquivo docker-compose.yml e localize a seção onde definimos os serviços. Adicione o seguinte trecho abaixono final do arquivo:
volumes:
workarea:
driver: local
driver_opts:
type: none
o: bind
device: /home/swillians/workarea #Inclua aqui o seu diretório
O nome do volume a ser criado será workarea, e referenciará o path /home/swillians/workarea.
Instalação do conector spooldir
Na máquina host, vamos criar o diretório confluent-connectors:
/home/swillians/workarea/confluent-connectors
Faça o download do conector CLICANDO AQUI e o coloque no diretório acima.
Agora, faremos a inclusão do seguinte código no container connect:
connect:
image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
volumes:
- workarea:/app/workarea
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
command:
- bash
- -c
- |
echo "Instalando o Conector"
confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
#
/etc/confluent/docker/run &
#
sleep infinity
Agora é hora de subir os containers:
sudo docker compose up -d
[+] Running 11/0
✔ Container flink-jobmanager Running 0.0s
✔ Container flink-taskmanager Running 0.0s
✔ Container flink-sql-client Running 0.0s
✔ Container broker Running 0.0s
✔ Container schema-registry Running 0.0s
✔ Container connect Running 0.0s
✔ Container control-center Running 0.0s
✔ Container ksqldb-server Running 0.0s
✔ Container ksqldb-cli Running 0.0s
✔ Container ksql-datagen Running 0.0s
✔ Container rest-proxy Running
Com todos os containers executando, acesse agora a Confluent Platform e verifique se o conector spooldir está disponível:
Se o spooldir estiver disponível, significa que deu tudo certo ! 🙂
Você tem a liberdade de instalar quantos conectores desejar. Como ilustrado na imagem acima, tenho vários outros conectores instalados. Se quiser instalar mais de um, siga o exemplo abaixo:
confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/streamthoughts-kafka-connect-file-pulse-2.10.0.zip
Assim você pode instalar quantos conectores forem necessários !
Para facilitar, abaixo o docker-compose.yml completo, com as alterações:
---
services:
broker:
image: confluentinc/cp-server:7.5.1
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 'broker:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry:
image: confluentinc/cp-schema-registry:7.5.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
volumes:
- workarea:/app/workarea
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
command:
- bash
- -c
- |
echo "Instalando o Conector"
confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
#
/etc/confluent/docker/run &
#
sleep infinity
control-center:
image: confluentinc/cp-enterprise-control-center:7.5.1
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.5.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.5.1
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.5.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.5.1
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
flink-sql-client:
image: cnfldemos/flink-sql-client-kafka:1.19.0-scala_2.12-java17
hostname: flink-sql-client
container_name: flink-sql-client
depends_on:
- flink-jobmanager
environment:
FLINK_JOBMANAGER_HOST: flink-jobmanager
volumes:
- ./settings/:/settings
flink-jobmanager:
image: cnfldemos/flink-kafka:1.19.0-scala_2.12-java17
hostname: flink-jobmanager
container_name: flink-jobmanager
ports:
- 9081:9081
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
rest.bind-port: 9081
flink-taskmanager:
image: cnfldemos/flink-kafka:1.19.0-scala_2.12-java17
hostname: flink-taskmanager
container_name: flink-taskmanager
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 10
volumes:
workarea:
driver: local
driver_opts:
type: none
o: bind
device: /home/swillians/workarea #Inclua aqui o seu diretório
Gerando arquivos para o conector
Antes de começarmos a configurar o conector, precisamos gerar uma massa de dados contínua para ser consumida por ele. Para isso, crie o seguinte diretório:
/home/swillians/workarea/confluent-spooldir-test/files
Dentro do diretório files, crie mais duas pastas. A error e a finished.
/home/swillians/workarea/confluent-spooldir-test/files/error
/home/swillians/workarea/confluent-spooldir-test/files/finished
A idéia é ter um script Python que faça a geração dos arquivos de tempos em tempos. Para saber se você possui o python instalado, digite o comando abaixo:
python3
Python 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
Se o resultado acima aparecer, o Python já está instalado. Caso não esteja, faça a instalação utilizando os comandos abaixo:
sudo apt update
sudo apt install python3
Com o Python instalado, poderemos executar o nosso script.
import random
import csv
import time
# Definição de dados fictícios para geração aleatória
first_names = ["John", "Jane", "Alice", "Bob", "Eve", "Mike", "Anna", "Tom", "Lucy", "Jack"]
last_names = ["Smith", "Doe", "Johnson", "Brown", "Williams", "Jones", "Garcia", "Miller", "Davis", "Rodriguez"]
emails = ["example.com", "test.com", "sample.com", "demo.com"]
genders = ["Male", "Female"]
countries = ["USA", "Canada", "UK", "Australia", "Germany", "France", "India", "Brazil", "China", "Japan"]
colors = ["Red", "Blue", "Green", "Yellow", "Black", "White", "Purple", "Orange", "Pink", "Brown"]
# Current ID, starting from 1
current_id = 1
# Função para gerar um IP aleatório
def generate_ip():
return ".".join(str(random.randint(0, 255)) for _ in range(4))
# Função para gerar uma data de último login aleatória
def generate_last_login():
return f"{random.randint(2020, 2023)}-{random.randint(1, 12)}-{random.randint(1, 28)}"
def create_tsv_file():
global current_id
end_id = current_id + 999 # As we are generating 1000 records
# Getting the current timestamp for the filename
timestamp = time.strftime("%Y%m%d%H%M%S")
filename = f"tsv-spooldir-source_{timestamp}.tsv"
with open(filename, "w", newline='') as tsvfile:
writer = csv.writer(tsvfile, delimiter='\t')
# Escrevendo o cabeçalho
writer.writerow(["id", "first_name", "last_name", "email", "gender", "ip_address", "last_login", "account_balance", "country", "favorite_color"])
# Gerando e escrevendo 1000 registros aleatórios
for i in range(current_id, end_id + 1):
writer.writerow([
i,
random.choice(first_names),
random.choice(last_names),
f"{random.choice(first_names).lower()}.{random.choice(last_names).lower()}@{random.choice(emails)}",
random.choice(genders),
generate_ip(),
generate_last_login(),
"{:.2f}".format(random.uniform(0, 10000)),
random.choice(countries),
random.choice(colors)
])
# Updating the current_id for the next iteration
current_id = end_id + 1
print(f"Arquivo {filename} criado com sucesso!")
while True:
create_tsv_file()
time.sleep(60) # Espera 1 minuto(s)
Chame o arquivo de tsv-spooldir-source.py e salve-o no diretório files, que criamos acima.
Para executar o script, utilize o seguinte comando:
cd /workarea/confluent-spooldir-test/files
python3 tsv-spooldir-source.py
Arquivo tsv-spooldir-source_20240512175643.tsv criado com sucesso!
Esse script gerará a cada minuto um arquivo .TSV (Tab Separated Value). Esses arquivos terão o seguinte formato:
id first_name last_name email gender ip_address last_login account_balance country favorite_color
1 Jennine Burkett jburkett0@hp.com Agender 203.96.208.246 2018-03-19T22:09:31Z 20795.26 AL #1852e1
2 Christiano Le Grys clegrys1@boston.com Male 140.21.114.158 2018-09-30T23:13:42Z 23052.0 CN #6e5afe
3 Geneva Livezley glivezley2@squarespace.com Non-binary 26.103.199.8 2018-09-21T19:10:51Z 24749.65 ML #38ce82
4 Gottfried Kiera gkiera3@washingtonpost.com Male 3.40.201.160 2017-11-13T02:18:12Z 12641.46 BR #052a99
5 Mariska Nimmo mnimmo4@accuweather.com Female 187.198.175.205 2017-02-06T17:20:56Z 7806.15 RU #a1c21d
6 Aubine Anthes aanthes5@google.com.au Female 13.41.43.20 2014-05-07T05:18:57Z 3539.99 CN #c2df7e
Pronto ! Agora já temos os arquivos que irão ser consumidos pelo nosso conector ! 🙂
Vamos por o nosso script para funcionar e começar a gerar os arquivos que precisamos, para isso, utilize o comando abaixo:
python3 tsv-spooldir-source.py
Arquivo tsv-spooldir-source_20240514133725.tsv criado com sucesso!
Configurando o conector SpoolDir
Agora chegou o tão esperado momento ! A configuração do conector SpoolDir !
Os conectores aceitam dois tipos de arquivos de configuração:
- .properties
- .json
Como nos meus artigos anteriores eu utilizei arquivos .json, dessa vez utilizarei o arquivo .properties.
Abaixo o conteúdo do nosso arquivo:
name=tsv.spooldirfile.clients.source
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector
input.path=/app/workarea/confluent-spooldir-test/files
error.path=/app/workarea/confluent-spooldir-test/files/error
finished.path=/app/workarea/confluent-spooldir-test/files/finished
input.file.pattern=^tsv-spooldir.*\.tsv$
halt.on.error=false
topic=tsv.spooldirfile.clients
schema.generation.enabled=true
csv.first.row.as.header=true
csv.separator.char=9
offset.flush.interval.ms=10000
Salve o conteúdo acima como spooldirtsv.properties.
Gosto muito de detalhar as coisas nos meus artigos, mas creio que a maior parte das propriedades sejam auto-explicativas. Caso deseje obter mais detalhes sobre o conector, CLIQUE AQUI e leia a documentação oficial.
Vamos agora criar o tópico que receberá os dados de nosso conector, para isso, acesse a Confluent Platform e clique em Topics.
Clique em Add topic.
Em topic name, coloque tsv.spooldirfile.clients e depois clique em Create with Defaults.
Nosso tópico foi criado com sucesso !
Vamos configurar agora o conector, para isso, clique em Connect>>connect-default
Clique agora no botão Add conector.
Clique em Upload connector config file e escolha o arquivo spooldirtsv.properties.
Nesta página você verá todas as propriedades descritas em nosso arquivo e muitas outras disponíveis para a configuração do nosso conector.
Vá até o final da página e Clique em Next.
Se nenhum erro for encontrado no conector, você verá a seguinte página:
Agora é só clicar em Launch.
Conector executando com sucesso !
Vamos dar uma olhada no nosso conector funcionando e alimentando o nosso tópico ? Para isso, clique em Topics e depois no tópico que criamos:
Agora clique em Messages.
Parabéns ! O conector spooldir está funcionando e produzindo dados ! 🙂
No próximo artigo, começaremos finalmente a utilizar o Apache Flink ! Vejo vocês lá !
Referências