{"id":176395,"date":"2024-07-04T09:39:13","date_gmt":"2024-07-04T12:39:13","guid":{"rendered":"https:\/\/www.profissionaloracle.com.br\/?p=176395"},"modified":"2024-07-04T09:39:18","modified_gmt":"2024-07-04T12:39:18","slug":"kafka-confluent-platform-e-apache-flink-uma-combinacao-perfeita-parte-v-iniciando-com-o-apache-flink","status":"publish","type":"post","link":"https:\/\/www.profissionaloracle.com.br\/2024\/07\/04\/kafka-confluent-platform-e-apache-flink-uma-combinacao-perfeita-parte-v-iniciando-com-o-apache-flink\/","title":{"rendered":"Kafka, Confluent Platform e Apache Flink: Uma combina\u00e7\u00e3o perfeita ! | Parte V \u2013 Iniciando com o Apache Flink"},"content":{"rendered":"\n

<\/p>\n\n\n\n

Kafka, Confluent Platform e Apache Flink: Uma combina\u00e7\u00e3o perfeita ! | Parte V \u2013 Iniciando com o Apache Flink<\/h3>\n\n\n\n

No artigo anterior, discutimos o Rest Proxy e testamos sua funcionalidade utilizando o Confluent CLI<\/a>. Mas agora, o t\u00e3o esperado momento chegou! Vamos come\u00e7ar a utilizar o Flink. No entanto (sempre h\u00e1 um por\u00e9m), precisaremos fazer uma altera\u00e7\u00e3o em nosso arquivo docker-compose.yml<\/code>.<\/p>\n\n\n\n

N\u00e3o fiquem bravos, essas mudan\u00e7as ser\u00e3o ben\u00e9ficas para que tenhamos mais traquilidade nos pr\u00f3ximos cap\u00edtulos de nossa saga.<\/p>\n\n\n\n

Abaixo o nosso novo docker-compose.yml<\/code><\/strong><\/p>\n\n\n\n

YAML<\/span><\/path><\/path><\/svg><\/span>
---<\/span><\/span>\nservices<\/span>:<\/span><\/span>\n<\/span>\n  <\/span>broker<\/span>:<\/span><\/span>\n    <\/span>image<\/span>:<\/span> <\/span>confluentinc\/cp-server:7.5.1<\/span><\/span>\n    <\/span>hostname<\/span>:<\/span> <\/span>broker<\/span><\/span>\n    <\/span>container_name<\/span>:<\/span> <\/span>broker<\/span><\/span>\n    <\/span>ports<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>"<\/span>9092:9092<\/span>"<\/span><\/span>\n      <\/span>-<\/span> <\/span>"<\/span>9101:9101<\/span>"<\/span><\/span>\n    <\/span>environment<\/span>:<\/span><\/span>\n      <\/span>KAFKA_NODE_ID<\/span>:<\/span> <\/span>1<\/span><\/span>\n      <\/span>KAFKA_LISTENER_SECURITY_PROTOCOL_MAP<\/span>:<\/span> <\/span>'<\/span>CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT<\/span>'<\/span><\/span>\n      <\/span>KAFKA_ADVERTISED_LISTENERS<\/span>:<\/span> <\/span>'<\/span>PLAINTEXT:\/\/broker:29092,PLAINTEXT_HOST:\/\/localhost:9092<\/span>'<\/span><\/span>\n      <\/span>KAFKA_METRIC_REPORTERS<\/span>:<\/span> <\/span>io.confluent.metrics.reporter.ConfluentMetricsReporter<\/span><\/span>\n      <\/span>KAFKA_CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS<\/span>:<\/span> <\/span>'<\/span>broker:9092<\/span>'<\/span><\/span>\n      <\/span>KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR<\/span>:<\/span> <\/span>1<\/span><\/span>\n      <\/span>KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS<\/span>:<\/span> <\/span>0<\/span><\/span>\n      <\/span>KAFKA_TRANSACTION_STATE_LOG_MIN_ISR<\/span>:<\/span> <\/span>1<\/span><\/span>\n      <\/span>KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR<\/span>:<\/span> <\/span>1<\/span><\/span>\n      <\/span>KAFKA_JMX_PORT<\/span>:<\/span> <\/span>9101<\/span><\/span>\n      <\/span>KAFKA_JMX_HOSTNAME<\/span>:<\/span> <\/span>localhost<\/span><\/span>\n      <\/span>KAFKA_PROCESS_ROLES<\/span>:<\/span> <\/span>'<\/span>broker,controller<\/span>'<\/span><\/span>\n      <\/span>KAFKA_CONTROLLER_QUORUM_VOTERS<\/span>:<\/span> <\/span>'<\/span>1@broker:29093<\/span>'<\/span><\/span>\n      <\/span>KAFKA_LISTENERS<\/span>:<\/span> <\/span>'<\/span>PLAINTEXT:\/\/broker:29092,CONTROLLER:\/\/broker:29093,PLAINTEXT_HOST:\/\/0.0.0.0:9092<\/span>'<\/span><\/span>\n      <\/span>KAFKA_INTER_BROKER_LISTENER_NAME<\/span>:<\/span> <\/span>'<\/span>PLAINTEXT<\/span>'<\/span><\/span>\n      <\/span>KAFKA_CONTROLLER_LISTENER_NAMES<\/span>:<\/span> <\/span>'<\/span>CONTROLLER<\/span>'<\/span><\/span>\n      <\/span>KAFKA_LOG_DIRS<\/span>:<\/span> <\/span>'<\/span>\/tmp\/kraft-combined-logs<\/span>'<\/span><\/span>\n      <\/span># Replace CLUSTER_ID with a unique base64 UUID using "bin\/kafka-storage.sh random-uuid" <\/span><\/span>\n      <\/span># See https:\/\/docs.confluent.io\/kafka\/operations-tools\/kafka-tools.html#kafka-storage-sh<\/span><\/span>\n      <\/span>CLUSTER_ID<\/span>:<\/span> <\/span>'<\/span>MkU3OEVBNTcwNTJENDM2Qk<\/span>'<\/span><\/span>\n<\/span>\n  <\/span>schema-registry<\/span>:<\/span><\/span>\n    <\/span>image<\/span>:<\/span> <\/span>confluentinc\/cp-schema-registry:7.5.1<\/span><\/span>\n    <\/span>hostname<\/span>:<\/span> <\/span>schema-registry<\/span><\/span>\n    <\/span>container_name<\/span>:<\/span> <\/span>schema-registry<\/span><\/span>\n    <\/span>depends_on<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>broker<\/span><\/span>\n    <\/span>ports<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>"<\/span>8081:8081<\/span>"<\/span><\/span>\n    <\/span>environment<\/span>:<\/span><\/span>\n      <\/span>SCHEMA_REGISTRY_HOST_NAME<\/span>:<\/span> <\/span>schema-registry<\/span><\/span>\n      <\/span>SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS<\/span>:<\/span> <\/span>'<\/span>broker:29092<\/span>'<\/span><\/span>\n      <\/span>SCHEMA_REGISTRY_LISTENERS<\/span>:<\/span> <\/span>http:\/\/0.0.0.0:8081<\/span><\/span>\n<\/span>\n  <\/span>connect<\/span>:<\/span><\/span>\n    <\/span>image<\/span>:<\/span> <\/span>cnfldemos\/cp-server-connect-datagen:0.6.2-7.5.0<\/span><\/span>\n    <\/span>hostname<\/span>:<\/span> <\/span>connect<\/span><\/span>\n    <\/span>container_name<\/span>:<\/span> <\/span>connect<\/span><\/span>\n    <\/span>depends_on<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>broker<\/span><\/span>\n      <\/span>-<\/span> <\/span>schema-registry<\/span><\/span>\n    <\/span>ports<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>"<\/span>8083:8083<\/span>"<\/span><\/span>\n    <\/span>volumes<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>workarea:\/app\/workarea<\/span><\/span>\n    <\/span>environment<\/span>:<\/span><\/span>\n      <\/span>CONNECT_BOOTSTRAP_SERVERS<\/span>:<\/span> <\/span>'<\/span>broker:29092<\/span>'<\/span><\/span>\n      <\/span>CONNECT_REST_ADVERTISED_HOST_NAME<\/span>:<\/span> <\/span>connect<\/span><\/span>\n      <\/span>CONNECT_GROUP_ID<\/span>:<\/span> <\/span>compose-connect-group<\/span><\/span>\n      <\/span>CONNECT_CONFIG_STORAGE_TOPIC<\/span>:<\/span> <\/span>docker-connect-configs<\/span><\/span>\n      <\/span>CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR<\/span>:<\/span> <\/span>1<\/span><\/span>\n      <\/span>CONNECT_OFFSET_FLUSH_INTERVAL_MS<\/span>:<\/span> <\/span>10000<\/span><\/span>\n      <\/span>CONNECT_OFFSET_STORAGE_TOPIC<\/span>:<\/span> <\/span>docker-connect-offsets<\/span><\/span>\n      <\/span>CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR<\/span>:<\/span> <\/span>1<\/span><\/span>\n      <\/span>CONNECT_STATUS_STORAGE_TOPIC<\/span>:<\/span> <\/span>docker-connect-status<\/span><\/span>\n      <\/span>CONNECT_STATUS_STORAGE_REPLICATION_FACTOR<\/span>:<\/span> <\/span>1<\/span><\/span>\n      <\/span>CONNECT_KEY_CONVERTER<\/span>:<\/span> <\/span>org.apache.kafka.connect.storage.StringConverter<\/span><\/span>\n      <\/span>CONNECT_VALUE_CONVERTER<\/span>:<\/span> <\/span>io.confluent.connect.avro.AvroConverter<\/span><\/span>\n      <\/span>CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL<\/span>:<\/span> <\/span>http:\/\/schema-registry:8081<\/span><\/span>\n      <\/span># CLASSPATH required due to CC-2422<\/span><\/span>\n      <\/span>CLASSPATH<\/span>:<\/span> <\/span>\/usr\/share\/java\/monitoring-interceptors\/monitoring-interceptors-7.5.0.jar<\/span><\/span>\n      <\/span>CONNECT_PRODUCER_INTERCEPTOR_CLASSES<\/span>:<\/span> <\/span>"<\/span>io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor<\/span>"<\/span><\/span>\n      <\/span>CONNECT_CONSUMER_INTERCEPTOR_CLASSES<\/span>:<\/span> <\/span>"<\/span>io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor<\/span>"<\/span><\/span>\n      <\/span>CONNECT_PLUGIN_PATH<\/span>:<\/span> <\/span>"<\/span>\/usr\/share\/java,\/usr\/share\/confluent-hub-components<\/span>"<\/span><\/span>\n      <\/span>CONNECT_LOG4J_LOGGERS<\/span>:<\/span> <\/span>org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR<\/span><\/span>\n    <\/span>command<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>bash<\/span><\/span>\n      <\/span>-<\/span> <\/span>-c<\/span><\/span>\n      <\/span>-<\/span> <\/span>|<\/span><\/span>\n        echo "Installing Connector"<\/span><\/span>\n        confluent-hub install --no-prompt \/app\/workarea\/confluent-connectors\/confluentinc-kafka-connect-jdbc-10.7.6.zip<\/span><\/span>\n        confluent-hub install --no-prompt \/app\/workarea\/confluent-connectors\/confluentinc-kafka-connect-http-1.7.6.zip<\/span><\/span>\n        confluent-hub install --no-prompt \/app\/workarea\/confluent-connectors\/confluentinc-kafka-connect-jdbc-10.7.6.zip<\/span><\/span>\n        confluent-hub install --no-prompt \/app\/workarea\/confluent-connectors\/confluentinc-kafka-connect-oracle-cdc-2.2.1.zip<\/span><\/span>\n        confluent-hub install --no-prompt \/app\/workarea\/confluent-connectors\/confluentinc-kafka-connect-sftp-3.2.4.zip<\/span><\/span>\n        confluent-hub install --no-prompt \/app\/workarea\/confluent-connectors\/jcustenborder-kafka-connect-spooldir-2.0.65.zip<\/span><\/span>\n        confluent-hub install --no-prompt \/app\/workarea\/confluent-connectors\/streamthoughts-kafka-connect-file-pulse-2.10.0.zip<\/span><\/span>\n        #<\/span><\/span>\n        echo "Launching Kafka Connect worker"<\/span><\/span>\n        \/etc\/confluent\/docker\/run &<\/span><\/span>\n        #<\/span><\/span>\n        sleep infinity<\/span><\/span>\n<\/span>\n  <\/span>control-center<\/span>:<\/span><\/span>\n    <\/span>image<\/span>:<\/span> <\/span>confluentinc\/cp-enterprise-control-center:7.5.1<\/span><\/span>\n    <\/span>hostname<\/span>:<\/span> <\/span>control-center<\/span><\/span>\n    <\/span>container_name<\/span>:<\/span> <\/span>control-center<\/span><\/span>\n    <\/span>depends_on<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>broker<\/span><\/span>\n      <\/span>-<\/span> <\/span>schema-registry<\/span><\/span>\n      <\/span>-<\/span> <\/span>connect<\/span><\/span>\n    <\/span>ports<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>"<\/span>9021:9021<\/span>"<\/span><\/span>\n    <\/span>environment<\/span>:<\/span><\/span>\n      <\/span>CONTROL_CENTER_BOOTSTRAP_SERVERS<\/span>:<\/span> <\/span>'<\/span>broker:29092<\/span>'<\/span><\/span>\n      <\/span>CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER<\/span>:<\/span> <\/span>'<\/span>connect:8083<\/span>'<\/span><\/span>\n      <\/span>CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT<\/span>:<\/span> <\/span>'<\/span>\/connectors<\/span>'<\/span><\/span>\n      <\/span>CONTROL_CENTER_KSQL_KSQLDB1_URL<\/span>:<\/span> <\/span>"<\/span>http:\/\/ksqldb-server:8088<\/span>"<\/span><\/span>\n      <\/span>CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL<\/span>:<\/span> <\/span>"<\/span>http:\/\/localhost:8088<\/span>"<\/span><\/span>\n      <\/span>CONTROL_CENTER_SCHEMA_REGISTRY_URL<\/span>:<\/span> <\/span>"<\/span>http:\/\/schema-registry:8081<\/span>"<\/span><\/span>\n      <\/span>CONTROL_CENTER_REPLICATION_FACTOR<\/span>:<\/span> <\/span>1<\/span><\/span>\n      <\/span>CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS<\/span>:<\/span> <\/span>1<\/span><\/span>\n      <\/span>CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS<\/span>:<\/span> <\/span>1<\/span><\/span>\n      <\/span>CONFLUENT_METRICS_TOPIC_REPLICATION<\/span>:<\/span> <\/span>1<\/span><\/span>\n      <\/span>PORT<\/span>:<\/span> <\/span>9021<\/span><\/span>\n<\/span>\n  <\/span>ksqldb-server<\/span>:<\/span><\/span>\n    <\/span>image<\/span>:<\/span> <\/span>confluentinc\/cp-ksqldb-server:7.5.1<\/span><\/span>\n    <\/span>hostname<\/span>:<\/span> <\/span>ksqldb-server<\/span><\/span>\n    <\/span>container_name<\/span>:<\/span> <\/span>ksqldb-server<\/span><\/span>\n    <\/span>depends_on<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>broker<\/span><\/span>\n      <\/span>-<\/span> <\/span>connect<\/span><\/span>\n    <\/span>ports<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>"<\/span>8088:8088<\/span>"<\/span><\/span>\n    <\/span>environment<\/span>:<\/span><\/span>\n      <\/span>KSQL_CONFIG_DIR<\/span>:<\/span> <\/span>"<\/span>\/etc\/ksql<\/span>"<\/span><\/span>\n      <\/span>KSQL_BOOTSTRAP_SERVERS<\/span>:<\/span> <\/span>"<\/span>broker:29092<\/span>"<\/span><\/span>\n      <\/span>KSQL_HOST_NAME<\/span>:<\/span> <\/span>ksqldb-server<\/span><\/span>\n      <\/span>KSQL_LISTENERS<\/span>:<\/span> <\/span>"<\/span>http:\/\/0.0.0.0:8088<\/span>"<\/span><\/span>\n      <\/span>KSQL_CACHE_MAX_BYTES_BUFFERING<\/span>:<\/span> <\/span>0<\/span><\/span>\n      <\/span>KSQL_KSQL_SCHEMA_REGISTRY_URL<\/span>:<\/span> <\/span>"<\/span>http:\/\/schema-registry:8081<\/span>"<\/span><\/span>\n      <\/span>KSQL_PRODUCER_INTERCEPTOR_CLASSES<\/span>:<\/span> <\/span>"<\/span>io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor<\/span>"<\/span><\/span>\n      <\/span>KSQL_CONSUMER_INTERCEPTOR_CLASSES<\/span>:<\/span> <\/span>"<\/span>io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor<\/span>"<\/span><\/span>\n      <\/span>KSQL_KSQL_CONNECT_URL<\/span>:<\/span> <\/span>"<\/span>http:\/\/connect:8083<\/span>"<\/span><\/span>\n      <\/span>KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR<\/span>:<\/span> <\/span>1<\/span><\/span>\n      <\/span>KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE<\/span>:<\/span> <\/span>'<\/span>true<\/span>'<\/span><\/span>\n      <\/span>KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE<\/span>:<\/span> <\/span>'<\/span>true<\/span>'<\/span><\/span>\n<\/span>\n  <\/span>ksqldb-cli<\/span>:<\/span><\/span>\n    <\/span>image<\/span>:<\/span> <\/span>confluentinc\/cp-ksqldb-cli:7.5.1<\/span><\/span>\n    <\/span>container_name<\/span>:<\/span> <\/span>ksqldb-cli<\/span><\/span>\n    <\/span>depends_on<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>broker<\/span><\/span>\n      <\/span>-<\/span> <\/span>connect<\/span><\/span>\n      <\/span>-<\/span> <\/span>ksqldb-server<\/span><\/span>\n    <\/span>entrypoint<\/span>:<\/span> <\/span>\/bin\/sh<\/span><\/span>\n    <\/span>tty<\/span>:<\/span> <\/span>true<\/span><\/span>\n<\/span>\n  <\/span>ksql-datagen<\/span>:<\/span><\/span>\n    <\/span>image<\/span>:<\/span> <\/span>confluentinc\/ksqldb-examples:7.5.1<\/span><\/span>\n    <\/span>hostname<\/span>:<\/span> <\/span>ksql-datagen<\/span><\/span>\n    <\/span>container_name<\/span>:<\/span> <\/span>ksql-datagen<\/span><\/span>\n    <\/span>depends_on<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>ksqldb-server<\/span><\/span>\n      <\/span>-<\/span> <\/span>broker<\/span><\/span>\n      <\/span>-<\/span> <\/span>schema-registry<\/span><\/span>\n      <\/span>-<\/span> <\/span>connect<\/span><\/span>\n    <\/span>command<\/span>:<\/span> <\/span>"<\/span>bash -c 'echo Waiting for Kafka to be ready... && <\/span>\\<\/span><\/span>\n                       cub kafka-ready -b broker:29092 1 40 && <\/span>\\<\/span><\/span>\n                       echo Waiting for Confluent Schema Registry to be ready... && <\/span>\\<\/span><\/span>\n                       cub sr-ready schema-registry 8081 40 && <\/span>\\<\/span><\/span>\n                       echo Waiting a few seconds for topic creation to finish... && <\/span>\\<\/span><\/span>\n                       sleep 11 && <\/span>\\<\/span><\/span>\n                       tail -f \/dev\/null'<\/span>"<\/span><\/span>\n    <\/span>environment<\/span>:<\/span><\/span>\n      <\/span>KSQL_CONFIG_DIR<\/span>:<\/span> <\/span>"<\/span>\/etc\/ksql<\/span>"<\/span><\/span>\n      <\/span>STREAMS_BOOTSTRAP_SERVERS<\/span>:<\/span> <\/span>broker:29092<\/span><\/span>\n      <\/span>STREAMS_SCHEMA_REGISTRY_HOST<\/span>:<\/span> <\/span>schema-registry<\/span><\/span>\n      <\/span>STREAMS_SCHEMA_REGISTRY_PORT<\/span>:<\/span> <\/span>8081<\/span><\/span>\n<\/span>\n  <\/span>rest-proxy<\/span>:<\/span><\/span>\n    <\/span>image<\/span>:<\/span> <\/span>confluentinc\/cp-kafka-rest:7.5.1<\/span><\/span>\n    <\/span>depends_on<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>broker<\/span><\/span>\n      <\/span>-<\/span> <\/span>schema-registry<\/span><\/span>\n    <\/span>ports<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>8082:8082<\/span><\/span>\n    <\/span>hostname<\/span>:<\/span> <\/span>rest-proxy<\/span><\/span>\n    <\/span>container_name<\/span>:<\/span> <\/span>rest-proxy<\/span><\/span>\n    <\/span>environment<\/span>:<\/span><\/span>\n      <\/span>KAFKA_REST_HOST_NAME<\/span>:<\/span> <\/span>rest-proxy<\/span><\/span>\n      <\/span>KAFKA_REST_BOOTSTRAP_SERVERS<\/span>:<\/span> <\/span>'<\/span>broker:29092<\/span>'<\/span><\/span>\n      <\/span>KAFKA_REST_LISTENERS<\/span>:<\/span> <\/span>"<\/span>http:\/\/0.0.0.0:8082<\/span>"<\/span><\/span>\n      <\/span>KAFKA_REST_SCHEMA_REGISTRY_URL<\/span>:<\/span> <\/span>'<\/span>http:\/\/schema-registry:8081<\/span>'<\/span><\/span>\n<\/span>\n  <\/span>flink-init<\/span>:<\/span><\/span>\n    <\/span>image<\/span>:<\/span> <\/span>busybox<\/span><\/span>\n    <\/span>container_name<\/span>:<\/span> <\/span>flink-init<\/span><\/span>\n    <\/span>command<\/span>:<\/span> <\/span>><\/span><\/span>\n      sh -c "<\/span><\/span>\n      echo 'Downloading Flink Kafka connector...' &&<\/span><\/span>\n      wget -O \/opt\/flink\/plugins\/kafka\/flink-sql-connector-kafka-3.2.0-1.19.jar https:\/\/repo1.maven.org\/maven2\/org\/apache\/flink\/flink-sql-connector-kafka\/3.2.0-1.19\/flink-sql-connector-kafka-3.2.0-1.19.jar --no-check-certificate &&<\/span><\/span>\n      echo 'Connector downloaded successfully.'<\/span><\/span>\n      "<\/span><\/span>\n    <\/span>volumes<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>flink-plugins:\/opt\/flink\/plugins\/kafka<\/span><\/span>\n    <\/span>depends_on<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>broker<\/span><\/span>\n<\/span>\n  <\/span>flink-jobmanager<\/span>:<\/span><\/span>\n    <\/span>image<\/span>:<\/span> <\/span>cnfldemos\/flink-kafka:1.19.1-scala_2.12-java17<\/span><\/span>\n    <\/span>hostname<\/span>:<\/span> <\/span>flink-jobmanager<\/span><\/span>\n    <\/span>container_name<\/span>:<\/span> <\/span>flink-jobmanager<\/span><\/span>\n    <\/span>ports<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>9081:9081<\/span><\/span>\n    <\/span>command<\/span>:<\/span> <\/span>jobmanager<\/span><\/span>\n    <\/span>environment<\/span>:<\/span><\/span>\n      <\/span>FLINK_PROPERTIES<\/span>:<\/span> <\/span>|<\/span><\/span>\n        jobmanager.rpc.address: flink-jobmanager<\/span><\/span>\n        rest.bind-port: 9081<\/span><\/span>\n    <\/span>volumes<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>flink-plugins:\/opt\/flink\/plugins\/kafka<\/span><\/span>\n    <\/span>depends_on<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>flink-init<\/span><\/span>\n<\/span>\n  <\/span>flink-taskmanager<\/span>:<\/span><\/span>\n    <\/span>image<\/span>:<\/span> <\/span>cnfldemos\/flink-kafka:1.19.1-scala_2.12-java17<\/span><\/span>\n    <\/span>hostname<\/span>:<\/span> <\/span>flink-taskmanager<\/span><\/span>\n    <\/span>container_name<\/span>:<\/span> <\/span>flink-taskmanager<\/span><\/span>\n    <\/span>depends_on<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>flink-jobmanager<\/span><\/span>\n    <\/span>command<\/span>:<\/span> <\/span>taskmanager<\/span><\/span>\n    <\/span>scale<\/span>:<\/span> <\/span>1<\/span><\/span>\n    <\/span>environment<\/span>:<\/span><\/span>\n      <\/span>FLINK_PROPERTIES<\/span>:<\/span> <\/span>|<\/span><\/span>\n        jobmanager.rpc.address: flink-jobmanager<\/span><\/span>\n        taskmanager.numberOfTaskSlots: 10<\/span><\/span>\n    <\/span>volumes<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>flink-plugins:\/opt\/flink\/plugins\/kafka<\/span><\/span>\n<\/span>\n  <\/span>flink-sql-client<\/span>:<\/span><\/span>\n    <\/span>image<\/span>:<\/span> <\/span>cnfldemos\/flink-sql-client-kafka:1.19.1-scala_2.12-java17<\/span><\/span>\n    <\/span>hostname<\/span>:<\/span> <\/span>flink-sql-client<\/span><\/span>\n    <\/span>container_name<\/span>:<\/span> <\/span>flink-sql-client<\/span><\/span>\n    <\/span>depends_on<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>flink-jobmanager<\/span><\/span>\n    <\/span>environment<\/span>:<\/span><\/span>\n      <\/span>FLINK_JOBMANAGER_HOST<\/span>:<\/span> <\/span>flink-jobmanager<\/span><\/span>\n    <\/span>volumes<\/span>:<\/span><\/span>\n      <\/span>-<\/span> <\/span>.\/settings\/:\/settings<\/span><\/span>\n      <\/span>-<\/span> <\/span>flink-plugins:\/opt\/flink\/plugins\/kafka<\/span><\/span>\n<\/span>\nvolumes<\/span>:<\/span><\/span>\n  <\/span>workarea<\/span>:<\/span><\/span>\n    <\/span>driver<\/span>:<\/span> <\/span>local<\/span><\/span>\n    <\/span>driver_opts<\/span>:<\/span><\/span>\n      <\/span>type<\/span>:<\/span> <\/span>none<\/span><\/span>\n      <\/span>o<\/span>:<\/span> <\/span>bind<\/span><\/span>\n      <\/span>device<\/span>:<\/span> <\/span>\/home\/swillians\/workarea<\/span><\/span>\n  <\/span>flink-plugins<\/span>:<\/span><\/span>\n    <\/span>driver<\/span>:<\/span> <\/span>local<\/span><\/span><\/code><\/pre><\/div>\n\n\n\n

<\/p>\n\n\n\n

Fiz algumas altera\u00e7\u00f5es no arquivo, para que fosse copiada a vers\u00e3o correta do conector kafka para o Flink<\/strong>. N\u00e3o est\u00e1 muito elegante, inclusive, eu poderia ter utilizado um arquivo .env<\/code><\/strong> para organizar os paths. Mas isso ficar\u00e1 para um outro momento. Prometo que at\u00e9 o final da s\u00e9rie, montaremos um docker-compose.yml<\/code><\/strong> bem interessante !<\/p>\n\n\n\n

Levante o ambiente ! Ap\u00f3s todos os containers subirem, vamos executar o client do Flink:<\/p>\n\n\n\n

Bash<\/span><\/path><\/path><\/svg><\/span>
sudo<\/span> <\/span>docker<\/span> <\/span>compose<\/span> <\/span>exec<\/span> <\/span>flink-sql-client<\/span> <\/span>sql-client.sh<\/span> <\/span>embedded<\/span> <\/span>--library<\/span> <\/span>\/opt\/flink\/plugins\/kafka<\/span><\/span><\/code><\/pre><\/div>\n\n\n\n

<\/p>\n\n\n\n

O comando –library \/opt\/flink\/plugins\/kafka<\/strong> serve para indicar ao client onde os plugins est\u00e3o armazenados.<\/p>\n\n\n

\n
\"\"<\/figure><\/div>\n\n\n

<\/p>\n\n\n\n

Vamos criar a nossa primeira tabela baseada em um t\u00f3pico kafka<\/strong>:<\/p>\n\n\n\n

SQL<\/span><\/path><\/path><\/svg><\/span>
CREATE<\/span> <\/span>TABLE<\/span> <\/span>flink_clients_account_balance<\/span> (<\/span><\/span>\n  first_name STRING,<\/span><\/span>\n  last_name STRING,<\/span><\/span>\n  account_balance <\/span>DECIMAL<\/span>(<\/span>10<\/span>,<\/span>2<\/span>),<\/span><\/span>\n  <\/span>`<\/span>ts<\/span>`<\/span> <\/span>TIMESTAMP<\/span>(<\/span>3<\/span>) METADATA <\/span>FROM<\/span> <\/span>'<\/span>timestamp<\/span>'<\/span>,<\/span><\/span>\n  WATERMARK <\/span>FOR<\/span> <\/span>`<\/span>ts<\/span>`<\/span> <\/span>AS<\/span> <\/span>`<\/span>ts<\/span>`<\/span><\/span>\n) <\/span>WITH<\/span> (<\/span><\/span>\n    <\/span>'<\/span>connector<\/span>'<\/span> <\/span>=<\/span> <\/span>'<\/span>kafka<\/span>'<\/span>, <\/span><\/span>\n    <\/span>'<\/span>topic<\/span>'<\/span> <\/span>=<\/span> <\/span>'<\/span>tsv.spooldirfile.clients<\/span>'<\/span>, <\/span><\/span>\n    <\/span>'<\/span>scan.startup.mode<\/span>'<\/span> <\/span>=<\/span> <\/span>'<\/span>earliest-offset<\/span>'<\/span>, <\/span><\/span>\n    <\/span>'<\/span>properties.bootstrap.servers<\/span>'<\/span> <\/span>=<\/span> <\/span>'<\/span>broker:29092<\/span>'<\/span>, <\/span><\/span>\n    <\/span>'<\/span>value.format<\/span>'<\/span> <\/span>=<\/span> <\/span>'<\/span>json<\/span>'<\/span><\/span>\n);<\/span><\/span><\/code><\/pre><\/div>\n\n\n\n

<\/p>\n\n\n\n

Explicando o c\u00f3digo<\/h4>\n\n\n\n

Defini\u00e7\u00e3o da Tabela e Colunas<\/strong><\/h5>\n\n\n\n
SQL<\/span><\/path><\/path><\/svg><\/span>
CREATE<\/span> <\/span>TABLE<\/span> <\/span>flink_clients_account_balance<\/span><\/span><\/code><\/pre><\/div>\n\n\n\n

<\/p>\n\n\n\n