How I Dockerized Apache Flink, Kafka, and PostgreSQL for Real-Time Data Streaming
Integrating pyFlink, Kafka, and PostgreSQL using DockerGet your pyFlink applications ready using docker — author generated image using https://www.dall-efree.com/Why Read This?Real-World Insights: Get practical tips from my personal journey of overcoming integration hurdles.Complete Setup: Learn how to integrate Flink, Kafka, and PostgreSQL seamlessly using Docker-Compose.Step-by-Step Guide: Perfect for both beginners and experienced developers looking to streamline their data streaming stack.Setting Up the SceneI embarked on a mission to integrate Apache Flink with Kafka and PostgreSQL using Docker. What makes this endeavor particularly exciting is the use of pyFlink — the Python flavor of Flink — which is both powerful and relatively rare. This setup aims to handle real-time data processing and storage efficiently. In the following sections, I’ll demonstrate how I achieved this, discussing the challenges encountered and how I overcame them. I’ll conclude with a step-by-step guide so you can build and experiment with this streaming pipeline yourself.The infrastructure we’ll build is illustrated below. Externally, there’s a publisher module that simulates IoT sensor messages, similar to what was discussed in a previous post. Inside the Docker container, we will create two Kafka topics. The first topic, sensors, will store incoming messages from IoT devices in real-time. A Flink application will then consume messages from this topic, filter those with temperatures above 30°C, and publish them to a second topic, alerts. Additionally, the Flink application will insert the consumed messages into a PostgreSQL table created specifically for this purpose. This setup allows us to persist sensor data in a structured, tabular format, providing opportunities for further transformation and analysis. Visualization tools like Tableau or Power BI can be connected to this data for real-time plotting and dashboards.Moreover, the alerts topic can be consumed by other clients to initiate actions based on the messages it holds, such as activating air conditioning systems or triggering fire safety protocols.Services included in the docker container — image by authorIn order to follow up the tutorial, you can clone the following repo. A docker-compose.yml is placed in the root of the project so you can initialize the multi-container application. Furthermore, you can find detailed instructions in the README file.Issues With Kafka Ports in docker-compose.ymlInitially, I encountered problems with Kafka’s port configuration when using the confluentinc Kafka Docker image, a popular choice for such setups. This issue became apparent through the logs, emphasizing the importance of not running docker-compose up in detached mode (-d) during initial setup and troubleshooting phases.The reason for the failure was that the internal and external hosts were using the same port, which led to connectivity problems. I fixed this by changing the internal port to 19092. I’ve found this blog post pretty clarifying.KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092,PLAINTEXT_HOST://localhost:9092Configuring Flink in Session ModeTo run Flink in session mode (allowing multiple jobs in a single cluster), I’m using the following directives in the docker-compose.yml.https://medium.com/media/782b0c801dae990cb0589055d18c21c6/hrefCustom Docker Image for PyFlinkGiven the limitations of the default Apache Flink Docker image, which doesn’t include Python support, I created a custom Docker image for pyFlink. This custom image ensures that Flink can run Python jobs and includes the necessary dependencies for integration with Kafka and PostgreSQL. The Dockerfile used for this is located in the pyflink subdirectory.https://medium.com/media/148bf4c640f1747b197bdfc004e94953/hrefBase Image: We start with the official Flink image.Python Installation: Python and pip are installed, upgrading pip to the latest version.Dependency Management: Dependencies are installed via requirements.txt. Alternatively, lines are commented to demonstrate how to manually install dependencies from local files, useful for deployment in environments without internet access.Connector Libraries: Connectors for Kafka and PostgreSQL are downloaded directly into the Flink lib directory. This enables Flink to interact with Kafka and PostgreSQL during job execution.Script Copying: Scripts from the repository are copied into the /opt/flink directory to be executed by the Flink task manager.With this custom Docker image, we ensure pyFlink can run properly within the Docker container, equipped with the necessary libraries to interact with Kafka and PostgreSQL seamlessly. This approach provides flexibility and is suitable for both development and production environments.Note: Ensure that any network or security considerations for downloading connectors and other dependencies are addressed according to your deployment environment’s policies.Integrating PostgreSQLTo connect Apache Flink to the PostgreSQL database, a proper JDBC connector is required. The custom Docker image for pyFlink downloads the JDBC connector for PostgreSQL, which is compatible with PostgreSQL 16.To simplify this process, a download_libs.sh script is included in the repository, mirroring the actions performed in the Flink Docker container. This script automates the download of the necessary libraries, ensuring consistency between the Docker and local environments.Note: Connectors usually have two versions. In this particular case, since I’m using Flink 1.18, the latest stable version available, I’ve downloaded 3.1.2–1.18. My guess is that the first version tracks JDBC implementation for several databases. They’re available in the maven directory.env.add_jars( f"file://{current_dir}/flink-connector-jdbc-3.1.2–1.18.jar", f"file://{current_dir}/postgresql-42.7.3.jar")Defining JDBC SinkIn our Flink task, there’s a crucial function named configure_postgre_sink located in the usr_jobs/postgres_sink.py file. This function is responsible for configuring a generic PostgreSQL sink. To use it effectively, you need to provide the SQL Data Manipulation Language (DML) statement and the corresponding value types. The types used in the streaming data are defined as TYPE_INFO … it took me a while to come up with the correct declaration 😅.Notice also that the JdbcSink has an optional parameter to define the ExecutionOptions. For this particular case, I’ll use an update interval of 1 second and limit the amount of rows to 200. You can find more information in the official documentation. Yes, you guessed it, since I’m defining an interval, this can be considered a micro-batch ETL. However, due to Flink parallelism you can handle multiple streams at once in a simple script which is at the same time, easy to follow.https://medium.com/media/b35f42f394f1e542537a01ab328ca53b/hrefNote: Don’t forget to create the raw_sensors_data table in Postgres, where raw data coming from the IoT sensors will be received. This is covered in the step-by-step guide in the sections below.Sinking Data to KafkaI’ve covered how to consume data from a Kafka topic in a previous discussion. However, I haven’t configured a sink yet and that’s what we’ll do. The configuration has some intricacies and it’s defined in a function, similarly to the Postgres sink. Additionally, you have to define the type for the data stream before sinking it to Kafka. Notice that the alarms_data stream is properly casted as a string with output_type=Types.STRING() before sinking it to Kafka, since I’ve declared the serializer as SimpleStringSchema().https://medium.com/media/1083c8b9a478512580e779af00c647c8/hrefI’ll show you how to fetch data from the alerts topic in the following steps.Local or Containerized configurationOne of the greatest things about this docker configuration is that you can run Flink from local or inside the container as a managed task. The local Flink setup is depicted in the following figure, where you can see our Flink application detached from the docker container. This may help to troubleshoot Flink, which doesn’t have a good suite of native observability tools. Actually, we would like to give a try to datorios tools for Flink, they are very promising for monitoring purposes.Runing Flink applications in local with other services running inside the container — image by authorIf you want to try the Flink application locally, you have to correctly define the hosts and ports used by the script which actually are two constants in the usr_jobs/postgres_sink.py file:For container run, use:KAFKA_HOST = "kafka:19092"POSTGRES_HOST = "postgres:5432"For local run, use:KAFKA_HOST = "localhost:9092"POSTGRES_HOST = "localhost:5432"By default the repo sets up the Flink application to run inside the container. You can monitor the jobs running using the web UI, accessing from http://localhost:8081. You won’t be able to see it if you choose to run the job locally.Screenshot of the Flink web UI with the running job — image by authorNote: If you run the job locally, you need to install the Flink dependencies located in the requirements.txt. Also a pyproject.toml file is provided if you like to set up the environment with poetry.Step-by-Step Guide to Run the Streaming PipelineStep 1: Launch the multi-container applicationLaunch the containers by running docker-compose. I preferred to do it without detached mode to see the logs while the containers are spinning up and then running.docker-compose upCheck for the logs to see if the services are running properly.Step 2: Create the Kafka topicsNext, we’re going to create the topics to receive data from the IoT sensors and store the alerts filtered by the Flink application.docker-compose exec kafka kafka-topics \ -- create - topic sensors \ -- bootstrap-server localhost:9092 \ -- partitions 1 \ -- replication-factor 1docker-compose exec kafka kafka-topics \ -- create - topic alerts \ -- bootstrap-server localhost:9092 \ -- partitions 1 \ -- replication-factor 1To check if the topics were created correctly you can execute the following commanddocker-compose exec kafka kafka-topics \ -- bootstrap-server localhost:9092 \ -- listStep 3: Create Postgres tableLogin to the postgres consolepsql -h localhost -U flinkuser -d flinkdbEnter the password flinkpassword to log into the postgres console, remember this is a local configuration so default access has been configured in the docker-compose.yml. Then create the tableCREATE TABLE raw_sensors_data (message_id VARCHAR(255) PRIMARY KEY,sensor_id INT NOT NULL,message TEXT NOT NULL,timestamp TIMESTAMPTZ NOT NULL);You can check if the table is properly created by doing the followingflinkdb=# \d raw_sensors_dataThis will show you a result similar to the following one:Step 4: Launching the Kafka producerCreate a local environment with conda or poetry and install python kafka package:pip install kafka-pythonThen execute the kafka producer, which mimics IoT sensor messages and publishes messages to the sensors topic.python pyflink/usr_jobs/kafka_producer.pyLeave it running for the rest of the tutorial.Step 5: Initializing the Flink taskWe’re going to launch the Flink application from within the container, so you can monitor it from the web UI through localhost:8081. Run the following command from the repository root:docker-compose exec flink-jobmanager flink run \ -py /opt/flink/usr_jobs/postgres_sink.pyYou’ll see some logging information, additionally alerts will also be displayed in the flink-jobmanager container logs. Also, you can check if the job is running from the Flink web UI http://localhost:8081/#/job/running.Details of running job — image by authorApparently the monitoring tells that there are no messages going through the Flink job, which is not true, since alerts can be seen in the docker log.We’ll check the messages using the Postgres table and read the alerts topic, which were created for this purpose.Step 6: Read Alerts in Kafka TopicTo read data in the alerts topic, you can execute the following command:docker-compose exec kafka kafka-console-consumer \ -- bootstrap-server localhost:9092 \ -- topic alerts \ -- from-beginningThat will bring all the messages that the topic has received so far.Step 7: Read raw data from Postgres tableAdditionally you can query the raw messages from the IoT sensor and even parse the JSON data in PostgreSQL:SELECT *, (message::json->>'temperature')::numeric as temperatureFROM raw_sensors_dataLIMIT 10;Step 8: Stopping ServicesYou can easily stop everything by doing ctrl-c on the docker terminal. If you prefer, to make proper shutdown, proceed with the following steps:Cancel the Flink job by clicking in the top right corner of job details in the web UI.Stop the kafka_producer.py script which was running locally.Ctrl-c on the docker terminal to stop the servicesThe information exchanged in the session, while the services were running, is permanently stored. So in the case you want to query the Postgres table or the Kafka topics, the data is going to be there.Insights on Using Multiple Sinks in a PyFlink JobIn the Flink job used for demonstration, I’m managing 2 data streams simultaneously, in the same task. The one that writes raw data coming from the sensors topic (IoT devices) and the filtered alerts which are set to another topic. This has some advantages and drawbacks, as a simple summary, here are the pros and cons:Pros of Single Job with Multiple Sinks:- Simplicity in resource management.- Consistency in data flow.Cons of Single Job:- Can become complex as logic grows.- Scalability might be an issue.Pros of Multiple Jobs:- Better fault isolation.- Focused optimization.Cons of Multiple Jobs:- Resource overhead.- Coordination complexity.ConclusionThis setup offers a robust solution for real-time data streaming and processing, integrating Flink, Kafka, and PostgreSQL effectively. The main purpose of using Postgres in the loop is to check the raw messages coming from the IoT devices without relying on queries to the topic itself. It also helped to demonstrate how to sink data using a JDBC connector, which might be pretty standard. The message transformations were done using the DataStream API. I would like to dive further into the SQL API which introduces a friendlier interface. Finally, regarding how to manage data streams, choose between single or multiple jobs based on your specific requirements ensuring scalability and maintainability.Next Steps1. Use SQL API to make transformations.2. Optimize resource usage based on job complexity.3. Explore advanced Flink features for complex data processing tasks.Happy streaming! 🚀Stay tuned for more tutorials on integrating and scaling data engineering solutions with Docker!Feel free to reach out for any questions or suggestions in the comments below!Ready to Optimize Your Streaming Data Applications?Unlock the full potential of your data with our expert consulting services, tailored for streaming data applications. Whether you’re looking to enhance real-time analytics, streamline data pipelines, or optimize performance, we’re here to help.Referenceshttps://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/https://mvnrepository.com/https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/jdbc/https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#session-modehttps://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-dockerhttps://medium.com/@sant1/flink-docker-kafka-faee9c0f1580How I Dockerized Apache Flink, Kafka, and PostgreSQL for Real-Time Data Streaming was originally published in Towards Data Science on Medium, where people are continuing the conversation by highlighting and responding to this story.
Welcome to Billionaire Club Co LLC, your gateway to a brand-new social media experience! Sign up today and dive into over 10,000 fresh daily articles and videos curated just for your enjoyment. Enjoy the ad free experience, unlimited content interactions, and get that coveted blue check verification—all for just $1 a month!
Account Frozen
Your account is frozen. You can still view content but cannot interact with it.
Please go to your settings to update your account status.
Open Profile Settings