RxJava 2 - Vloeibaar
1. Inleiding
RxJava is een Reactive Extensions Java-implementatie waarmee we gebeurtenisgestuurde en asynchrone applicaties kunnen schrijven. Meer informatie over het gebruik van RxJava vindt u in ons intro-artikel hier.
RxJava 2 werd helemaal opnieuw geschreven, wat meerdere nieuwe functies met zich meebracht; waarvan sommige zijn gemaakt als reactie op problemen die in de vorige versie van het framework bestonden.
Een van die functies is de io.reactivex.Flowable.
2. Waarneembaar vs. Vloeiend
In de vorige versie van RxJava was er slechts één basisklasse voor het omgaan met tegendrukbewuste en niet-tegendrukbewuste bronnen - Waarneembaar.
RxJava 2 introduceerde een duidelijk onderscheid tussen deze twee soorten bronnen - tegendrukbewuste bronnen worden nu weergegeven met behulp van een speciale klasse - Vloeiend.
Waarneembaar bronnen ondersteunen geen tegendruk. Daarom zouden we het moeten gebruiken voor bronnen die we alleen consumeren en die we niet kunnen beïnvloeden.
Als we te maken hebben met een groot aantal elementen, kunnen er zich twee mogelijke scenario's voordoen die verband houden met tegendruk, afhankelijk van het type Waarneembaar.
In het geval van gebruik van een zogenaamd “verkoudheid Waarneembaar“, gebeurtenissen worden lui uitgezonden, dus we zijn veilig voor het overstromen van een waarnemer.
Bij gebruik van een “heet Waarneembaar” dit zal echter gebeurtenissen blijven uitzenden, zelfs als de consument het niet bij kan houden.
3. Een Vloeiend
Er zijn verschillende manieren om een Vloeiend. Handig voor ons lijken die methoden op de methoden in Waarneembaar in de eerste versie van RxJava.
3.1. Gemakkelijk Vloeiend
We kunnen een Vloeiend de ... gebruiken alleen maar() methode op dezelfde manier als we zouden kunnen met Waarneembaar:
Flowable integerFlowable = Flowable.just (1, 2, 3, 4);
Ook al gebruik je de alleen maar() is vrij eenvoudig, het is niet erg gebruikelijk om een Vloeiend van statische gegevens, en het wordt gebruikt voor testdoeleinden.
3.2. Vloeiend van Waarneembaar
Als we een Waarneembaar we kunnen het gemakkelijk transformeren naar Vloeiend de ... gebruiken toFlowable () methode:
Waarneembare integerObservable = Observable.just (1, 2, 3); Flowable integerFlowable = integerObservable .toFlowable (BackpressureStrategy.BUFFER);
Merk op dat om de conversie te kunnen uitvoeren, we de Waarneembaar met een Tegendruk Strategie. In de volgende sectie zullen we beschikbare strategieën beschrijven.
3.3. Vloeiend van FlowableOnSubscribe
RxJava 2 introduceerde een functionele interface FlowableOnSubscribe, wat staat voor een Vloeiend die gebeurtenissen begint uit te zenden nadat de consument zich erop heeft geabonneerd.
Hierdoor ontvangen alle klanten dezelfde reeks evenementen, waardoor FlowableOnSubscribe tegendrukveilig.
Als we de FlowableOnSubscribe we kunnen het gebruiken om het Vloeiend:
FlowableOnSubscribe flowableOnSubscribe = flowable -> flowable.onNext (1); Flowable integerFlowable = Flowable .create (flowableOnSubscribe, BackpressureStrategy.BUFFER);
De documentatie beschrijft veel meer methoden om te creëren Vloeiend.
4. VloeiendTegendruk Strategie
Sommige methoden zoals toFlowable () of maken () neem een Tegendruk Strategie als argument.
De Tegendruk Strategie is een opsomming die het tegendrukgedrag definieert dat we op ons toepassen Vloeiend.
Het kan gebeurtenissen cachen of laten vallen of helemaal geen gedrag implementeren, in het laatste geval zullen wij verantwoordelijk zijn voor het definiëren ervan, met behulp van tegendrukoperatoren.
Tegendruk Strategie is gelijkaardig aan Tegendruk Mode aanwezig in de vorige versie van RxJava.
Er zijn vijf verschillende strategieën beschikbaar in RxJava 2.
4.1. Buffer
Als we de Tegendruk Strategie.BUFFER, de bron zal alle gebeurtenissen bufferen totdat de abonnee ze kan consumeren:
public void thenAllValuesAreBufferedAndReceived () {List testList = IntStream.range (0, 100000) .boxed () .collect (Collectors.toList ()); Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = waarneembaar .toFlowable (BackpressureStrategy.BUFFER) .observeOn (Schedulers.computation ()). Test (); testSubscriber.awaitTerminalEvent (); Lijst ontvangenInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (object -> (int) object) .boxed () .collect (Collectors.toList ()); assertEquals (testList, ontvangenInts); }
Het is vergelijkbaar met aanroepen onBackpressureBuffer () methode op Vloeiend, maar het staat niet toe om expliciet een buffergrootte of de actie onOverflow te definiëren.
4.2. Laten vallen
We kunnen de BackpressureStrategy.DROP om de gebeurtenissen die niet kunnen worden geconsumeerd weg te gooien in plaats van ze te bufferen.
Nogmaals, dit is vergelijkbaar met gebruiken onBackpressureDrop() Aan Vloeiend:
openbare leegte whenDropStrategyUsed_thenOnBackpressureDropped () {Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = waarneembaar .toFlowable (BackpressureStrategy.DROP) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); Lijst ontvangenInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (object -> (int) object) .boxed () .collect (Collectors.toList ()); assertThat (ontvangenInts.size () <testList.size ()); assertThat (! ReceivedInts.contains (100000)); }
4.3. Laatste
De ... gebruiken BackpressureStrategy.LATEST zal de bron dwingen om alleen de laatste gebeurtenissen te bewaren, waardoor eventuele eerdere waarden worden overschreven als de consument het niet kan bijhouden:
openbare leegte whenLatestStrategyUsed_thenTheLastElementReceived () {Observable observable = Observable.fromIterable (testList); TestSubscriber testSubscriber = waarneembaar .toFlowable (BackpressureStrategy.LATEST) .observeOn (Schedulers.computation ()) .test (); testSubscriber.awaitTerminalEvent (); Lijst ontvangenInts = testSubscriber.getEvents () .get (0) .stream () .mapToInt (object -> (int) object) .boxed () .collect (Collectors.toList ()); assertThat (ontvangenInts.size () <testList.size ()); assertThat (ontvangenInts. bevat (100000)); }
BackpressureStrategy.LATEST en BackpressureStrategy.DROP lijken erg op elkaar als we naar de code kijken.
Echter, BackpressureStrategy.LATEST zal elementen overschrijven die onze abonnee niet aankan en alleen de laatste behouden, vandaar de naam.
Tegendruk Strategie.DROP, Aan de andere kant worden elementen weggegooid die niet kunnen worden gehanteerd. Dit betekent dat de nieuwste elementen niet per se worden uitgezonden.
4.4. Fout
Wanneer we de Tegendruk Strategie.FOUT, dat zeggen we gewoon we verwachten geen tegendruk. Bijgevolg een MissingBackpressureException moet worden gegooid als de consument de bron niet kan bijhouden:
public void whenErrorStrategyUsed_thenExceptionIsThrown () {Observable observable = Observable.range (1, 100000); TestSubscriber-abonnee = waarneembaar .toFlowable (BackpressureStrategy.ERROR) .observeOn (Schedulers.computation ()) .test (); subscriber.awaitTerminalEvent (); subscriber.assertError (MissingBackpressureException.class); }
4.5. Missend
Als we de Tegendruk Strategie ONTBREEKT, zal de bron elementen pushen zonder ze weg te gooien of te bufferen.
De benedenstroomse zal in dit geval te maken krijgen met overstorten:
public void whenMissingStrategyUsed_thenException () {Observable observable = Observable.range (1, 100000); TestSubscriber-abonnee = waarneembaar .toFlowable (BackpressureStrategy.MISSING) .observeOn (Schedulers.computation ()) .test (); subscriber.awaitTerminalEvent (); subscriber.assertError (MissingBackpressureException.class); }
In onze tests zijn we aan het uitzonderen MissingbackpressureException voor beide FOUT en MISSEND strategieën. Omdat beide een dergelijke uitzondering zullen genereren wanneer de interne buffer van de bron wordt overschreden.
Het is echter de moeite waard om op te merken dat ze allebei een ander doel hebben.
We zouden de eerste moeten gebruiken als we helemaal geen tegendruk verwachten, en we willen dat de bron een uitzondering genereert voor het geval dat het zich voordoet.
De laatste kan worden gebruikt als we geen standaardgedrag willen specificeren bij het maken van het Vloeiend. En we gaan tegendrukoperatoren gebruiken om het later te definiëren.
5. Samenvatting
In deze tutorial hebben we de nieuwe klasse gepresenteerd die in RxJava is geïntroduceerd 2 gebeld Vloeiend.
Voor meer informatie over het Vloeiend zichzelf en zijn API kunnen we verwijzen naar de documentatie.
Zoals altijd zijn alle codevoorbeelden te vinden op GitHub.