Planners in RxJava

1. Overzicht

In dit artikel gaan we ons concentreren op verschillende soorten Planners die we gaan gebruiken bij het schrijven van multithreading-programma's op basis van RxJava Observable's abonneren op en observeren methoden.

Planners geef de mogelijkheid om te specificeren waar en waarschijnlijk wanneer taken moeten worden uitgevoerd die verband houden met de werking van een Waarneembaar ketting.

We kunnen een Planner van de fabrieksmethoden die in de klas worden beschreven Planners.

2. Standaardgedrag inrijgen

Standaard,Rx heeft één schroefdraad wat inhoudt dat een Waarneembaar en de keten van operators die we erop kunnen toepassen, zal zijn waarnemers op de hoogte brengen op dezelfde thread waarop het abonneren () methode wordt genoemd.

De observeren en abonneren op methoden nemen als argument a Planner, dat, zoals de naam al doet vermoeden, een tool is die we kunnen gebruiken voor het plannen van individuele acties.

We maken onze implementatie van een Planner door de creërenWerknemer methode, die een Planner. Werknemer. EEN werknemer accepteert acties en voert ze opeenvolgend uit op een enkele thread.

In zekere zin een werknemer is een Scheduler zelf, maar we zullen er niet naar verwijzen als een Planner om verwarring te voorkomen.

2.1. Een actie plannen

We kunnen voor elk een klus plannen Planner door een nieuw werknemer en het plannen van enkele acties:

Scheduler scheduler = Schedulers.immediate (); Scheduler.Worker werknemer = scheduler.createWorker (); worker.schedule (() -> resultaat + = "actie"); Assert.assertTrue (result.equals ("action"));

De actie wordt vervolgens in de wachtrij geplaatst op de thread waaraan de werknemer is toegewezen.

2.2. Een actie annuleren

Planner. Werknemer strekt zich uit Abonnement. Bellen met het Afmelden methode op een werknemer zal ertoe leiden dat de wachtrij wordt geleegd en alle lopende taken worden geannuleerd. We kunnen dat aan de hand van een voorbeeld zien:

Scheduler scheduler = Schedulers.newThread (); Scheduler.Worker werknemer = scheduler.createWorker (); worker.schedule (() -> {resultaat + = "First_Action"; worker.unsubscribe ();}); worker.schedule (() -> resultaat + = "Second_Action"); Assert.assertTrue (result.equals ("First_Action"));

De tweede taak wordt nooit uitgevoerd omdat de taak ervoor de hele bewerking heeft geannuleerd. Acties die werden uitgevoerd, worden onderbroken.

3. Schedulers.newThread

Deze planner start eenvoudig een nieuwe thread elke keer dat deze wordt aangevraagd via abonneren op () of observationOn ().

Het is bijna nooit een goede keuze, niet alleen vanwege de latentie bij het starten van een thread, maar ook omdat deze thread niet opnieuw wordt gebruikt:

Observable.just ("Hallo") .observeOn (Schedulers.newThread ()) .doOnNext (s -> resultaat2 + = Thread.currentThread (). GetName ()) .observeOn (Schedulers.newThread ()) .subscribe (s - > result1 + = Thread.currentThread (). getName ()); Thread.sleep (500); Assert.assertTrue (result1.equals ("RxNewThreadScheduler-1")); Assert.assertTrue (result2.equals ("RxNewThreadScheduler-2"));

Wanneer de Werknemer klaar is, eindigt de thread gewoon. Dit Planner kan alleen worden gebruikt als taken grofkorrelig zijn: het kost veel tijd om te voltooien, maar er zijn er maar heel weinig, zodat het onwaarschijnlijk is dat threads opnieuw worden gebruikt.

Scheduler scheduler = Schedulers.newThread (); Scheduler.Worker werknemer = scheduler.createWorker (); worker.schedule (() -> {resultaat + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> resultaat + = "_worker_"); resultaat + = "_End";} ); Thread.sleep (3000); Assert.assertTrue (result.equals ("RxNewThreadScheduler-1_Start_End_worker_"));

Toen we de werknemer op een NewThreadScheduler, we zagen dat de arbeider aan een bepaalde draad was gebonden.

4. Planners. Onmiddellijk

Planners. Onmiddellijk is een speciale planner die een taak binnen de clientthread op een blokkerende manier aanroept in plaats van asynchroon en terugkeert wanneer de actie is voltooid:

Scheduler scheduler = Schedulers.immediate (); Scheduler.Worker werknemer = scheduler.createWorker (); worker.schedule (() -> {resultaat + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> resultaat + = "_worker_"); resultaat + = "_End";} ); Thread.sleep (500); Assert.assertTrue (result.equals ("main_Start_worker__End"));

In feite abonneren op een Waarneembaar via onmiddellijke planner heeft doorgaans hetzelfde effect als het niet inschrijven bij een bepaalde Scheduler helemaal:

Observable.just ("Hallo") .subscribeOn (Schedulers.immediate ()) .subscribe (s -> resultaat + = Thread.currentThread (). GetName ()); Thread.sleep (500); Assert.assertTrue (result.equals ("main"));

5. Planners. Trampoline

De trampolinePlanner lijkt erg op onmiddellijk omdat het ook taken in dezelfde thread plant en effectief blokkeert.

De aanstaande taak wordt echter uitgevoerd wanneer alle eerder geplande taken zijn voltooid:

Observable.just (2, 4, 6, 8) .subscribeOn (Schedulers.trampoline ()) .subscribe (i -> resultaat + = "" + i); Observable.just (1, 3, 5, 7, 9) .subscribeOn (Schedulers.trampoline ()) .subscribe (i -> resultaat + = "" + i); Thread.sleep (500); Assert.assertTrue (result.equals ("246813579"));

Onmiddellijk roept meteen een bepaalde taak op, terwijl trampoline wacht tot de huidige taak is voltooid.

De trampoline‘S werknemer voert elke taak uit op de thread die de eerste taak heeft gepland. De eerste oproep aan schema blokkeert totdat de wachtrij is geleegd:

Scheduler scheduler = Schedulers.trampoline (); Scheduler.Worker werknemer = scheduler.createWorker (); worker.schedule (() -> {resultaat + = Thread.currentThread (). getName () + "Start"; worker.schedule (() -> {resultaat + = "_middleStart"; worker.schedule (() -> result + = "_worker_"); result + = "_middleEnd";}); result + = "_mainEnd";}); Thread.sleep (500); Assert.assertTrue (resultaat .equals ("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Planners. Van

Planners zijn intern complexer dan Uitvoerders van java.util.concurrent - dus een aparte abstractie was nodig.

Maar omdat ze conceptueel erg op elkaar lijken, is het niet verwonderlijk dat er een wikkel is die kan draaien Uitvoerder in Planner de ... gebruiken van fabrieks methode:

private ThreadFactory threadFactory (String patroon) {retourneer nieuwe ThreadFactoryBuilder () .setNameFormat (patroon) .build (); } @Test openbare leegte gegevenExecutors_whenSchedulerFrom_thenReturnElements () gooit InterruptedException {ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched-A-% d")); Scheduler schedulerA = Schedulers.from (poolA); ExecutorService poolB = newFixedThreadPool (10, threadFactory ("Sched-B-% d")); Scheduler schedulerB = Schedulers.from (poolB); Observable observable = Observable.create (subscriber -> {subscriber.onNext ("Alfa"); subscriber.onNext ("Beta"); subscriber.onCompleted ();}) ;; waarneembaar .subscribeOn (schedulerA) .subscribeOn (schedulerB) .subscribe (x -> resultaat + = Thread.currentThread (). getName () + x + "_", Throwable :: printStackTrace, () -> resultaat + = "_Completed "); Thread.sleep (2000); Assert.assertTrue (result.equals ("Sched-A-0Alfa_Sched-A-0Beta__Completed")); }

SchedulerB wordt voor een korte tijd gebruikt, maar er wordt nauwelijks een nieuwe actie op gepland plannerA, die al het werk doet. Dus meerdere subscribeOn methoden worden niet alleen genegeerd, maar introduceren ook een kleine overhead.

7. Schedulers.io

Dit Planner is vergelijkbaar met de newThread behalve het feit dat reeds gestarte threads worden gerecycled en mogelijk toekomstige verzoeken kunnen afhandelen.

Deze implementatie werkt op dezelfde manier als ThreadPoolExecutor van java.util.concurrent met een grenzeloze pool van draden. Elke keer een nieuwe werknemer wordt aangevraagd, wordt ofwel een nieuwe thread gestart (en later enige tijd inactief gehouden) of wordt de inactieve thread hergebruikt:

Observable.just ("io") .subscribeOn (Schedulers.io ()) .subscribe (i -> resultaat + = Thread.currentThread (). GetName ()); Assert.assertTrue (result.equals ("RxIoScheduler-2"));

We moeten voorzichtig zijn met onbeperkte bronnen van welke aard dan ook - in het geval van langzame of niet-reagerende externe afhankelijkheden zoals webservices, ioplanner kan een enorm aantal threads starten, waardoor onze eigen applicatie niet meer reageert.

In de praktijk volgt Schedulers.io is bijna altijd een betere keuze.

8. Planners. Berekening

Berekening Scheduler beperkt standaard het aantal threads dat parallel loopt tot de waarde van beschikbare processors (), zoals gevonden in het Runtime.getRuntime () hulpprogramma klasse.

Dus we moeten een berekeningsplanner wanneer taken volledig CPU-gebonden zijn; dat wil zeggen, ze hebben rekenkracht nodig en hebben geen blokkeringscode.

Het gebruikt een onbegrensde wachtrij voor elke thread, dus als de taak is gepland, maar alle kernen bezet zijn, wordt deze in de wachtrij geplaatst. De wachtrij net voor elke thread zal echter blijven groeien:

Observable.just ("berekening") .subscribeOn (Schedulers.computation ()) .subscribe (i -> resultaat + = Thread.currentThread (). GetName ()); Assert.assertTrue (result.equals ("RxComputationScheduler-1"));

Als we om wat voor reden dan ook een ander aantal threads nodig hebben dan de standaard, kunnen we altijd de rx.scheduler.max-berekening-threads systeemeigenschap.

Door minder threads te nemen, kunnen we ervoor zorgen dat er altijd een of meer CPU-kernen inactief zijn, en zelfs onder zware belasting, berekening thread pool verzadigt de server niet. Het is simpelweg niet mogelijk om meer reken-threads dan cores te hebben.

9. Schedulers.test

Dit Planner wordt alleen gebruikt voor testdoeleinden en we zullen het nooit in productiecode zien. Het belangrijkste voordeel is de mogelijkheid om de klok vooruit te zetten en de tijd die willekeurig verstrijkt te simuleren:

Lijstletters = Arrays.asList ("A", "B", "C"); TestScheduler scheduler = Schedulers.test (); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembaar vinkje = Waarneembaar .interval (1, TimeUnit.SECONDS, planner); Observable.from (letters) .zipWith (tick, (string, index) -> index + "-" + string) .subscribeOn (planner) .subscribe (abonnee); subscriber.assertNoValues ​​(); subscriber.assertNotCompleted (); scheduler.advanceTimeBy (1, TimeUnit.SECONDS); subscriber.assertNoErrors (); subscriber.assertValueCount (1); subscriber.assertValues ​​("0-A"); scheduler.advanceTimeTo (3, TimeUnit.SECONDS); subscriber.assertCompleted (); subscriber.assertNoErrors (); subscriber.assertValueCount (3); assertThat (subscriber.getOnNextEvents (), hasItems ("0-A", "1-B", "2-C"));

10. Standaardplanners

Sommige Waarneembaar operators in RxJava hebben alternatieve vormen waarmee we kunnen instellen welke Planner de operator zal gebruiken voor zijn werking. Anderen werken niet op een bepaald gebied Planner of werken op een bepaalde standaard Planner.

Bijvoorbeeld de vertraging operator neemt stroomopwaartse gebeurtenissen en duwt ze stroomafwaarts na een bepaalde tijd. Het is duidelijk dat het de originele draad gedurende die periode niet kan vasthouden, dus het moet een andere draad gebruiken Planner:

ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched1-")); Scheduler schedulerA = Schedulers.from (poolA); Observable.just ('A', 'B') .delay (1, TimeUnit.SECONDS, schedulerA) .subscribe (i -> resultaat + = Thread.currentThread (). GetName () + i + ""); Thread.sleep (2000); Assert.assertTrue (result.equals ("Sched1-A Sched1-B"));

Zonder een maatwerk te leveren plannerA, alle onderstaande operators vertraging zou de berekeningsplanner.

Andere belangrijke operators die maatwerk ondersteunen Planners zijn buffer, interval, bereik, timer, overspringen, nemen, time-out, en verschillende anderen. Als we geen Planner aan dergelijke operators, berekening scheduler wordt gebruikt, wat in de meeste gevallen een veilige standaard is.

11. Conclusie

In echt reactieve applicaties, waarvoor alle langlopende bewerkingen asynchroon zijn, zeer weinig threads en dus Planners zijn nodig.

Mastering schedulers zijn essentieel voor het schrijven van schaalbare en veilige code met RxJava. Het verschil tussen abonneren op en observeren is vooral belangrijk onder hoge belasting, waar elke taak precies moet worden uitgevoerd wanneer we verwachten.

En last but not least: dat moeten we zeker weten Planners stroomafwaarts gebruikt kan de lo-advertentie bijhouden die wordt gegenereerd door Planners stroomopwaarts. Voor meer informatie is er dit artikel over tegendruk.

De implementatie van al deze voorbeelden en codefragmenten is te vinden in het GitHub-project - dit is een Maven-project, dus het moet gemakkelijk te importeren en uit te voeren zijn zoals het is.