Inleiding tot reactorkern

1. Inleiding

Reactor Core is een Java 8-bibliotheek die het reactieve programmeermodel implementeert. Het is gebouwd bovenop de Reactive Streams-specificatie, een standaard voor het bouwen van reactieve applicaties.

Gezien de achtergrond van niet-reactieve Java-ontwikkeling, kan reactief gaan een behoorlijk steile leercurve zijn. Dit wordt uitdagender als je het vergelijkt met de Java 8 Stroom API, omdat ze kunnen worden aangezien voor dezelfde abstracties op hoog niveau.

In dit artikel zullen we proberen dit paradigma te ontraadselen. We nemen kleine stappen door Reactor totdat we een beeld hebben gemaakt van hoe reactieve code kan worden samengesteld, waarmee we de basis leggen voor meer geavanceerde artikelen die in een latere serie zullen verschijnen.

2. Specificatie van reactieve stromen

Voordat we naar Reactor kijken, moeten we naar de Reactive Streams-specificatie kijken. Dit is wat Reactor implementeert, en het legt de basis voor de bibliotheek.

In wezen is Reactive Streams een specificatie voor asynchrone streamverwerking.

Met andere woorden, een systeem waarin veel gebeurtenissen asynchroon worden geproduceerd en verbruikt. Denk aan een stroom van duizenden voorraadupdates per seconde die een financiële applicatie binnenkomen, en dat deze tijdig op die updates moet reageren.

Een van de belangrijkste doelen hiervan is om het probleem van tegendruk aan te pakken. Als we een producent hebben die gebeurtenissen sneller naar een consument stuurt dan hij ze kan verwerken, dan zal de consument uiteindelijk overweldigd worden door gebeurtenissen, waardoor de systeembronnen opraken.

Tegendruk houdt in dat onze consument de producent moet kunnen vertellen hoeveel data hij moet opsturen om dit te voorkomen, en dat staat in de specificatie.

3. Maven afhankelijkheden

Laten we voordat we beginnen onze Maven-afhankelijkheden toevoegen:

 io.projectreactor reactorkern 3.3.9.RELEASE ch.qos.logback logback-classic 1.1.3 

We voegen ook Logback toe als afhankelijkheid. Dit komt omdat we de output van Reactor zullen loggen om de gegevensstroom beter te begrijpen.

4. Een stroom gegevens produceren

Om een ​​applicatie reactief te laten zijn, moet het eerste wat het moet kunnen, een datastroom produceren.

Dit kan zoiets zijn als het voorbeeld van de voorraadupdate dat we eerder hebben gegeven. Zonder deze gegevens zouden we niets hebben om op te reageren, daarom is dit een logische eerste stap.

Reactive Core geeft ons twee gegevenstypen waarmee we dit kunnen doen.

4.1. Flux

De eerste manier om dit te doen is met een Flux. Het is een stroom die kan uitzenden 0..n elementen. Laten we proberen een eenvoudige te maken:

Flux just = Flux.just (1, 2, 3, 4);

In dit geval hebben we een statische stroom van vier elementen.

4.2. Mono

De tweede manier om dit te doen is met een Mono, dat is een stroom van 0..1 elementen. Laten we proberen er een te instantiëren:

Mono just = Mono.just (1);

Dit ziet er bijna hetzelfde uit en gedraagt ​​zich als het Flux, alleen zijn we deze keer beperkt tot niet meer dan één element.

4.3. Waarom niet alleen Flux?

Voordat u verder gaat experimenteren, is het de moeite waard om te benadrukken waarom we deze twee gegevenstypen hebben.

Ten eerste moet worden opgemerkt dat zowel a Flux en Mono zijn implementaties van de Reactive Streams Uitgever koppel. Beide klassen voldoen aan de specificatie en we zouden deze interface in plaats daarvan kunnen gebruiken:

Uitgever just = Mono.just ("foo");

Maar het is echt nuttig om deze kardinaliteit te kennen. Dit komt doordat een paar bewerkingen alleen zinvol zijn voor een van de twee typen, en omdat het expressiever kan zijn (stel je voor vind een() in een repository).

5. Abonneren op een stream

Nu hebben we een overzicht op hoog niveau van hoe we een datastroom kunnen produceren, we moeten ons erop abonneren om ervoor te zorgen dat het de elementen uitzendt.

5.1. Elementen verzamelen

Laten we de abonneren () methode om alle elementen in een stream te verzamelen:

Lijstelementen = nieuwe ArrayList (); Flux.just (1, 2, 3, 4) .log () .subscribe (elementen :: toevoegen); assertThat (elementen) .containsExactly (1, 2, 3, 4);

De gegevens beginnen pas te stromen als we ons abonneren. Merk op dat we ook wat logboekregistratie hebben toegevoegd, dit zal handig zijn als we kijken naar wat er achter de schermen gebeurt.

5.2. De stroom van elementen

Met inloggen kunnen we het gebruiken om te visualiseren hoe de gegevens door onze stream stromen:

20: 25: 19.550 [hoofd] INFO reactor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 25: 19.553 [hoofd] INFO reactor.Flux.Array.1 - | request (onbegrensd) 20: 25: 19.553 [main] INFO reactor.Flux.Array.1 - | onNext (1) 20: 25: 19.553 [hoofd] INFO reactor.Flux.Array.1 - | onNext (2) 20: 25: 19.553 [hoofd] INFO reactor.Flux.Array.1 - | onNext (3) 20: 25: 19.553 [hoofd] INFO reactor.Flux.Array.1 - | onNext (4) 20: 25: 19.553 [hoofd] INFO reactor.Flux.Array.1 - | onComplete ()

Allereerst draait alles op de rode draad. Laten we hier niet in detail op ingaan, aangezien we later in dit artikel nader zullen ingaan op concurrency. Het maakt de zaken echter eenvoudig, omdat we alles op orde kunnen afhandelen.

Laten we nu de volgorde bekijken die we een voor een hebben geregistreerd:

  1. opAbonneren () - Dit wordt genoemd wanneer we ons abonneren op onze stream
  2. verzoek (onbegrensd) - Als we bellen abonneren, achter de schermen maken we een Abonnement. Dit abonnement vraagt ​​om elementen uit de stream. In dit geval is het standaard grenzeloos, wat betekent dat elk afzonderlijk beschikbaar element wordt aangevraagd
  3. onNext () - Dit wordt op elk afzonderlijk element aangeroepen
  4. onComplete () - Dit heet last, na ontvangst van het laatste element. Er is eigenlijk een onError () ook, die zou worden aangeroepen als er een uitzondering is, maar in dit geval niet

Dit is de stroom die in de Abonnee interface als onderdeel van de Reactive Streams-specificatie, en in werkelijkheid is dat wat er achter de schermen is geïnstantieerd in onze oproep aan onSubscribe (). Het is een handige methode, maar om beter te begrijpen wat er gebeurt, laten we een Abonnee interface rechtstreeks:

Flux.just (1, 2, 3, 4) .log () .subscribe (nieuwe abonnee () {@Override public void onSubscribe (Subscription s) {s.request (Long.MAX_VALUE);} @Override public void onNext ( Integer geheel getal) {elements.add (integer);} @Override public void onError (Throwable t) {} @Override public void onComplete () {}});

We kunnen zien dat elke mogelijke fase in de bovenstaande stroom wordt toegewezen aan een methode in de Abonnee implementatie. Het gebeurt gewoon dat de Flux heeft ons een hulpmethode gegeven om deze breedsprakigheid te verminderen.

5.3. Vergelijking met Java 8 Streams

Het lijkt er nog steeds op dat we iets hebben dat synoniem is met een Java 8 Stroom verzamelen:

Verzamelde lijst = Stream.of (1, 2, 3, 4) .collect (toList ());

Alleen wij niet.

Het belangrijkste verschil is dat Reactive een push-model is, terwijl de Java 8 Streams zijn een pull-model. Bij een reactieve benadering zijn gebeurtenissen dat wel geduwd aan de abonnees zodra ze binnenkomen.

Het volgende dat opvalt is een Streams terminaloperator is precies dat, terminal, trekt alle gegevens op en retourneert een resultaat. Met Reactive kunnen we een oneindige stroom krijgen die binnenkomt vanaf een externe bron, met meerdere abonnees die op een ad-hocbasis zijn aangesloten en verwijderd. We kunnen ook dingen doen zoals stromen combineren, stromen smoren en tegendruk toepassen, die we hierna zullen behandelen.

6. Tegendruk

Het volgende dat we moeten overwegen, is tegendruk. In ons voorbeeld vertelt de abonnee de producent om elk afzonderlijk element tegelijk te pushen. Dit zou uiteindelijk overweldigend kunnen worden voor de abonnee en al zijn middelen opslokken.

Tegendruk is wanneer een stroomafwaarts een stroomopwaarts kan vertellen om minder gegevens te verzenden om te voorkomen dat deze wordt overweldigd.

We kunnen onze Abonnee implementatie om tegendruk toe te passen. Laten we de stroomopwaarts vertellen om slechts twee elementen tegelijk te verzenden met behulp van verzoek():

Flux.just (1, 2, 3, 4) .log () .subscribe (nieuwe Subscriber () {privéabonnement s; int onNextAmount; @Override public void onSubscribe (Abonnement s) {this.s = s; s.request (2);} @Override public void onNext (Integer integer) {elements.add (integer); onNextAmount ++; if (onNextAmount% 2 == 0) {s.request (2);}} @Override public void onError (Throwable t) {} @Override public void onComplete () {}});

Als we onze code nu opnieuw uitvoeren, zien we het verzoek (2) wordt aangeroepen, gevolgd door twee onNext () roept dan verzoek (2) opnieuw.

23: 31: 15.395 [hoofd] INFO reactor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 23: 31: 15.397 [hoofd] INFO reactor.Flux.Array.1 - | request (2) 23: 31: 15.397 [hoofd] INFO reactor.Flux.Array.1 - | onNext (1) 23: 31: 15.398 [hoofd] INFO reactor.Flux.Array.1 - | onNext (2) 23: 31: 15.398 [hoofd] INFO reactor.Flux.Array.1 - | verzoek (2) 23: 31: 15.398 [hoofd] INFO reactor.Flux.Array.1 - | onNext (3) 23: 31: 15.398 [hoofd] INFO reactor.Flux.Array.1 - | onNext (4) 23: 31: 15.398 [hoofd] INFO reactor.Flux.Array.1 - | verzoek (2) 23: 31: 15.398 [hoofd] INFO reactor.Flux.Array.1 - | onComplete ()

In wezen is dit reactieve tegendruk. We vragen de upstream om slechts een bepaald aantal elementen te pushen, en alleen als we er klaar voor zijn.

Als we ons voorstellen dat we tweets van Twitter worden gestreamd, is het aan de upstream om te beslissen wat te doen. Als er tweets binnenkwamen, maar er zijn geen verzoeken van de downstream, dan zou de upstream items kunnen laten vallen, ze in een buffer kunnen opslaan of een andere strategie.

7. Werken op een stream

We kunnen ook bewerkingen uitvoeren op de gegevens in onze stream en reageren op gebeurtenissen zoals we dat nodig achten.

7.1. Gegevens in een stream in kaart brengen

Een eenvoudige bewerking die we kunnen uitvoeren, is het toepassen van een transformatie. Laten we in dit geval alle nummers in onze stream verdubbelen:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .subscribe (elementen :: toevoegen);

kaart() wordt toegepast wanneer onNext () wordt genoemd.

7.2. Twee streams combineren

We kunnen de dingen dan interessanter maken door een andere stream met deze te combineren. Laten we dit proberen door te gebruiken zip () functie:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .zipWith (Flux.range (0, Integer.MAX_VALUE), (één, twee) -> String.format ("First Flux:% d, Second Flux:% d", one, two)) .subscribe (elementen :: add); assertThat (elements) .containsExactly ("First Flux: 2, Second Flux: 0", "First Flux: 4, Second Flux: 1", "First Flux: 6, Second Flux: 2", "First Flux: 8, Second Flux: 3 ");

Hier creëren we een andere Flux dat blijft met één toenemen en het samen met onze originele streamen. We kunnen zien hoe deze samenwerken door de logboeken te inspecteren:

20: 04: 38.064 [hoofd] INFO reactor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 04: 38.065 [hoofd] INFO reactor.Flux.Array.1 - | onNext (1) 20: 04: 38.066 [hoofd] INFO reactor.Flux.Range.2 - | onSubscribe ([Synchronous Fuseable] FluxRange.RangeSubscription) 20: 04: 38.066 [main] INFO reactor.Flux.Range.2 - | onNext (0) 20: 04: 38.067 [hoofd] INFO reactor.Flux.Array.1 - | onNext (2) 20: 04: 38.067 [hoofd] INFO reactor.Flux.Range.2 - | onNext (1) 20: 04: 38.067 [hoofd] INFO reactor.Flux.Array.1 - | onNext (3) 20: 04: 38.067 [hoofd] INFO reactor.Flux.Range.2 - | onNext (2) 20: 04: 38.067 [hoofd] INFO reactor.Flux.Array.1 - | onNext (4) 20: 04: 38.067 [hoofd] INFO reactor.Flux.Range.2 - | onNext (3) 20: 04: 38.067 [hoofd] INFO reactor.Flux.Array.1 - | onComplete () 20: 04: 38.067 [hoofd] INFO reactor.Flux.Array.1 - | annuleren () 20: 04: 38.067 [hoofd] INFO reactor.Flux.Range.2 - | annuleren()

Merk op hoe we nu één abonnement per hebben Flux. De onNext () aanroepen worden ook afgewisseld, dus de index van elk element in de stream komt overeen wanneer we de zip () functie.

8. Hot Streams

Momenteel hebben we ons voornamelijk gericht op koude stromen. Dit zijn statische stromen met een vaste lengte die gemakkelijk te verwerken zijn. Een meer realistische use case voor reactief kan iets zijn dat oneindig gebeurt.

We kunnen bijvoorbeeld een stroom muisbewegingen hebben waarop constant moet worden gereageerd of een twitterfeed. Dit soort streams worden hot streams genoemd, omdat ze altijd actief zijn en op elk moment kunnen worden geabonneerd, waarbij de start van de gegevens ontbreekt.

8.1. Een ConnectableFlux

Een manier om een ​​hete stroom te creëren, is door een koude stroom in een te veranderen. Laten we een Flux dat voor altijd duurt, en de resultaten naar de console uitvoert, wat een oneindige stroom gegevens zou simuleren die afkomstig zijn van een externe bron:

ConnectableFlux publish = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}) .publish ();

Door te bellen publiceren() we krijgen een ConnectableFlux. Dit betekent dat bellen abonneren () zal niet veroorzaken dat het begint uit te zenden, waardoor we meerdere abonnementen kunnen toevoegen:

publish.subscribe (System.out :: println); publish.subscribe (System.out :: println);

Als we deze code proberen uit te voeren, gebeurt er niets. Pas als we bellen aansluiten(), dat de Flux zal beginnen met uitzenden:

publish.connect ();

8.2. Throttling

Als we onze code uitvoeren, wordt onze console overweldigd door logboekregistratie. Dit simuleert een situatie waarin te veel gegevens worden doorgegeven aan onze consumenten. Laten we proberen dit te omzeilen met beperking:

ConnectableFlux publish = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}) .sample (ofSeconds (2)) .publish ();

Hier hebben we een monster() methode met een interval van twee seconden. Nu worden de waarden alleen om de twee seconden naar onze abonnee gestuurd, wat betekent dat de console een stuk minder hectisch zal zijn.

Er zijn natuurlijk meerdere strategieën om de hoeveelheid gegevens die stroomafwaarts wordt verzonden te verminderen, zoals windowing en buffering, maar deze worden buiten het bereik van dit artikel gelaten.

9. Gelijktijdigheid

Al onze bovenstaande voorbeelden zijn momenteel op de hoofdthread uitgevoerd. We kunnen echter bepalen op welke thread onze code wordt uitgevoerd als we dat willen. De Planner interface biedt een abstractie rond asynchrone code, waarvoor veel implementaties voor ons zijn voorzien. Laten we proberen ons te abonneren op een andere thread dan main:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .subscribeOn (Schedulers.parallel ()) .subscribe (elementen :: toevoegen);

De Parallel planner zorgt ervoor dat ons abonnement op een andere thread wordt uitgevoerd, wat we kunnen bewijzen door naar de logboeken te kijken. We zien dat de eerste invoer afkomstig is van de hoofd thread en de Flux wordt uitgevoerd in een andere thread met de naam parallel-1.

20:03:27.505 [hoofd] DEBUG reactor.util.Loggers $ LoggerFactory - Slf4j logging framework gebruiken 20: 03: 27.529 [parallel-1] INFO reactor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 03: 27.531 [parallel-1] INFO reactor.Flux.Array.1 - | request (onbegrensd) 20: 03: 27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext (1) 20: 03: 27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext (2) 20: 03: 27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext (3) 20: 03: 27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onNext (4) 20: 03: 27.531 [parallel-1] INFO reactor.Flux.Array.1 - | onComplete ()

Gelijktijdigheid wordt interessanter dan dit, en het zal de moeite waard zijn om het in een ander artikel te onderzoeken.

10. Conclusie

In dit artikel hebben we een end-to-end overzicht van Reactive Core op hoog niveau gegeven. We hebben uitgelegd hoe we streams kunnen publiceren en abonneren, tegendruk kunnen toepassen, streams kunnen bedienen en ook gegevens asynchroon kunnen verwerken. Dit zou hopelijk een basis moeten leggen voor ons om reactieve applicaties te schrijven.

Latere artikelen in deze serie zullen meer geavanceerde gelijktijdigheid en andere reactieve concepten behandelen. Er is ook een ander artikel over Reactor met veer.

De broncode voor onze applicatie is beschikbaar op meer dan op GitHub; dit is een Maven-project dat zou moeten kunnen draaien zoals het is.


$config[zx-auto] not found$config[zx-overlay] not found