Omgaan met tegendruk met RxJava

1. Overzicht

In dit artikel zullen we kijken naar de manier waarop de RxJava-bibliotheek ons ​​helpt om met tegendruk om te gaan.

Simpel gezegd - RxJava gebruikt een concept van reactieve streams door Waarneembare, naar welke een of meerdere Waarnemers kan zich abonneren op. Omgaan met mogelijk oneindige stromen is een grote uitdaging, omdat we het probleem van tegendruk onder ogen moeten zien.

Het is niet moeilijk om in een situatie te komen waarin een Waarneembaar zendt items sneller uit dan een abonnee ze kan consumeren. We zullen de verschillende oplossingen bekijken voor het probleem van een groeiende buffer van niet-geconsumeerde items.

2. Heet Waarneembare Versus koud Waarneembare

Laten we eerst een eenvoudige consumentenfunctie maken die zal worden gebruikt als een gebruiker van elementen uit Waarneembare die we later zullen definiëren:

openbare klasse ComputeFunction {openbare statische ongeldige berekening (geheel getal v) {probeer {System.out.println ("reken geheel getal v:" + v); Thread.sleep (1000); } catch (InterruptedException e) {e.printStackTrace (); }}}

Onze berekenen() functie is gewoon het argument afdrukken. Het belangrijkste dat hier moet worden opgemerkt, is het aanroepen van een Draad.sleep (1000) methode - we doen het om een ​​langlopende taak te emuleren die Waarneembaar om sneller met items te vullen Waarnemer kan ze consumeren.

We hebben twee soorten Observables - Heet en Verkoudheid - die totaal anders zijn als het gaat om het hanteren van tegendruk.

2.1. Verkoudheid Waarneembare

Een verkoudheid Waarneembaar zendt een bepaalde reeks items uit, maar kan beginnen met het uitzenden van deze reeks wanneer zijn Waarnemer vindt het handig, en in welk tempo dan ook de Waarnemer verlangens, zonder de integriteit van de reeks te verstoren. Verkoudheid Waarneembaar levert items op een luie manier.

De Waarnemer neemt alleen elementen als het klaar is om dat item te verwerken, en items hoeven niet te worden gebufferd in een Waarneembaar omdat ze op een pull-manier worden aangevraagd.

Als u bijvoorbeeld een Waarneembaar gebaseerd op een statische reeks elementen van één tot één miljoen, dat Waarneembaar zou dezelfde reeks items uitzenden, ongeacht hoe vaak die items worden waargenomen:

Observable.range (1, 1_000_000) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute);

Wanneer we ons programma starten, worden items berekend door Waarnemer lui en zal op een pull-manier worden opgevraagd. De Schedulers.computation () methode betekent dat we onze Waarnemer binnen een berekening thread pool in RxJava.

De output van een programma zal bestaan ​​uit een resultaat van een berekenen() methode aangeroepen voor één voor één item vanuit een Waarneembaar:

bereken geheel getal v: 1 bereken geheel getal v: 2 bereken geheel getal v: 3 bereken geheel getal v: 4 ...

Verkoudheid Waarneembare hoeven geen enkele vorm van tegendruk te hebben, omdat ze trekkend werken. Voorbeelden van items die vrijkomen bij verkoudheid Waarneembaar kan de resultaten van een databasequery, het ophalen van bestanden of een webverzoek bevatten.

2.2. Heet Waarneembare

Een heet Waarneembaar begint met het genereren van items en geeft ze onmiddellijk weer wanneer ze worden gemaakt. Het is in strijd met een verkoudheid Waarneembare pull-model van verwerking. Heet Waarneembaar zendt items in zijn eigen tempo uit, en het is aan de waarnemers om bij te blijven.

Wanneer de Waarnemer kan items niet zo snel consumeren als ze worden geproduceerd door een Waarneembaar ze moeten worden gebufferd of op een andere manier worden afgehandeld, omdat ze het geheugen vullen en uiteindelijk veroorzaken OutOfMemoryException.

Laten we eens kijken naar een voorbeeld van hot Waarneembaar, dat is een miljoen items produceren voor een eindgebruiker die deze items verwerkt. Wanneer een berekenen() methode in de Waarnemer kost wat tijd om elk item te verwerken, de Waarneembaar begint een geheugen met items te vullen, waardoor een programma mislukt:

PublishSubject source = PublishSubject.create (); source.observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); IntStream.range (1, 1_000_000) .forEach (bron :: onNext); 

Het uitvoeren van dat programma zal mislukken met een MissingBackpressureException omdat we geen manier hebben gedefinieerd om met overproductie om te gaan Waarneembaar.

Voorbeelden van items die worden uitgestoten door een hete Waarneembaar kan muis- en toetsenbordgebeurtenissen, systeemgebeurtenissen of aandelenkoersen omvatten.

3. Overproductie bufferen Waarneembaar

De eerste manier om overproductie aan te pakken Waarneembaar is om een ​​soort buffer te definiëren voor elementen die niet kunnen worden verwerkt door een Waarnemer.

We kunnen het doen door een buffer() methode:

PublishSubject source = PublishSubject.create (); source.buffer (1024) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); 

Als u een buffer met een grootte van 1024 definieert, krijgt u een Waarnemer enige tijd om een ​​overproducerende bron in te halen. De buffer zal items opslaan die nog niet zijn verwerkt.

We kunnen een buffergrootte vergroten om voldoende ruimte te hebben voor geproduceerde waarden.

Merk echter op dat in het algemeen dit kan slechts een tijdelijke oplossing zijn omdat de overflow nog steeds kan optreden als de bron de voorspelde buffergrootte te veel produceert.

4. Verzonden items batchgewijs

We kunnen overgeproduceerde items in vensters van N-elementen batchen.

Wanneer Waarneembaar produceert elementen sneller dan Waarnemer kunnen we ze verwerken, kunnen we dit verlichten door geproduceerde elementen samen te groeperen en een batch elementen naar te sturen Waarnemer die in staat is om een ​​verzameling elementen in plaats van element één voor één te verwerken:

PublishSubject source = PublishSubject.create (); source.window (500) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); 

Gebruik makend van venster() methode met argument 500, zal vertellen Waarneembaar om elementen in batches van 500 te groeperen. Deze techniek kan een probleem van overproductie verminderen Waarneembaar wanneer Waarnemer kan een batch elementen sneller verwerken in vergelijking met het één voor één verwerken van elementen.

5. Elementen overslaan

Als sommige waarden geproduceerd door Waarneembaar kan veilig worden genegeerd, we kunnen de bemonstering binnen een bepaalde tijd en throttling-operators gebruiken.

De methodes monster() en throttleFirst () nemen duur als parameter:

  • De sruim() method kijkt periodiek naar de opeenvolging van elementen en zendt het laatste item uit dat is geproduceerd binnen de duur die is opgegeven als een parameter
  • De throttleFirst () method zendt het eerste item uit dat werd geproduceerd na de duur die is opgegeven als een parameter

De duur is een tijd waarna een specifiek element wordt gekozen uit de reeks geproduceerde elementen. We kunnen een strategie specificeren voor het omgaan met tegendruk door elementen over te slaan:

PublishSubject source = PublishSubject.create (); source.sample (100, TimeUnit.MILLISECONDS) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace);

We hebben gespecificeerd dat de strategie om elementen over te slaan een monster() methode. We willen een sample van een reeks met een duur van 100 milliseconden. Dat element wordt verzonden naar het Waarnemer.

Onthoud echter dat deze operators alleen het tempo van waardeontvangst door de downstream verminderen Waarnemer en dus kunnen ze nog steeds leiden tot MissingBackpressureException.

6. Omgaan met een vulling Waarneembaar Buffer

In het geval dat onze strategieën van bemonstering of batching elementen niet helpen bij het vullen van een buffer, we moeten een strategie implementeren voor het afhandelen van gevallen waarin een buffer vol raakt.

We moeten een onBackpressureBuffer () methode om te voorkomen BufferOverflowException.

De onBackpressureBuffer () methode heeft drie argumenten: een capaciteit van een Waarneembaar buffer, een methode die wordt aangeroepen wanneer een buffer vol raakt, en een strategie voor het afhandelen van elementen die uit een buffer moeten worden verwijderd. Strategieën voor overflow zijn in een Tegendruk Overstroom klasse.

Er zijn 4 soorten acties die kunnen worden uitgevoerd wanneer de buffer vol raakt:

  • ON_OVERFLOW_ERROR - dit is de standaard gedragssignalering a BufferOverflowException als de buffer vol is
  • ON_OVERFLOW_DEFAULT - momenteel is het hetzelfde als ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST - als er een overstroming zou plaatsvinden, wordt de huidige waarde eenvoudigweg genegeerd en worden alleen de oude waarden eenmaal stroomafwaarts afgeleverd Waarnemer verzoeken
  • ON_OVERFLOW_DROP_OLDEST - laat het oudste element in de buffer vallen en voegt er de huidige waarde aan toe

Laten we eens kijken hoe we die strategie kunnen specificeren:

Observable.range (1, 1_000_000) .onBackpressureBuffer (16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) .observeOn (Schedulers.computation ()) .subscribe (e -> {}, Throwable :: printStackTrace); 

Hier is onze strategie voor het omgaan met de overvolle buffer het oudste element in een buffer te laten vallen en het nieuwste item toe te voegen dat is geproduceerd door een Waarneembaar.

Merk op dat de laatste twee strategieën een discontinuïteit in de stream veroorzaken als ze elementen weglaten. Bovendien zullen ze niet signaleren BufferOverflowException.

7. Alle overgeproduceerde elementen laten vallen

Altijd stroomafwaarts Waarnemer niet klaar is om een ​​element te ontvangen, kunnen we een onBackpressureDrop () methode om dat element uit de reeks te verwijderen.

We kunnen die methode zien als een onBackpressureBuffer () methode met een capaciteit van een buffer op nul gezet met een strategie ON_OVERFLOW_DROP_LATEST.

Deze operator is handig wanneer we waarden van een bron veilig kunnen negeren Waarneembaar (zoals muisbewegingen of huidige GPS-locatiesignalen), aangezien er later meer actuele waarden zullen zijn:

Observable.range (1, 1_000_000) .onBackpressureDrop () .observeOn (Schedulers.computation ()) .doOnNext (ComputeFunction :: compute) .subscribe (v -> {}, Throwable :: printStackTrace);

De methode onBackpressureDrop () elimineert een probleem van overproductie Waarneembaar maar moet met de nodige voorzichtigheid worden gebruikt.

8. Conclusie

In dit artikel hebben we gekeken naar een probleem van overproductie Waarneembaar en manieren om met tegendruk om te gaan. We hebben gekeken naar strategieën voor het bufferen, batchen en overslaan van elementen wanneer de Waarnemer kan elementen niet zo snel consumeren als ze worden geproduceerd door een Waarneembaar.

De implementatie van al deze voorbeelden en codefragmenten is te vinden in het GitHub-project - dit is een Maven-project, dus het moet gemakkelijk te importeren en uit te voeren zijn zoals het is.


$config[zx-auto] not found$config[zx-overlay] not found