# HiveMQ

{% hint style="success" %}

#### **SCADA**

Nowadays, plants use a wide selection of industrial sensors, each with a unique design, and application to collect and analyze data.

These **Supervisory Control and Data Acquisition (SCADA)** systems consist of both software and hardware components which enable remote and on-site gathering of data from the industrial equipment.

Pentaho Data Integration enables you to collect data from any source in real-time, augment data streams in a single interface and transform raw data into actionable manufacturing insights.
{% endhint %}

{% hint style="info" %}
Lets take a brief look at a couple of the challenges that face implementing a SCADA system:

* **Data Silos:** Brownfield factories will have manufacturing equipment and backend systems from a wide variety of vendors that produce data in proprietary formats. These formats often create data silos that hinder deep level analysis across the entire factory operation.
* **IT/OT Priorities:** A successful modernization project needs to include experts from the operations side (OT) and the enterprise IT side (IT).
  {% endhint %}

<figure><img src="/files/xf9LphQ6DRu8vNRwcQpL" alt=""><figcaption><p>SCADA</p></figcaption></figure>

{% hint style="info" %}
The goal is to connect the various functions of the factory across a standardized IIoT bus:

* **Automation area:** Factory machines, sensors and gateways. Data needs to be able to flow between the machines and the sensors and gateways. The gateways are typically used to communicate with other areas in the factory architecture.
* **Manufacturing area:** Systems used to control the factory equipment such as SCADA and MES systems.
* **Factory area:** Systems used to manage the entire factory such as PLM (Product Lifecycle Management) and OEE (Overall Equipment Effectiveness) systems.
* **Cloud:** Connectivity to the enterprise IT systems of the organization that allows for deeper integration between the OT and IT systems.

The majority of SCADA and MES systems on the market come with support for MQTT.
{% endhint %}

{% tabs %}
{% tab title="1. Install HiveMQ" %}
{% hint style="info" %}
Unified Namespace (UNS) is a novel solution that allows you to collect data from various industrial IoT (IIoT) systems, add context to it, and transform it into a format that other systems can understand.
{% endhint %}

<figure><img src="/files/BJ3FBE6PGJiBkUE8ssPy" alt=""><figcaption><p>UNS</p></figcaption></figure>

{% embed url="<https://www.hivemq.com/docs/hivemq/4.14/user-guide/introduction.html>" %}
Link to HiveMQ documentation
{% endembed %}

***

{% hint style="danger" %}
Remember to stop the mosquitto container.
{% endhint %}

1. Ensure the Mosquitto Broker has been stopped.

```bash
docker stop mosquitto
```

2. Copy over the required files.

```bash
cd
mkdir -p ~/Streaming/HiveMQ4 && cd "$_"
cp -R ~/Workshop--Data-Integration/Labs/'Module 7 - Workflows'/'Streaming Data'/HiveMQ/* .
```

**Docker Network**

{% hint style="info" %}
As we're running quite a few containers on the same server, let's ensure that the containers for each How-To are isolated from each other.
{% endhint %}

```bash
docker network create -d bridge hivemq
```

**HiveMQ Container**

{% hint style="info" %}

* This HiveMQ deployment is not secure! It's lacking Authentication and Authorization.
* Right now any MQTT client can connect to the broker with a full set of permissions.
* For production usage, add an appropriate security extension and remove the hivemq-allow-all extension.
* You can download security extensions from the HiveMQ Marketplace (<https://www.hivemq.com/extensions/>).
  {% endhint %}

2. Run HiveMQ Docker container.

```bash
docker run --ulimit nofile=500000:500000 --name=hivemq4 -p 9090:8080 -p 9000:9000 -p 1883:1883 --net=hivemq hivemq/hivemq4
```

<table><thead><tr><th width="185">Flag</th><th>Description</th></tr></thead><tbody><tr><td>--ulimit</td><td>imits system resource amounts that individual users can consume</td></tr><tr><td>nofile</td><td>the maximum number of Open Files/File Descriptors this user can have at one time</td></tr><tr><td>--name</td><td>name of container</td></tr><tr><td>-p 9090</td><td>mapped container port. Exposes HiveMQ container Control Center on port 8080 to external 9090</td></tr><tr><td>-p 9000</td><td>mapped container port. Exposes HiveMQ container Websocket on port 9000</td></tr><tr><td>-p 1883</td><td>mapped container port. Exposes HiveMQ container TCP Listener on port 1883.</td></tr><tr><td>--net</td><td>name of isolated Docker network: hivemq</td></tr><tr><td>hivemq/hivemq4</td><td>Docker Hub image</td></tr></tbody></table>

3. Log into HiveMQ Control Center.

{% embed url="<http://localhost:9090>" %}
Link to HiveMQ Control Center
{% endembed %}

<table data-header-hidden><thead><tr><th width="220"></th><th></th></tr></thead><tbody><tr><td>User</td><td>admin</td></tr><tr><td>Password</td><td>hivemq</td></tr></tbody></table>

<figure><img src="/files/JIfok0S5dz7xgsSLE6Qa" alt=""><figcaption><p>HiveMQ Control Center</p></figcaption></figure>

➡️ Next: [**Generate industrial robot sensor data**](#id-2.-sensor)
{% endtab %}

{% tab title="2. Sensor" %}
{% hint style="info" %}
Let's publish some sensor data to our HiveMQ broker. Based on the factory area, the data needs to processed in a timely manner.

* Integration of the automation and manufacturing areas (levels 0-3) requires reliable data exchange between various machines, PLCs, and sensors that have very little memory or computing power. The raw data is is used in positive and negative feedbacks to automatically trigger actuators and alerts.
* Level 4 aggregated data is usually augmented to provide not only OT daily, weekly, monthly OEE reports but also OT across the organisation - for example the number of cars assembled based on engine / area factory output, manufacturing of chassis , and so on ..
  {% endhint %}

1. Start the HiveMQ Broker.

```bash
docker start hivemq4
```

{% embed url="<https://localhost:9443/#!/home>" %}
Link to Portainer
{% endembed %}

2. Take a look at:

\~/Streaming/HiveMQ4/scripts/sensor.py

```bash
cd
cd ~/Streaming/HiveMQ4/scripts
cat sensor.py
```

```bash
# python 3.10
# Note: This script requires the 'paho-mqtt' package to be installed.
# pip3 install paho-mqtt python-etcd for V2
# pip3 install "paho-mqtt<2.0.0" for V1

import random
import time
import json
from paho.mqtt import client as mqtt_client
# username = 'emqx'  not required as HiveMQ has no security
# password = 'public'

# MQTT settings
broker = 'localhost'
port = 1883
topic = "industrial/robot/sensor"
# Generate a Client ID with the subscribe prefix.
client_id = f'python-mqtt-{random.randint(0, 1000)}'

# Connect to MQTT broker
def connect_mqtt():
    def on_connect(client, userdata, flags, rc):
    # For paho-mqtt 2.0.0, you need to add the properties parameter.
    # def on_connect(client, userdata, flags, rc, properties):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)
    
    # Set Connecting Client ID
    # client = mqtt_client.Client(client_id)

    # For paho-mqtt 2.0.0, you need to set callback_api_version.
    client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)

    # client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

# Publish sensor data
def publish(client):
    while True:
        temperature = random.uniform(20.0, 100.0)  # Simulate temperature sensor data
        position = {'x': random.uniform(-10.0, 10.0), 'y': random.uniform(-10.0, 10.0), 'z': random.uniform(-10.0, 10.0)}  # Simulate position sensor data
        message = json.dumps({'temperature': temperature, 'position': position})
        result = client.publish(topic, message)
        status = result[0]
        if status == 0:
            print(f"Sent `{message}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        time.sleep(1)

# Main function
def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)

# Execute the main function
if __name__ == '__main__':
    run()
```

{% hint style="info" %}
The sensor.py script:

* loads required libraries
* sets HiveMQ Broker connection details
* connects to HiveMQ MQTT broker
* sets client\_id
* generates random sensor data
* builds message - data dictionary in JSON format
* publish topic + message every 1 second
  {% endhint %}

**Execute sensor.py**

{% hint style="warning" %}
[Release 2.0.0](https://github.com/eclipse/paho.mqtt.python/releases/tag/v2.0.0) of the Paho Python MQTT includes breaking changes (11th Feb 2024)
{% endhint %}

1. Execute sensor.py script.

```bash
cd
cd ~/Streaming/HiveMQ4/scripts
python3 sensor.py
```

```
pentaho@pentaho-virtual-machine:~/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/02 HiveMQ$ python3 subscribe.py
/home/pentaho/Workshop--Data-Integration/Labs/Module 3 - Data Sources/Streaming Data/02 HiveMQ/subscribe.py:25: DeprecationWarning: Callback API version 1 is deprecated, update to latest version
  client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)
Connected to MQTT Broker!
Received `{"temperature": 58.80126840794128, "position": {"x": -9.382200219332637, "y": 6.256113402110607, "z": 1.3091624489302767}}` from `industrial/robot/sensor` topic
Received `{"temperature": 80.70263220990923, "position": {"x": 7.8459965093831165, "y": -0.37157016433597967, "z": -4.3821611625288455}}` from `industrial/robot/sensor` topic
Received `{"temperature": 87.65337962266882, "position": {"x": 8.240244717092018, "y": -8.806589253052007, "z": -6.419154857552536}}` from `industrial/robot/sensor` topic
Received `{"temperature": 81.6254980952253, "position": {"x": -5.722946590492448, "y": -9.03693881691259, "z": -4.436963415897499}}` from `industrial/robot/sensor` topic
```

2. Check that the sensor data is being successfully published to HiveMQ.

<figure><img src="/files/rLaOiqM4LcGJfTtCL4iE" alt=""><figcaption><p>HiveMQ - Dashboard</p></figcaption></figure>

{% hint style="info" %}
CTRL + Z will stop the pyhton script.
{% endhint %}

***

**Execute subscribe.py**

1. Take a look at:

\~/Streaming/HiveMQ4/scripts/subscribe.py

```bash
cd
cd ~/Streaming/HiveMQ4/scripts
cat subscribe.py
```

```python
# python 3.10

import random

from paho.mqtt import client as mqtt_client

# MQTT settings
broker = 'localhost'
port = 1883
topic = "industrial/robot/sensor"
# Generate a Client ID with the subscribe prefix.
client_id = f'subscribe-{random.randint(0, 100)}'
# username = 'emqx'
# password = 'public'

# Connect to MQTT broker
def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    # client = mqtt_client.Client(client_id)
    client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)
    # client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client

# Subscribe sensor data
def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message

# Main function
def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()

if __name__ == '__main__':
    run()
```

2. Execute sensor.py script.

```bash
cd
cd ~/Streaming/HiveMQ4/scripts
python3 subscribe.py
```

```
/home/pentaho/Streaming/HiveMQ4/scripts/sensor.py:34: DeprecationWarning: Callback API version 1 is deprecated, update to latest version
  client = mqtt_client.Client(client_id=client_id, callback_api_version=mqtt_client.CallbackAPIVersion.VERSION1)
Sent `{"temperature": 65.6060308859324, "position": {"x": 5.196954639657465, "y": -5.148951217854867, "z": 7.25115088212948}}` to topic `industrial/robot/sensor`
Connected to MQTT Broker!
Sent `{"temperature": 35.29565078000626, "position": {"x": -4.7272734097139875, "y": -1.6080980541298366, "z": 1.2319865862071566}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 60.21167757015797, "position": {"x": 8.249639900333282, "y": -8.689339716964462, "z": -0.3634263425774815}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 29.38714431589844, "position": {"x": 2.680389418417885, "y": 9.119972168648552, "z": -3.499766582369035}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 62.49045227458001, "position": {"x": 8.70609818000656, "y": -4.170991834771276, "z": -5.241184819883387}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 59.84193372465427, "position": {"x": 7.965218766387451, "y": 7.143657046592622, "z": -2.240637293748426}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 29.876349256802914, "position": {"x": -5.748307038999321, "y": 0.24214771476402497, "z": 1.9866611587661822}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 70.68207314566436, "position": {"x": 2.309473028696079, "y": 0.29885174801980163, "z": 6.721634090924283}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 25.96861468369226, "position": {"x": 8.031650944821799, "y": -3.8632965993350403, "z": 9.137809914072445}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 85.06901713629799, "position": {"x": 2.258189129745281, "y": 8.693555870184714, "z": -7.451199925102903}}` to topic `industrial/robot/sensor`
Sent `{"temperature": 77.13264612691731, "position": {"x": -9.194413714063014, "y": -8.213353835851764, "z": -9.207292840168185}}` to topic `industrial/robot/sensor`
...
```

3. Take a look at the stats.

<figure><img src="/files/N9pvPm07WVidX9uN0wpI" alt=""><figcaption><p>Inbound - Outbound messages</p></figcaption></figure>

➡️ Next: [**Consume the data stream in PDI**](#id-3.-pentaho-data-integration)
{% endtab %}

{% tab title="3. Pentaho Data Integration" %}
{% hint style="info" %}
The data will also be processed within different timeframes:

**Level 0**: This is the level of sensors, signals, machine, and real-time capabilities. At this level, microseconds and milliseconds are extremely important.

**Level 1**: On the PLC (programmable logic controllers) or control level, sensing and manipulating takes place within seconds. Real-time is also important here for production processes predictability.

**Level 2**: This level handles process management for monitoring and supervising. Here, SCADA (supervisory control and data acquisition) systems and HMI (human-machine interface) give operators a first visualization of what’s going on within the factory.

**Level 3**: The next level is for MES (manufacturing execution systems). This is where manufacturing operations management takes place. Here, we move off the factory floor into the top floor and measure activity in hours.

**Level 4**: Finally, at the top, there are ERP (enterprise resource planning) systems for business planning and logistics. Processes on this level are typically calculated in days and months.

To achieve integration, the data flow must work between systems on the same level as well as between the levels.
{% endhint %}

**Pentaho Data Integration**

{% hint style="info" %}
Lets subscribe to the industrial/robot/sensor topic retained in HiveMQ and use Pentaho Data Integration to process the data.
{% endhint %}

1. Start Pentaho Data Integration:

```bash
cd
cd Pentaho/design-tools/data-integration
./spoon.sh
```

{% tabs %}
{% tab title="3.1 Sensor" %}
{% hint style="info" %}
Again we'll use the sensor.py to generate the data and consume in Pentaho Data Integration.
{% endhint %}

1. Execute sensor.py script.

```bash
cd
cd ~/Streaming/HiveMQ4/scripts
python3 sensor.py
```

2. Log into HiveMQ Control Center.

{% embed url="<http://localhost:9090>" %}
Link to HiveMQ Control Center
{% endembed %}

<table data-header-hidden><thead><tr><th width="220"></th><th></th></tr></thead><tbody><tr><td>User</td><td>admin</td></tr><tr><td>Password</td><td>admin</td></tr></tbody></table>

3. Check the inbound connections and traffic.

<figure><img src="/files/F0mmvUTHi67YC0ndfutX" alt=""><figcaption><p>HiveMQ Control Center - Connections</p></figcaption></figure>

4. You can also view the stream in MQTT Explorer.

<figure><img src="/files/M3xxu67PDj2xr6pN9E3Y" alt=""><figcaption><p>MQTT Explorer - industrial / robot / sensor</p></figcaption></figure>
{% endtab %}

{% tab title="3.2 MQTT Consumer" %}
{% hint style="info" %}
The PDI client can pull streaming data from an MQTT broker or clients through an MQTT transformation. The parent MQTT Consumer step runs a child transformation that executes according to the message batch size or duration, allowing you to process a continuous stream of records in near real-time. The child transformation must start with the Get records from stream step.

Additionally, from the MQTT Consumer step, you can select a step in the child transformation to stream records back to the parent transformation. This capability allows records processed by an MQTT Consumer step in a parent transformation to be passed downstream to any other steps included within the same parent transformation.
{% endhint %}

1. Open the following transformation:

\~/Streaming/HiveMQ4/tr\_hive\_consumer.ktr

2. Double click on the MQTT Consumer step.

For further details on the settings: [**MQTT Consumer**](/pentaho-data-integration/use-cases/streaming-data/mqtt/mosquitto.md#consumer)

<figure><img src="/files/ymFLeZUqEGRv25ZASoi4" alt=""><figcaption><p>MQTT Consumer - industrial/robot/sensor</p></figcaption></figure>

Transformation

```
${Internal.Entry.Current.Directory}/tr_process_sensor_data.ktr
```

➡️ Next: [**Process the sensor records**](#id-3.3-get-records)
{% endtab %}

{% tab title="3.3 Get records" %}
{% hint style="info" %}
This step returns records that were previously generated by another transformation in a job. The records were passed to this step using either the Copy rows to result step or the Transformation Executor step. You can enter the metadata of the fields you are expecting from the previous transformation in a job.
{% endhint %}

1. Open the following transformation:

\~/Streaming/HiveMQ4/tr\_process\_sensor\_data.ktr

<figure><img src="/files/tqCTLMz769lZAR725ZnR" alt=""><figcaption><p>Get records - industrial/robot/sensor</p></figcaption></figure>

2. Double-click on the JSON Input step and configure with the following settings.

<figure><img src="/files/WP97oKLzX4DvzDJrM22N" alt=""><figcaption><p>JSON Input - Fields</p></figcaption></figure>

{% hint style="info" %}

* Pull the 'message' - records from the stream
* Add a timestamp
* Read the JSON stream
* Append a file - Project/HiveMQ directory
  {% endhint %}

3. Open the following file:

\~/Streaming/HiveMQ4/HiveMQ4/output/robot\_sensor.txt

<figure><img src="/files/cB8dA24T5WKO1ZPeMTAu" alt=""><figcaption><p>robot_sensor</p></figcaption></figure>

{% hint style="info" %}
Notice the records are being appended.
{% endhint %}
{% endtab %}
{% endtabs %}
{% endtab %}
{% endtabs %}


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://academy.pentaho.com/pentaho-data-integration/use-cases/streaming-data/mqtt/hivemq.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
