Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 22 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,45 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
- **Where**: *mqtt_streaming_module/src/mqtt_client_fb_impl.cpp, include/mqtt_streaming_module/...*
- **Purpose**: Represents the MQTT broker as an openDAQ function block - the connection point through which function blocks are created.
- **Main properties:**
- *BrokerAddress* (string) - MQTT broker address. It can be an IP address or a hostname. By default, it is set to *"127.0.0.1"*.
- *BrokerPort* (integer) - Port number for the MQTT broker connection. By default, it is set to *1883*.
- *BrokerAddress* (string) MQTT broker address. It can be an IP address or a hostname. By default, it is set to *"127.0.0.1"*.
- *BrokerPort* (integer) Port number for the MQTT broker connection. By default, it is set to *1883*.
- *Username* (string) — Username for MQTT broker authentication. By default, it is empty.
- *Password* (string) — Password for MQTT broker authentication. By default, it is empty.
- *ConnectionTimeout* (integer) — Timeout in milliseconds for the initial connection to the MQTT broker. If the connection fails, an exception is thrown. By default, it is set to *3000 ms*.
2) **MQTT publisher Function Block (MQTTJSONPublisherFB)**:
- **Where**: *include/mqtt_streaming_module/mqtt_publisher_fb_impl.h, src/mqtt_publisher_fb_impl.cpp*
- **Purpose**: Publishes openDAQ signal data to MQTT topics. There are **four** general data publishing schemes:
1) One MQTT message per signal / one message per sample / one topic per signal / one timestamp for each sample. Example: *{"AI0": 1.1, "timestamp": 1763716736100000}*
- **Purpose**: Publishes openDAQ signal data to MQTT topics.
There are **five** general data publishing schemes:
1) Raw data publishing. When using this approach, the data is transmitted in raw binary form without any additional wrapper. Domain data is not transmitted.
2) JSON message data publishing. One MQTT message per signal / one message per sample / one topic per signal / one timestamp for each sample. Example: *{"AI0": 1.1, "timestamp": 1763716736100000}*

2) One MQTT message per signal / one message containing several samples / one topic per signal / one timestamp per sample (array of samples). Example: *{"AI0": [1.1, 2.2, 3.3], "timestamps": [1763716736100000, 1763716736200000, 1763716736300000]}*
3) JSON message data publishing. One MQTT message per signal / one message containing several samples / one topic per signal / one timestamp per sample (array of samples). Example: *{"AI0": [1.1, 2.2, 3.3], "timestamps": [1763716736100000, 1763716736200000, 1763716736300000]}*

3) One MQTT message for all signals / one message per sample containing all signals / one topic for all signals / one shared timestamp for all signals. Example: *{"AI0": 1.1, "AI1": 2, "timestamp": 1763716736100000}*
4) JSON message data publishing. One MQTT message for all signals / one message per sample containing all signals / one topic for all signals / one shared timestamp for all signals. Example: *{"AI0": 1.1, "AI1": 2, "timestamp": 1763716736100000}*

4) One MQTT message for all signals / one message containing several samples for all signals / one topic for all signals / one shared timestamp for all signals (array of samples). Example: *{"AI0": [1.1, 2.2, 3.3], "AI1": [4.1, 4.2, 4.3], "timestamp": [1763716736100000, 1763716736200000, 1763716736300000]}*
5) JSON message data publishing. One MQTT message for all signals / one message containing several samples for all signals / one topic for all signals / one shared timestamp for all signals (array of samples). Example: *{"AI0": [1.1, 2.2, 3.3], "AI1": [4.1, 4.2, 4.3], "timestamp": [1763716736100000, 1763716736200000, 1763716736300000]}*

The schemes are configured through combinations of properties.

- **Main properties**:
- *TopicMode* (list) — Selects whether to publish all signals to separate MQTT topics (one per signal, *TopicPerSignal mode*) or to a single topic (*SingleTopic mode*), one for all signals. Choose *0* for *TopicPerSignal* mode and *1* for *SingleTopic* mode. By default, it is set to *TopicPerSignal* mode.
- *Mode* (list) — Selects the mode of publishing (JSON, Raw). Choose *0* for *JSON* mode and *1* for *Raw* mode. In JSON mode, the function block converts signal samples into JSON messages and publishes them to MQTT topics. In Raw mode, the function block publishes raw signal samples to MQTT topics without any conversion. By default it is set to JSON mode.
- *TopicMode* (list) — The property is used **only** in *JSON* mode. Selects whether to publish all signals to separate MQTT topics (one per signal, *TopicPerSignal mode*) or to a single topic (*SingleTopic mode*), one for all signals. Choose *0* for *TopicPerSignal* mode and *1* for *SingleTopic* mode. By default, it is set to *TopicPerSignal* mode.
- *QoS* (list) — MQTT Quality of Service level. It can be *0* (at most once), *1* (at least once), or *2* (exactly once). By default, it is set to *1*.
- *Topic* (string) — Topic name for publishing in *SingleTopic* mode. If left empty, the Publisher's *Global ID* is used as the topic name.
- *Topic* (string) — The property is used **only** in *JSON* mode. Topic name for publishing in *SingleTopic* mode. If left empty, the Publisher's *Global ID* is used as the topic name.
- *Topics* (list of strings, read-only) — Contains a list of topics used for publishing data in the *TopicPerSignal* mode. The order in the list is the same as the input ports order.
- *GroupValues* (bool) — Enables the use of a sample pack for a signal. By default, it is set to *false*.
- *SignalValueJSONKey* (list) — Describes how to name a JSON value field. By default it is set to *GlobalID*.
- *SamplesPerMessage* (integer) — Sets the size of the sample pack when publishing grouped values. By default, it is set to *1*.
- *GroupValues* (bool) — The property is used **only** in *JSON* mode. Enables the use of a sample pack for a signal. By default, it is set to *false*.
- *SignalValueJSONKey* (list) — The property is used **only** in *JSON* mode. Describes how to name a JSON value field. By default it is set to *GlobalID*.
- *SamplesPerMessage* (integer) — The property is used **only** in *JSON* mode. Sets the size of the sample pack when publishing grouped values. By default, it is set to *1*.
- *ReaderWaitPeriod* (integer) — Polling period in milliseconds, specifying how often the server calls an internal reader to collect and publish the connected signals’ data to an MQTT broker. By default, it is set to *20 ms*.
- *EnablePreviewSignal* (bool) — Enable the creation of preview signals: one signal in *SingleTopic* mode and one signal per connected input port in *TopicPerSignal* mode. These signals contain the same JSON string data that is published to MQTT topics.
- *Schema* (string, read-only) - Describes the general representation of a JSON data packet according to the current function block settings.

To configure the publishing schemes, set the properties as follows:
1) *TopicMode(0), GroupValues(false)*;
2) *TopicMode(0), GroupValues(true), SamplesPerMessage(<pack_size>)*;
3) *TopicMode(1), GroupValues(false)*;
4) *TopicMode(1), GroupValues(true), SamplesPerMessage(<pack_size>)*;
1) *Mode(1)*;
2) *Mode(0), TopicMode(0), GroupValues(false)*;
3) *Mode(0), TopicMode(0), GroupValues(true), SamplesPerMessage(<pack_size>)*;
4) *Mode(0), TopicMode(1), GroupValues(false)*;
5) *Mode(0), TopicMode(1), GroupValues(true), SamplesPerMessage(<pack_size>)*;


3) **MQTT subscriber Function Block (MQTTSubscriberFB)**:
Expand All @@ -61,6 +65,7 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
- *Topic* (string) — MQTT topic to subscribe to for receiving raw binary data.
- *QoS* (list) — MQTT Quality of Service level. It can be *0* (at most once), *1* (at least once), or *2* (exactly once). By default, it is set to *1*.
- *EnablePreviewSignal* (bool) — Enable the creation of a preview signal. This signal contains the raw binary data received from an MQTT topic.
- *DomainMode* (list) — Defines the domain of the preview signal. By default it is set to *None* (0), which means that the preview signal doesn't have a timestamp. If set to *System time* (1), the preview signal's timestamp is set to the system time when the MQTT message is received.
- *MessageIsString* (bool) — Interpret a received message as a string.
- *JSONConfigFile* (string) — path to file with **JSON configuration string**. See the *JSONConfig* property for more details. This property could be set only at creation. It is not visible.
- *JSONConfig* (string) — **JSON configuration string** that defines the MQTT topic and the corresponding signals to subscribe to. This property could be set only at creation. It is not visible. A typical string structure:
Expand Down Expand Up @@ -126,6 +131,7 @@ MQTT module for the [OpenDAQ SDK](https://github.com/openDAQ/openDAQ). The modul
- **Purpose**: To parse JSON string data to extract a value and a timestamp, and to send data and domain samples based on this data.
- **Main properties**:
- *ValueKey* (string) — Specifies the JSON field name from which value data will be extracted. This property is required. It should be contained in the incoming JSON messages. Otherwise, a parsing error will occur.
- *DomainMode* (list) — Defines how the timestamp of the decoded signal is generated. By default it is set to *None* (0), which means that the decoded signal doesn't have a timestamp. If set to *Extract from message* (1), the JSON decoder will try to extract the timestamp from the incoming JSON messages (see *DomainKey* property). If set to *System time* (2), the timestamp of the decoded signal is set to the system time when the JSON message is received.
- *DomainKey* (string) — Specifies the JSON field name from which timestamp will be extracted. This property is optional. If it is set it should be contained in the incoming JSON messages. Otherwise, a parsing error will occur.
- *Unit* (string) — Specifies the unit symbol for the decoded value. This property is optional.
---
Expand Down
2 changes: 1 addition & 1 deletion cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ include(FetchContent)
FetchContent_Declare(
opendaq-cmake-utils
GIT_REPOSITORY https://github.com/openDAQ/opendaq-cmake-utils.git
GIT_TAG v1.0.0
GIT_TAG v1.0.1
GIT_PROGRESS ON
)
FetchContent_MakeAvailable(opendaq-cmake-utils)
4 changes: 4 additions & 0 deletions external/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
opendaq_get_cmake_mode(_MQTT_CMAKE_MODERN_MODE_SAVED)
opendaq_set_cmake_mode(ANCIENT)
opendaq_set_cmake_folder_context(TARGET_FOLDER_NAME)

if (${CMAKE_SOURCE_DIR} STREQUAL ${CMAKE_BINARY_DIR})
Expand All @@ -11,3 +13,5 @@ endif()
add_subdirectory(boost)
add_subdirectory(paho_mqtt_c)
add_subdirectory(rapidjson)

opendaq_set_cmake_mode(${_MQTT_CMAKE_MODERN_MODE_SAVED})
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ class AtomicSignalAtomicSampleHandler : public HandlerBase
ListPtr<IString> getTopics(const std::vector<SignalContext>& signalContexts) override;
std::string getSchema() override;
protected:
SignalValueJSONKey signalNamesMode;

virtual MqttData processSignalContext(SignalContext& signalContext);
void
processSignalDescriptorChanged(SignalContext& signalCtx, const DataDescriptorPtr& valueSigDesc, const DataDescriptorPtr& domainSigDesc);
MqttDataSample processDataPacket(SignalContext& signalContext, const DataPacketPtr& dataPacket, size_t offset);
MqttDataSamplePtr processDataPacket(SignalContext& signalContext, const DataPacketPtr& dataPacket, size_t offset);
std::string toString(const std::string valueFieldName, daq::DataPacketPtr packet, size_t offset);
std::string buildTopicName(const SignalContext& signalContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class AtomicSignalSampleArrayHandler : public AtomicSignalAtomicSampleHandler
std::unordered_map<std::string, SignalBuffer> signalBuffers;

MqttData processSignalContext(SignalContext& signalContext) override;
MqttDataSample processDataPackets(SignalContext& signalContext);
MqttDataSamplePtr processDataPackets(SignalContext& signalContext);
std::string toString(const std::string valueFieldName, SignalContext& signalContext);
std::pair<DataPacketPtr, size_t> getSample(SignalContext& signalContext);
void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ static constexpr const char* PROPERTY_NAME_SUB_JSON_CONFIG_FILE = "JSONConfigFil
static constexpr const char* PROPERTY_NAME_SUB_QOS = "QoS";
static constexpr const char* PROPERTY_NAME_SUB_TOPIC = "Topic";
static constexpr const char* PROPERTY_NAME_SUB_PREVIEW_SIGNAL = "EnablePreviewSignal";
static constexpr const char* PROPERTY_NAME_SUB_PREVIEW_SIGNAL_TS_MODE = "DomainMode";
static constexpr const char* PROPERTY_NAME_SUB_PREVIEW_SIGNAL_IS_STRING = "MessageIsString";

static constexpr const char* PROPERTY_NAME_DEC_VALUE_NAME = "ValueKey";
static constexpr const char* PROPERTY_NAME_DEC_TS_MODE = "DomainMode";
static constexpr const char* PROPERTY_NAME_DEC_TS_NAME = "DomainKey";
static constexpr const char* PROPERTY_NAME_DEC_UNIT = "Unit";

static constexpr const char* PROPERTY_NAME_PUB_MODE = "Mode";
static constexpr const char* PROPERTY_NAME_PUB_TOPIC_MODE = "TopicMode";
static constexpr const char* PROPERTY_NAME_PUB_TOPIC_NAME = "Topic";
static constexpr const char* PROPERTY_NAME_PUB_GROUP_VALUES = "GroupValues";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class GroupSignalSharedTsHandler : public HandlerBase
protected:
const size_t buffersSize;
const std::string topic;
SignalValueJSONKey signalNamesMode;
std::vector<void*> dataBuffers;
bool firstDescriptorChange;
daq::MultiReaderPtr reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
class HandlerBase
{
public:
HandlerBase(WeakRefPtr<IFunctionBlock> parentFb, SignalValueJSONKey signalNamesMode)
: parentFb(parentFb),
signalNamesMode(signalNamesMode)
HandlerBase(WeakRefPtr<IFunctionBlock> parentFb)
: parentFb(parentFb)
{
}
virtual ~HandlerBase() = default;
Expand Down Expand Up @@ -58,7 +57,6 @@ class HandlerBase

protected:
WeakRefPtr<IFunctionBlock> parentFb;
SignalValueJSONKey signalNamesMode;

static std::pair<uint64_t, uint64_t> calculateRatio(const DataDescriptorPtr descriptor)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <mqtt_streaming_module/atomic_signal_sample_arr_handler.h>
#include <mqtt_streaming_module/group_signal_shared_ts_handler.h>
#include <mqtt_streaming_module/signal_arr_atomic_sample_handler.h>
#include <mqtt_streaming_module/raw_handler.h>

BEGIN_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE

Expand All @@ -31,22 +32,31 @@ class HandlerFactory
static std::unique_ptr<HandlerBase>
create(WeakRefPtr<IFunctionBlock> parentFb, const PublisherFbConfig config, const std::string& publisherFbGlobalId)
{
if (config.topicMode == TopicMode::Single)
if (config.mode == PublisherMode::Raw)
{
const auto topic = config.topicName.empty() ? publisherFbGlobalId : config.topicName;
if (config.groupValues)
return std::make_unique<GroupSignalSharedTsArrHandler>(parentFb, config.valueFieldName, topic, config.groupValuesPackSize);
else
return std::make_unique<GroupSignalSharedTsHandler>(parentFb, config.valueFieldName, topic);
return std::make_unique<RawHandler>(parentFb);
}
else if (config.topicMode == TopicMode::PerSignal)
else
{
if (config.groupValues)
return std::make_unique<AtomicSignalSampleArrayHandler>(parentFb, config.valueFieldName, config.groupValuesPackSize);
else
return std::make_unique<AtomicSignalAtomicSampleHandler>(parentFb, config.valueFieldName);
if (config.topicMode == TopicMode::Single)
{
const auto topic = config.topicName.empty() ? publisherFbGlobalId : config.topicName;
if (config.groupValues)
return std::make_unique<GroupSignalSharedTsArrHandler>(parentFb,
config.valueFieldName,
topic,
config.groupValuesPackSize);
else
return std::make_unique<GroupSignalSharedTsHandler>(parentFb, config.valueFieldName, topic);
}
else if (config.topicMode == TopicMode::PerSignal)
{
if (config.groupValues)
return std::make_unique<AtomicSignalSampleArrayHandler>(parentFb, config.valueFieldName, config.groupValuesPackSize);
else
return std::make_unique<AtomicSignalAtomicSampleHandler>(parentFb, config.valueFieldName);
}
}

return std::make_unique<AtomicSignalAtomicSampleHandler>(parentFb, config.valueFieldName);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/

#pragma once
#include "MqttAsyncClient.h"
#include "MqttSettings.h"
#include "mqtt_streaming_protocol/MqttAsyncClient.h"
#include "mqtt_streaming_protocol/MqttSettings.h"
#include <future>
#include <mqtt_streaming_module/common.h>
#include <opendaq/function_block_impl.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

#pragma once
#include "MqttDataWrapper.h"
#include "mqtt_streaming_protocol/MqttDataWrapper.h"
#include <mqtt_streaming_module/common.h>
#include <opendaq/function_block_impl.h>

Expand All @@ -38,12 +38,13 @@ class MqttJsonDecoderFbImpl final : public FunctionBlock
const FunctionBlockTypePtr& type,
const PropertyObjectPtr& config = nullptr);

static FunctionBlockTypePtr CreateType();
void processMessage(const std::string& json);
DAQ_MQTT_STREAM_MODULE_API static FunctionBlockTypePtr CreateType();
DAQ_MQTT_STREAM_MODULE_API void processMessage(const std::string& json, const uint64_t externalTs);
protected:

struct FbConfig {
std::string valueFieldName;
mqtt::MqttDataWrapper::DomainSignalMode tsMode;
std::string tsFieldName;
std::string unitSymbol;
};
Expand All @@ -57,7 +58,9 @@ class MqttJsonDecoderFbImpl final : public FunctionBlock
std::atomic<bool> configValid;
std::string configMsg;
std::atomic<bool> parsingSucceeded;
std::atomic<bool> externalTsDuplicate;
std::string parsingMsg;
uint64_t lastExternalTs;

FbConfig config;

Expand All @@ -69,11 +72,10 @@ class MqttJsonDecoderFbImpl final : public FunctionBlock

void initProperties(const PropertyObjectPtr& config);
void readProperties();
template <typename retT, typename intfT>
retT readProperty(const std::string& propertyName, const retT defaultValue);
void propertyChanged();

void updateStatuses();
void checkExternalTs(const uint64_t externalTs);
};

END_NAMESPACE_OPENDAQ_MQTT_STREAMING_MODULE
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/

#pragma once
#include "MqttAsyncClient.h"
#include "MqttDataWrapper.h"
#include "mqtt_streaming_protocol/MqttAsyncClient.h"
#include "mqtt_streaming_protocol/MqttDataWrapper.h"
#include "mqtt_streaming_module/handler_base.h"
#include "mqtt_streaming_module/status_helper.h"
#include <mqtt_streaming_module/common.h>
Expand Down Expand Up @@ -112,8 +112,6 @@ class MqttPublisherFbImpl final : public FunctionBlock
void validateInputPorts();
void updateTopics();
void updateSchema();
template <typename retT, typename intfT>
retT readProperty(const std::string& propertyName, const retT defaultValue);
void runReaderThread();
void readerLoop();
void sendMessages(const MqttData& data);
Expand Down
Loading
Loading