I spent quite some time trying to figure out how to connect to a Kapua back end with anything other than a Kura client and could not find a satisfactory answer. This despite the fantastic efforts of community members like Ctron. I was looking for a simple non-Kura option, something that went beyond Kura simulators and the like.
Given that Kapua offers what is a MQTT interface as one of its connectivity options, one would have expected Paho or any good MQTT client solution to work I was quite frustrated to find using a simple Node-Red mqtt node did not quite work initially. The following is learning and the code that finally got it all working:
Things that I learnt:
- You need to figure out where your Kapua Broker logs are being stored and watch them closely as you get your installation running. It is exhaustive and an excellent debugging option
- If you use username password authentication as I am initially, you need to get your user roles and permissions right in the Kapua console
- Once I had solved all the problems around getting the message to the broker, I saw this persistent but inexplicable message around unprocessable messages from Camel. Using mqtt-spy I eventually figured out that Kapua does not depend on Kura but has standardized on the Google ProtoBuf based data format for message encoding over MQTT
- You need to honor Kapua / Kura topic naming semantics - at the very least your topic names should be in the form:
/ {account-name}/{client-id}/{app-id}
Java source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Date; | |
import org.eclipse.kura.cloud.CloudPayloadProtoBufEncoder; | |
import org.eclipse.kura.core.cloud.CloudServiceImpl; | |
import org.eclipse.kura.message.KuraPayload; | |
import org.eclipse.paho.client.mqttv3.MqttClient; | |
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; | |
import org.eclipse.paho.client.mqttv3.MqttException; | |
import org.eclipse.paho.client.mqttv3.MqttMessage; | |
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; | |
public class MQTTClient { | |
public static void main(String[] args) throws Exception { | |
String topic = "test-account/D-0001/APPID/METRIC1"; | |
String content = "Message from MqttPublishSample"; | |
int qos = 2; | |
String broker = "tcp://192.168.1.119:1883"; | |
String clientId = "D-0001"; | |
MemoryPersistence persistence = new MemoryPersistence(); | |
try { | |
MqttClient sampleClient = new MqttClient(broker, clientId, persistence); | |
MqttConnectOptions connOpts = new MqttConnectOptions(); | |
connOpts.setUserName("test1"); | |
connOpts.setPassword("Password".toCharArray()); | |
connOpts.setCleanSession(true); | |
System.out.println("Connecting to broker: " + broker); | |
sampleClient.connect(connOpts); | |
System.out.println("Connected"); | |
System.out.println("Publishing message: " + content); | |
KuraPayload payload = new KuraPayload(); | |
payload.setTimestamp(new Date()); | |
payload.addMetric("test", Math.random()*1000); | |
CloudPayloadProtoBufEncoder encoder = new CloudServiceImpl(); | |
byte[] bytePayload = encoder.getBytes(payload, false); | |
System.out.println(bytePayload); | |
//MqttMessage message = new MqttMessage(content.getBytes()); | |
MqttMessage message = new MqttMessage(bytePayload); | |
message.setQos(qos); | |
sampleClient.publish(topic, message); | |
System.out.println("Message published"); | |
sampleClient.disconnect(); | |
System.out.println("Disconnected"); | |
System.exit(0); | |
} catch (MqttException me) { | |
System.out.println("reason " + me.getReasonCode()); | |
System.out.println("msg " + me.getMessage()); | |
System.out.println("loc " + me.getLocalizedMessage()); | |
System.out.println("cause " + me.getCause()); | |
System.out.println("excep " + me); | |
me.printStackTrace(); | |
} | |
} | |
} |
Contents of pom.xml
Things to do: Check out https://www.eclipse.org/kapua/docs/0.3.0/developer-guide/en/client.html
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>mqtt</groupId> | |
<artifactId>mqtt</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<build> | |
<sourceDirectory>src/main/java</sourceDirectory> | |
<plugins> | |
<plugin> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<version>3.5.1</version> | |
<configuration> | |
<source>1.8</source> | |
<target>1.8</target> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
<repositories> | |
<repository> | |
<id>my-repo1</id> | |
<name>Mvn Repo</name> | |
<url>https://repo.eclipse.org/content/repositories/kura-releases/</url> | |
</repository> | |
</repositories> | |
<dependencies> | |
<dependency> | |
<groupId>org.eclipse.paho</groupId> | |
<artifactId>org.eclipse.paho.client.mqttv3</artifactId> | |
<version>1.0.2</version> | |
</dependency> | |
<dependency> | |
<groupId>org.eclipse.kura</groupId> | |
<artifactId>org.eclipse.kura.api</artifactId> | |
<version>1.4.0</version> | |
</dependency> | |
<dependency> | |
<groupId>org.osgi</groupId> | |
<artifactId>osgi_R4_core</artifactId> | |
<version>1.0</version> | |
<scope>provided</scope> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>org.osgi</groupId> | |
<artifactId>osgi_R4_compendium</artifactId> | |
<version>1.0</version> | |
<scope>provided</scope> | |
<optional>true</optional> | |
</dependency> | |
<!-- https://mvnrepository.com/artifact/org.eclipse.kura/org.eclipse.kura.core.cloud --> | |
<dependency> | |
<groupId>org.eclipse.kura</groupId> | |
<artifactId>org.eclipse.kura.core.cloud</artifactId> | |
<version>1.1.200</version> | |
</dependency> | |
<!-- https://mvnrepository.com/artifact/org.eclipse.kura/org.eclipse.kura.core --> | |
<dependency> | |
<groupId>org.eclipse.kura</groupId> | |
<artifactId>org.eclipse.kura.core</artifactId> | |
<version>1.0.300</version> | |
</dependency> | |
<dependency> | |
<groupId>org.slf4j</groupId> | |
<artifactId>slf4j-api</artifactId> | |
<version>1.7.24</version> | |
</dependency> | |
<dependency> | |
<groupId>com.google.protobuf</groupId> | |
<artifactId>protobuf-java</artifactId> | |
<version>2.6.1</version> | |
<type>jar</type> | |
</dependency> | |
</dependencies> | |
</project> |
No comments:
Post a Comment