Waarneembare gegevens combineren in RxJava

1. Inleiding

In deze korte tutorial bespreken we verschillende manieren om te combineren Waarneembare in RxJava.

Als je nieuw bent bij RxJava, bekijk dan zeker eerst deze intro-tutorial.

Laten we nu meteen naar binnen springen.

2. Waarneembare

Waarneembaar sequenties, of gewoon Waarneembare, zijn representaties van asynchrone datastromen.

Deze zijn gebaseerd op het Observer-patroon waarin een object genaamd an Waarnemer, abonneert zich op items die worden uitgezonden door een Waarneembaar.

Het abonnement blokkeert niet omdat het Waarnemer staat om te reageren op wat dan ook Waarneembaar zal uitzenden in de toekomst. Dit vergemakkelijkt op zijn beurt gelijktijdigheid.

Hier is een eenvoudige demonstratie in RxJava:

Waarneembaar .from (nieuwe String [] {"John", "Doe"}) .subscribe (naam -> System.out.println ("Hallo" + naam))

3. Combineren van observabelen

Bij het programmeren met behulp van een reactief raamwerk, is het een veel voorkomende use-case om verschillende te combineren Waarneembare.

In een webtoepassing hebben we bijvoorbeeld mogelijk twee sets asynchrone gegevensstromen nodig die onafhankelijk van elkaar zijn.

In plaats van te wachten tot de vorige stream is voltooid voordat we de volgende stream aanvragen, kunnen we beide tegelijkertijd bellen en ons abonneren op de gecombineerde streams.

In dit gedeelte bespreken we enkele van de verschillende manieren waarop we meerdere kunnen combineren Waarneembare in RxJava en de verschillende use-cases waarop elke methode van toepassing is.

3.1. Samenvoegen

We kunnen de samenvoegen operator om de output van meerdere te combineren Waarneembare zodat ze zich gedragen als een:

@Test openbare ongeldig gegevenTwoObservables_whenMerged_shouldEmitCombinedResults () {TestSubscriber testSubscriber = nieuwe TestSubscriber (); Observable.merge (Observable.from (nieuwe String [] {"Hallo", "Wereld"}), Observable.from (nieuwe String [] {"I love", "RxJava"})) .subscribe (testSubscriber); testSubscriber.assertValues ​​("Hallo", "Wereld", "I love", "RxJava"); }

3.2. MergeDelayError

De mergeDelayError methode is hetzelfde als samenvoegen in dat het meerdere combineert Waarneembare in een, maar als er fouten optreden tijdens het samenvoegen, kunnen foutloze items worden voortgezet voordat de fouten worden doorgegeven:

@Test openbare leegte gegevenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError () {TestSubscriber testSubscriber = nieuwe TestSubscriber (); Observable.mergeDelayError (Observable.from (new String [] {"hallo", "world"}), Observable.error (nieuwe RuntimeException ("Some exception")), Observable.from (nieuwe String [] {"rxjava"} )). abonneren (testSubscriber); testSubscriber.assertValues ​​("hallo", "wereld", "rxjava"); testSubscriber.assertError (RuntimeException.class); }

Het bovenstaande voorbeeld zendt alle foutloze waarden uit:

hallo wereld rxjava

Merk op dat als we samenvoegen in plaats van mergeDelayError, de Draadrxjava ” zal niet worden uitgezonden omdat samenvoegen stopt onmiddellijk de gegevensstroom van Waarneembare wanneer er een fout optreedt.

3.3. Ritssluiting

De zip uitbreidingsmethode brengt twee reeksen waarden samen als paren:

@Test openbare ongeldig gegevenTwoObservables_whenZipped_thenReturnCombinedResults () {Lijst zippedStrings = nieuwe ArrayList (); Observable.zip (Observable.from (new String [] {"Simple", "Moderate", "Complex"}), Observable.from (new String [] {"Solutions", "Success", "Hierarchy"}), (str1, str2) -> str1 + "" + str2) .subscribe (zippedStrings :: add); assertThat (zippedStrings) .isNotEmpty (); assertThat (zippedStrings.size ()). isEqualTo (3); assertThat (zippedStrings) .contains ("Eenvoudige oplossingen", "Matig succes", "Complexe hiërarchie"); }

3.4. Ritssluiting met interval

In dit voorbeeld zullen we een stream zippen met interval die in feite de emissie van elementen van de eerste stroom zal vertragen:

@Test openbare ongeldig gegevenAStream_whenZippedWithInterval_shouldDelayStreamEmmission () {TestSubscriber testSubscriber = nieuwe TestSubscriber (); Waarneembare gegevens = Observable.just ("een", "twee", "drie", "vier", "vijf"); Waarneembare interval = Observable.interval (1L, TimeUnit.SECONDS); Waarneembare .zip (data, interval, (strData, tick) -> String.format ("[% d] =% s", tick, strData)) .toBlocking (). Subscribe (testSubscriber); testSubscriber.assertCompleted (); testSubscriber.assertValueCount (5); testSubscriber.assertValues ​​("[0] = één", "[1] = twee", "[2] = drie", "[3] = vier", "[4] = vijf"); }

4. Samenvatting

In dit artikel hebben we enkele van de methoden voor combineren gezien Waarneembare met RxJava. U kunt meer te weten komen over andere methoden, zoals combinerenLatest, toetreden, groepJoin, switchOnNext, in de officiële RxJava-documentatie.

Zoals altijd is de broncode voor dit artikel beschikbaar in onze GitHub-opslagplaats.