Pular para o conteúdo

Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte VI – Apache Flink e PostgreSQL

Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte VI – Apache Flink e PostgreSQL

No artigo anterior, iniciamos com o Flink e o Kafka. Dando continuidade à nossa série de artigos, vamos agora conectar o Flink ao PostgreSQL. Nosso objetivo final é utilizar uma tabela que será criada no PostgreSQL para enriquecer os dados provenientes do Kafka.

O que iremos instalar em nosso ambiente:

  • Banco de dados PostgreSQL 13
  • PGADMIN
  • Driver JDBC PostgreSQL para o Flink

Adivinhe o que precisaremos fazer para instalar mais esses recursos em nosso ambiente ? Alterar o nosso docker-compose.yml ! rs

Mas como prometido, começaremos a melhorá-lo para que tenhamos um docker-compose.yml mais limpo e fácil de alterar se for necessário. Para isso, introduziremos o arquivo .env

O arquivo .env no contexto do Docker Compose é um arquivo de configuração usado para definir variáveis de ambiente que podem ser referenciadas no arquivo docker-compose.yml. Ele permite que você separe a configuração do código e mantenha suas variáveis de ambiente fora do código-fonte, o que facilita a gestão de diferentes configurações para diferentes ambientes (desenvolvimento, teste, produção, etc.). No nosso caso, servirá principalmente para salvar as credenciais de acesso ao banco de dados.

Vamos criar o nosso .env, com o conteúdo abaixo:

Plaintext
# PostgreSQL
DATABASE_NAME=<preencha aqui>
DATABASE_USER=<preencha aqui>
DATABASE_PASSWORD=<preencha aqui>

# PGADMIN
PGADMIN_USER=<preencha aqui>
PGADMIN_PASSWORD=<preencha aqui>

Onde estiver escrito <preencha aqui>, substitua pelos conteúdos que achar mais conveniente para o seu ambiente. Antes que eu me esqueça, o arquivo .env tem que estar no mesmo path do docker-compose.yml.

Agora vamos fazer as mudanças em nosso docker-compose.yml:

YAML
---
services:
  postgres:
    image: postgres:13
    container_name: postgres_container
    environment:
      POSTGRES_USER: ${DATABASE_USER}
      POSTGRES_PASSWORD: ${DATABASE_PASSWORD}
      POSTGRES_DB: ${DATABASE_NAME}
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data

  pgadmin:
    image: dpage/pgadmin4
    container_name: pgadmin_container
    environment:
      PGADMIN_DEFAULT_EMAIL: ${PGADMIN_USER}
      PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_PASSWORD}
    ports:
      - "5050:80"
    depends_on:
      - postgres
    volumes:
      - pgadmin_data:/var/lib/pgadmin

  broker:
    image: confluentinc/cp-server:7.5.1
    hostname: broker
    container_name: broker
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 'broker:9092'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_PROCESS_ROLES: 'broker,controller'
      KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
      KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
      KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
      KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
      KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
      # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" 
      # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  connect:
    image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - workarea:/app/workarea
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    command:
      - bash
      - -c
      - |
        echo "Installing Connector"
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-http-1.7.6.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-oracle-cdc-2.2.1.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-sftp-3.2.4.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
        confluent-hub install --no-prompt /app/workarea/confluent-connectors/streamthoughts-kafka-connect-file-pulse-2.10.0.zip
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.5.1
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
      CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
      CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

  ksqldb-server:
    image: confluentinc/cp-ksqldb-server:7.5.1
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_HOST_NAME: ksqldb-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      KSQL_KSQL_CONNECT_URL: "http://connect:8083"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'

  ksqldb-cli:
    image: confluentinc/cp-ksqldb-cli:7.5.1
    container_name: ksqldb-cli
    depends_on:
      - broker
      - connect
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

  ksql-datagen:
    image: confluentinc/ksqldb-examples:7.5.1
    hostname: ksql-datagen
    container_name: ksql-datagen
    depends_on:
      - ksqldb-server
      - broker
      - schema-registry
      - connect
    command: "bash -c 'echo Waiting for Kafka to be ready... && \
                       cub kafka-ready -b broker:29092 1 40 && \
                       echo Waiting for Confluent Schema Registry to be ready... && \
                       cub sr-ready schema-registry 8081 40 && \
                       echo Waiting a few seconds for topic creation to finish... && \
                       sleep 11 && \
                       tail -f /dev/null'"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      STREAMS_BOOTSTRAP_SERVERS: broker:29092
      STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
      STREAMS_SCHEMA_REGISTRY_PORT: 8081

  rest-proxy:
    image: confluentinc/cp-kafka-rest:7.5.1
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8082:8082
    hostname: rest-proxy
    container_name: rest-proxy
    environment:
      KAFKA_REST_HOST_NAME: rest-proxy
      KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
      KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
      KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'

  flink-init:
    image: busybox
    container_name: flink-init
    command: >
      sh -c "
      echo 'Downloading Flink Kafka connector...' &&
      wget -O /opt/flink/plugins/kafka/flink-sql-connector-kafka-3.2.0-1.19.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar --no-check-certificate &&
      echo 'Connector downloaded successfully.'
      echo 'Downloading Flink JDBC connector...' &&
      wget -O /opt/flink/plugins/kafka/postgresql-42.7.3.jar https://jdbc.postgresql.org/download/postgresql-42.7.3.jar --no-check-certificate &&
      echo 'Connector downloaded successfully.'
      "
    volumes:
      - flink-plugins:/opt/flink/plugins/kafka
    depends_on:
      - broker

  flink-jobmanager:
    image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
    hostname: flink-jobmanager
    container_name: flink-jobmanager
    ports:
      - 9081:9081
    command: jobmanager
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: flink-jobmanager
        rest.bind-port: 9081
    volumes:
      - flink-plugins:/opt/flink/plugins/kafka
    depends_on:
      - flink-init

  flink-taskmanager:
    image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
    hostname: flink-taskmanager
    container_name: flink-taskmanager
    depends_on:
      - flink-jobmanager
    command: taskmanager
    scale: 1
    environment:
      FLINK_PROPERTIES: |
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 10
    volumes:
      - flink-plugins:/opt/flink/plugins/kafka

  flink-sql-client:
    image: cnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17
    hostname: flink-sql-client
    container_name: flink-sql-client
    depends_on:
      - flink-jobmanager
    environment:
      FLINK_JOBMANAGER_HOST: flink-jobmanager
    volumes:
      - ./settings/:/settings
      - flink-plugins:/opt/flink/plugins/kafka

volumes:
  workarea:
    driver: local
    driver_opts:
      type: none
      o: bind
      device: /home/swillians/workarea
  flink-plugins:
    driver: local
  pgdata:  
  pgadmin_data:

Após subir o ambiente, vamos verificar se está tudo certo:

image 61

Observe que o pgadmin e o postgres_container subiram juntos com os outros containers. Vamos verificar se conseguimos acessá-los:

http://localhost:5050/login?next=/browser/

image 62

Entre com as credenciais que você utilizou no arquivo .env:

image 63

Agora clique em Add New Server para registramos o nosso banco de dados PostgreSQL:

image 64

Preencha os campos a seguir. Eu irei colocar a minha configuração, mas você pode substítui-la por aquela que achar mais conveniente para você:

image 65

Se atente que o hostname deverá ser o nome do container onde o postgreSQL está rodando:

image 66

Clique em Save:

image 67
image 68

Perfeito ! Agora vamos preparar a nossa tabela e populá-la ! Para isso, primeiro vamos instalar a biblioteca psycopg2 do Python:

Python
pip install psycopg2-binary

A biblioteca psycopg2 (e sua variante pré-compilada psycopg2-binary) é um adaptador de banco de dados PostgreSQL para a linguagem de programação Python. Ela que nos ajudará a conectar no banco e rodar o nosso script que fará a criação da tabela e também populará os dados.

Você pode reaproveitar o arquivo .env que utilizou com o docker-compose.yml para utilizar as credenciais do banco de dados que serão os mesmos. Para isso, copie o arquivo .env para o mesmo diretório do script Python, e inclua a variável DATABASE_HOST.

Plaintext
# PostgreSQL
DATABASE_HOST=localhost
DATABASE_NAME=<preencha aqui>
DATABASE_USER=<preencha aqui>
DATABASE_PASSWORD=<preencha aqui>

# PGADMIN
PGADMIN_USER=<preencha aqui>
PGADMIN_PASSWORD=<preencha aqui>

Abaixo o nosso script Python chamado populate_table.py:

Python
import psycopg2
from psycopg2 import sql
import random
from dotenv import load_dotenv
import os

# Carregar variáveis de ambiente do arquivo .env
load_dotenv()

# Configurações de conexão com o banco de dados
DB_HOST = os.getenv("DATABASE_HOST")
DB_NAME = os.getenv("DATABASE_NAME")
DB_USER = os.getenv("DATABASE_USER")
DB_PASS = os.getenv("DATABASE_PASSWORD")

# Conexão com o banco de dados PostgreSQL
conn = psycopg2.connect(
    host=DB_HOST,
    dbname=DB_NAME,
    user=DB_USER,
    password=DB_PASS
)

# Criação de um cursor para executar comandos SQL
cur = conn.cursor()

# Comando SQL para criar a tabela CLIENTS_ADDRESS
create_table_query = sql.SQL("""
    CREATE TABLE IF NOT EXISTS CLIENTS_ADDRESS (
        FIRST_NAME VARCHAR(50),
        LAST_NAME VARCHAR(50),
        ADDRESS VARCHAR(255)
    )
""")

# Execução do comando SQL para criar a tabela
cur.execute(create_table_query)

# Listas de nomes
first_names = ["John", "Jane", "Alice", "Bob", "Eve", "Mike", "Anna", "Tom", "Lucy", "Jack"]
last_names = ["Smith", "Doe", "Johnson", "Brown", "Williams", "Jones", "Garcia", "Miller", "Davis", "Rodriguez"]

# Função para gerar um endereço aleatório
def generate_random_address():
    streets = ["Main St", "High St", "Park Ave", "Oak St", "Pine St", "Maple St", "Cedar St", "Elm St", "View St", "Lake St"]
    street_number = random.randint(1, 1000)
    return f"{street_number} {random.choice(streets)}"

# Inserção dos dados na tabela
for first_name in first_names:
    for last_name in last_names:
        address = generate_random_address()
        insert_query = sql.SQL("""
            INSERT INTO CLIENTS_ADDRESS (FIRST_NAME, LAST_NAME, ADDRESS)
            VALUES (%s, %s, %s)
        """)
        cur.execute(insert_query, (first_name, last_name, address))

# Confirmação das alterações no banco de dados
conn.commit()

# Fechamento do cursor e da conexão
cur.close()
conn.close()

print("Tabela CLIENTS_ADDRESS criada e populada com sucesso.")

Vamos executá-lo agora:

Python
python3 populate_table.py 

Tabela CLIENTS_ADDRESS criada e populada com sucesso.

Vamos verificar a tabela através do PGADMIN:

image 69

Com a nossa tabela criada e populada, vamos voltar ao Flink e criar lá a nossa tabela baseada no PostgreSQL.

Agora vamos executar o sql client do Flink:

Bash
sudo docker compose exec flink-sql-client sql-client.sh embedded --library /opt/flink/plugins/kafka
Apache Flink e PostgreSQL

Vamos agora criar a tabela, para isso utilize o comando abaixo:

SQL
CREATE TABLE clients_address (
    first_name STRING,
    last_name STRING,
    address STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://postgres_container:5432/flink_db',
    'table-name' = 'clients_address',
    'username' = '<coloque aqui o username do arquivo .env>',
    'password' = '<coloque aqui a senha do arquivo .env>',
    'driver' = 'org.postgresql.Driver'
);

Agora vamos dar um SELECT em nossa tabela:

SQL
SELECT * FROM clients_address;

E o resultado é:

image 59

Parabéns ! A sua tabela no Flink foi criada com sucesso !

No próximo artigo vamos enriquecer os dados que recebemos do Kafka, utilizando os dados do PostgreSQL.

Até lá !

Referências

Sergio Willians

Sergio Willians

Sergio Willians é o fundador do GPO (Grupo de Profissionais Oracle) e possui quase 30 anos de experiência em tecnologias Oracle, sendo especialista em desenvolvimento Forms/Reports, PL/SQL e EBS (E-Business Suite) nos módulos Receivables, Payables e General Ledger. Atualmente trabalha na Scania Latin America, onde se dedica à área de integração de dados com Confluent Kafka. Sua paixão é compartilhar conhecimento com a comunidade Oracle, contribuindo para o crescimento e a excelência da plataforma.

Deixe um comentário

O seu endereço de e-mail não será publicado. Campos obrigatórios são marcados com *

plugins premium WordPress