Pular para o conteúdo

Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte II – Preparação

Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte II

Olá pessoal !

No primeiro artigo, abordamos a instalação do nosso ambiente de estudos e a implementação da Confluent Platform. Agora, avançaremos ainda mais, focando na instalação e configuração do conector spooldir, fundamental para a geração de dados em nosso tópico.

Para dar início, efetuaremos algumas modificações em nosso arquivo docker-compose.yml. Isso incluirá a criação de um volume persistente e a instalação do conector spooldir no container connect.

Volume persistente

O volume persistente servirá para dois propósitos:

  • Repositório do arquivo de instalação do conector
  • Diretório de origem dos arquivos CSV a serem lidos

Então vamos começar. Primeiramente crie o diretório workarea:

ShellScript
/home/swillians/workarea

Abra o arquivo docker-compose.yml e localize a seção onde definimos os serviços. Adicione o seguinte trecho abaixono final do arquivo:

YAML
volumes:
  workarea:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: /home/swillians/workarea  #Inclua aqui o seu diretório   

O nome do volume a ser criado será workarea, e referenciará o path /home/swillians/workarea.

Instalação do conector spooldir

Na máquina host, vamos criar o diretório confluent-connectors:

ShellScript
/home/swillians/workarea/confluent-connectors

Faça o download do conector CLICANDO AQUI e o coloque no diretório acima.

Agora, faremos a inclusão do seguinte código no container connect:

YAML
 connect:
    image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - workarea:/app/workarea
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    command:
      - bash
      - -c
      - |
        echo "Instalando o Conector"
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
        #
        /etc/confluent/docker/run &
        #
        sleep infinity

Agora é hora de subir os containers:

ShellScript
sudo docker compose up -d

[+] Running 11/0
  Container flink-jobmanager   Running                                                    0.0s 
  Container flink-taskmanager  Running                                                    0.0s 
  Container flink-sql-client   Running                                                    0.0s 
  Container broker             Running                                                    0.0s 
  Container schema-registry    Running                                                    0.0s 
  Container connect            Running                                                    0.0s 
  Container control-center     Running                                                    0.0s 
  Container ksqldb-server      Running                                                    0.0s 
  Container ksqldb-cli         Running                                                    0.0s 
  Container ksql-datagen       Running                                                    0.0s 
  Container rest-proxy         Running

Com todos os containers executando, acesse agora a Confluent Platform e verifique se o conector spooldir está disponível:

Confluent Platform e Apache Flink

Se o spooldir estiver disponível, significa que deu tudo certo ! 🙂

Você tem a liberdade de instalar quantos conectores desejar. Como ilustrado na imagem acima, tenho vários outros conectores instalados. Se quiser instalar mais de um, siga o exemplo abaixo:

YAML
confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/streamthoughts-kafka-connect-file-pulse-2.10.0.zip

Assim você pode instalar quantos conectores forem necessários !

Para facilitar, abaixo o docker-compose.yml completo, com as alterações:

YAML
---
services:

  broker:
    image: confluentinc/cp-server:7.5.1
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 'broker:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" 
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - workarea:/app/workarea
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    command:
      - bash
      - -c
      - |
        echo "Instalando o Conector"
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
        #
        /etc/confluent/docker/run &
        #
        sleep infinity

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.5.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.5.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.5.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.5.1
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.5.1
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

  flink-sql-client:
    image: cnfldemos/flink-sql-client-kafka:1.19.0-scala_2.12-java17
    hostname: flink-sql-client
    container_name: flink-sql-client
    depends_on:
    - flink-jobmanager
    environment:
      FLINK_JOBMANAGER_HOST: flink-jobmanager
    volumes:
    - ./settings/:/settings

  flink-jobmanager:
    image: cnfldemos/flink-kafka:1.19.0-scala_2.12-java17
    hostname: flink-jobmanager
    container_name: flink-jobmanager
    ports:
    - 9081:9081
    command: jobmanager
    environment:
    - |
      FLINK_PROPERTIES=
      jobmanager.rpc.address: flink-jobmanager
      rest.bind-port: 9081

  flink-taskmanager:
    image: cnfldemos/flink-kafka:1.19.0-scala_2.12-java17
    hostname: flink-taskmanager
    container_name: flink-taskmanager
    depends_on:
    - flink-jobmanager
    command: taskmanager
    scale: 1
    environment:
    - |
      FLINK_PROPERTIES=
      jobmanager.rpc.address: flink-jobmanager
      taskmanager.numberOfTaskSlots: 10

volumes:
  workarea:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: /home/swillians/workarea  #Inclua aqui o seu diretório    

Gerando arquivos para o conector

Antes de começarmos a configurar o conector, precisamos gerar uma massa de dados contínua para ser consumida por ele. Para isso, crie o seguinte diretório:

ShellScript
/home/swillians/workarea/confluent-spooldir-test/files

Dentro do diretório files, crie mais duas pastas. A error e a finished.

ShellScript
/home/swillians/workarea/confluent-spooldir-test/files/error

/home/swillians/workarea/confluent-spooldir-test/files/finished

A idéia é ter um script Python que faça a geração dos arquivos de tempos em tempos. Para saber se você possui o python instalado, digite o comando abaixo:

ShellScript
python3

Python 3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> 

Se o resultado acima aparecer, o Python já está instalado. Caso não esteja, faça a instalação utilizando os comandos abaixo:

ShellScript
sudo apt update

sudo apt install python3

Com o Python instalado, poderemos executar o nosso script.

Python
import random
import csv
import time

# Definição de dados fictícios para geração aleatória
first_names = ["John", "Jane", "Alice", "Bob", "Eve", "Mike", "Anna", "Tom", "Lucy", "Jack"]
last_names = ["Smith", "Doe", "Johnson", "Brown", "Williams", "Jones", "Garcia", "Miller", "Davis", "Rodriguez"]
emails = ["example.com", "test.com", "sample.com", "demo.com"]
genders = ["Male", "Female"]
countries = ["USA", "Canada", "UK", "Australia", "Germany", "France", "India", "Brazil", "China", "Japan"]
colors = ["Red", "Blue", "Green", "Yellow", "Black", "White", "Purple", "Orange", "Pink", "Brown"]

# Current ID, starting from 1
current_id = 1

# Função para gerar um IP aleatório
def generate_ip():
    return ".".join(str(random.randint(0, 255)) for _ in range(4))

# Função para gerar uma data de último login aleatória
def generate_last_login():
    return f"{random.randint(2020, 2023)}-{random.randint(1, 12)}-{random.randint(1, 28)}"

def create_tsv_file():
    global current_id
    end_id = current_id + 999  # As we are generating 1000 records

    # Getting the current timestamp for the filename
    timestamp = time.strftime("%Y%m%d%H%M%S")
    filename = f"tsv-spooldir-source_{timestamp}.tsv"

    with open(filename, "w", newline='') as tsvfile:
        writer = csv.writer(tsvfile, delimiter='\t')
        
        # Escrevendo o cabeçalho
        writer.writerow(["id", "first_name", "last_name", "email", "gender", "ip_address", "last_login", "account_balance", "country", "favorite_color"])
        
        # Gerando e escrevendo 1000 registros aleatórios
        for i in range(current_id, end_id + 1):
            writer.writerow([
                i, 
                random.choice(first_names),
                random.choice(last_names),
                f"{random.choice(first_names).lower()}.{random.choice(last_names).lower()}@{random.choice(emails)}",
                random.choice(genders),
                generate_ip(),
                generate_last_login(),
                "{:.2f}".format(random.uniform(0, 10000)),
                random.choice(countries),
                random.choice(colors)
            ])
    
    # Updating the current_id for the next iteration
    current_id = end_id + 1
    
    print(f"Arquivo {filename} criado com sucesso!")

while True:
    create_tsv_file()
    time.sleep(60)  # Espera 1 minuto(s) 

Chame o arquivo de tsv-spooldir-source.py e salve-o no diretório files, que criamos acima.

Para executar o script, utilize o seguinte comando:

ShellScript
cd /workarea/confluent-spooldir-test/files

python3 tsv-spooldir-source.py

Arquivo tsv-spooldir-source_20240512175643.tsv criado com sucesso!

Esse script gerará a cada minuto um arquivo .TSV (Tab Separated Value). Esses arquivos terão o seguinte formato:

Plaintext
id	first_name	last_name	email	gender	ip_address	last_login	account_balance	country	favorite_color
1	Jennine	Burkett	jburkett0@hp.com	Agender	203.96.208.246	2018-03-19T22:09:31Z	20795.26	AL	#1852e1
2	Christiano	Le Grys	clegrys1@boston.com	Male	140.21.114.158	2018-09-30T23:13:42Z	23052.0	CN	#6e5afe
3	Geneva	Livezley	glivezley2@squarespace.com	Non-binary	26.103.199.8	2018-09-21T19:10:51Z	24749.65	ML	#38ce82
4	Gottfried	Kiera	gkiera3@washingtonpost.com	Male	3.40.201.160	2017-11-13T02:18:12Z	12641.46	BR	#052a99
5	Mariska	Nimmo	mnimmo4@accuweather.com	Female	187.198.175.205	2017-02-06T17:20:56Z	7806.15	RU	#a1c21d
6	Aubine	Anthes	aanthes5@google.com.au	Female	13.41.43.20	2014-05-07T05:18:57Z	3539.99	CN	#c2df7e

Pronto ! Agora já temos os arquivos que irão ser consumidos pelo nosso conector ! 🙂

Vamos por o nosso script para funcionar e começar a gerar os arquivos que precisamos, para isso, utilize o comando abaixo:

ShellScript
python3 tsv-spooldir-source.py 

Arquivo tsv-spooldir-source_20240514133725.tsv criado com sucesso!

Configurando o conector SpoolDir

Agora chegou o tão esperado momento ! A configuração do conector SpoolDir !

Os conectores aceitam dois tipos de arquivos de configuração:

  • .properties
  • .json

Como nos meus artigos anteriores eu utilizei arquivos .json, dessa vez utilizarei o arquivo .properties.

Abaixo o conteúdo do nosso arquivo:

Plaintext
name=tsv.spooldirfile.clients.source
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.spooldir.SpoolDirCsvSourceConnector
input.path=/app/workarea/confluent-spooldir-test/files
error.path=/app/workarea/confluent-spooldir-test/files/error
finished.path=/app/workarea/confluent-spooldir-test/files/finished
input.file.pattern=^tsv-spooldir.*\.tsv$
halt.on.error=false
topic=tsv.spooldirfile.clients
schema.generation.enabled=true
csv.first.row.as.header=true
csv.separator.char=9
offset.flush.interval.ms=10000

Salve o conteúdo acima como spooldirtsv.properties.

Gosto muito de detalhar as coisas nos meus artigos, mas creio que a maior parte das propriedades sejam auto-explicativas. Caso deseje obter mais detalhes sobre o conector, CLIQUE AQUI e leia a documentação oficial.

Vamos agora criar o tópico que receberá os dados de nosso conector, para isso, acesse a Confluent Platform e clique em Topics.

image 31

Clique em Add topic.

Em topic name, coloque tsv.spooldirfile.clients e depois clique em Create with Defaults.

image 32

Nosso tópico foi criado com sucesso !

Vamos configurar agora o conector, para isso, clique em Connect>>connect-default

image 33
image 34

Clique agora no botão Add conector.

image 35

Clique em Upload connector config file e escolha o arquivo spooldirtsv.properties.

image 36

Nesta página você verá todas as propriedades descritas em nosso arquivo e muitas outras disponíveis para a configuração do nosso conector.

Vá até o final da página e Clique em Next.

image 37

Se nenhum erro for encontrado no conector, você verá a seguinte página:

image 38

Agora é só clicar em Launch.

image 39

Conector executando com sucesso !

Vamos dar uma olhada no nosso conector funcionando e alimentando o nosso tópico ? Para isso, clique em Topics e depois no tópico que criamos:

image 40
image 41

Agora clique em Messages.

image 42
image 43

Parabéns ! O conector spooldir está funcionando e produzindo dados ! 🙂

No próximo artigo, começaremos finalmente a utilizar o Apache Flink ! Vejo vocês lá !

Referências

Sergio Willians

Sergio Willians

Sergio Willians é o fundador do GPO (Grupo de Profissionais Oracle) e possui quase 30 anos de experiência em tecnologias Oracle, sendo especialista em desenvolvimento Forms/Reports, PL/SQL e EBS (E-Business Suite) nos módulos Receivables, Payables e General Ledger. Atualmente trabalha na Scania Latin America, onde se dedica à área de integração de dados com Confluent Kafka. Sua paixão é compartilhar conhecimento com a comunidade Oracle, contribuindo para o crescimento e a excelência da plataforma.

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

plugins premium WordPress