Kafka, Confluent Platform e Apache Flink: Uma combinação perfeita ! | Parte VI – Apache Flink e PostgreSQL
No artigo anterior, iniciamos com o Flink e o Kafka. Dando continuidade à nossa série de artigos, vamos agora conectar o Flink ao PostgreSQL. Nosso objetivo final é utilizar uma tabela que será criada no PostgreSQL para enriquecer os dados provenientes do Kafka.
O que iremos instalar em nosso ambiente:
- Banco de dados PostgreSQL 13
- PGADMIN
- Driver JDBC PostgreSQL para o Flink
Adivinhe o que precisaremos fazer para instalar mais esses recursos em nosso ambiente ? Alterar o nosso docker-compose.yml
! rs
Mas como prometido, começaremos a melhorá-lo para que tenhamos um docker-compose.yml
mais limpo e fácil de alterar se for necessário. Para isso, introduziremos o arquivo .env
O arquivo .env
no contexto do Docker Compose é um arquivo de configuração usado para definir variáveis de ambiente que podem ser referenciadas no arquivo docker-compose.yml
. Ele permite que você separe a configuração do código e mantenha suas variáveis de ambiente fora do código-fonte, o que facilita a gestão de diferentes configurações para diferentes ambientes (desenvolvimento, teste, produção, etc.). No nosso caso, servirá principalmente para salvar as credenciais de acesso ao banco de dados.
Vamos criar o nosso .env
, com o conteúdo abaixo:
# PostgreSQL
DATABASE_NAME=<preencha aqui>
DATABASE_USER=<preencha aqui>
DATABASE_PASSWORD=<preencha aqui>
# PGADMIN
PGADMIN_USER=<preencha aqui>
PGADMIN_PASSWORD=<preencha aqui>
Onde estiver escrito <preencha aqui>, substitua pelos conteúdos que achar mais conveniente para o seu ambiente. Antes que eu me esqueça, o arquivo .env
tem que estar no mesmo path do docker-compose.yml
.
Agora vamos fazer as mudanças em nosso docker-compose.yml
:
---
services:
postgres:
image: postgres:13
container_name: postgres_container
environment:
POSTGRES_USER: ${DATABASE_USER}
POSTGRES_PASSWORD: ${DATABASE_PASSWORD}
POSTGRES_DB: ${DATABASE_NAME}
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
pgadmin:
image: dpage/pgadmin4
container_name: pgadmin_container
environment:
PGADMIN_DEFAULT_EMAIL: ${PGADMIN_USER}
PGADMIN_DEFAULT_PASSWORD: ${PGADMIN_PASSWORD}
ports:
- "5050:80"
depends_on:
- postgres
volumes:
- pgadmin_data:/var/lib/pgadmin
broker:
image: confluentinc/cp-server:7.5.1
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: 'broker:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093'
KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
# Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid"
# See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
schema-registry:
image: confluentinc/cp-schema-registry:7.5.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.6.2-7.5.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
volumes:
- workarea:/app/workarea
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-http-1.7.6.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-jdbc-10.7.6.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-oracle-cdc-2.2.1.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/confluentinc-kafka-connect-sftp-3.2.4.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/jcustenborder-kafka-connect-spooldir-2.0.65.zip
confluent-hub install --no-prompt /app/workarea/confluent-connectors/streamthoughts-kafka-connect-file-pulse-2.10.0.zip
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
control-center:
image: confluentinc/cp-enterprise-control-center:7.5.1
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: '/connectors'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.5.1
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.5.1
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.5.1
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.5.1
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
flink-init:
image: busybox
container_name: flink-init
command: >
sh -c "
echo 'Downloading Flink Kafka connector...' &&
wget -O /opt/flink/plugins/kafka/flink-sql-connector-kafka-3.2.0-1.19.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar --no-check-certificate &&
echo 'Connector downloaded successfully.'
echo 'Downloading Flink JDBC connector...' &&
wget -O /opt/flink/plugins/kafka/postgresql-42.7.3.jar https://jdbc.postgresql.org/download/postgresql-42.7.3.jar --no-check-certificate &&
echo 'Connector downloaded successfully.'
"
volumes:
- flink-plugins:/opt/flink/plugins/kafka
depends_on:
- broker
flink-jobmanager:
image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
hostname: flink-jobmanager
container_name: flink-jobmanager
ports:
- 9081:9081
command: jobmanager
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
rest.bind-port: 9081
volumes:
- flink-plugins:/opt/flink/plugins/kafka
depends_on:
- flink-init
flink-taskmanager:
image: cnfldemos/flink-kafka:1.19.1-scala_2.12-java17
hostname: flink-taskmanager
container_name: flink-taskmanager
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 10
volumes:
- flink-plugins:/opt/flink/plugins/kafka
flink-sql-client:
image: cnfldemos/flink-sql-client-kafka:1.19.1-scala_2.12-java17
hostname: flink-sql-client
container_name: flink-sql-client
depends_on:
- flink-jobmanager
environment:
FLINK_JOBMANAGER_HOST: flink-jobmanager
volumes:
- ./settings/:/settings
- flink-plugins:/opt/flink/plugins/kafka
volumes:
workarea:
driver: local
driver_opts:
type: none
o: bind
device: /home/swillians/workarea
flink-plugins:
driver: local
pgdata:
pgadmin_data:
Após subir o ambiente, vamos verificar se está tudo certo:
Observe que o pgadmin e o postgres_container subiram juntos com os outros containers. Vamos verificar se conseguimos acessá-los:
http://localhost:5050/login?next=/browser/
Entre com as credenciais que você utilizou no arquivo .env
:
Agora clique em Add New Server para registramos o nosso banco de dados PostgreSQL:
Preencha os campos a seguir. Eu irei colocar a minha configuração, mas você pode substítui-la por aquela que achar mais conveniente para você:
Se atente que o hostname deverá ser o nome do container onde o postgreSQL está rodando:
Clique em Save:
Perfeito ! Agora vamos preparar a nossa tabela e populá-la ! Para isso, primeiro vamos instalar a biblioteca psycopg2 do Python:
pip install psycopg2-binary
A biblioteca psycopg2
(e sua variante pré-compilada psycopg2-binary
) é um adaptador de banco de dados PostgreSQL para a linguagem de programação Python. Ela que nos ajudará a conectar no banco e rodar o nosso script que fará a criação da tabela e também populará os dados.
Você pode reaproveitar o arquivo .env
que utilizou com o docker-compose.yml
para utilizar as credenciais do banco de dados que serão os mesmos. Para isso, copie o arquivo .env
para o mesmo diretório do script Python, e inclua a variável DATABASE_HOST.
# PostgreSQL
DATABASE_HOST=localhost
DATABASE_NAME=<preencha aqui>
DATABASE_USER=<preencha aqui>
DATABASE_PASSWORD=<preencha aqui>
# PGADMIN
PGADMIN_USER=<preencha aqui>
PGADMIN_PASSWORD=<preencha aqui>
Abaixo o nosso script Python chamado populate_table.py
:
import psycopg2
from psycopg2 import sql
import random
from dotenv import load_dotenv
import os
# Carregar variáveis de ambiente do arquivo .env
load_dotenv()
# Configurações de conexão com o banco de dados
DB_HOST = os.getenv("DATABASE_HOST")
DB_NAME = os.getenv("DATABASE_NAME")
DB_USER = os.getenv("DATABASE_USER")
DB_PASS = os.getenv("DATABASE_PASSWORD")
# Conexão com o banco de dados PostgreSQL
conn = psycopg2.connect(
host=DB_HOST,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASS
)
# Criação de um cursor para executar comandos SQL
cur = conn.cursor()
# Comando SQL para criar a tabela CLIENTS_ADDRESS
create_table_query = sql.SQL("""
CREATE TABLE IF NOT EXISTS CLIENTS_ADDRESS (
FIRST_NAME VARCHAR(50),
LAST_NAME VARCHAR(50),
ADDRESS VARCHAR(255)
)
""")
# Execução do comando SQL para criar a tabela
cur.execute(create_table_query)
# Listas de nomes
first_names = ["John", "Jane", "Alice", "Bob", "Eve", "Mike", "Anna", "Tom", "Lucy", "Jack"]
last_names = ["Smith", "Doe", "Johnson", "Brown", "Williams", "Jones", "Garcia", "Miller", "Davis", "Rodriguez"]
# Função para gerar um endereço aleatório
def generate_random_address():
streets = ["Main St", "High St", "Park Ave", "Oak St", "Pine St", "Maple St", "Cedar St", "Elm St", "View St", "Lake St"]
street_number = random.randint(1, 1000)
return f"{street_number} {random.choice(streets)}"
# Inserção dos dados na tabela
for first_name in first_names:
for last_name in last_names:
address = generate_random_address()
insert_query = sql.SQL("""
INSERT INTO CLIENTS_ADDRESS (FIRST_NAME, LAST_NAME, ADDRESS)
VALUES (%s, %s, %s)
""")
cur.execute(insert_query, (first_name, last_name, address))
# Confirmação das alterações no banco de dados
conn.commit()
# Fechamento do cursor e da conexão
cur.close()
conn.close()
print("Tabela CLIENTS_ADDRESS criada e populada com sucesso.")
Vamos executá-lo agora:
python3 populate_table.py
Tabela CLIENTS_ADDRESS criada e populada com sucesso.
Vamos verificar a tabela através do PGADMIN:
Com a nossa tabela criada e populada, vamos voltar ao Flink e criar lá a nossa tabela baseada no PostgreSQL.
Agora vamos executar o sql client do Flink:
sudo docker compose exec flink-sql-client sql-client.sh embedded --library /opt/flink/plugins/kafka
Vamos agora criar a tabela, para isso utilize o comando abaixo:
CREATE TABLE clients_address (
first_name STRING,
last_name STRING,
address STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://postgres_container:5432/flink_db',
'table-name' = 'clients_address',
'username' = '<coloque aqui o username do arquivo .env>',
'password' = '<coloque aqui a senha do arquivo .env>',
'driver' = 'org.postgresql.Driver'
);
Agora vamos dar um SELECT em nossa tabela:
SELECT * FROM clients_address;
E o resultado é:
Parabéns ! A sua tabela no Flink foi criada com sucesso !
No próximo artigo vamos enriquecer os dados que recebemos do Kafka, utilizando os dados do PostgreSQL.
Até lá !
Referências