EMQX
Use Case: Predictive maintenance
Logistics use case (predictive maintenance)
You want near-real-time telemetry from delivery trucks.
Each truck publishes sensor telemetry to an MQTT topic over GSM.
EMQX is the MQTT broker.
Pentaho Data Integration (PDI) subscribes and processes messages.
You will:
publish telemetry into EMQX (producer transformation)
consume telemetry in PDI (consumer + child transformation)
write minor and major alerts to output files

Before you begin
Make sure EMQX is running and reachable:
MQTT broker (TCP):
localhost:1883EMQX Dashboard (web):
http://localhost:18083
Default dashboard credentials are often:
Username:
adminPassword:
Password123
If you need to set it up first, see EMQX setup.
Make sure the lab files exist:
~/Streaming/emqx/solution/tr_mqtt_producer.ktr~/Streaming/emqx/solution/tr_mqtt_consumer.ktr~/Streaming/emqx/solution/tr_process_sensor_data.ktr
Start Pentaho Data Integration
Launch Data Integration:
Publish telemetry to EMQX (MQTT Producer)
Open:
~/Streaming/emqx/solution/tr_mqtt_producer.ktr

This transformation:
generates data for
vehicle_id = 111every 5 secondsadds a timestamp
builds a
messagepayloadpublishes the payload to EMQX using MQTT Producer
Double-click MQTT Producer and confirm these settings:
Connection: points to your broker (for example
tcp://localhost:1883)Client ID: unique on the broker
Topic: note the value. You will reuse it in the consumer.
QoS: choose based on delivery requirements
Message field:
message

Run the transformation.
Validate in EMQX Dashboard:
Open
http://localhost:18083.Go to Clients. Confirm the producer client is connected.
Go to Topics. Confirm message rate on your topic.
Reference: QoS and session settings
QoS
0 (at most once): lowest latency. Messages can be lost.
1 (at least once): can deliver duplicates.
2 (exactly once): highest overhead.
Clean session
True: broker does not persist state for this client.
False: broker persists subscriptions and queued QoS 1/2 messages.
Consume and process telemetry (MQTT Consumer)
Open:
~/Streaming/emqx/solution/tr_mqtt_consumer.ktr

Double-click MQTT Consumer and confirm:
Topic matches the producer topic.
Child transformation points to:
Batch has at least one trigger set:
Duration (ms) >
0, orNumber of records >
0

Run the transformation.
Validate in EMQX Dashboard:
Go to Clients. Confirm the consumer client is connected.
Go to Subscriptions. Confirm the client is subscribed to the topic.
Reference: batch and backpressure
PDI runs the child transformation when either threshold is met:
Duration (ms): time window for collecting messages
Number of records: message count for collecting messages
Backpressure controls:
Message prefetch limit: max messages PDI queues in memory
Maximum concurrent batches: increases throughput and resource use
Inspect results
Open the child transformation:
~/Streaming/emqx/solution/tr_process_sensor_data.ktr

This transformation:
pulls
messagerecords from the streamadds a timestamp
resolves
sensor_typefilters minor and major alerts
aggregates alerts for reporting
appends results to output files
Open the output files and confirm new rows append:
~/Streaming/emqx/output/major_alert.txt~/Streaming/emqx/output/minor_alert.txt


If you see no output, check these first:
producer and consumer use the same topic
EMQX shows both clients as connected
the consumer batch has Duration (ms) or Number of records set to
> 0the child transformation starts with Get records from stream
Last updated
Was this helpful?
