# EMQX

{% hint style="success" %}

#### Logistics use case (predictive maintenance)

You want near-real-time telemetry from delivery trucks.

Each truck publishes sensor telemetry to an MQTT topic over GSM.

* **EMQX** is the MQTT broker.
* **Pentaho Data Integration (PDI)** subscribes and processes messages.

You will:

* publish telemetry into EMQX (producer transformation)
* consume telemetry in PDI (consumer + child transformation)
* write minor and major alerts to output files
  {% endhint %}

<figure><img src="/files/gCqIJiFtHafwQo6jB8vn" alt=""><figcaption><p>Logistics</p></figcaption></figure>

{% hint style="info" %}
**Before you begin**

Make sure EMQX is running and reachable:

* **MQTT broker (TCP):** `localhost:1883`
* **EMQX Dashboard (web):** `http://localhost:18083`

Default dashboard credentials are often:

* **Username:** `admin`
* **Password:** `Password123`

If you need to set it up first, see [EMQX setup](/pentaho-data-integration/setup/use-cases/streaming/mosquitto.md).

Make sure the lab files exist:

* `~/Streaming/emqx/solution/tr_mqtt_producer.ktr`
* `~/Streaming/emqx/solution/tr_mqtt_consumer.ktr`
* `~/Streaming/emqx/solution/tr_process_sensor_data.ktr`
  {% endhint %}

{% embed url="<https://www.emqx.com/en>" %}

{% stepper %}
{% step %}
**Start Pentaho Data Integration**

Launch Data Integration:

{% tabs %}
{% tab title="macOS / Linux" %}

```bash
cd
cd ~/Pentaho/design-tools/data-integration
./spoon.sh
```

{% endtab %}

{% tab title="Windows" %}

```powershell
cd \
cd Pentaho/design-tools/data-integration
.\spoon.bat
```

{% endtab %}
{% endtabs %}
{% endstep %}

{% step %}
**Publish telemetry to EMQX (MQTT Producer)**

Open:

`~/Streaming/emqx/solution/tr_mqtt_producer.ktr`

<figure><img src="/files/lcamGpXeQ1RCZZ4jOyLv" alt=""><figcaption><p>MQTT Producer</p></figcaption></figure>

This transformation:

* generates data for `vehicle_id = 111` every 5 seconds
* adds a timestamp
* builds a `message` payload
* publishes the payload to EMQX using **MQTT Producer**

Double-click **MQTT Producer** and confirm these settings:

* **Connection:** points to your broker (for example `tcp://localhost:1883`)
* **Client ID:** unique on the broker
* **Topic:** note the value. You will reuse it in the consumer.
* **QoS:** choose based on delivery requirements
* **Message field:** `message`

<figure><img src="/files/aEL93068oB5g56jGqTzR" alt=""><figcaption><p>MQTT Producer</p></figcaption></figure>

Run the transformation.

Validate in EMQX Dashboard:

1. Open `http://localhost:18083`.
2. Go to **Clients**. Confirm the producer client is connected.
3. Go to **Topics**. Confirm message rate on your topic.

<details>

<summary>Reference: QoS and session settings</summary>

**QoS**

* **0 (at most once):** lowest latency. Messages can be lost.
* **1 (at least once):** can deliver duplicates.
* **2 (exactly once):** highest overhead.

**Clean session**

* **True:** broker does not persist state for this client.
* **False:** broker persists subscriptions and queued QoS 1/2 messages.

</details>
{% endstep %}

{% step %}
**Consume and process telemetry (MQTT Consumer)**

Open:

`~/Streaming/emqx/solution/tr_mqtt_consumer.ktr`

<figure><img src="/files/Y2HdmvU17NUmcAJxxxUV" alt=""><figcaption><p>MQTT Consumer</p></figcaption></figure>

Double-click **MQTT Consumer** and confirm:

* **Topic** matches the producer topic.
* **Child transformation** points to:

  ```
  ${Internal.Entry.Current.Directory}/tr_process_sensor_data.ktr
  ```
* **Batch** has at least one trigger set:
  * **Duration (ms)** > `0`, or
  * **Number of records** > `0`

<figure><img src="/files/iNpsiDuumB0e3v2zSFf2" alt=""><figcaption><p>MQTT Consumer</p></figcaption></figure>

Run the transformation.

Validate in EMQX Dashboard:

* Go to **Clients**. Confirm the consumer client is connected.
* Go to **Subscriptions**. Confirm the client is subscribed to the topic.

<details>

<summary>Reference: batch and backpressure</summary>

PDI runs the child transformation when either threshold is met:

* **Duration (ms):** time window for collecting messages
* **Number of records:** message count for collecting messages

Backpressure controls:

* **Message prefetch limit:** max messages PDI queues in memory
* **Maximum concurrent batches:** increases throughput and resource use

</details>
{% endstep %}

{% step %}
**Inspect results**

Open the child transformation:

`~/Streaming/emqx/solution/tr_process_sensor_data.ktr`

<figure><img src="/files/eBxUlEZs38O678UU0r9w" alt=""><figcaption><p>Process sensor data</p></figcaption></figure>

This transformation:

* pulls `message` records from the stream
* adds a timestamp
* resolves `sensor_type`
* filters minor and major alerts
* aggregates alerts for reporting
* appends results to output files

Open the output files and confirm new rows append:

* `~/Streaming/emqx/output/major_alert.txt`
* `~/Streaming/emqx/output/minor_alert.txt`
  {% endstep %}
  {% endstepper %}

<figure><img src="/files/BzFraUhuQStxtnXK4Anv" alt=""><figcaption><p>major_alert</p></figcaption></figure>

<figure><img src="/files/iRRUVFIZwpZ9ircRIe73" alt=""><figcaption><p>minor_alert</p></figcaption></figure>

{% hint style="warning" %}
If you see no output, check these first:

* producer and consumer use the same **topic**
* EMQX shows both clients as **connected**
* the consumer batch has **Duration (ms)** or **Number of records** set to `> 0`
* the child transformation starts with **Get records from stream**
  {% endhint %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://academy.pentaho.com/pentaho-data-integration/use-cases/streaming-data/mqtt/mosquitto.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
