MQTT-client in Java

1. Overzicht

In deze tutorial zullen we zien hoe we MQTT-berichten kunnen toevoegen aan een Java-project met behulp van de bibliotheken van het Eclipse Paho-project.

2. MQTT-primer

MQTT (MQ Telemetry Transport) is een berichtenprotocol dat is gemaakt om tegemoet te komen aan de behoefte aan een eenvoudige en lichtgewicht methode om gegevens over te dragen van / naar apparaten met een laag vermogen, zoals die worden gebruikt in industriële toepassingen.

Met de toegenomen populariteit van IoT-apparaten (Internet of Things), wordt MQTT steeds vaker gebruikt, wat heeft geleid tot standaardisatie door OASIS en ISO.

Het protocol ondersteunt een enkel berichtenpatroon, namelijk het patroon publiceren-abonneren: elk bericht dat door een klant wordt verzonden, bevat een bijbehorend "onderwerp" dat door de makelaar wordt gebruikt om het naar geabonneerde klanten te routeren. Onderwerpennamen kunnen eenvoudige tekenreeksen zijn, zoals 'olietemp'Of een padachtige tekenreeks'motor / 1 / tpm“.

Om berichten te ontvangen, abonneert een klant zich op een of meer onderwerpen met de exacte naam of een tekenreeks met een van de ondersteunde jokertekens ("#" voor onderwerpen met meerdere niveaus en "+" voor onderwerpen met één niveau ").

3. Projectconfiguratie

Om de Paho-bibliotheek in een Maven-project op te nemen, moeten we de volgende afhankelijkheid toevoegen:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

De nieuwste versie van de Eclipse Paho Java-bibliotheekmodule kan worden gedownload vanaf Maven Central.

4. Clientconfiguratie

Wanneer u de Paho-bibliotheek gebruikt, is het eerste dat we moeten doen om berichten van een MQTT-makelaar te verzenden en / of ontvangen, verkrijg een implementatie van de IMqttClient koppel. Deze interface bevat alle methoden die een applicatie nodig heeft om een ​​verbinding met de server tot stand te brengen, berichten te verzenden en te ontvangen.

Paho komt uit de doos met twee implementaties van deze interface, een asynchrone (MqttAsyncClient) en een synchrone (MqttClient).In ons geval zullen we ons concentreren op de synchrone versie, die een eenvoudigere semantiek heeft.

De installatie zelf is een proces in twee stappen: we maken eerst een instantie van het MqttClient class en dan verbinden we het met onze server. In de volgende subsectie worden deze stappen beschreven.

4.1. Een nieuw IMqttClient Voorbeeld

Het volgende codefragment laat zien hoe u een nieuw IMqttClient synchrone instantie:

String publisherId = UUID.randomUUID (). ToString (); IMqttClient publisher = nieuwe MqttClient ("tcp: //iot.eclipse.org: 1883", publisherId);

In dit geval, we gebruiken de eenvoudigste constructor die beschikbaar is, die het eindpuntadres van onze MQTT-makelaar en een klant-ID gebruikt, die onze klant uniek identificeert.

In ons geval hebben we een willekeurige UUID gebruikt, dus bij elke run wordt een nieuwe client-ID gegenereerd.

Paho biedt ook aanvullende constructors die we kunnen gebruiken om het persistentiemechanisme aan te passen dat wordt gebruikt om niet-bevestigde berichten en / of de ScheduledExecutorService gebruikt om achtergrondtaken uit te voeren die vereist zijn voor de implementatie van de protocolengine.

Het server-eindpunt dat we gebruiken, is een openbare MQTT-makelaar die wordt gehost door het Paho-project, waarmee iedereen met een internetverbinding clients kan testen zonder enige authenticatie.

4.2. Verbinding maken met de server

Onze nieuw gecreëerde MqttClient instantie is niet verbonden met de server. We doen dit door haar te bellen aansluiten() methode, optioneel een MqttConnectOptions instantie waarmee we enkele aspecten van het protocol kunnen aanpassen.

In het bijzonder kunnen we die opties gebruiken om aanvullende informatie door te geven, zoals beveiligingsreferenties, sessieherstelmodus, herverbindingsmodus enzovoort.

De MqttConnectionOptions class stellen die opties bloot als eenvoudige eigenschappen die we kunnen instellen met behulp van normale setter-methoden. We hoeven alleen de eigenschappen in te stellen die vereist zijn voor ons scenario - de overige zullen standaardwaarden aannemen.

De code die wordt gebruikt om een ​​verbinding met de server tot stand te brengen, ziet er doorgaans als volgt uit:

MqttConnectOptions-opties = nieuwe MqttConnectOptions (); options.setAutomaticReconnect (true); options.setCleanSession (true); options.setConnectionTimeout (10); publisher.connect (opties);

Hier definiëren we onze verbindingsopties zodat:

  • De bibliotheek zal automatisch proberen om opnieuw verbinding te maken met de server in het geval van een netwerkstoring
  • Het zal niet-verzonden berichten van een vorige run verwijderen
  • De time-out voor de verbinding is ingesteld op 10 seconden

5. Berichten verzenden

Berichten verzenden met een reeds verbonden MqttClient is heel eenvoudig. We gebruiken een van de publiceren() methodevarianten om de payload, die altijd een byte-array is, naar een bepaald onderwerp te sturen, met behulp van een van de volgende Quality-of-Service-opties:

  • 0 - "ten hoogste één keer" semantiek, ook bekend als "vuur-en-vergeten". Gebruik deze optie wanneer het verlies van een bericht acceptabel is, aangezien het geen enkele vorm van bevestiging of persistentie vereist
  • 1 - "ten minste één keer" semantiek. Gebruik deze optie als het verlies van berichten niet acceptabel is en uw abonnees kunnen duplicaten aan
  • 2 - "precies één keer" semantiek. Gebruik deze optie als het verlies van berichten niet acceptabel is en uw abonnees kunnen geen duplicaten aan

In ons voorbeeldproject is het EngineTemperatureSensor klasse speelt de rol van een nepsensor die elke keer dat we deze aanroepen een nieuwe temperatuurmeting produceert bellen () methode.

Deze klasse implementeert het Oproepbaar interface, zodat we het gemakkelijk kunnen gebruiken met een van de ExecutorService implementaties beschikbaar in het java.util.concurrent pakket:

openbare klasse EngineTemperatureSensor implementeert Callable {// ... privé-leden hebben openbare EngineTemperatureSensor weggelaten (IMqttClient-client) {this.client = client; } @Override public Void call () genereert uitzondering {if (! Client.isConnected ()) {return null; } MqttMessage msg = readEngineTemp (); msg.setQos (0); msg.setRetained (true); client.publish (TOPIC, msg); null teruggeven; } privé MqttMessage readEngineTemp () {dubbele temp = 80 + rnd.nextDouble () * 20.0; byte [] payload = String.format ("T:% 04.2f", temp) .getBytes (); retourneer nieuwe MqttMessage (payload); }}

De MqttMessage omvat de payload zelf, de gevraagde Quality-of-Service en ook de behouden vlag voor het bericht. Deze vlag geeft aan de makelaar aan dat het dit bericht moet bewaren totdat het door een abonnee wordt geconsumeerd.

We kunnen deze functie gebruiken om een ​​"laatst bekend goed" -gedrag te implementeren, dus wanneer een nieuwe abonnee verbinding maakt met de server, ontvangt deze het bewaarde bericht meteen.

6. Berichten ontvangen

Om berichten van de MQTT-makelaar te ontvangen, we moeten een van de abonneren () methode varianten, waarmee we kunnen specificeren:

  • Een of meer onderwerpfilters voor berichten die we willen ontvangen
  • De bijbehorende QoS
  • De callback-handler om ontvangen berichten te verwerken

In het volgende voorbeeld laten we zien hoe u een berichtenlistener kunt toevoegen aan een bestaand IMqttClient instantie om berichten van een bepaald onderwerp te ontvangen. We gebruiken een CountDownLatch als een synchronisatiemechanisme tussen onze callback en de hoofduitvoeringsthread, waarbij het elke keer dat er een nieuw bericht binnenkomt, wordt verlaagd.

In de voorbeeldcode hebben we een andere IMqttClient instantie om berichten te ontvangen. We hebben het alleen gedaan om duidelijker te maken welke client wat doet, maar dit is geen beperking van Paho - als je wilt, kun je dezelfde client gebruiken voor het publiceren en ontvangen van berichten:

CountDownLatch ontvangenSignal = nieuwe CountDownLatch (10); subscriber.subscribe (EngineTemperatureSensor.TOPIC, (topic, msg) -> {byte [] payload = msg.getPayload (); // ... payload-afhandeling weggelaten ontvangstsignal.countDown ();}); ontvangenSignal.await (1, TimeUnit.MINUTES);

De abonneren () variant die hierboven wordt gebruikt, heeft een IMqttMessageListener instantie als zijn tweede argument.

In ons geval gebruiken we een eenvoudige lambda-functie die de payload verwerkt en een teller verlaagt. Als er niet genoeg berichten binnen het opgegeven tijdvenster (1 minuut) binnenkomen, wordt het wachten() methode genereert een uitzondering.

Bij het gebruik van Paho hoeven we de ontvangst van het bericht niet expliciet te bevestigen. Als het terugbellen normaal terugkeert, gaat Paho ervan uit dat het een succesvolle consumptie is en stuurt een bevestiging naar de server.

Als de callback een Uitzondering, wordt de client uitgeschakeld. Houd er rekening mee dat dit zal resulteren in het verlies van berichten die zijn verzonden met QoS-niveau van 0.

Berichten die met QoS-niveau 1 of 2 worden verzonden, worden opnieuw door de server verzonden zodra de client opnieuw is verbonden en zich weer op het onderwerp abonneert.

7. Conclusie

In dit artikel hebben we laten zien hoe we ondersteuning voor het MQTT-protocol kunnen toevoegen aan onze Java-toepassingen met behulp van de bibliotheek van het Eclipse Paho-project.

Deze bibliotheek verwerkt alle protocoldetails op laag niveau, waardoor we ons kunnen concentreren op andere aspecten van onze oplossing, terwijl er voldoende ruimte overblijft om belangrijke aspecten van de interne functies aan te passen, zoals persistentie van berichten.

De code die in dit artikel wordt getoond, is beschikbaar op GitHub.