Use Cases
Use Cases
Complete hands-on workshops for learning Kafka-based streaming data integration with Pentaho Data Integration (PDI) Kafka Enterprise Edition plugin.
Scenario: Basic Kafka Consumer - Real-time User Activity Stream
Ensure the Kafka EE plugin is installed.

Start Pentaho Data Integration.
Select a Use Case:
Scenario: Basic Kafka Consumer - Real-time User Activity Stream
Your company tracks user registrations across web and mobile platforms. User registration events are published to a Kafka topic in real-time.
Your task is to build a streaming pipeline that continuously reads these events, parses the JSON payload, transforms timestamps, and loads the data into a MySQL data warehouse - enabling real-time dashboards and analytics.
Kafka Consumer
The Kafka Consumer step pulls streaming data from Kafka through a transformation. Within the Kafka Consumer step you enter the path that will execute the transformation according to message batch size or duration in near real-time. The child transformation must start with the Get records from stream step.
Additionally, from the Kafka Consumer step, you can select a step in the child transformation to stream records back to the parent transformation. This allows records processed by a Kafka Consumer step in a parent transformation to be passed downstream to any other steps included within the same parent transformation.
Architecture Overview
This workshop uses PDI's parent/child transformation pattern for Kafka streaming:
How it works: The parent transformation's Kafka Consumer step reads messages in batches (every 5 seconds or 100 records, whichever comes first) and passes each batch to the child transformation for processing. The child transformation parses, transforms, and writes each batch to MySQL.
Open the following transformation:
~/Workshop--Data-Integration/Labs/Module 7 - Use Cases/Streaming Data/Kafka/transformations/users-to-db-parent.ktr
Double-click on the Kafka Consumer step to review the properties:
Setup Tab

Transformation
Child transformation to process the records
${Internal.Entry.Current.Directory}/users-to-db-child.ktr
Setup
Connection
Direct: Specify Bootstrap servers.
Cluster: Specify a Hadoop cluster configuration.
localhost:9092
Topics
Kafka topics to consume from
pdi-users
Consumer Group
Each Kafka consumer step starts a single thread. When part of a consumer group, each consumer is assigned a subset of topic partitions.
pdi-warehouse-users
Batch tab
How batching works: Whichever threshold is reached first (duration or record count) triggers the batch to be sent to the child transformation. With pdi-users producing ~1 msg/sec, the 5-second duration will usually trigger first, sending ~5 records per batch.

Duration (ms)
Time (in milliseconds) to collect records before executing the child transformation.
500
Number of records
Number of records to collect before executing the child transformation.
100
Maximum concurrent batches
Maximum number of batches to collect at the same time.
1
Message prefetch limit
Limit for incoming messages to queue for processing.
100000
Offset Management
Commit when record read: Commit offset when a record is read.
Commit when batch completed: Commit offsets after the batch is processed.
Commit when batch completed
Fields tab

Input Name
Incoming fields received from Kafka streams. Default inputs include:
key: Determines message distribution to partitions. If no key is present, messages are randomly distributed.
message: The message value.
topic: Topic name.
partition: Partition number.
offset: Sequential ID that uniquely identifies the record within the partition.
timestamp: Time the message is received on the server.
Output Name
Output field name.
Type
Data Type
Results fields tab

Return fields from
Step name in the child transformation that returns fields to the parent transformation.
Options tab

auto.offset.reset
set the offset from when to process the records: latest, earliest
earliest
Get records from stream
This step returns records that were previously generated by another transformation in a job. The records are passed to this step using one of the streaming input steps (for example, Kinesis consumer, Kafka consumer, etc.).
This step produces one or more rows and cannot be placed within a stream. It must be the first step in a stream since it produces rows. If you want to add this data to an existing stream, you need to use a join step.

Open the following transformation:
~/Workshop--Data-Integration/Labs/Module 7 - Use Cases/Streaming Data/Kafka/transformations/users-to-db-child.ktr
JSON Input
Use the JSON Input step to read data from JSON structures, files, or incoming fields.
The step uses a JSONPath expression to extract data and output rows. JSONPath expressions can use either dot notation or square bracket notation.
Double-click on the JSON Input step to display the settings:
File tab
Ensure: Source is from previous step - enabled.
From the drop down select:
messagefield.

Content tab
Suppress errors

Fields tab
You will have to manually enter the path.

You can get an idea of the JSON object by viewing the messages in the Control Center.
The pdi-users topic receives user registration events at ~1 message/second from the datagen connector.
Sample message:
Field descriptions:
registertime
Long
Registration timestamp (epoch milliseconds)
userid
String
User identifier (e.g., User_1)
regionid
String
Region identifier (e.g., Region_9)
gender
String
Gender (MALE or FEMALE)
Double-click on the step to diaply the properties.
Click on the Metadata tab.

This tab sets the data type and length metadata for each field. This is critical for MySQL - without explicit lengths, PDI maps String fields to TINYTEXT, which breaks MySQL indexes and causes errors like:
user_id
String
100
region_id
String
100
gender
String
20
register_time_epoch
Integer
15
kafka_topic
String
255
kafka_partition
Integer
9
kafka_offset
Integer
15
key
String
100
message
String
5000
timestamp
Integer
15
Why these lengths? They match the MySQL table column definitions: user_id VARCHAR(100), region_id VARCHAR(100), gender VARCHAR(20), kafka_topic VARCHAR(255). Setting the correct lengths ensures PDI generates VARCHAR instead of TINYTEXT.
Formula
The Formula step can calculate Formula Expressions within a data stream. It can be used to create simple calculations like [A]+[B] or more complex business logic with a lot of nested if / then logic.

Double-click to configure the Formula step:
register_time_seconds
[register_time_epoch] / 1000
Integer
-1
-1
(blank)
Why Formula instead of Calculator? The Calculator step requires both operands to be existing stream fields — you cannot enter a literal constant like 1000 as Field B. The Formula step supports inline constants in expressions.
What this does: The datagen produces registertime as epoch milliseconds (e.g., 1493899960000). MySQL's TIMESTAMP column expects epoch seconds, so we divide by 1000 to get 1493899960.
Alternative using Calculator: Add an Add constants step before Calculator with a field divisor = 1000 (Integer). Then use Calculator with operation A / B where A = register_time_epoch and B = divisor.
Table Output
The Table Output step loads data into a database table. Table Output is equivalent to the SQL INSERT operator.
If you only need to update rows, use the Update step.
To perform both INSERT and UPDATE, use Insert/Update.
This step provides configuration options for a target table and performance-related options such as Commit size and Use batch update for inserts.
Create Database Connection in Spoon
Open Spoon (PDI)
Go to View panel (left side) → right-click Database connections → New
Configure:
Connection Name
warehouse_db
Connection Type
MySQL
Access
Native (JDBC)
Host Name
localhost
Database Name
kafka_warehouse
Port Number
3306
User Name
kafka_user
Password
kafka_password
Click the Options tab and add these parameters:
useServerPrepStmts
false
rewriteBatchedStatements
true
cachePrepStmts
true
prepStmtCacheSize
250
useCompression
true
Click Test — should show "Connection successful"
Click OK to save
Main Settings
Double-click on the Table output stepto configure:
Connection
warehouse_db
The MySQL connection from Step 2
Target schema
(leave blank)
Important: Do NOT set this for MySQL
Target table
user_events
Commit size
1000
Truncate table
No
Ignore insert errors
No
Use batch updates
Yes
Specify database fields
Yes
Must be Yes to control field mapping
Critical: Leave Target schema blank. MySQL uses the database name from the connection, not a separate schema. Setting it to kafka_warehouse causes PDI to qualify the table as kafka_warehouse.user_events which can fail or cause unexpected behavior.
Database Fields
Click Specify database fields: Yes, then configure the field mapping:
user_id
user_id
region_id
region_id
gender
gender
register_time
register_time_seconds
kafka_topic
kafka_topic
kafka_partition
kafka_partition
kafka_offset
kafka_offset
Do NOT map these columns - MySQL handles them automatically:
event_id- AUTO_INCREMENT primary keyingestion_timestamp- DEFAULT CURRENT_TIMESTAMP
Tip: You can use Get Fields button to auto-populate, then remove event_id and ingestion_timestamp, and fix the register_time mapping (stream field should be register_time_seconds, not register_time).
Prerequisites
Before using the transformation templates, ensure:
Kafka cluster is running: make start or docker compose up -d from the Kafka-Docker directory
Datagen connectors are deployed: make deploy-connectors or ./connectors/deploy-connectors.sh
MySQL database is running: make mysql-setup
Run the following commands:
Starts all the required Kafka containers + connectors + MySQL.
User event data is now being streamed to the Brokers

Start users-to-db-parent.ktr

The user events are being consumed and processed, writing the stream to the user_events table.
In DBeaver display the data in the user_events table.

Verify Data in MySQL
E-Commerce Purchases
Your e-commerce platform publishes purchase transactions to Kafka using Avro serialization with a schema managed by Confluent Schema Registry. Avro provides schema enforcement at the producer — incompatible schema changes are rejected before data enters the pipeline, unlike raw JSON where structural issues are only discovered at parse time.
Your task is to configure the PDI Kafka Consumer to deserialize Avro messages, extract nested JSON fields (the address object), and load purchase data into MySQL.
x
x
x
x
x
x
Last updated
Was this helpful?

