Pular para o conteúdo

Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte IV – REST Proxy

Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte IV

Olá pessoal !

Aqui estou de volta para continuarmos a nossa saga ! Depois do último artigo sobre KsqlDB, hoje iremos abordar um assunto interessante, que é o REST Proxy !

O REST Proxy na Confluent Platform para Kafka é um componente que fornece uma interface HTTP RESTful para interagir com o Apache Kafka. Isso permite que aplicações possam produzir e consumir mensagens no Kafka sem necessidade de utilizar diretamente os clientes nativos do Kafka (que são baseados em Java), facilitando a integração com uma variedade maior de linguagens e frameworks.

Funcionalidades do REST Proxy

  1. Produção de Mensagens:
    • Permite que clientes enviem mensagens para tópicos do Kafka utilizando requisições HTTP POST.
    • Suporta envio de mensagens individuais ou em lote.
  2. Consumo de Mensagens:
    • Clientes podem consumir mensagens de tópicos do Kafka através de requisições HTTP GET.
    • Suporta a criação de consumidores com commits automáticos ou manuais de offsets.
  3. Administração:
    • Permite operações administrativas básicas, como listar tópicos e verificar metadados dos tópicos.

Vantagens do REST Proxy

  • Facilidade de Integração: Qualquer linguagem de programação que suporte HTTP pode interagir com o Kafka através do REST Proxy, eliminando a necessidade de bibliotecas específicas do Kafka.
  • Simples de Usar: Utilizando requisições HTTP, as operações com Kafka tornam-se mais simples e intuitivas.
  • Escalabilidade: Pode ser escalado independentemente do cluster Kafka, atendendo diferentes necessidades de carga e performance.

Antes de começarmos a colocar a mão na massa e testar o REST Proxy na prática, vamos elevar o nível do estudo e instalar o Confluent CLI. Um dos testes que faremos, é justamente consumir os dados através dele.

Mas o que seria o Confluent CLI ?

O Confluent CLI (Command Line Interface) é uma ferramenta de linha de comando fornecida pela Confluent para gerenciar e interagir com a plataforma Confluent, que inclui o Apache Kafka e outros componentes associados. Ele oferece uma maneira fácil e eficiente de executar várias operações administrativas, de gerenciamento e de desenvolvimento relacionadas ao Kafka e aos serviços da Confluent.

Agora que você já sabe o que é, vamos começar a instalação.

Adicione o repositório da Confluent e importe a chave GPG. Para isso, execute os comandos abaixo:

Bash
sudo apt-get update
sudo apt-get install -y wget gnupg software-properties-common
wget -qO - https://packages.confluent.io/deb/7.4/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/7.4 stable main"

Instale o Confluent CLI.

Bash
sudo apt-get update
sudo apt-get install -y confluent-cli

Bash
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following packages were automatically installed and are no longer required:
  chromium-codecs-ffmpeg-extra docker-scan-plugin gstreamer1.0-vaapi i965-va-driver intel-media-va-driver libaacs0 libaom3 libass9 libavcodec58 libavformat58 libavutil56 libbdplus0 libblas3 libbluray2
  libbs2b0 libchromaprint1 libcodec2-1.0 libdav1d5 libflashrom1 libflite1 libftdi1-2 libgme0 libgsm1 libgstreamer-plugins-bad1.0-0 libigdgmm12 liblilv-0-0 libllvm13 libmfx1 libmysofa1 libnorm1 libopenmpt0
  libpgm-5.3-0 libpostproc55 librabbitmq4 librubberband2 libserd-0-0 libshine3 libsnappy1v5 libsord-0-0 libsratom-0-0 libsrt1.4-gnutls libssh-gcrypt-4 libswresample3 libswscale5 libudfread0 libva-drm2
  libva-wayland2 libva-x11-2 libva2 libvdpau1 libvidstab1.1 libwpe-1.0-1 libwpebackend-fdo-1.0-1 libx265-199 libxvidcore4 libzimg2 libzmq5 libzvbi-common libzvbi0 mesa-va-drivers mesa-vdpau-drivers
  ocl-icd-libopencl1 pocketsphinx-en-us va-driver-all vdpau-driver-all
Use 'sudo apt autoremove' to remove them.
The following NEW packages will be installed:
  confluent-cli
0 upgraded, 1 newly installed, 0 to remove and 6 not upgraded.
Need to get 64,1 MB of archives.
After this operation, 289 MB of additional disk space will be used.
Get:1 https://packages.confluent.io/deb/7.4 stable/main amd64 confluent-cli all 7.4.5-1 [64,1 MB]
Fetched 64,1 MB in 6s (11,3 MB/s)         
Selecting previously unselected package confluent-cli.
(Reading database ... 205665 files and directories currently installed.)
Preparing to unpack .../confluent-cli_7.4.5-1_all.deb ...
Unpacking confluent-cli (7.4.5-1) ...
Setting up confluent-cli (7.4.5-1) ...

Vamos verificar se a instalação está ok:

Bash
confluent --version

confluent version v2.32.1

Instalação efetuada com sucesso !

Vamos agora “conectar’ o Confluent CLI com a nossa Confluent Platform. Para isso, precisamos setar algumas variáveis de ambiente:

Bash
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export SCHEMA_REGISTRY_URL=http://localhost:8081
export CONFLUENT_REST_URL=http://localhost:8082

Vamos utilizar o Confluent CLI agora para listar os tópicos:

Bash
confluent kafka topic list

No session token found, please enter user credentials. To avoid being prompted, run "confluent login".
Username: 
Password: 
                                              Name                                               
-------------------------------------------------------------------------------------------------
  CLIENTS_TABLE                                                                                  
  CLIENT_STREAM                                                                                  
  _confluent-command                                                                             
  _confluent-controlcenter-7-5-1-1-AlertHistoryStore-changelog                                   
  _confluent-controlcenter-7-5-1-1-AlertHistoryStore-repartition                                 
  _confluent-controlcenter-7-5-1-1-Group-ONE_MINUTE-changelog                                    
  _confluent-controlcenter-7-5-1-1-Group-ONE_MINUTE-repartition                                  
  _confluent-controlcenter-7-5-1-1-Group-THREE_HOURS-changelog                                   
  _confluent-controlcenter-7-5-1-1-Group-THREE_HOURS-repartition                                 
  _confluent-controlcenter-7-5-1-1-KSTREAM-OUTEROTHER-0000000106-store-changelog                 
  _confluent-controlcenter-7-5-1-1-KSTREAM-OUTEROTHER-0000000106-store-repartition               
  _confluent-controlcenter-7-5-1-1-KSTREAM-OUTERTHIS-0000000105-store-changelog                  
  _confluent-controlcenter-7-5-1-1-KSTREAM-OUTERTHIS-0000000105-store-repartition                
  _confluent-controlcenter-7-5-1-1-MetricsAggregateStore-changelog                               
  _confluent-controlcenter-7-5-1-1-MetricsAggregateStore-repartition                             
  _confluent-controlcenter-7-5-1-1-MonitoringMessageAggregatorWindows-ONE_MINUTE-changelog       
  _confluent-controlcenter-7-5-1-1-MonitoringMessageAggregatorWindows-ONE_MINUTE-repartition     
  _confluent-controlcenter-7-5-1-1-MonitoringMessageAggregatorWindows-THREE_HOURS-changelog      
  _confluent-controlcenter-7-5-1-1-MonitoringMessageAggregatorWindows-THREE_HOURS-repartition    
  _confluent-controlcenter-7-5-1-1-MonitoringStream-ONE_MINUTE-changelog                         
  _confluent-controlcenter-7-5-1-1-MonitoringStream-ONE_MINUTE-repartition                       
  _confluent-controlcenter-7-5-1-1-MonitoringStream-THREE_HOURS-changelog                        
  _confluent-controlcenter-7-5-1-1-MonitoringStream-THREE_HOURS-repartition                      
  _confluent-controlcenter-7-5-1-1-MonitoringTriggerStore-changelog                              
  _confluent-controlcenter-7-5-1-1-MonitoringTriggerStore-repartition                            
  _confluent-controlcenter-7-5-1-1-MonitoringVerifierStore-changelog                             
  _confluent-controlcenter-7-5-1-1-MonitoringVerifierStore-repartition                           
  _confluent-controlcenter-7-5-1-1-TriggerActionsStore-changelog                                 
  _confluent-controlcenter-7-5-1-1-TriggerActionsStore-repartition                               
  _confluent-controlcenter-7-5-1-1-TriggerEventsStore-changelog                                  
  _confluent-controlcenter-7-5-1-1-TriggerEventsStore-repartition                                
  _confluent-controlcenter-7-5-1-1-actual-group-consumption-rekey                                
  _confluent-controlcenter-7-5-1-1-aggregate-topic-partition-store-changelog                     
  _confluent-controlcenter-7-5-1-1-aggregate-topic-partition-store-repartition                   
  _confluent-controlcenter-7-5-1-1-aggregatedTopicPartitionTableWindows-ONE_MINUTE-changelog     
  _confluent-controlcenter-7-5-1-1-aggregatedTopicPartitionTableWindows-ONE_MINUTE-repartition   
  _confluent-controlcenter-7-5-1-1-aggregatedTopicPartitionTableWindows-THREE_HOURS-changelog    
  _confluent-controlcenter-7-5-1-1-aggregatedTopicPartitionTableWindows-THREE_HOURS-repartition  
  _confluent-controlcenter-7-5-1-1-cluster-rekey                                                 
  _confluent-controlcenter-7-5-1-1-expected-group-consumption-rekey                              
  _confluent-controlcenter-7-5-1-1-group-aggregate-store-ONE_MINUTE-changelog                    
  _confluent-controlcenter-7-5-1-1-group-aggregate-store-ONE_MINUTE-repartition                  
  _confluent-controlcenter-7-5-1-1-group-aggregate-store-THREE_HOURS-changelog                   
  _confluent-controlcenter-7-5-1-1-group-aggregate-store-THREE_HOURS-repartition                 
  _confluent-controlcenter-7-5-1-1-group-stream-extension-rekey                                  
  _confluent-controlcenter-7-5-1-1-metrics-trigger-measurement-rekey                             
  _confluent-controlcenter-7-5-1-1-monitoring-aggregate-rekey-store-changelog                    
  _confluent-controlcenter-7-5-1-1-monitoring-aggregate-rekey-store-repartition                  
  _confluent-controlcenter-7-5-1-1-monitoring-message-rekey-store                                
  _confluent-controlcenter-7-5-1-1-monitoring-trigger-event-rekey                                
  _confluent-ksql-default__command_topic                                                         
  _confluent-ksql-default_query_CTAS_CLIENTS_TABLE_21-Aggregate-Aggregate-Materialize-changelog  
  _confluent-ksql-default_query_CTAS_CLIENTS_TABLE_21-Aggregate-GroupBy-repartition              
  _confluent-metrics                                                                             
  _confluent-monitoring                                                                          
  _schemas                                                                                       
  default_ksql_processing_log                                                                    
  docker-connect-configs                                                                         
  docker-connect-offsets                                                                         
  docker-connect-status                                                                          
  tsv.spooldirfile.clients

Agora vamos executar o comando para consumir os dados de nosso tópico:

Bash
sudo confluent kafka topic consume tsv.spooldirfile.clients --bootstrap localhost:9092 --protocol PLAINTEXT --from-beginning

Starting Kafka Consumer. Use Ctrl-C to exit.
{"id":"3115","first_name":"Jack","last_name":"Johnson","email":"jane.williams@example.com","gender":"Female","ip_address":"49.67.44.7","last_login":"2022-6-11","account_balance":"1779.32","country":"India","favorite_color":"Green"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="115" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3116","first_name":"Alice","last_name":"Johnson","email":"jane.doe@test.com","gender":"Female","ip_address":"237.181.93.167","last_login":"2020-11-14","account_balance":"6077.84","country":"UK","favorite_color":"Purple"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="116" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3117","first_name":"Jack","last_name":"Smith","email":"jack.williams@sample.com","gender":"Female","ip_address":"213.111.238.64","last_login":"2022-3-11","account_balance":"4160.24","country":"Australia","favorite_color":"Black"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="117" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3118","first_name":"Alice","last_name":"Smith","email":"jack.davis@demo.com","gender":"Female","ip_address":"156.13.10.92","last_login":"2022-12-17","account_balance":"1399.15","country":"Japan","favorite_color":"Yellow"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="118" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3119","first_name":"Jack","last_name":"Smith","email":"alice.jones@test.com","gender":"Male","ip_address":"89.223.6.115","last_login":"2021-11-10","account_balance":"3660.54","country":"France","favorite_color":"Orange"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="119" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3120","first_name":"Mike","last_name":"Johnson","email":"mike.miller@test.com","gender":"Female","ip_address":"18.40.23.49","last_login":"2020-11-17","account_balance":"2176.57","country":"Japan","favorite_color":"Black"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="120" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3121","first_name":"Jack","last_name":"Doe","email":"john.rodriguez@example.com","gender":"Female","ip_address":"230.246.109.60","last_login":"2023-12-7","account_balance":"5600.74","country":"Australia","favorite_color":"Brown"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="121" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3122","first_name":"Anna","last_name":"Johnson","email":"jane.miller@test.com","gender":"Female","ip_address":"10.77.73.63","last_login":"2022-9-24","account_balance":"6388.37","country":"Brazil","favorite_color":"Purple"}
% Headers: [file.name="tsv-spooldir-source_20240524143524.tsv" file.name.without.extension="tsv-spooldir-source_20240524143524" file.path="/app/workarea/confluent-spooldir-test/files/tsv-sp"(32 more bytes) file.parent.dir.name="files" file.length="90131" file.offset="122" file.last.modified="2024-05-24T17:35:24.405Z" file.relative.path="tsv-spooldir-source_20240524143524.tsv"]
{"id":"3123","first_name":"John","last_name":"Miller","email":"eve.williams@sample.com","gender":"Male","ip_address":"23.13.192.154","last_login":"2023-10-28","account_balance":"9808.56","country":"China","favorite_color":"Purple"}
Stopping Consumer.

Olha os dados aí ! Teste efetuado com sucesso !

Explicando o comando
  • confluent kafka topic consume: Este é o comando principal. Ele diz ao Confluent Kafka que você deseja consumir mensagens de um tópico específico.
  • tsv.spooldirfile.clients: Este é o nome do tópico Kafka do qual você quer consumir mensagens.
  • --bootstrap localhost:9092: Este parâmetro especifica o endereço e a porta do servidor Kafka que você está usando. Neste caso, o endereço é “localhost” e a porta é “9092”. Isso indica onde o consumidor deve se conectar para consumir as mensagens do tópico especificado.
  • --protocol PLAINTEXT: Este parâmetro especifica o protocolo de comunicação com o servidor Kafka. “PLAINTEXT” significa que a comunicação não é criptografada. Existem outras opções de protocolo, como SSL, que fornecem comunicação segura.
  • --from-beginning: Este parâmetro indica que você quer começar a consumir mensagens desde o início do tópico. Ou seja, você deseja ler todas as mensagens disponíveis no tópico, incluindo aquelas que foram publicadas antes do momento em que você iniciou o consumo. Se você não incluir esse parâmetro, o consumo começará a partir do ponto atual no tópico.

Acesse o site oficial do Confluent CLI na Confluent e explore todas as funcionalidades dessa ferramenta ! Garanto que ficará impressionado com o que é possível fazer !

Consumindo um tópico via Python

Para consumir mensagens de um tópico Kafka via REST Proxy, você pode usar a biblioteca confluent_kafka do Python para fazer solicitações HTTP ao endpoint do REST Proxy.

Passo 1: Instalar a biblioteca confluent_kafka

Se você ainda não tem a biblioteca confluent_kafka instalada, você pode instalá-la usando pip:

Bash
pip install confluent_kafka

Defaulting to user installation because normal site-packages is not writeable
Collecting confluent_kafka
  Downloading confluent_kafka-2.4.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.0 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 4.0/4.0 MB 16.0 MB/s eta 0:00:00
Installing collected packages: confluent_kafka
Successfully installed confluent_kafka-2.4.0

Passo 2: Exemplo de Código em Python

Salve o arquivo abaixo como rest_proxy_python_test.py

Python
from confluent_kafka import Consumer, KafkaException, KafkaError

# Configurações do consumidor
conf = {
    'bootstrap.servers': 'localhost:9092',  # Endereço do Kafka
    'group.id': 'my_consumer_group',        # ID do grupo de consumidores
    'auto.offset.reset': 'earliest',        # Posição inicial para leitura de mensagens se não houver offset inicial
}

# Cria uma instância do consumidor
consumer = Consumer(conf)

# Nome do tópico a ser consumido
topic_name = 'tsv.spooldirfile.clients'

# Callback para atribuição de partições
def on_assign(consumer, partitions):
    print('Assigned partitions:', partitions)
    consumer.assign(partitions)

# Assina o tópico
consumer.subscribe([topic_name], on_assign=on_assign)

try:
    while True:
        # Lê a próxima mensagem do tópico
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # Fim da partição
                print(f"End of partition reached {msg.partition()} [{msg.offset()}]")
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # Mensagem válida
            print(f"Consumed message: {msg.value().decode('utf-8')}")

except KeyboardInterrupt:
    print("Consumo interrompido pelo usuário")

finally:
    # Fecha o consumidor
    consumer.close()

Explicação do Código

Python
from confluent_kafka import Consumer, KafkaException, KafkaError, TopicPartition

# Configurações do consumidor
conf = {
    'bootstrap.servers': 'localhost:9092',  # Endereço do Kafka
    'group.id': 'my_consumer_group',        # ID do grupo de consumidores
    'auto.offset.reset': 'earliest',        # Posição inicial para leitura de mensagens se não houver offset inicial
}

  • bootstrap.servers: Define o endereço dos brokers Kafka que o consumidor vai usar para se conectar ao cluster. No caso, ele está se conectando ao Kafka rodando localmente na porta 9092.
  • group.id: Define o ID do grupo de consumidores. Consumidores com o mesmo group.id colaboram para consumir tópicos de forma balanceada.
  • auto.offset.reset: Configura o comportamento do consumidor quando ele não encontrar um offset inicial (ou seja, quando o grupo de consumidores não tiver offsets salvos). O valor 'earliest' significa que ele começará a ler desde o início do log.
Criação do Consumidor e Nome do Tópico
Python
# Cria uma instância do consumidor
consumer = Consumer(conf)

# Nome do tópico a ser consumido
topic_name = 'tsv.spooldirfile.clients'

  • Consumer(conf): Cria uma instância do consumidor com as configurações fornecidas.
  • topic_name: Define o nome do tópico que será consumido.
Callback para Atribuição de Partições
Python
# Callback para atribuição de partições
def on_assign(consumer, partitions):
    print('Assigned partitions:', partitions)
    consumer.assign(partitions)
    for partition in partitions:
        partition.offset = KafkaError._PARTITION_EOF
    consumer.assign(partitions)

  • on_assign: Função callback que é chamada sempre que o consumidor recebe a atribuição de partições.
    • consumer.assign(partitions): Atribui explicitamente as partições ao consumidor.
    • partition.offset = KafkaError._PARTITION_EOF: Define o offset para o final do log (usado aqui incorretamente, geralmente é para indicar que a leitura deve continuar do final).
Inscrição no Tópico
Python
# Assina o tópico
consumer.subscribe([topic_name], on_assign=on_assign)

  • consumer.subscribe([topic_name], on_assign=on_assign): Inscreve o consumidor no tópico especificado e define on_assign como a função callback para a atribuição de partições.
Loop de Consumo de Mensagens
Python
try:
    while True:
        # Lê a próxima mensagem do tópico
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # Fim da partição
                print(f"End of partition reached {msg.partition()} [{msg.offset()}]")
            elif msg.error():
                raise KafkaException(msg.error())
        else:
            # Mensagem válida
            print(f"Consumed message: {msg.value().decode('utf-8')}")

except KeyboardInterrupt:
    print("Consumo interrompido pelo usuário")

finally:
    # Fecha o consumidor
    consumer.close()

  • while True: Cria um loop infinito para continuar consumindo mensagens.
  • msg = consumer.poll(timeout=1.0): Consulta o Kafka para buscar uma mensagem. O timeout define o tempo máximo de espera por uma mensagem.
    • if msg is None: Se não houver mensagem, continua o loop.
    • if msg.error(): Verifica se a mensagem tem um erro associado.
      • if msg.error().code() == KafkaError._PARTITION_EOF: Verifica se o erro indica o fim da partição.
        • print(f"End of partition reached {msg.partition()} [{msg.offset()}]"): Imprime uma mensagem indicando que o fim da partição foi alcançado.
      • elif msg.error(): Para outros erros, lança uma exceção KafkaException.
    • else: Se a mensagem é válida:
      • print(f"Consumed message: {msg.value().decode('utf-8')}"): Imprime o conteúdo da mensagem consumida.
Tratamento de Interrupção e Fechamento do Consumidor
  • except KeyboardInterrupt: Captura a interrupção do teclado (Ctrl+C) para encerrar o loop de forma graciosa.
    • print("Consumo interrompido pelo usuário"): Imprime uma mensagem indicando que o consumo foi interrompido pelo usuário.
  • finally: Garante que o consumidor seja fechado corretamente quando o loop é interrompido.
    • consumer.close(): Fecha o consumidor, liberando recursos e garantindo que o estado do consumidor seja salvo.
Resumo
  • Configurações de consumidor são feitas corretamente para garantir que o consumidor comece do início (earliest).
  • Atribuição de partições é feita usando o callback on_assign.
  • Loop principal consome mensagens continuamente, lidando com erros e imprimindo mensagens consumidas.
  • Tratamento de interrupção e fechamento adequado do consumidor.

Agora que o código já está mais do que explicado, vamos executá-lo:

Python
python3 rest_proxy_python_test.py

Consumed message: {"id":"14042","first_name":"Lucy","last_name":"Brown","email":"john.smith@sample.com","gender":"Female","ip_address":"148.78.124.124","last_login":"2020-4-14","account_balance":"849.62","country":"Germany","favorite_color":"White"}
Consumed message: {"id":"14043","first_name":"Tom","last_name":"Smith","email":"jack.doe@sample.com","gender":"Male","ip_address":"234.235.204.26","last_login":"2022-12-15","account_balance":"6128.25","country":"UK","favorite_color":"Orange"}
Consumed message: {"id":"14044","first_name":"John","last_name":"Jones","email":"anna.doe@test.com","gender":"Female","ip_address":"100.118.162.34","last_login":"2020-1-17","account_balance":"3300.60","country":"China","favorite_color":"Yellow"}
Consumed message: {"id":"14045","first_name":"Lucy","last_name":"Miller","email":"lucy.rodriguez@demo.com","gender"^C
Consumo interrompido pelo usuário

Está aí o nosso código funcionando e consumindo o tópico ! 🙂

E assim terminamos mais um artigo. No próximo começaremos a nos divertir com o Apache Flink ! Nos vemos lá !

Referências

Sergio Willians

Sergio Willians

Sergio Willians é o fundador do GPO (Grupo de Profissionais Oracle) e possui quase 30 anos de experiência em tecnologias Oracle, sendo especialista em desenvolvimento Forms/Reports, PL/SQL e EBS (E-Business Suite) nos módulos Receivables, Payables e General Ledger. Atualmente trabalha na Scania Latin America, onde se dedica à área de integração de dados com Confluent Kafka. Sua paixão é compartilhar conhecimento com a comunidade Oracle, contribuindo para o crescimento e a excelência da plataforma.

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

plugins premium WordPress