# EMQX

{% hint style="success" %}

#### Logistics use case (predictive maintenance)

You want near-real-time telemetry from delivery trucks.

Each truck publishes sensor telemetry to an MQTT topic over GSM.

* **EMQX** is the MQTT broker.
* **Pentaho Data Integration (PDI)** subscribes and processes messages.

You will:

* publish telemetry into EMQX (producer transformation)
* consume telemetry in PDI (consumer + child transformation)
* write minor and major alerts to output files
  {% endhint %}

<figure><img src="https://3680356391-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FZpCSy6Skj215f4oWypdc%2Fuploads%2Fgit-blob-1be96f79c83bd174c3aca1a215a785e2abb276e1%2Flogistics.png?alt=media" alt=""><figcaption><p>Logistics</p></figcaption></figure>

{% hint style="info" %}
**Before you begin**

Make sure EMQX is running and reachable:

* **MQTT broker (TCP):** `localhost:1883`
* **EMQX Dashboard (web):** `http://localhost:18083`

Default dashboard credentials are often:

* **Username:** `admin`
* **Password:** `Password123`

If you need to set it up first, see [EMQX setup](https://academy.pentaho.com/pentaho-data-integration/setup/use-cases/streaming/mosquitto).

Make sure the lab files exist:

* `~/Streaming/emqx/solution/tr_mqtt_producer.ktr`
* `~/Streaming/emqx/solution/tr_mqtt_consumer.ktr`
* `~/Streaming/emqx/solution/tr_process_sensor_data.ktr`
  {% endhint %}

{% embed url="<https://www.emqx.com/en>" %}

{% stepper %}
{% step %}
**Start Pentaho Data Integration**

Launch Data Integration:

{% tabs %}
{% tab title="macOS / Linux" %}

```bash
cd
cd ~/Pentaho/design-tools/data-integration
./spoon.sh
```

{% endtab %}

{% tab title="Windows" %}

```powershell
cd \
cd Pentaho/design-tools/data-integration
.\spoon.bat
```

{% endtab %}
{% endtabs %}
{% endstep %}

{% step %}
**Publish telemetry to EMQX (MQTT Producer)**

Open:

`~/Streaming/emqx/solution/tr_mqtt_producer.ktr`

<figure><img src="https://3680356391-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FZpCSy6Skj215f4oWypdc%2Fuploads%2Fgit-blob-bcfa3f52b52c792eefd9d0ee191f401995f84964%2Fimage.png?alt=media" alt=""><figcaption><p>MQTT Producer</p></figcaption></figure>

This transformation:

* generates data for `vehicle_id = 111` every 5 seconds
* adds a timestamp
* builds a `message` payload
* publishes the payload to EMQX using **MQTT Producer**

Double-click **MQTT Producer** and confirm these settings:

* **Connection:** points to your broker (for example `tcp://localhost:1883`)
* **Client ID:** unique on the broker
* **Topic:** note the value. You will reuse it in the consumer.
* **QoS:** choose based on delivery requirements
* **Message field:** `message`

<figure><img src="https://3680356391-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FZpCSy6Skj215f4oWypdc%2Fuploads%2Fgit-blob-af6122659c753ba17f04f4d164490125e5ee1338%2Fimage.png?alt=media" alt=""><figcaption><p>MQTT Producer</p></figcaption></figure>

Run the transformation.

Validate in EMQX Dashboard:

1. Open `http://localhost:18083`.
2. Go to **Clients**. Confirm the producer client is connected.
3. Go to **Topics**. Confirm message rate on your topic.

<details>

<summary>Reference: QoS and session settings</summary>

**QoS**

* **0 (at most once):** lowest latency. Messages can be lost.
* **1 (at least once):** can deliver duplicates.
* **2 (exactly once):** highest overhead.

**Clean session**

* **True:** broker does not persist state for this client.
* **False:** broker persists subscriptions and queued QoS 1/2 messages.

</details>
{% endstep %}

{% step %}
**Consume and process telemetry (MQTT Consumer)**

Open:

`~/Streaming/emqx/solution/tr_mqtt_consumer.ktr`

<figure><img src="https://3680356391-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FZpCSy6Skj215f4oWypdc%2Fuploads%2Fgit-blob-b72b28bbc98ec4dc72420e9a03574d39131222f5%2Fimage.png?alt=media" alt=""><figcaption><p>MQTT Consumer</p></figcaption></figure>

Double-click **MQTT Consumer** and confirm:

* **Topic** matches the producer topic.
* **Child transformation** points to:

  ```
  ${Internal.Entry.Current.Directory}/tr_process_sensor_data.ktr
  ```
* **Batch** has at least one trigger set:
  * **Duration (ms)** > `0`, or
  * **Number of records** > `0`

<figure><img src="https://3680356391-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FZpCSy6Skj215f4oWypdc%2Fuploads%2Fgit-blob-23e5e58fe72e6eb769843979df4afa6825b186b9%2Fimage.png?alt=media" alt=""><figcaption><p>MQTT Consumer</p></figcaption></figure>

Run the transformation.

Validate in EMQX Dashboard:

* Go to **Clients**. Confirm the consumer client is connected.
* Go to **Subscriptions**. Confirm the client is subscribed to the topic.

<details>

<summary>Reference: batch and backpressure</summary>

PDI runs the child transformation when either threshold is met:

* **Duration (ms):** time window for collecting messages
* **Number of records:** message count for collecting messages

Backpressure controls:

* **Message prefetch limit:** max messages PDI queues in memory
* **Maximum concurrent batches:** increases throughput and resource use

</details>
{% endstep %}

{% step %}
**Inspect results**

Open the child transformation:

`~/Streaming/emqx/solution/tr_process_sensor_data.ktr`

<figure><img src="https://3680356391-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FZpCSy6Skj215f4oWypdc%2Fuploads%2Fgit-blob-9b35bd9533af4880b647e0e77079d24f5fdee9a4%2Fimage.png?alt=media" alt=""><figcaption><p>Process sensor data</p></figcaption></figure>

This transformation:

* pulls `message` records from the stream
* adds a timestamp
* resolves `sensor_type`
* filters minor and major alerts
* aggregates alerts for reporting
* appends results to output files

Open the output files and confirm new rows append:

* `~/Streaming/emqx/output/major_alert.txt`
* `~/Streaming/emqx/output/minor_alert.txt`
  {% endstep %}
  {% endstepper %}

<figure><img src="https://3680356391-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FZpCSy6Skj215f4oWypdc%2Fuploads%2FnbBkVs2ZwOwvFGEbkp72%2Fimage.png?alt=media&#x26;token=8c7bf943-eb6c-4346-9c77-eeb9da75ab57" alt=""><figcaption><p>major_alert</p></figcaption></figure>

<figure><img src="https://3680356391-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FZpCSy6Skj215f4oWypdc%2Fuploads%2FYo1tKPxKm1CFLCgBHhNW%2Fimage.png?alt=media&#x26;token=67961f0b-f258-4363-ba7e-4f82aeb9044f" alt=""><figcaption><p>minor_alert</p></figcaption></figure>

{% hint style="warning" %}
If you see no output, check these first:

* producer and consumer use the same **topic**
* EMQX shows both clients as **connected**
* the consumer batch has **Duration (ms)** or **Number of records** set to `> 0`
* the child transformation starts with **Get records from stream**
  {% endhint %}
