Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte IV
Olá pessoal !
Aqui estou de volta para continuarmos a nossa saga ! Depois do último artigo sobre KsqlDB, hoje iremos abordar um assunto interessante, que é o REST Proxy !
O REST Proxy na Confluent Platform para Kafka é um componente que fornece uma interface HTTP RESTful para interagir com o Apache Kafka. Isso permite que aplicações possam produzir e consumir mensagens no Kafka sem necessidade de utilizar diretamente os clientes nativos do Kafka (que são baseados em Java), facilitando a integração com uma variedade maior de linguagens e frameworks.
Funcionalidades do REST Proxy
- Produção de Mensagens:
- Permite que clientes enviem mensagens para tópicos do Kafka utilizando requisições HTTP POST.
- Suporta envio de mensagens individuais ou em lote.
- Consumo de Mensagens:
- Clientes podem consumir mensagens de tópicos do Kafka através de requisições HTTP GET.
- Suporta a criação de consumidores com commits automáticos ou manuais de offsets.
- Administração:
- Permite operações administrativas básicas, como listar tópicos e verificar metadados dos tópicos.
Vantagens do REST Proxy
- Facilidade de Integração: Qualquer linguagem de programação que suporte HTTP pode interagir com o Kafka através do REST Proxy, eliminando a necessidade de bibliotecas específicas do Kafka.
- Simples de Usar: Utilizando requisições HTTP, as operações com Kafka tornam-se mais simples e intuitivas.
- Escalabilidade: Pode ser escalado independentemente do cluster Kafka, atendendo diferentes necessidades de carga e performance.
Antes de começarmos a colocar a mão na massa e testar o REST Proxy na prática, vamos elevar o nível do estudo e instalar o Confluent CLI. Um dos testes que faremos, é justamente consumir os dados através dele.
Mas o que seria o Confluent CLI ?
O Confluent CLI (Command Line Interface) é uma ferramenta de linha de comando fornecida pela Confluent para gerenciar e interagir com a plataforma Confluent, que inclui o Apache Kafka e outros componentes associados. Ele oferece uma maneira fácil e eficiente de executar várias operações administrativas, de gerenciamento e de desenvolvimento relacionadas ao Kafka e aos serviços da Confluent.
Agora que você já sabe o que é, vamos começar a instalação.
Adicione o repositório da Confluent e importe a chave GPG. Para isso, execute os comandos abaixo:
sudo apt-get update
sudo apt-get install -y wget gnupg software-properties-common
wget -qO - https://packages.confluent.io/deb/7.4/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/7.4 stable main"
Instale o Confluent CLI.
sudo apt-get update
sudo apt-get install -y confluent-cli
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following packages were automatically installed and are no longer required:
chromium-codecs-ffmpeg-extra docker-scan-plugin gstreamer1.0-vaapi i965-va-driver intel-media-va-driver libaacs0 libaom3 libass9 libavcodec58 libavformat58 libavutil56 libbdplus0 libblas3 libbluray2
libbs2b0 libchromaprint1 libcodec2-1.0 libdav1d5 libflashrom1 libflite1 libftdi1-2 libgme0 libgsm1 libgstreamer-plugins-bad1.0-0 libigdgmm12 liblilv-0-0 libllvm13 libmfx1 libmysofa1 libnorm1 libopenmpt0
libpgm-5.3-0 libpostproc55 librabbitmq4 librubberband2 libserd-0-0 libshine3 libsnappy1v5 libsord-0-0 libsratom-0-0 libsrt1.4-gnutls libssh-gcrypt-4 libswresample3 libswscale5 libudfread0 libva-drm2
libva-wayland2 libva-x11-2 libva2 libvdpau1 libvidstab1.1 libwpe-1.0-1 libwpebackend-fdo-1.0-1 libx265-199 libxvidcore4 libzimg2 libzmq5 libzvbi-common libzvbi0 mesa-va-drivers mesa-vdpau-drivers
ocl-icd-libopencl1 pocketsphinx-en-us va-driver-all vdpau-driver-all
Use 'sudo apt autoremove' to remove them.
The following NEW packages will be installed:
confluent-cli
0 upgraded, 1 newly installed, 0 to remove and 6 not upgraded.
Need to get 64,1 MB of archives.
After this operation, 289 MB of additional disk space will be used.
Get:1 https://packages.confluent.io/deb/7.4 stable/main amd64 confluent-cli all 7.4.5-1 [64,1 MB]
Fetched 64,1 MB in 6s (11,3 MB/s)
Selecting previously unselected package confluent-cli.
(Reading database ... 205665 files and directories currently installed.)
Preparing to unpack .../confluent-cli_7.4.5-1_all.deb ...
Unpacking confluent-cli (7.4.5-1) ...
Setting up confluent-cli (7.4.5-1) ...
Vamos verificar se a instalação está ok:
confluent --version
confluent version v2.32.1
Instalação efetuada com sucesso !
Vamos agora “conectar’ o Confluent CLI com a nossa Confluent Platform. Para isso, precisamos setar algumas variáveis de ambiente:
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export SCHEMA_REGISTRY_URL=http://localhost:8081
export CONFLUENT_REST_URL=http://localhost:8082
Vamos utilizar o Confluent CLI agora para listar os tópicos:
confluent kafka topic list
No session token found, please enter user credentials. To avoid being prompted, run "confluent login".
Username:
Password:
Name
-------------------------------------------------------------------------------------------------
CLIENTS_TABLE
CLIENT_STREAM
_confluent-command
_confluent-controlcenter-7-5-1-1-AlertHistoryStore-changelog
_confluent-controlcenter-7-5-1-1-AlertHistoryStore-repartition
_confluent-controlcenter-7-5-1-1-Group-ONE_MINUTE-changelog
_confluent-controlcenter-7-5-1-1-Group-ONE_MINUTE-repartition
_confluent-controlcenter-7-5-1-1-Group-THREE_HOURS-changelog
_confluent-controlcenter-7-5-1-1-Group-THREE_HOURS-repartition
_confluent-controlcenter-7-5-1-1-KSTREAM-OUTEROTHER-0000000106-store-changelog
_confluent-controlcenter-7-5-1-1-KSTREAM-OUTEROTHER-0000000106-store-repartition
_confluent-controlcenter-7-5-1-1-KSTREAM-OUTERTHIS-0000000105-store-changelog
_confluent-controlcenter-7-5-1-1-KSTREAM-OUTERTHIS-0000000105-store-repartition
_confluent-controlcenter-7-5-1-1-MetricsAggregateStore-changelog
_confluent-controlcenter-7-5-1-1-MetricsAggregateStore-repartition
_confluent-controlcenter-7-5-1-1-MonitoringMessageAggregatorWindows-ONE_MINUTE-changelog
_confluent-controlcenter-7-5-1-1-MonitoringMessageAggregatorWindows-ONE_MINUTE-repartition
_confluent-controlcenter-7-5-1-1-MonitoringMessageAggregatorWindows-THREE_HOURS-changelog
_confluent-controlcenter-7-5-1-1-MonitoringMessageAggregatorWindows-THREE_HOURS-repartition
_confluent-controlcenter-7-5-1-1-MonitoringStream-ONE_MINUTE-changelog
_confluent-controlcenter-7-5-1-1-MonitoringStream-ONE_MINUTE-repartition
_confluent-controlcenter-7-5-1-1-MonitoringStream-THREE_HOURS-changelog
_confluent-controlcenter-7-5-1-1-MonitoringStream-THREE_HOURS-repartition
_confluent-controlcenter-7-5-1-1-MonitoringTriggerStore-changelog
_confluent-controlcenter-7-5-1-1-MonitoringTriggerStore-repartition
_confluent-controlcenter-7-5-1-1-MonitoringVerifierStore-changelog
_confluent-controlcenter-7-5-1-1-MonitoringVerifierStore-repartition
_confluent-controlcenter-7-5-1-1-TriggerActionsStore-changelog
_confluent-controlcenter-7-5-1-1-TriggerActionsStore-repartition
_confluent-controlcenter-7-5-1-1-TriggerEventsStore-changelog
_confluent-controlcenter-7-5-1-1-TriggerEventsStore-repartition
_confluent-controlcenter-7-5-1-1-actual-group-consumption-rekey
_confluent-controlcenter-7-5-1-1-aggregate-topic-partition-store-changelog
_confluent-controlcenter-7-5-1-1-aggregate-topic-partition-store-repartition
_confluent-controlcenter-7-5-1-1-aggregatedTopicPartitionTableWindows-ONE_MINUTE-changelog
_confluent-controlcenter-7-5-1-1-aggregatedTopicPartitionTableWindows-ONE_MINUTE-repartition
_confluent-controlcenter-7-5-1-1-aggregatedTopicPartitionTableWindows-THREE_HOURS-changelog
_confluent-controlcenter-7-5-1-1-aggregatedTopicPartitionTableWindows-THREE_HOURS-repartition
_confluent-controlcenter-7-5-1-1-cluster-rekey
_confluent-controlcenter-7-5-1-1-expected-group-consumption-rekey
_confluent-controlcenter-7-5-1-1-group-aggregate-store-ONE_MINUTE-changelog
_confluent-controlcenter-7-5-1-1-group-aggregate-store-ONE_MINUTE-repartition
_confluent-controlcenter-7-5-1-1-group-aggregate-store-THREE_HOURS-changelog
_confluent-controlcenter-7-5-1-1-group-aggregate-store-THREE_HOURS-repartition
_confluent-controlcenter-7-5-1-1-group-stream-extension-rekey
_confluent-controlcenter-7-5-1-1-metrics-trigger-measurement-rekey
_confluent-controlcenter-7-5-1-1-monitoring-aggregate-rekey-store-changelog
_confluent-controlcenter-7-5-1-1-monitoring-aggregate-rekey-store-repartition
_confluent-controlcenter-7-5-1-1-monitoring-message-rekey-store
_confluent-controlcenter-7-5-1-1-monitoring-trigger-event-rekey
_confluent-ksql-default__command_topic
_confluent-ksql-default_query_CTAS_CLIENTS_TABLE_21-Aggregate-Aggregate-Materialize-changelog
_confluent-ksql-default_query_CTAS_CLIENTS_TABLE_21-Aggregate-GroupBy-repartition
_confluent-metrics
_confluent-monitoring
_schemas
default_ksql_processing_log
docker-connect-configs
docker-connect-offsets
docker-connect-status
tsv.spooldirfile.clients
Agora vamos executar o comando para consumir os dados de nosso tópico:
sudo confluent kafka topic consume tsv.spooldirfile.clients --bootstrap localhost:9092 --protocol PLAINTEXT --from-beginning
Starting Kafka Consumer. Use Ctrl-C to exit.
{"id":"3115","first_name":"Jack","last_name":"Johnson","email":"jane.williams@example.com","gender":"Female","ip_address":"49.67.44.7","last_login":"2022-6-11","account_balance":"1779.32","country":"India","favorite_color":"Green"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="115" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3116","first_name":"Alice","last_name":"Johnson","email":"jane.doe@test.com","gender":"Female","ip_address":"237.181.93.167","last_login":"2020-11-14","account_balance":"6077.84","country":"UK","favorite_color":"Purple"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="116" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3117","first_name":"Jack","last_name":"Smith","email":"jack.williams@sample.com","gender":"Female","ip_address":"213.111.238.64","last_login":"2022-3-11","account_balance":"4160.24","country":"Australia","favorite_color":"Black"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="117" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3118","first_name":"Alice","last_name":"Smith","email":"jack.davis@demo.com","gender":"Female","ip_address":"156.13.10.92","last_login":"2022-12-17","account_balance":"1399.15","country":"Japan","favorite_color":"Yellow"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="118" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3119","first_name":"Jack","last_name":"Smith","email":"alice.jones@test.com","gender":"Male","ip_address":"89.223.6.115","last_login":"2021-11-10","account_balance":"3660.54","country":"France","favorite_color":"Orange"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="119" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3120","first_name":"Mike","last_name":"Johnson","email":"mike.miller@test.com","gender":"Female","ip_address":"18.40.23.49","last_login":"2020-11-17","account_balance":"2176.57","country":"Japan","favorite_color":"Black"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="120" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3121","first_name":"Jack","last_name":"Doe","email":"john.rodriguez@example.com","gender":"Female","ip_address":"230.246.109.60","last_login":"2023-12-7","account_balance":"5600.74","country":"Australia","favorite_color":"Brown"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="121" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3122","first_name":"Anna","last_name":"Johnson","email":"jane.miller@test.com","gender":"Female","ip_address":"10.77.73.63","last_login":"2022-9-24","account_balance":"6388.37","country":"Brazil","favorite_color":"Purple"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="122" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3123","first_name":"John","last_name":"Miller","email":"eve.williams@sample.com","gender":"Male","ip_address":"23.13.192.154","last_login":"2023-10-28","account_balance":"9808.56","country":"China","favorite_color":"Purple"}
Stopping Consumer.
Olha os dados aí ! Teste efetuado com sucesso !
Explicando o comando
confluent kafka topic consume
: Este é o comando principal. Ele diz ao Confluent Kafka que você deseja consumir mensagens de um tópico específico.tsv.spooldirfile.clients
: Este é o nome do tópico Kafka do qual você quer consumir mensagens.--bootstrap localhost:9092
: Este parâmetro especifica o endereço e a porta do servidor Kafka que você está usando. Neste caso, o endereço é “localhost” e a porta é “9092”. Isso indica onde o consumidor deve se conectar para consumir as mensagens do tópico especificado.--protocol PLAINTEXT
: Este parâmetro especifica o protocolo de comunicação com o servidor Kafka. “PLAINTEXT” significa que a comunicação não é criptografada. Existem outras opções de protocolo, como SSL, que fornecem comunicação segura.--from-beginning
: Este parâmetro indica que você quer começar a consumir mensagens desde o início do tópico. Ou seja, você deseja ler todas as mensagens disponíveis no tópico, incluindo aquelas que foram publicadas antes do momento em que você iniciou o consumo. Se você não incluir esse parâmetro, o consumo começará a partir do ponto atual no tópico.
Acesse o site oficial do Confluent CLI na Confluent e explore todas as funcionalidades dessa ferramenta ! Garanto que ficará impressionado com o que é possível fazer !
Consumindo um tópico via Python
Para consumir mensagens de um tópico Kafka via REST Proxy, você pode usar a biblioteca confluent_kafka
do Python para fazer solicitações HTTP ao endpoint do REST Proxy.
Passo 1: Instalar a biblioteca confluent_kafka
confluent_kafka
Se você ainda não tem a biblioteca
instalada, você pode instalá-la usando confluent_kafka
pip
:
pip install confluent_kafka
Defaulting to user installation because normal site-packages is not writeable
Collecting confluent_kafka
Downloading confluent_kafka-2.4.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.0 MB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 4.0/4.0 MB 16.0 MB/s eta 0:00:00
Installing collected packages: confluent_kafka
Successfully installed confluent_kafka-2.4.0
Passo 2: Exemplo de Código em Python
Salve o arquivo abaixo como rest_proxy_python_test.py
from confluent_kafka import Consumer, KafkaException, KafkaError
# Configurações do consumidor
conf = {
'bootstrap.servers': 'localhost:9092', # Endereço do Kafka
'group.id': 'my_consumer_group', # ID do grupo de consumidores
'auto.offset.reset': 'earliest', # Posição inicial para leitura de mensagens se não houver offset inicial
}
# Cria uma instância do consumidor
consumer = Consumer(conf)
# Nome do tópico a ser consumido
topic_name = 'tsv.spooldirfile.clients'
# Callback para atribuição de partições
def on_assign(consumer, partitions):
print('Assigned partitions:', partitions)
consumer.assign(partitions)
# Assina o tópico
consumer.subscribe([topic_name], on_assign=on_assign)
try:
while True:
# Lê a próxima mensagem do tópico
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# Fim da partição
print(f"End of partition reached {msg.partition()} [{msg.offset()}]")
elif msg.error():
raise KafkaException(msg.error())
else:
# Mensagem válida
print(f"Consumed message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
print("Consumo interrompido pelo usuário")
finally:
# Fecha o consumidor
consumer.close()
Explicação do Código
from confluent_kafka import Consumer, KafkaException, KafkaError, TopicPartition
# Configurações do consumidor
conf = {
'bootstrap.servers': 'localhost:9092', # Endereço do Kafka
'group.id': 'my_consumer_group', # ID do grupo de consumidores
'auto.offset.reset': 'earliest', # Posição inicial para leitura de mensagens se não houver offset inicial
}
bootstrap.servers
: Define o endereço dos brokers Kafka que o consumidor vai usar para se conectar ao cluster. No caso, ele está se conectando ao Kafka rodando localmente na porta 9092.group.id
: Define o ID do grupo de consumidores. Consumidores com o mesmogroup.id
colaboram para consumir tópicos de forma balanceada.auto.offset.reset
: Configura o comportamento do consumidor quando ele não encontrar um offset inicial (ou seja, quando o grupo de consumidores não tiver offsets salvos). O valor'earliest'
significa que ele começará a ler desde o início do log.
Criação do Consumidor e Nome do Tópico
# Cria uma instância do consumidor
consumer = Consumer(conf)
# Nome do tópico a ser consumido
topic_name = 'tsv.spooldirfile.clients'
Consumer(conf)
: Cria uma instância do consumidor com as configurações fornecidas.topic_name
: Define o nome do tópico que será consumido.
Callback para Atribuição de Partições
# Callback para atribuição de partições
def on_assign(consumer, partitions):
print('Assigned partitions:', partitions)
consumer.assign(partitions)
for partition in partitions:
partition.offset = KafkaError._PARTITION_EOF
consumer.assign(partitions)
on_assign
: Função callback que é chamada sempre que o consumidor recebe a atribuição de partições.consumer.assign(partitions)
: Atribui explicitamente as partições ao consumidor.partition.offset = KafkaError._PARTITION_EOF
: Define o offset para o final do log (usado aqui incorretamente, geralmente é para indicar que a leitura deve continuar do final).
Inscrição no Tópico
# Assina o tópico
consumer.subscribe([topic_name], on_assign=on_assign)
consumer.subscribe([topic_name], on_assign=on_assign)
: Inscreve o consumidor no tópico especificado e defineon_assign
como a função callback para a atribuição de partições.
Loop de Consumo de Mensagens
try:
while True:
# Lê a próxima mensagem do tópico
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# Fim da partição
print(f"End of partition reached {msg.partition()} [{msg.offset()}]")
elif msg.error():
raise KafkaException(msg.error())
else:
# Mensagem válida
print(f"Consumed message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
print("Consumo interrompido pelo usuário")
finally:
# Fecha o consumidor
consumer.close()
while True
: Cria um loop infinito para continuar consumindo mensagens.msg = consumer.poll(timeout=1.0)
: Consulta o Kafka para buscar uma mensagem. Otimeout
define o tempo máximo de espera por uma mensagem.if msg is None
: Se não houver mensagem, continua o loop.if msg.error()
: Verifica se a mensagem tem um erro associado.if msg.error().code() == KafkaError._PARTITION_EOF
: Verifica se o erro indica o fim da partição.print(f"End of partition reached {msg.partition()} [{msg.offset()}]")
: Imprime uma mensagem indicando que o fim da partição foi alcançado.
elif msg.error()
: Para outros erros, lança uma exceçãoKafkaException
.
else
: Se a mensagem é válida:print(f"Consumed message: {msg.value().decode('utf-8')}")
: Imprime o conteúdo da mensagem consumida.
Tratamento de Interrupção e Fechamento do Consumidor
except KeyboardInterrupt
: Captura a interrupção do teclado (Ctrl+C) para encerrar o loop de forma graciosa.print("Consumo interrompido pelo usuário")
: Imprime uma mensagem indicando que o consumo foi interrompido pelo usuário.
finally
: Garante que o consumidor seja fechado corretamente quando o loop é interrompido.consumer.close()
: Fecha o consumidor, liberando recursos e garantindo que o estado do consumidor seja salvo.
Resumo
- Configurações de consumidor são feitas corretamente para garantir que o consumidor comece do início (
earliest
). - Atribuição de partições é feita usando o callback
on_assign
. - Loop principal consome mensagens continuamente, lidando com erros e imprimindo mensagens consumidas.
- Tratamento de interrupção e fechamento adequado do consumidor.
Agora que o código já está mais do que explicado, vamos executá-lo:
python3 rest_proxy_python_test.py
Consumed message: {"id":"14042","first_name":"Lucy","last_name":"Brown","email":"john.smith@sample.com","gender":"Female","ip_address":"148.78.124.124","last_login":"2020-4-14","account_balance":"849.62","country":"Germany","favorite_color":"White"}
Consumed message: {"id":"14043","first_name":"Tom","last_name":"Smith","email":"jack.doe@sample.com","gender":"Male","ip_address":"234.235.204.26","last_login":"2022-12-15","account_balance":"6128.25","country":"UK","favorite_color":"Orange"}
Consumed message: {"id":"14044","first_name":"John","last_name":"Jones","email":"anna.doe@test.com","gender":"Female","ip_address":"100.118.162.34","last_login":"2020-1-17","account_balance":"3300.60","country":"China","favorite_color":"Yellow"}
Consumed message: {"id":"14045","first_name":"Lucy","last_name":"Miller","email":"lucy.rodriguez@demo.com","gender"^C
Consumo interrompido pelo usuário
Está aí o nosso código funcionando e consumindo o tópico ! 🙂
E assim terminamos mais um artigo. No próximo começaremos a nos divertir com o Apache Flink ! Nos vemos lá !
Referências