Pular para o conteúdo

Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte III – KsqlDB

Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte III

Olá pessoal !

Devido a pedidos, hoje eu irei incluir dois novos assuntos com exemplos. O KsqlDB e o REST Proxy. Apesar de fugir um pouco do tema, acredito que seja algo bem interessante para aprendermos. Prometo que após o artigo do Rest Proxy, partiremos para o Apache Flink diretamente ! 🙂

KsqlDB

KsqlDB é um banco de dados projetado especificamente para aplicações de processamento de fluxo em cima do Apache Kafka. Ele é voltado para a construção de aplicativos que processam fluxos de eventos em tempo real. Aqui estão alguns detalhes sobre o ksqlDB:

  • Streaming de Eventos: O ksqlDB é conhecido por ser um banco de dados de streaming de eventos. Ele permite que você processe fluxos de dados em tempo real, o que é essencial para cenários como detecção de fraudes e aplicativos baseados no padrão Event Sourcing.
  • SQL para Descrição de Tarefas: Uma característica interessante do ksqlDB é que você pode usar SQL para descrever o que deseja fazer em vez de como fazê-lo. Isso facilita a construção de aplicativos nativos do Kafka para processar fluxos de dados em tempo real.

Flink é uma escolha mais geral e escalável para profissionais altamente especializados em processamento de fluxo, enquanto o ksqlDB é mais acessível e adequado para profissionais que desejam uma abordagem mais simples e baseada em SQL. Porém, o KsqlDB ainda é amplamente usado e tem o seu espaço.

Vamos agora ao que interessa, vamos criar uma STREAM e um TABLE utilizando o KsqlDB. Vamos entender o que cada um deles significa:

  • Streams (fluxos):
    • Streams são séries ilimitadas de eventos.
    • Eles são construídos a partir de tópicos do Apache Kafka.
    • Cada evento em um stream é tratado como uma entrada contínua.
    • Streams crescem à medida que novos eventos são adicionados.
    • Eles são úteis para processamento de fluxo em tempo real, como detecção de fraudes ou análise de eventos.
  • Tables (tabelas):
    • Tables representam o estado atual de uma chave específica.
    • Assim como streams, elas também são construídas a partir de tópicos do Kafka.
    • Uma tabela só cresce quando novas chaves são adicionadas.
    • Elas são ideais para obter o estado atual de uma chave rapidamente.
    • Por exemplo, se tivermos uma tabela com informações sobre a localização de pessoas, ela nos diria onde cada pessoa está no momento.

Em resumo, streams são para eventos contínuos e tables representam o estado atual de uma chave específica. Ambos são fundamentais para o processamento de fluxo no ksqlDB !

Para começarmos, vamos fazer uma pequena modificação no arquivo .properties do conector:

Plaintext
name=tsv.spooldirfile.clients.source
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector
input.path=/app/workarea/confluent-spooldir-test/files
error.path=/app/workarea/confluent-spooldir-test/files/error
finished.path=/app/workarea/confluent-spooldir-test/files/finished
input.file.pattern=^tsv-spooldir.*\.tsv$
halt.on.error=false
topic=tsv.spooldirfile.clients
schema.generation.enabled=true
csv.first.row.as.header=true
csv.separator.char=9
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

Fiz algumas modificações para definir o payload JSON. Assim não teremos problemas ao criar as STREAMs e os TABLES. Crie o novo conector utilizando os mesmos passos do artigo anterior, com esse novo arquivo.

Agora vamos ao que interessa. No Control Center, clique em KsqlDB, depois em ksqldb1:

Confluent Platform e Apache Flink

No editor, cole o seguinte código:

SQL
CREATE STREAM raw_clients_stream (
  id STRING,
  first_name STRING,
  last_name STRING,
  email STRING,
  gender STRING,
  ip_address STRING,
  last_login STRING,
  account_balance DOUBLE,
  country STRING,
  favorite_color STRING
) WITH (
  KAFKA_TOPIC='tsv.spooldirfile.clients',
  VALUE_FORMAT='JSON',
  KEY_FORMAT='JSON',  -- Usar JSON para suportar chaves compostas
  PARTITIONS=1
);

Agora clique em Run Query.

image 59

A nossa STREAM foi criada com sucesso !

Clique agora na opção Flow e depois em RAW_CLIENTS_STREAM. Agora observe os dados do tópico tsv.spooldirfile.clients fluindo pela stream que criamos !

image 60

Clique novamente no Editor, e vamos criar uma nova stream baseada na RAW_CLIENTS_STREAM que criamos acima. Para isso, use utilize o código abaixo:

SQL
CREATE STREAM client_stream AS
SELECT
  id,
  first_name,
  last_name
FROM raw_clients_stream;

image 61

Retorne em Flow e clique em CLIENT_STREAM:

image 62

Vamos abrir uma mensagem e ver o seu conteúdo:

JSON
{
  "ID": "19000",
  "FIRST_NAME": "Tom",
  "LAST_NAME": "Davis"
}

Observe que a nova STREAM que criamos, possui apenas 3 campos.

O próximo passo será criar a nossa tabela, baseada na RAW_CLIENTS_STREAM. Essa tabela guardará sempre o último saldo da conta do cliente, que terá como chave first_name e last_name.

Para isso, use o código abaixo:

SQL
CREATE TABLE clients_table WITH (KEY_FORMAT='JSON') AS
SELECT
  first_name,
  last_name,
  LATEST_BY_OFFSET(account_balance) AS latest_account_balance
FROM raw_clients_stream
GROUP BY first_name, last_name;

image 63
image 64

Agora vamos fazer um pequeno SELECT para consultar os nossos dados na tabela CLIENTS_TABLE:

SQL
SELECT 
  first_name,
  last_name,
  latest_account_balance
FROM clients_table 
EMIT CHANGES;

image 65

Vamos pegar a posição de saldo de um cliente específico:

SQL
SELECT 
  first_name,
  last_name,
  latest_account_balance
FROM clients_table 
WHERE first_name = 'Jane'
AND   last_name = 'Doe'
EMIT CHANGES;

image 66

Toda vez que o saldo para este cliente se alterar, será exibidido o registro com o novo valor.

A cláusula EMIT CHANGES no final do SELECT, faz com que a query persista e exiba os dados assim que a condição do WHERE seja cumprida.

O próximo passo agora, é você que decide ! Experimente, teste, erre, acerte, crie cenários e explore toda a capacidade do KsqlDB !

Para aprender mais sobre o KqlDB, inscreva-se neste conteúdo para iniciantes, assista o mini curso sobre STREAMS e acesse a documentação oficial !

E assim terminamos mais um capítulo de nossa saga ! Até o próximo artigo !

Referências

Sergio Willians

Sergio Willians

Sergio Willians é o fundador do GPO (Grupo de Profissionais Oracle) e possui quase 30 anos de experiência em tecnologias Oracle, sendo especialista em desenvolvimento Forms/Reports, PL/SQL e EBS (E-Business Suite) nos módulos Receivables, Payables e General Ledger. Atualmente trabalha na Scania Latin America, onde se dedica à área de integração de dados com Confluent Kafka. Sua paixão é compartilhar conhecimento com a comunidade Oracle, contribuindo para o crescimento e a excelência da plataforma.

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

plugins premium WordPress