hexagon-equalsEMQX

Use Case: Predictive maintenance

circle-check

Logistics use case (predictive maintenance)

Logistics
circle-info

Before you begin

Make sure EMQX is running and reachable:

  • MQTT broker (TCP): localhost:1883

  • EMQX Dashboard (web): http://localhost:18083

Default dashboard credentials are often:

  • Username: admin

  • Password: Password123

If you need to set it up first, see EMQX setup.

Make sure the lab files exist:

  • ~/Streaming/emqx/solution/tr_mqtt_producer.ktr

  • ~/Streaming/emqx/solution/tr_mqtt_consumer.ktr

  • ~/Streaming/emqx/solution/tr_process_sensor_data.ktr

1

Start Pentaho Data Integration

Launch Data Integration:

2

Publish telemetry to EMQX (MQTT Producer)

Open:

~/Streaming/emqx/solution/tr_mqtt_producer.ktr

MQTT Producer

This transformation:

  • generates data for vehicle_id = 111 every 5 seconds

  • adds a timestamp

  • builds a message payload

  • publishes the payload to EMQX using MQTT Producer

Double-click MQTT Producer and confirm these settings:

  • Connection: points to your broker (for example tcp://localhost:1883)

  • Client ID: unique on the broker

  • Topic: note the value. You will reuse it in the consumer.

  • QoS: choose based on delivery requirements

  • Message field: message

MQTT Producer

Run the transformation.

Validate in EMQX Dashboard:

  1. Open http://localhost:18083.

  2. Go to Clients. Confirm the producer client is connected.

  3. Go to Topics. Confirm message rate on your topic.

chevron-rightReference: QoS and session settingshashtag

QoS

  • 0 (at most once): lowest latency. Messages can be lost.

  • 1 (at least once): can deliver duplicates.

  • 2 (exactly once): highest overhead.

Clean session

  • True: broker does not persist state for this client.

  • False: broker persists subscriptions and queued QoS 1/2 messages.

3

Consume and process telemetry (MQTT Consumer)

Open:

~/Streaming/emqx/solution/tr_mqtt_consumer.ktr

MQTT Consumer

Double-click MQTT Consumer and confirm:

  • Topic matches the producer topic.

  • Child transformation points to:

  • Batch has at least one trigger set:

    • Duration (ms) > 0, or

    • Number of records > 0

MQTT Consumer

Run the transformation.

Validate in EMQX Dashboard:

  • Go to Clients. Confirm the consumer client is connected.

  • Go to Subscriptions. Confirm the client is subscribed to the topic.

chevron-rightReference: batch and backpressurehashtag

PDI runs the child transformation when either threshold is met:

  • Duration (ms): time window for collecting messages

  • Number of records: message count for collecting messages

Backpressure controls:

  • Message prefetch limit: max messages PDI queues in memory

  • Maximum concurrent batches: increases throughput and resource use

4

Inspect results

Open the child transformation:

~/Streaming/emqx/solution/tr_process_sensor_data.ktr

Process sensor data

This transformation:

  • pulls message records from the stream

  • adds a timestamp

  • resolves sensor_type

  • filters minor and major alerts

  • aggregates alerts for reporting

  • appends results to output files

Open the output files and confirm new rows append:

  • ~/Streaming/emqx/output/major_alert.txt

  • ~/Streaming/emqx/output/minor_alert.txt

major_alert
minor_alert
circle-exclamation

Last updated

Was this helpful?