Inleiding tot MBassador

1. Overzicht

Simpel gezegd, MBassador is een high-performance eventbus die gebruik maakt van de publish-subscribe semantiek.

Berichten worden naar een of meer peers verzonden zonder dat u vooraf weet hoeveel abonnees er zijn of hoe ze het bericht gebruiken.

2. Maven Afhankelijkheid

Voordat we de bibliotheek kunnen gebruiken, moeten we de mbassador-afhankelijkheid toevoegen:

 net.engio mbassador 1.3.1 

3. Basisafhandeling van gebeurtenissen

3.1. Eenvoudig voorbeeld

We beginnen met een eenvoudig voorbeeld van het publiceren van een bericht:

privé MBassador-dispatcher = nieuwe MBassador (); privé String messageString; @Before public void preparTests () {dispatcher.subscribe (dit); } @Test openbare leegte whenStringDispatched_thenHandleString () {dispatcher.post ("TestString"). Now (); assertNotNull (messageString); assertEquals ("TestString", messageString); } @Handler public void handleString (String bericht) {messageString = bericht; } 

Bovenaan deze testklasse zien we de creatie van een MBassador met zijn standaard constructor. Vervolgens in de @Voordat methode, noemen we abonneren () en een verwijzing naar de klas zelf doorgeven.

In abonneren (), de dispatcher inspecteert de abonnee voor @Handler annotaties.

En bij de eerste test bellen we dispatcher.post (...) .now () om het bericht te verzenden - wat resulteert in handleString () gebeld worden.

Deze eerste test demonstreert verschillende belangrijke concepten. Ieder Voorwerp kan een abonnee zijn, zolang het een of meer methoden heeft die zijn geannoteerd met @Handler. Een abonnee kan een willekeurig aantal afhandelaars hebben.

We gebruiken testobjecten die voor de eenvoud op zichzelf zijn geabonneerd, maar in de meeste productiescenario's zullen berichtverdelers in verschillende klassen van consumenten komen.

Handlermethoden hebben slechts één invoerparameter - het bericht, en kunnen geen aangevinkte uitzonderingen genereren.

Net als bij de abonneren () methode, accepteert de post-methode elke Voorwerp. Dit Voorwerp wordt geleverd aan abonnees.

Wanneer een bericht wordt gepost, wordt het afgeleverd bij alle luisteraars die zich op het berichttype hebben geabonneerd.

Laten we nog een berichtafhandelaar toevoegen en een ander berichttype verzenden:

privé Geheel getal messageInteger; @Test openbare leegte whenIntegerDispatched_thenHandleInteger () {dispatcher.post (42) .now (); assertNull (messageString); assertNotNull (messageInteger); assertTrue (42 == messageInteger); } @Handler public void handleInteger (Geheel getal bericht) {messageInteger = bericht; } 

Zoals verwacht, wanneer we verzendeneen Geheel getal, handleInteger () heet, en handleString () is niet. Een enkele dispatcher kan worden gebruikt om meer dan één berichttype te verzenden.

3.2. Dode berichten

Dus waar gaat een bericht naartoe als er geen handler voor is? Laten we een nieuwe gebeurtenishandler toevoegen en vervolgens een derde berichttype verzenden:

privé-object deadEvent; @Test openbare leegte whenLongDispatched_thenDeadEvent () {dispatcher.post (42L) .now (); assertNull (messageString); assertNull (messageInteger); assertNotNull (deadEvent); assertTrue (deadEvent instantie van Long); assertTrue (42L == (Long) deadEvent); } @Handler openbare ongeldige handleDeadEvent (DeadMessage-bericht) {deadEvent = message.getMessage (); } 

In deze test verzenden we een Lang in plaats van een Geheel getal. Geen van beide handleInteger () noch handleString () worden genoemd, maar handleDeadEvent () is.

Als er geen afhandelaars zijn voor een bericht, wordt het verpakt in een DeadMessage voorwerp. Omdat we een handler hebben toegevoegd voor Dood bericht, we vangen het op.

DeadMessage kan veilig worden genegeerd; als een applicatie dode berichten niet hoeft op te sporen, mogen ze nergens heen.

4. Een gebeurtenishiërarchie gebruiken

Bezig met verzenden Draad en Geheel getal gebeurtenissen zijn beperkend. Laten we een paar berichtklassen maken:

openbare klasse Bericht {} openbare klasse AckMessage breidt bericht {} openbare klasse RejectMessage breidt bericht {int code uit; // setters en getters}

We hebben een eenvoudige basisklasse en twee klassen die deze uitbreiden.

4.1. Een basisklasse verzenden Bericht

We beginnen met Bericht evenementen:

private MBassador-dispatcher = nieuwe MBassador (); privébericht bericht; privé AckMessage ackMessage; private RejectMessage rejectMessage; @Before public void preparTests () {dispatcher.subscribe (dit); } @Test openbare leegte whenMessageDispatched_thenMessageHandled () {dispatcher.post (nieuw bericht ()). Now (); assertNotNull (bericht); assertNull (ackMessage); assertNull (rejectMessage); } @Handler public void handleMessage (Berichtbericht) {this.message = bericht; } @Handler openbare ongeldige handleRejectMessage (RejectMessage-bericht) {rejectMessage = bericht; } @Handler openbare ongeldige handleAckMessage (AckMessage-bericht) {ackMessage = bericht; }

Ontdek MBassador - een high-performance pub-sub-evenementenbus. Dit beperkt ons tot het gebruik van Berichten maar voegt een extra laag typeveiligheid toe.

Wanneer we een Bericht, handleMessage () ontvangt het. De andere twee afhandelaars doen dat niet.

4.2. Een subklasse-bericht verzenden

Laten we een WeigerenMessage:

@Test openbare leegte whenRejectDispatched_thenMessageAndRejectHandled () {dispatcher.post (nieuwe RejectMessage ()). Now (); assertNotNull (bericht); assertNotNull (rejectMessage); assertNull (ackMessage); }

Wanneer we een WeigerenMessage beide handleRejectMessage () en handleMessage () ontvang het.

Sinds WeigerenMessage strekt zich uit Bericht, de Bericht handler ontving het, naast de RejectMessage handler.

Laten we dit gedrag verifiëren met een AckMessage:

@Test openbare leegte whenAckDispatched_thenMessageAndAckHandled () {dispatcher.post (nieuwe AckMessage ()). Now (); assertNotNull (bericht); assertNotNull (ackMessage); assertNull (rejectMessage); }

Precies zoals we hadden verwacht, toen we een AckMessage, beide handleAckMessage () en handleMessage () ontvang het.

5. Berichten filteren

Berichten ordenen op type is al een krachtige functie, maar we kunnen ze nog meer filteren.

5.1. Filter op Klasse en Subklasse

Toen we een WeigerenMessage of AckMessage, hebben we de gebeurtenis ontvangen in zowel de gebeurtenishandler voor het specifieke type als in de basisklasse.

We kunnen dit type hiërarchieprobleem oplossen door Bericht abstract en het creëren van een klasse zoals GenericMessage. Maar wat als we deze luxe niet hebben?

We kunnen berichtfilters gebruiken:

privébericht baseMessage; privébericht subMessage; @Test openbare leegte whenMessageDispatched_thenMessageFiltered () {dispatcher.post (nieuw bericht ()). Now (); assertNotNull (baseMessage); assertNull (subMessage); } @Test openbare leegte whenRejectDispatched_thenRejectFiltered () {dispatcher.post (nieuwe RejectMessage ()). Now (); assertNotNull (subMessage); assertNull (baseMessage); } @Handler (filters = {@Filter (Filters.RejectSubtypes.class)}) public void handleBaseMessage (berichtbericht) {this.baseMessage = bericht; } @Handler (filters = {@Filter (Filters.SubtypesOnly.class)}) openbare ongeldige handleSubMessage (berichtbericht) {this.subMessage = bericht; }

De filters parameter voor de @Handler annotatie accepteert een Klasse dat implementeert IMessageFilter. De bibliotheek biedt twee voorbeelden:

De Filters.RejectSubtypes doet wat de naam doet vermoeden: het filtert alle subtypen uit. In dit geval zien we dat WeigerenMessage wordt niet afgehandeld door handleBaseMessage ().

De Filters.SubtypesOnly doet ook wat de naam doet vermoeden: het filtert alle basistypen uit. In dit geval zien we dat Bericht wordt niet afgehandeld door handleSubMessage ().

5.2. IMessageFilter

De Filters.RejectSubtypes en de Filters.SubtypesOnly beide implementeren IMessageFilter.

RejectSubTypes vergelijkt de klasse van het bericht met de gedefinieerde berichttypen en staat alleen berichten toe die gelijk zijn aan een van de typen, in tegenstelling tot alle subklassen.

5.3. Filter met voorwaarden

Gelukkig is er een eenvoudigere manier om berichten te filteren. MBassador ondersteunt een subset van Java EL-expressies als voorwaarden voor het filteren van berichten.

Laten we een Draad bericht op basis van de lengte:

privé String testString; @Test openbare leegte whenLongStringDispatched_thenStringFiltered () {dispatcher.post ("foobar!"). Now (); assertNull (testString); } @Handler (voorwaarde = "msg.length () <7") public void handleStringMessage (String bericht) {this.testString = bericht; }

De "foobar!" bericht is zeven tekens lang en wordt gefilterd. Laten we een kortere sturen Draad:

 @Test openbare leegte whenShortStringDispatched_thenStringHandled () {dispatcher.post ("foobar"). Now (); assertNotNull (testString); }

Nu is de "foobar" slechts zes tekens lang en wordt deze doorgegeven.

Onze WeigerenMessage bevat een veld met een accessor. Laten we daar een filter voor schrijven:

private RejectMessage rejectMessage; @Test openbare leegte whenWrongRejectDispatched_thenRejectFiltered () {RejectMessage testReject = nieuwe RejectMessage (); testReject.setCode (-1); dispatcher.post (testReject) .now (); assertNull (rejectMessage); assertNotNull (subMessage); assertEquals (-1, ((RejectMessage) subMessage) .getCode ()); } @Handler (voorwaarde = "msg.getCode ()! = -1") public void handleRejectMessage (RejectMessage rejectMessage) {this.rejectMessage = rejectMessage; }

Ook hier kunnen we een methode op een object opvragen en het bericht filteren of niet.

5.4. Leg gefilterde berichten vast

Gelijkwaardig aan DeadEvents, misschien willen we gefilterde berichten vastleggen en verwerken. Er is ook een speciaal mechanisme om gefilterde gebeurtenissen vast te leggen. Gefilterde gebeurtenissen worden anders behandeld dan "dode" gebeurtenissen.

Laten we een test schrijven die dit illustreert:

privé String testString; privé FilteredMessage gefilterdMessage; privé DeadMessage deadMessage; @Test openbare leegte whenLongStringDispatched_thenStringFiltered () {dispatcher.post ("foobar!"). Now (); assertNull (testString); assertNotNull (gefilterdMessage); assertTrue (filterMessage.getMessage () instanceof String); assertNull (deadMessage); } @Handler (voorwaarde = "msg.length () <7") public void handleStringMessage (String bericht) {this.testString = bericht; } @Handler public void handleFilterMessage (FilteredMessage bericht) {this.filteredMessage = bericht; } @Handler public void handleDeadMessage (DeadMessage deadMessage) {this.deadMessage = deadMessage; } 

Met de toevoeging van een FilteredMessage handler, kunnen we volgen Snaren die worden gefilterd vanwege hun lengte. De filterMessage bevat onze te lang Draad terwijl deadMessage stoffelijk overschot nul.

6. Asynchrone verzending en verwerking van berichten

Tot dusverre hebben al onze voorbeelden synchrone berichtverzending gebruikt; toen we belden post.now () de berichten werden afgeleverd bij elke handler in dezelfde thread die we hebben gebeld post() van.

6.1. Asynchrone verzending

De MBassador.post () geeft een SyncAsyncPostCommand terug. Deze klasse biedt verschillende methoden, waaronder:

  • nu() - berichten synchroon verzenden; de oproep wordt geblokkeerd totdat alle berichten zijn afgeleverd
  • asynchroon () - voert de berichtpublicatie asynchroon uit

Laten we asynchrone verzending gebruiken in een voorbeeldklasse. We gebruiken Awaitility in deze tests om de code te vereenvoudigen:

private MBassador-dispatcher = nieuwe MBassador (); privé String testString; private AtomicBoolean gereed = nieuwe AtomicBoolean (false); @Test openbare leegte whenAsyncDispatched_thenMessageReceived () {dispatcher.post ("foobar"). Asynchroon (); wachten (). tillAtomic (ready, equalTo (true)); assertNotNull (testString); } @Handler public void handleStringMessage (String bericht) {this.testString = bericht; ready.set (true); }

Wij bellen asynchroon () in deze test en gebruik een AtomicBoolean als vlag bij wachten() om te wachten tot de bezorgthread het bericht bezorgt.

Als we commentaar geven op de oproep naar wachten(), lopen we het risico dat de test mislukt, omdat we controleren testString voordat de afleveringsdraad is voltooid.

6.2. Aanroep van asynchrone handler

Asynchrone verzending stelt de berichtaanbieder in staat terug te keren naar de berichtverwerking voordat de berichten worden afgeleverd bij elke handler, maar het roept nog steeds elke handler op volgorde aan en elke handler moet wachten tot de vorige klaar is.

Dit kan tot problemen leiden als een handler een dure operatie uitvoert.

MBassador biedt een mechanisme voor het aanroepen van een asynchrone handler. Handlers die hiervoor zijn geconfigureerd, ontvangen berichten in hun thread:

privé Geheel getal testGeheel getal; private String invocationThreadName; private AtomicBoolean gereed = nieuwe AtomicBoolean (false); @Test openbare leegte whenHandlerAsync_thenHandled () {dispatcher.post (42) .now (); wachten (). tillAtomic (ready, equalTo (true)); assertNotNull (testInteger); assertFalse (Thread.currentThread (). getName (). is gelijk aan (invocationThreadName)); } @Handler (levering = Invoke.Asynchronously) public void handleIntegerMessage (bericht geheel getal) {this.invocationThreadName = Thread.currentThread (). GetName (); this.testInteger = bericht; ready.set (true); }

Handlers kunnen asynchrone aanroep aanvragen met de delivery = Invoke.Asynchronously eigendom op de Handler annotatie. We verifiëren dit in onze test door de Draad namen in de verzendmethode en de handler.

7. MBassador aanpassen

Tot dusver hebben we een instantie van MBassador met zijn standaardconfiguratie gebruikt. Het gedrag van de coördinator kan worden gewijzigd met annotaties, vergelijkbaar met degene die we tot nu toe hebben gezien; we zullen er nog een paar behandelen om deze tutorial af te maken.

7.1. Afhandeling van uitzonderingen

Handlers kunnen geen aangevinkte uitzonderingen definiëren. In plaats daarvan kan de dispatcher worden voorzien van een IPublicationErrorHandler als argument voor de constructor:

openbare klasse MBassadorConfigurationTest implementeert IPublicationErrorHandler {privé MBassador-dispatcher; privé String messageString; privé Throwable-fout Oorzaak; @Before public void preparTests () {dispatcher = nieuwe MBassador (dit); dispatcher.subscribe (dit); } @Test openbare leegte whenErrorOccurs_thenErrorHandler () {dispatcher.post ("Fout"). Now (); assertNull (messageString); assertNotNull (errorCause); } @Test openbare leegte whenNoErrorOccurs_thenStringHandler () {dispatcher.post ("Fout"). Now (); assertNull (errorCause); assertNotNull (messageString); } @Handler public void handleString (String bericht) {if ("Error" .equals (bericht)) {throw new Error ("BOOM"); } messageString = bericht; } @Override public void handleError (PublicationError-fout) {errorCause = error.getCause (). GetCause (); }}

Wanneer handleString () gooit een Fout, het wordt opgeslagen in fout Oorzaak.

7.2. Prioriteit van de handler

Handlers worden aangeroepen in omgekeerde volgorde van hoe ze zijn toegevoegd, maar dit is geen gedrag waarop we willen vertrouwen. Zelfs met de mogelijkheid om handlers in hun threads te bellen, moeten we misschien nog weten in welke volgorde ze zullen worden gebeld.

We kunnen de prioriteit van de handler expliciet instellen:

private LinkedList list = nieuwe LinkedList (); @Test openbare leegte whenRejectDispatched_thenPriorityHandled () {dispatcher.post (nieuwe RejectMessage ()). Nu (); // Items moeten pop () in omgekeerde volgorde van prioriteit assertTrue (1 == list.pop ()); assertTrue (3 == list.pop ()); assertTrue (5 == list.pop ()); } @Handler (prioriteit = 5) openbare ongeldige handleRejectMessage5 (RejectMessage rejectMessage) {list.push (5); } @Handler (prioriteit = 3) openbare ongeldige handleRejectMessage3 (RejectMessage rejectMessage) {list.push (3); } @Handler (prioriteit = 2, rejectSubtypes = true) public void handleMessage (Message rejectMessage) logger.error ("Reject handler # 3"); lijst.push (3); } @Handler (prioriteit = 0) openbare ongeldige handleRejectMessage0 (RejectMessage rejectMessage) {list.push (1); } 

Handlers worden aangeroepen van de hoogste prioriteit naar de laagste. Handlers met de standaardprioriteit, die nul is, worden als laatste aangeroepen. We zien dat de handlernummers knal() uit in omgekeerde volgorde.

7.3. Verwerp subtypen, de gemakkelijke manier

wat is er gebeurd met handleMessage () in de bovenstaande test? We hoeven niet te gebruiken RejectSubTypes.class om onze subtypes te filteren.

RejectSubTypes is een booleaanse vlag die dezelfde filtering biedt als de klasse, maar met betere prestaties dan de IMessageFilter implementatie.

We moeten echter nog steeds de filtergebaseerde implementatie gebruiken om alleen subtypen te accepteren.

8. Conclusie

MBassador is een eenvoudige en ongecompliceerde bibliotheek voor het doorgeven van berichten tussen objecten. Berichten kunnen op verschillende manieren worden georganiseerd en kunnen synchroon of asynchroon worden verzonden.

En, zoals altijd, is het voorbeeld beschikbaar in dit GitHub-project.