Pular para o conteúdo

Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte V – Iniciando com o Apache Flink

Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte V – Iniciando com o Apache Flink

No artigo anterior, discutimos o Rest Proxy e testamos sua funcionalidade utilizando o Confluent CLI. Mas agora, o tão esperado momento chegou! Vamos começar a utilizar o Flink. No entanto (sempre há um porém), precisaremos fazer uma alteração em nosso arquivo docker-compose.yml.

Não fiquem bravos, essas mudanças serão benéficas para que tenhamos mais traquilidade nos próximos capítulos de nossa saga.

Abaixo o nosso novo docker-compose.yml

YAML
---
services:

  broker:
    image: confluentinc/cp-server:7.5.1
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 'broker:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" 
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - workarea:/app/workarea
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    command:
      - bash
      - -c
      - |
        echo "Installing Connector"
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-http-1.7.6.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-oracle-cdc-2.2.1.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-sftp-3.2.4.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/streamthoughts-kafka-connect-file-pulse-2.10.0.zip
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.5.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.5.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.5.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.5.1
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.5.1
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

  flink-init:
    image: busybox
    container_name: flink-init
    command: >
      sh -c "
      echo 'Downloading Flink Kafka connector...' &&
      wget -O /opt/flink/plugins/kafka/flink-sql-connector-kafka-3.2.0-1.19.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar --no-check-certificate &&
      echo 'Connector downloaded successfully.'
      "
    volumes:
      - flink-plugins:/opt/flink/plugins/kafka
    depends_on:
      - broker

  flink-jobmanager:
    image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
    hostname: flink-jobmanager
    container_name: flink-jobmanager
    ports:
      - 9081:9081
    command: jobmanager
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: flink-jobmanager
        rest.bind-port: 9081
    volumes:
      - flink-plugins:/opt/flink/plugins/kafka
    depends_on:
      - flink-init

  flink-taskmanager:
    image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
    hostname: flink-taskmanager
    container_name: flink-taskmanager
    depends_on:
      - flink-jobmanager
    command: taskmanager
    scale: 1
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 10
    volumes:
      - flink-plugins:/opt/flink/plugins/kafka

  flink-sql-client:
    image: cnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17
    hostname: flink-sql-client
    container_name: flink-sql-client
    depends_on:
      - flink-jobmanager
    environment:
      FLINK_JOBMANAGER_HOST: flink-jobmanager
    volumes:
      - ./settings/:/settings
      - flink-plugins:/opt/flink/plugins/kafka

volumes:
  workarea:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: /home/swillians/workarea
  flink-plugins:
    driver: local

Fiz algumas alterações no arquivo, para que fosse copiada a versão correta do conector kafka para o Flink. Não está muito elegante, inclusive, eu poderia ter utilizado um arquivo .env para organizar os paths. Mas isso ficará para um outro momento. Prometo que até o final da série, montaremos um docker-compose.yml bem interessante !

Levante o ambiente ! Após todos os containers subirem, vamos executar o client do Flink:

Bash
sudo docker compose exec flink-sql-client sql-client.sh embedded --library /opt/flink/plugins/kafka

O comando –library /opt/flink/plugins/kafka serve para indicar ao client onde os plugins estão armazenados.

image 15

Vamos criar a nossa primeira tabela baseada em um tópico kafka:

SQL
CREATE TABLE flink_clients_account_balance (
  first_name STRING,
  last_name STRING,
  account_balance DECIMAL(10,2),
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  WATERMARK FOR `ts` AS `ts`
) WITH (
    'connector' = 'kafka', 
    'topic' = 'tsv.spooldirfile.clients', 
    'scan.startup.mode' = 'earliest-offset', 
    'properties.bootstrap.servers' = 'broker:29092', 
    'value.format' = 'json'
);

Explicando o código

Definição da Tabela e Colunas
SQL
CREATE TABLE flink_clients_account_balance

  • Define a criação de uma tabela chamada flink_clients_account_balance.
Colunas da Tabela
SQL
first_name STRING,
last_name STRING,
account_balance DECIMAL(10,2),
ts TIMESTAMP(3) METADATA FROM 'timestamp',

  • first_name STRING: Coluna first_name do tipo STRING.
  • last_name STRING: Coluna last_name do tipo STRING.
  • account_balance DECIMAL: Coluna latest_account_balance do tipo DECIMAL.
  • ts TIMESTAMP(3) METADATA FROM ‘timestamp’: Coluna ts do tipo TIMESTAMP com precisão de 3 dígitos para milissegundos. Esta coluna usa metadados provenientes do campo timestamp.
Watermark
  • WATERMARK FOR `ts` AS `ts` :Define uma política de geração de watermarks para a coluna ts. Watermarks são usados para gerenciar eventos de processamento em tempo e lidar com dados fora de ordem (late data).

Configurações do Conector

SQL
) WITH (
'connector' = 'kafka',
'topic' = 'tsv.spooldirfile.clients',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'broker:29092',
'value.format' = 'json');

Conector Kafka
  • ‘connector’ = ‘kafka’: Especifica que a tabela vai usar o conector Kafka.
  • Nome do Tópico: ‘topic’ = ‘tsv.spooldirfile.clients’: Define o nome do tópico Kafka de onde os dados serão consumidos.
Modo de Início da Leitura
  • ‘scan.startup.mode’ = ‘earliest-offset’: Define que a leitura dos dados deve começar do offset mais antigo disponível no tópico.
Servidores Bootstrap do Kafka
  • ‘properties.bootstrap.servers’ = ‘broker:29092’: Define a lista de servidores Kafka para conexão. Neste caso, está usando broker:29092.
Formato dos Valores:
  • ‘value.format’ = ‘json’: Define que o formato dos dados no tópico Kafka é JSON.

Resumo

Este código cria uma tabela flink_clients_account_balance no Flink que consome dados de um tópico Kafka chamado tsv.spooldirfile.clients. A tabela tem colunas para first_name, last_name, account_balance, e um campo ts que contém metadados do timestamp. A política de watermarks é definida para a coluna ts para gerenciar eventos em tempo e dados fora de ordem. As configurações adicionais especificam o conector Kafka, o nome do tópico, o modo de início da leitura, os servidores bootstrap do Kafka e o formato dos valores dos dados.

Copie o código de criação de tabela e tecle ENTER para que ela seja criada.

image 1

Agora vamos executar o SELECT para ler a nossa tabela flink_clients_account_balance:

SQL
SELECT * FROM flink_clients_account_balance;

image 2
image 3

Tabela criada e já recebendo os dados do tópico do kafka ! 🙂

Fique a vontade para navegar pelas opções. As principais são:

  • O: Open Row: Abre o detalhamento dos campos da linha que você escolher.
image 4
  • N e P: Navegam pelas páginas com os resutados
  • G e L: G vai para a página escolhida e L para a última página
  • – e +: Definem o tempo de refresh dos dados
  • R: Atualiza a lista com os dados
  • Q: Sai da lista dos dados

Vamos dar uma olhada no nosso job através do Apache Flink Dashboard. Você pode acessá-lo através do link: http://0.0.0.0:9081/#/overview

image 5
image 6
image 7

E assim terminamos mais um capítulo !

No próximo artigo, iremos criar uma tabela no Flink baseada em uma tabela do banco de dados PostgreSQL. Ele servirá como base para trabalharmos com enriquecimento de dados.

Sergio Willians

Sergio Willians

Sergio Willians é o fundador do GPO (Grupo de Profissionais Oracle) e possui quase 30 anos de experiência em tecnologias Oracle, sendo especialista em desenvolvimento Forms/Reports, PL/SQL e EBS (E-Business Suite) nos módulos Receivables, Payables e General Ledger. Atualmente trabalha na Scania Latin America, onde se dedica à área de integração de dados com Confluent Kafka. Sua paixão é compartilhar conhecimento com a comunidade Oracle, contribuindo para o crescimento e a excelência da plataforma.

Comentário(s) da Comunidade

  1. Avatar de Mendes

    Sergio tenho acompanhado o seus artigos sobre Kafka e tenho aprendido muito sobre a tecnologia.
    Fico muito agradecido por ter acesso a esse material. Obrigado por compartilhar seu conhecimento!

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

plugins premium WordPress