Java 9 reactieve streams

1. Overzicht

In dit artikel zullen we kijken naar de Java 9 Reactive Streams. Simpel gezegd, we kunnen de Stromen class, die de primaire bouwstenen bevat voor het bouwen van reactieve stroomverwerkingslogica.

Reactieve streams is een standaard voor asynchrone streamverwerking met niet-blokkerende tegendruk. Deze specificatie is gedefinieerd in het Reactief manifest, en er zijn verschillende implementaties van, bijvoorbeeld RxJava of Akka-Streams.

2. Reactive API-overzicht

Om een Stromenkunnen we drie belangrijke abstracties gebruiken en deze samenstellen tot asynchrone verwerkingslogica.

Elke Stromen moet gebeurtenissen verwerken die ernaar zijn gepubliceerd door een Publisher-exemplaar; de Uitgever heeft één methode - abonneren ().

Als een van de abonnees evenementen wil ontvangen die door deze worden gepubliceerd, moeten ze zich abonneren op het gegeven Uitgever.

De ontvanger van berichten moet het Abonnee koppel. Meestal is dit het einde van elk Stromen verwerking omdat de instantie ervan geen berichten verder verzendt.

We kunnen erover nadenken Abonnee als een Wastafel. Dit heeft vier methoden die moeten worden overschreven - onSubscribe (), onNext (), onError (), en onComplete (). We zullen die in de volgende sectie bekijken.

Als we inkomend bericht willen transformeren en verder willen doorgeven aan het volgende Abonnee, we moeten het Verwerker koppel. Dit fungeert zowel als een Abonnee omdat het berichten ontvangt, en als de Uitgever omdat het die berichten verwerkt en verzendt voor verdere verwerking.

3. Berichten publiceren en consumeren

Laten we zeggen dat we een simple willen maken Stromen, waarin we een Uitgever berichten publiceren, en een eenvoudig Abonnee consumerende berichten zodra ze binnenkomen - een voor een.

Laten we een EndSubscriber klasse. We moeten het Abonnee koppel. Vervolgens overschrijven we de vereiste methoden.

De opAbonneren () methode wordt aangeroepen voordat de verwerking begint. Het exemplaar van de Abonnement wordt doorgegeven als argument. Het is een klasse die wordt gebruikt om de berichtenstroom tussen Abonnee en de Uitgever:

openbare klasse EndSubscriber implementeert Subscriber {private Subscription-abonnement; openbare lijst consumedElements = nieuwe LinkedList (); @Override public void onSubscribe (Abonnementsabonnement) {this.subscription = abonnement; abonnement.verzoek (1); }}

We hebben ook een leeg Lijst van consumedElements die zullen worden gebruikt in de tests.

Nu moeten we de resterende methoden van de Abonnee koppel. De belangrijkste methode hier is onNext () - dit wordt aangeroepen wanneer de Uitgever publiceert een nieuw bericht:

@Override public void onNext (T item) {System.out.println ("Got:" + item); abonnement.verzoek (1); }

Merk op dat toen we het abonnement begonnen in de opAbonneren () methode en wanneer we een bericht hebben verwerkt, moeten we de verzoek() methode op de Abonnement om aan te geven dat de huidige Abonnee is klaar om meer berichten te consumeren.

Ten slotte moeten we implementeren onError () - die wordt aangeroepen wanneer een uitzondering in de verwerking wordt gegooid, evenals onComplete () - gebeld toen de Uitgever is gesloten:

@Override public void onError (Throwable t) {t.printStackTrace (); } @Override public void onComplete () {System.out.println ("Klaar"); }

Laten we een test schrijven voor de verwerking Stromen. We gebruiken de Indiening Uitgever class - een constructie uit de java.util.concurrent - die het Uitgever koppel.

We gaan ons indienen N elementen aan de Uitgever - welke onze EndSubscriber ontvangt:

@Test openbare leegte whenSubscribeToIt_thenShouldConsumeAll () gooit InterruptedException {// gegeven SubmissionPublisher publisher = nieuwe SubmissionPublisher (); EndSubscriber-abonnee = nieuwe EndSubscriber (); publisher.subscribe (abonnee); Lijstitems = List.of ("1", "x", "2", "x", "3", "x"); // when assertThat (publisher.getNumberOfSubscribers ()). isEqualTo (1); items.forEach (uitgever :: indienen); publisher.close (); // dan wachten (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (items)); }

Merk op dat we de dichtbij() methode op het exemplaar van de EndSubscriber. Het zal aanroepen onComplete () callback hieronder op elke Abonnee van het gegeven Uitgever.

Als u dat programma uitvoert, wordt de volgende uitvoer geproduceerd:

Kreeg: 1 Kreeg: x Kreeg: 2 Kreeg: x Kreeg: 3 Kreeg: x Gereed

4. Transformatie van berichten

Laten we zeggen dat we een vergelijkbare logica willen bouwen tussen a Uitgever en een Abonnee, maar pas ook wat transformatie toe.

We maken het TransformProcessor klasse die implementeert Verwerker en strekt zich uit Indiening Uitgever - aangezien dit beide zullen zijn P.ublisher en Sabonnee.

We passeren in een Functie die inputs zullen omzetten in outputs:

openbare klasse TransformProcessor breidt uit SubmissionPublisher implementeert Flow.Processor {private Function-functie; privé Flow.Subscription-abonnement; openbare TransformProcessor (functie functie) {super (); this.function = functie; } @Override public void onSubscribe (Flow.Subscription-abonnement) {this.subscription = abonnement; abonnement.verzoek (1); } @Override public void onNext (T item) {submit (function.apply (item)); abonnement.verzoek (1); } @Override public void onError (Throwable t) {t.printStackTrace (); } @Override public void onComplete () {close (); }}

Laten we nu schrijf een snelle test met een verwerkingsstroom waarin de Uitgever publiceert Draad elementen.

Onze TransformProcessor zal het Draad net zo Geheel getal - wat betekent dat hier een conversie moet plaatsvinden:

@Test openbare leegte whenSubscribeAndTransformElements_thenShouldConsumeAll () gooit InterruptedException {// gegeven SubmissionPublisher publisher = nieuwe SubmissionPublisher (); TransformProcessor transformProcessor = nieuwe TransformProcessor (Integer :: parseInt); EndSubscriber-abonnee = nieuwe EndSubscriber (); Lijstitems = List.of ("1", "2", "3"); Lijst verwachteResultaat = List.of (1, 2, 3); // wanneer publisher.subscribe (transformProcessor); transformProcessor.subscribe (abonnee); items.forEach (uitgever :: indienen); publisher.close (); // dan wachten (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (verwachtResultaat)); }

Merk op dat het aanroepen van de dichtbij() methode op de basis Uitgever veroorzaakt de onComplete () methode op de TransformProcessor worden ingeroepen.

Houd er rekening mee dat alle uitgevers in de verwerkingsketen op deze manier gesloten moeten worden.

5. Beheersing van de vraag naar berichten met behulp van de Abonnement

Laten we zeggen dat we alleen het eerste element van het abonnement willen consumeren, wat logica willen toepassen en de verwerking willen voltooien. We kunnen de verzoek() methode om dit te bereiken.

Laten we onze EndSubscriber om slechts N aantal berichten te verbruiken. We zullen dat nummer doorgeven als de howMuchMessagesConsume constructor argument:

openbare klasse EndSubscriber implementeert Subscriber {privé AtomicInteger howMuchMessagesConsume; privéabonnement; openbare lijst consumedElements = nieuwe LinkedList (); openbare EndSubscriber (geheel getal howMuchMessagesConsume) {this.howMuchMessagesConsume = nieuwe AtomicInteger (howMuchMessagesConsume); } @Override public void onSubscribe (Abonnementsabonnement) {this.subscription = abonnement; abonnement.verzoek (1); } @Override openbare leegte onNext (T-item) {howMuchMessagesConsume.decrementAndGet (); System.out.println ("Got:" + item); consumedElements.add (item); if (howMuchMessagesConsume.get ()> 0) {subscription.request (1); }} // ...}

We kunnen elementen opvragen zolang we willen.

Laten we een test schrijven waarin we slechts één element van het gegeven willen consumeren Abonnement:

@Test openbare leegte whenRequestForOnlyOneElement_thenShouldConsumeOne () gooit InterruptedException {// gegeven SubmissionPublisher publisher = nieuwe SubmissionPublisher (); EndSubscriber-abonnee = nieuwe EndSubscriber (1); publisher.subscribe (abonnee); Lijstitems = List.of ("1", "x", "2", "x", "3", "x"); Lijst verwacht = List.of ("1"); // when assertThat (publisher.getNumberOfSubscribers ()). isEqualTo (1); items.forEach (uitgever :: indienen); publisher.close (); // dan wachten (). atMost (1000, TimeUnit.MILLISECONDS) .until (() -> assertThat (subscriber.consumedElements) .containsExactlyElementsOf (verwacht)); }

Hoewel de uitgever publiceert zes elementen, onze EndSubscriber zal slechts één element verbruiken omdat het de vraag signaleert dat alleen dat ene wordt verwerkt.

Door de verzoek() methode op de Abonnement, we kunnen een geavanceerder tegendrukmechanisme implementeren om de snelheid van het berichtenverbruik te regelen.

6. Conclusie

In dit artikel hebben we de Java 9 Reactive Streams bekeken.

We hebben gezien hoe we een verwerking moesten aanmaken Stromen bestaande uit een Uitgever en een Abonnee. We creëerden een meer complexe verwerkingsstroom met de transformatie van elementen met behulp van Processoren.

Ten slotte hebben we de Abonnement om de vraag naar elementen te beheersen door de Abonnee.

De implementatie van al deze voorbeelden en codefragmenten is te vinden in het GitHub-project - dit is een Maven-project, dus het moet gemakkelijk te importeren en uit te voeren zijn zoals het is.


$config[zx-auto] not found$config[zx-overlay] not found