Waarneembare gegevens filteren in RxJava

1. Inleiding

Na de inleiding tot RxJava gaan we kijken naar de filteroperatoren.

In het bijzonder gaan we ons concentreren op filteren, overslaan, tijdfiltering en enkele meer geavanceerde filterbewerkingen.

2. Filteren

Bij het werken met Waarneembaar, soms is het handig om alleen een subset van verzonden items te selecteren. Voor dit doeleinde, RxJava biedt verschillende filtermogelijkheden.

Laten we beginnen met kijken naar de filter methode.

2.1. De filter Operator

Simpel gezegd, de filter operator filtert een Waarneembaar ervoor zorgen dat uitgezonden items overeenkomen met de gespecificeerde conditie, die wordt geleverd in de vorm van een Predikaat.

Laten we eens kijken hoe we alleen de oneven waarden uit de uitgezonden waarden kunnen filteren:

Waarneembare sourceObservable = Observable.range (1, 10); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable .filter (i -> i% 2! = 0); gefilterdObservable.subscribe (abonnee); subscriber.assertValues ​​(1, 3, 5, 7, 9);

2.2. De nemen Operator

Bij het filteren met nemen, de logica resulteert in de emissie van de eerste n items terwijl de resterende items worden genegeerd.

Laten we eens kijken hoe we het bronWaarneembaar en zenden alleen de eerste twee items uit:

Waarneembare sourceObservable = Observable.range (1, 10); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable.take (3); gefilterdObservable.subscribe (abonnee); subscriber.assertValues ​​(1, 2, 3);

2.3. De nemen Operator

Tijdens gebruik nemenWhile, het gefilterde Waarneembaar blijft items uitzenden totdat het een eerste element tegenkomt dat niet overeenkomt met de Predikaat.

Laten we eens kijken hoe we de nemen - met een filtering Predikaat:

Waarneembare sourceObservable = Observable.just (1, 2, 3, 4, 3, 2, 1); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable .takeWhile (i -> i <4); gefilterdObservable.subscribe (abonnee); subscriber.assertValues ​​(1, 2, 3);

2.4. De neemFirst Operator

Wanneer we alleen het eerste item willen uitzenden dat aan een bepaalde voorwaarde voldoet, kunnen we gebruiken takeFirst ().

Laten we snel kijken hoe we het eerste item kunnen uitstoten dat groter is dan 5:

Waarneembare bronObservable = Waarneembare .just (1, 2, 3, 4, 5, 7, 6); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable .takeFirst (x -> x> 5); gefilterdObservable.subscribe (abonnee); subscriber.assertValue (7);

2.5. eerste en firstOrDefault Operatoren

Een soortgelijk gedrag kan worden bereikt met de eerste API:

Waarneembare sourceObservable = Observable.range (1, 10); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable.first (); gefilterdObservable.subscribe (abonnee); subscriber.assertValue (1);

Als we echter een standaardwaarde willen specificeren, als er geen items worden verzonden, kunnen we firstOrDefault:

Waarneembare sourceObservable = Observable.empty (); Waarneembare filterObservable = sourceObservable.firstOrDefault (-1); gefilterdObservable.subscribe (abonnee); subscriber.assertValue (-1);

2.6. De takeLast Operator

Vervolgens, als we alleen de laatste willen uitzenden n items uitgezonden door een Waarneembaar, we kunnen gebruiken takeLast.

Laten we eens kijken hoe het mogelijk is om alleen de laatste drie items uit te zenden:

Waarneembare sourceObservable = Observable.range (1, 10); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable.takeLast (3); gefilterdObservable.subscribe (abonnee); subscriber.assertValues ​​(8, 9, 10);

We moeten niet vergeten dat dit de verzending van elk item vanaf de bron vertraagt Waarneembaar totdat het is voltooid.

2.7. laatste en lastOrDefault

Als we alleen het laatste element willen uitzenden, anders dan gebruiken takeLast (1), we kunnen gebruiken laatste.

Dit filtert het Waarneembaar, waarbij alleen het laatste element wordt uitgezonden, dat optioneel een filtering verifieert Predikaat:

Waarneembare sourceObservable = Observable.range (1, 10); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable .last (i -> i% 2! = 0); gefilterdObservable.subscribe (abonnee); subscriber.assertValue (9);

In het geval dat de Waarneembaar is leeg, kunnen we gebruiken lastOrDefault, dat filtert de Waarneembaar het uitzenden van de standaardwaarde.

De standaardwaarde wordt ook verzonden als de lastOrDefault operator wordt gebruikt en er zijn geen items die de filtervoorwaarde verifiëren:

Waarneembare sourceObservable = Observable.range (1, 10); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable.lastOrDefault (-1, i -> i> 10); gefilterdObservable.subscribe (abonnee); subscriber.assertValue (-1);

2.8. elementAt en elementAtOrDefault Operatoren

Met de elementAt operator, kunnen we een enkel item kiezen dat door de bron wordt verzonden Waarneembaar, met vermelding van de index:

Waarneembare bronObservable = Waarneembare .just (1, 2, 3, 5, 7, 11); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable.elementAt (4); gefilterdObservable.subscribe (abonnee); subscriber.assertValue (7);

Echter, elementAt zal een IndexOutOfBoundException als de gespecificeerde index het aantal uitgezonden items overschrijdt.

Om deze situatie te voorkomen, is het mogelijk om elementAtOrDefault - die een standaardwaarde retourneert in het geval de index buiten het bereik valt:

Waarneembare bronObservable = Waarneembare .just (1, 2, 3, 5, 7, 11); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable.elementAtOrDefault (7, -1); gefilterdObservable.subscribe (abonnee); subscriber.assertValue (-1);

2.9. De ofType Operator

Wanneer het Waarneembaar zendt Voorwerp items, is het mogelijk om ze te filteren op basis van hun type.

Laten we eens kijken hoe we alleen de Draad type items verzonden:

Waarneembare sourceObservable = Observable.just (1, "twee", 3, "vijf", 7, 11); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable.ofType (String.class); gefilterdObservable.subscribe (abonnee); subscriber.assertValues ​​("twee", "vijf");

3. Overslaan

Aan de andere kant, wanneer we een aantal van de items die door een Waarneembaar, RxJava biedt een aantal operators als tegenhanger van de filterende, die we eerder hebben besproken.

Laten we beginnen met kijken naar de overspringen operator, de tegenhanger van nemen.

3.1. De overspringen Operator

Wanneer een Waarneembaar een reeks items uitzendt, is het mogelijk om enkele van de eerste verzonden items uit te filteren of over te slaan met overspringen.

Bijvoorbeeld. laten we eens kijken hoe het mogelijk is om de eerste vier elementen over te slaan:

Waarneembare sourceObservable = Observable.range (1, 10); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable.skip (4); gefilterdObservable.subscribe (abonnee); subscriber.assertValues ​​(5, 6, 7, 8, 9, 10);

3.2. De skipWhile Operator

Telkens wanneer we alle eerste waarden willen filteren die worden uitgezonden door een Waarneembaar die niet slagen voor een filterpredikaat, kunnen we de skipWhile operator:

Waarneembare bron Waarneembaar = Waarneembaar .just (1, 2, 3, 4, 5, 4, 3, 2, 1); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable .skipWhile (i -> i <4); gefilterdObservable.subscribe (abonnee); subscriber.assertValues ​​(4, 5, 4, 3, 2, 1);

3.3. De skipLast Operator

De skipLast operator stelt ons in staat om de laatste items over te slaan die door de Waarneembaar alleen degenen accepteren die vóór hen zijn uitgezonden.

Hiermee kunnen we bijvoorbeeld de laatste vijf items overslaan:

Waarneembare sourceObservable = Observable.range (1, 10); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = sourceObservable.skipLast (5); gefilterdObservable.subscribe (abonnee); subscriber.assertValues ​​(1, 2, 3, 4, 5);

3.4. onderscheiden en distinctUntilChanged Operatoren

De onderscheiden operator geeft een Waarneembaar dat alle items uitzendt die door de bron Waarneembaar die onderscheiden zijn:

Waarneembare bronObservable = Waarneembare .just (1, 1, 2, 2, 1, 3, 3, 1); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare distinctObservable = sourceObservable.distinct (); distinctObservable.subscribe (abonnee); subscriber.assertValues ​​(1, 2, 3);

Als we echter een Waarneembaar dat alle items uitzendt die door de bron Waarneembaar die verschillen van hun directe voorganger, kunnen we de distinctUntilChanged operator:

Waarneembare bron Waarneembaar = Waarneembaar .just (1, 1, 2, 2, 1, 3, 3, 1); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare distinctObservable = sourceObservable.distinctUntilChanged (); distinctObservable.subscribe (abonnee); subscriber.assertValues ​​(1, 2, 1, 3, 1);

3.5. De ignoreElements Operator

Telkens wanneer we alle elementen willen negeren die door de bron Waarneembaar, kunnen we gewoon de negeren Elementen:

Waarneembare sourceObservable = Observable.range (1, 10); TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembaar ignoredObservable = sourceObservable.ignoreElements (); ignoredObservable.subscribe (abonnee); subscriber.assertNoValues ​​();

4. Tijdfilterende operators

Bij het werken met een waarneembare reeks is de tijdas onbekend, maar soms kan het handig zijn om tijdige gegevens uit een reeks te halen.

Met dit doel, RxJava biedt een aantal methoden waarmee we kunnen werken Waarneembaar met behulp van ook de tijdas.

Laten we, voordat we naar de eerste gaan, een getimed definiëren Waarneembaar dat elke seconde een item zal uitstoten:

TestScheduler testScheduler = nieuwe TestScheduler (); Observable timedObservable = Observable .just (1, 2, 3, 4, 5, 6) .zipWith (Observable.interval (0, 1, TimeUnit.SECONDS, testScheduler), (item, tijd) -> item);

De TestScheduler is een speciale planner waarmee u de klok handmatig vooruit kunt zetten in welk tempo we maar willen.

4.1. monster en gasklepLast Operatoren

De monster operator filtert het getimed Waarneembaar, retourneert een Waarneembaar die de meest recente items uitzendt die door deze API binnen bepaalde tijdsintervallen worden verzonden.

Laten we eens kijken hoe we het getimed Waarneembaar, alleen het laatst verzonden item elke 2,5 seconden filteren:

TestSubscriber-abonnee = nieuwe TestSubscriber (); Observable sampledObservable = timedObservable .sample (2500L, TimeUnit.MILLISECONDS, testScheduler); sampledObservable.subscribe (abonnee); testScheduler.advanceTimeBy (7, TimeUnit.SECONDS); subscriber.assertValues ​​(3, 5, 6);

Dit soort gedrag kan ook worden bereikt met de gasklepLaatste operator.

4.2. De gas geven Operator

De gas geven operator verschilt van throttleLast / sample aangezien het het eerste item uitzendt dat wordt uitgezonden door de getimed Waarneembaar in elke bemonsteringsperiode in plaats van de meest recent uitgezonden.

Laten we eens kijken hoe we de eerste items kunnen uitzenden, met een bemonsteringsperiode van 4 seconden:

TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = timedObservable .throttleFirst (4100L, TimeUnit.SECONDS, testScheduler); gefilterdObservable.subscribe (abonnee); testScheduler.advanceTimeBy (7, TimeUnit.SECONDS); subscriber.assertValues ​​(1, 6);

4.3. debounce en throttleWithTimeout Operatoren

Met de debounce operator is het mogelijk om alleen een item uit te zenden als een bepaalde tijd is verstreken zonder een ander item uit te zenden.

Daarom, als we een tijdspanne selecteren die groter is dan het tijdsinterval tussen de uitgezonden items van de getimed Waarneembaar, het zal alleen de laatste uitzenden. Aan de andere kant, als het kleiner is, worden alle items uitgezonden door de getimed Waarneembaar.

Laten we eens kijken wat er gebeurt in het eerste scenario:

TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = timedObservable .debounce (2000L, TimeUnit.MILLISECONDS, testScheduler); gefilterdObservable.subscribe (abonnee); testScheduler.advanceTimeBy (7, TimeUnit.SECONDS); subscriber.assertValue (6);

Dit soort gedrag kan ook worden bereikt met throttleWithTimeout.

4.4. De time-out Operator

De time-out operator weerspiegelt de bron Waarneembaar, maar geef een meldingsfout, waardoor de emissie van items wordt afgebroken, als de bron Waarneembaar verzendt geen items tijdens een opgegeven tijdsinterval.

Laten we eens kijken wat er gebeurt als we een time-out van 500 milliseconden specificeren voor onze getimed Waarneembaar:

TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = timedObservable .timeout (500L, TimeUnit.MILLISECONDS, testScheduler); gefilterdObservable.subscribe (abonnee); testScheduler.advanceTimeBy (7, TimeUnit.SECONDS); subscriber.assertError (TimeoutException.class); subscriber.assertValues ​​(1);

5. Meerdere waarneembare filtering

Bij het werken met Waarneembaar, het is zeker mogelijk om te beslissen of items worden gefilterd of overgeslagen op basis van een seconde Waarneembaar.

Laten we, voordat we verder gaan, een vertraagd Waarneembaar, dat slechts 1 item zal uitstoten na 3 seconden:

Observable delayedObservable = Observable.just (1) .delay (3, TimeUnit.SECONDS, testScheduler);

Laten we beginnen met takeUntil operator.

5.1. De takeUntil Operator

De takeUntil operator negeert elk item dat door de bron wordt uitgezonden Waarneembaar (getimed Waarneembaar) na een seconde Waarneembaar (vertraagd Waarneembaar) zendt een item uit of beëindigt:

TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = timedObservable .skipUntil (delayedObservable); gefilterdObservable.subscribe (abonnee); testScheduler.advanceTimeBy (7, TimeUnit.SECONDS); subscriber.assertValues ​​(4, 5, 6);

5.2. De skipUntil Operator

Aan de andere kant, skipUntil verwijdert elk item dat door de bron wordt uitgezonden Waarneembaar (getimed Waarneembaar) tot een seconde Waarneembaar (vertraagd Waarneembaar) zendt een item uit:

TestSubscriber-abonnee = nieuwe TestSubscriber (); Waarneembare filterObservable = timedObservable .takeUntil (delayedObservable); gefilterdObservable.subscribe (abonnee); testScheduler.advanceTimeBy (7, TimeUnit.SECONDS); subscriber.assertValues ​​(1, 2, 3);

6. Conclusie

In deze uitgebreide tutorial hebben we de verschillende filteroperatoren onderzocht die beschikbaar zijn in RxJava, waarbij we van elk een eenvoudig voorbeeld hebben gegeven.

Zoals altijd zijn alle codevoorbeelden in dit artikel te vinden op GitHub.