grid-dividersUse Cases

circle-check
circle-info

Kafka PlugIn

x

  1. Ensure the Kafka EE plugin is installed.

Kafka EE plugin
  1. Start Pentaho Data Integration.


Select a Use Case:

circle-check

Scenario: Basic Kafka Consumer - Real-time User Activity Stream

circle-info

Kafka Consumer

The Kafka Consumer step pulls streaming data from Kafka through a transformation. Within the Kafka Consumer step you enter the path that will execute the transformation according to message batch size or duration in near real-time. The child transformation must start with the Get records from stream step.

Additionally, from the Kafka Consumer step, you can select a step in the child transformation to stream records back to the parent transformation. This allows records processed by a Kafka Consumer step in a parent transformation to be passed downstream to any other steps included within the same parent transformation.

Architecture Overview

This workshop uses PDI's parent/child transformation pattern for Kafka streaming:

circle-info

How it works: The parent transformation's Kafka Consumer step reads messages in batches (every 5 seconds or 100 records, whichever comes first) and passes each batch to the child transformation for processing. The child transformation parses, transforms, and writes each batch to MySQL.


  1. Open the following transformation:

~/Workshop--Data-Integration/Labs/Module 7 - Use Cases/Streaming Data/Kafka/transformations/users-to-db-parent.ktr

  1. Double-click on the Kafka Consumer step to review the properties:

Setup Tab

Setup
Property
Description
Value

Transformation

Child transformation to process the records

${Internal.Entry.Current.Directory}/users-to-db-child.ktr

Setup

Connection

Direct: Specify Bootstrap servers.

Cluster: Specify a Hadoop cluster configuration.

localhost:9092

Topics

Kafka topics to consume from

pdi-users

Consumer Group

Each Kafka consumer step starts a single thread. When part of a consumer group, each consumer is assigned a subset of topic partitions.

pdi-warehouse-users


Batch tab

circle-info

How batching works: Whichever threshold is reached first (duration or record count) triggers the batch to be sent to the child transformation. With pdi-users producing ~1 msg/sec, the 5-second duration will usually trigger first, sending ~5 records per batch.

Batch
Property
Description
Value

Duration (ms)

Time (in milliseconds) to collect records before executing the child transformation.

500

Number of records

Number of records to collect before executing the child transformation.

100

Maximum concurrent batches

Maximum number of batches to collect at the same time.

1

Message prefetch limit

Limit for incoming messages to queue for processing.

100000

Offset Management

Commit when record read: Commit offset when a record is read.

Commit when batch completed: Commit offsets after the batch is processed.

Commit when batch completed


Fields tab

Fields
Property
Description
Value

Input Name

Incoming fields received from Kafka streams. Default inputs include:

key: Determines message distribution to partitions. If no key is present, messages are randomly distributed.

message: The message value.

topic: Topic name.

partition: Partition number.

offset: Sequential ID that uniquely identifies the record within the partition.

timestamp: Time the message is received on the server.

Output Name

Output field name.

Type

Data Type


Results fields tab

Results fields
Property
Description
Value

Return fields from

Step name in the child transformation that returns fields to the parent transformation.


Options tab

Options
Property
Description
Value

auto.offset.reset

set the offset from when to process the records: latest, earliest

earliest

Last updated

Was this helpful?