Betrouwbare berichtenuitwisseling met JGroups

1. Overzicht

JGroups is een Java API voor betrouwbare berichtenuitwisseling. Het heeft een eenvoudige interface die het volgende biedt:

  • een flexibele protocolstack, inclusief TCP en UDP
  • fragmentatie en opnieuw samenvoegen van grote berichten
  • betrouwbare unicast en multicast
  • foutdetectie
  • stroomregeling

Evenals vele andere functies.

In deze tutorial maken we een eenvoudige applicatie om uit te wisselen Draad berichten tussen applicaties en het verstrekken van een gedeelde status aan nieuwe applicaties wanneer ze zich bij het netwerk aansluiten.

2. Installatie

2.1. Afhankelijkheid van Maven

We moeten een enkele afhankelijkheid toevoegen aan onze pom.xml:

 org.jgroups jgroups 4.0.10.Final 

De nieuwste versie van de bibliotheek kan worden gecontroleerd op Maven Central.

2.2. Netwerken

JGroups zal standaard proberen IPV6 te gebruiken. Afhankelijk van onze systeemconfiguratie kan dit ertoe leiden dat applicaties niet kunnen communiceren.

Om dit te voorkomen, stellen we de java.net.preferIPv4Stack naar waar property bij het uitvoeren van onze applicaties hier:

java -Djava.net.preferIPv4Stack = waar com.baeldung.jgroups.JGroupsMessenger 

3. JChannels

Onze verbinding met een JGroups-netwerk is een JChannel. Het kanaal sluit zich aan bij een cluster en verzendt en ontvangt berichten, evenals informatie over de status van het netwerk.

3.1. Een kanaal maken

We creëren een JChannel met een pad naar een configuratiebestand. Als we de bestandsnaam weglaten, zoekt het naar udp.xml in de huidige werkdirectory.

We maken een kanaal met een expliciet benoemd configuratiebestand:

JChannel-kanaal = nieuw JChannel ("src / main / resources / udp.xml"); 

De configuratie van JGroups kan erg ingewikkeld zijn, maar de standaard UDP- en TCP-configuraties zijn voldoende voor de meeste toepassingen. We hebben het bestand voor UDP in onze code opgenomen en zullen het voor deze tutorial gebruiken.

Zie de JGroups-handleiding hier voor meer informatie over het configureren van het transport.

3.2. Een kanaal verbinden

Nadat we ons kanaal hebben gemaakt, moeten we lid worden van een cluster. Een cluster is een groep knooppunten die berichten uitwisselen.

Om lid te worden van een cluster is een clusternaam vereist:

channel.connect ("Baeldung"); 

Het eerste knooppunt dat probeert lid te worden van een cluster, maakt het als het niet bestaat. We zullen dit proces hieronder in actie zien.

3.3. Een kanaal een naam geven

Knooppunten worden geïdentificeerd met een naam, zodat peers gerichte berichten kunnen verzenden en meldingen kunnen ontvangen over wie het cluster binnenkomt en verlaat. JGroups wijst automatisch een naam toe, of we kunnen onze eigen naam instellen:

channel.name ("gebruiker1");

We gebruiken deze namen hieronder om bij te houden wanneer knooppunten het cluster binnenkomen en verlaten.

3.4. Een kanaal sluiten

Kanaalopruiming is essentieel als we willen dat peers tijdig een melding ontvangen dat we zijn vertrokken.

We sluiten een JChannel met zijn close-methode:

kanaal.close ()

4. Wijzigingen in clusterweergave

Met een aangemaakt JChannel zijn we nu klaar om de status van peers in het cluster te zien en berichten met hen uit te wisselen.

JGroups handhaaft de clusterstatus binnen het Visie klasse. Elk kanaal heeft een Visie van het netwerk. Wanneer de weergave verandert, wordt deze afgeleverd via het viewAccepted () Bel terug.

Voor deze tutorial breiden we het OntvangerAdaptor API-klasse die alle interfacemethoden implementeert die nodig zijn voor een toepassing.

Het is de aanbevolen manier om callbacks te implementeren.

Laten we toevoegen viewAccepted op onze applicatie:

public void viewAccepted (View newView) {private View lastView; if (lastView == null) {System.out.println ("Ontvangen eerste weergave:"); newView.forEach (System.out :: println); } else {System.out.println ("Nieuwe weergave ontvangen."); Lijst newMembers = View.newMembers (lastView, newView); System.out.println ("Nieuwe leden:"); newMembers.forEach (System.out :: println); Lijst exMembers = View.leftMembers (lastView, newView); System.out.println ("Verlaten leden:"); exMembers.forEach (System.out :: println); } lastView = newView; } 

Elk Visie bevat een Lijst van Adres objecten, die elk lid van het cluster vertegenwoordigen. JGroups biedt handige methoden om de ene weergave met de andere te vergelijken, die we gebruiken om nieuwe of verlaten leden van het cluster te detecteren.

5. Berichten verzenden

Het afhandelen van berichten in JGroups is eenvoudig. EEN Bericht bevat een byte array en Adres objecten die overeenkomen met de afzender en de ontvanger.

Voor deze tutorial gebruiken we Snaren gelezen vanaf de opdrachtregel, maar het is gemakkelijk te zien hoe een toepassing andere gegevenstypen zou kunnen uitwisselen.

5.1. Uitgezonden berichten

EEN Bericht wordt gemaakt met een bestemming en een byte-array; JChannel stelt de afzender voor ons in. Als het doelwit is nul, het hele cluster zal het bericht ontvangen.

We accepteren tekst van de opdrachtregel en sturen deze naar het cluster:

System.out.print ("Voer een bericht in:"); String line = in.readLine (). ToLowerCase (); Bericht bericht = nieuw bericht (null, line.getBytes ()); channel.send (bericht); 

Als we meerdere instanties van ons programma uitvoeren en dit bericht verzenden (nadat we het te ontvangen() methode hieronder), zouden ze het allemaal ontvangen, inclusief de afzender.

5.2. Onze berichten blokkeren

Als we onze berichten niet willen zien, kunnen we daarvoor een eigenschap instellen:

channel.setDiscardOwnMessages (true); 

Wanneer we de vorige test uitvoeren, ontvangt de afzender van het bericht zijn uitgezonden bericht niet.

5.3. Directe berichten

Voor het verzenden van een direct bericht is een geldig Adres. Als we met de naam naar knooppunten verwijzen, hebben we een manier nodig om een Adres. Gelukkig hebben we de Visie daarom.

De huidige Visie is altijd beschikbaar via de JChannel:

private Optioneel getAddress (String naam) {View view = channel.view (); terug view.getMembers (). stream () .filter (adres -> naam.equals (adres.toString ())) .findAny (); } 

Adres namen zijn beschikbaar via de klas toString () methode, dus we zoeken alleen de Lijst van clusterleden voor de naam die we willen.

We kunnen dus een naam accepteren van de console, de bijbehorende bestemming zoeken en een direct bericht sturen:

Adres bestemming = null; System.out.print ("Voer een bestemming in:"); String bestemmingsnaam = in.readLine (). ToLowerCase (); bestemming = getAddress (bestemmingsnaam) .orElseThrow (() -> nieuwe uitzondering ("Bestemming niet gevonden"); Berichtbericht = nieuw bericht (bestemming, "Hallo!"); channel.send (bericht); 

6. Berichten ontvangen

We kunnen berichten verzenden, laten we nu proberen om ze nu te ontvangen.

Laten we negeren ReceiverAdaptor's lege ontvangstmethode:

openbare ongeldige ontvangst (Berichtbericht) {String line = Bericht ontvangen van: "+ message.getSrc () +" aan: "+ message.getDest () +" -> "+ message.getObject (); System.out.println (lijn);} 

Omdat we weten dat het bericht een Draad, we kunnen veilig passeren getObject () naar System.out.

7. Staatsbeurs

Wanneer een knooppunt het netwerk binnenkomt, moet het mogelijk statusinformatie over het cluster ophalen. JGroups biedt hiervoor een mechanisme voor staatsoverdracht.

Wanneer een knooppunt zich bij het cluster voegt, roept het gewoon aan getState (). Het cluster haalt de staat meestal op bij het oudste lid van de groep: de coördinator.

Laten we een aantal uitzendberichten aan onze applicatie toevoegen. We zullen een nieuwe lidvariabele toevoegen en deze binnenin verhogen te ontvangen():

privé geheel getal messageCount = 0; openbare ongeldige ontvangst (Berichtbericht) {String line = "Bericht ontvangen van:" + message.getSrc () + "aan:" + message.getDest () + "->" + message.getObject (); System.out.println (lijn); if (message.getDest () == null) {messageCount ++; System.out.println ("Aantal berichten:" + messageCount); }} 

We controleren op een nul bestemming, want als we directe berichten tellen, heeft elk knooppunt een ander nummer.

Vervolgens overschrijven we nog twee methoden in OntvangerAdaptor:

openbare ongeldige setState (InputStream-invoer) {probeer {messageCount = Util.objectFromStream (nieuwe DataInputStream (input)); } catch (uitzondering e) {System.out.println ("Fout bij het deserialiseren!"); } System.out.println (messageCount + "is het huidige aantal berichten."); } public void getState (OutputStream output) gooit uitzondering {Util.objectToStream (messageCount, nieuwe DataOutputStream (output)); } 

Net als bij berichten, draagt ​​JGroups de status over als een array van bytes.

JGroups levert een InputStream naar de coördinator om de staat naar te schrijven, en een OutputStream zodat het nieuwe knooppunt kan worden gelezen. De API biedt gemaksklassen voor het serialiseren en deserialiseren van de gegevens.

Merk op dat bij productiecode toegang tot statusinformatie thread-safe moet zijn.

Ten slotte voegen we de oproep toe aan getState () naar onze startup, nadat we verbinding hebben gemaakt met het cluster:

channel.connect (clusterName); channel.getState (null, 0); 

getState () accepteert een bestemming waarvan de staat moet worden opgevraagd en een time-out in milliseconden. EEN nul bestemming geeft de coördinator aan en 0 betekent geen time-out.

Wanneer we deze app uitvoeren met een paar knooppunten en uitzendberichten uitwisselen, zien we het aantal berichten toenemen.

Als we vervolgens een derde client toevoegen of een van hen stoppen en starten, zullen we zien dat het nieuw verbonden knooppunt het juiste aantal berichten afdrukt.

8. Conclusie

In deze tutorial hebben we JGroups gebruikt om een ​​applicatie te maken voor het uitwisselen van berichten. We hebben de API gebruikt om te controleren welke knooppunten zijn verbonden met en het cluster hebben verlaten en ook om de clusterstatus over te dragen naar een nieuw knooppunt wanneer het lid werd.

Codevoorbeelden zijn, zoals altijd, te vinden op GitHub.


$config[zx-auto] not found$config[zx-overlay] not found