Inleiding tot RxJava

1. Overzicht

In dit artikel gaan we ons concentreren op het gebruik van Reactive Extensions (Rx) in Java om reeksen gegevens samen te stellen en te gebruiken.

In één oogopslag lijkt de API op Java 8 Streams, maar in feite is hij veel flexibeler en vloeiender, waardoor het een krachtig programmeerparadigma is.

Als je meer wilt lezen over RxJava, bekijk dan dit artikel.

2. Installatie

Om RxJava in ons Maven-project te gebruiken, moeten we de volgende afhankelijkheid toevoegen aan onze pom.xml:

 io.reactivex rxjava $ {rx.java.version} 

Of, voor een Gradle-project:

compileer 'io.reactivex.rxjava: rxjava: x.y.z'

3. Functionele reactieve concepten

Aan de ene kant, functionele programmering is het proces van het bouwen van software door pure functies samen te stellen, waarbij gedeelde status, veranderlijke gegevens en bijwerkingen worden vermeden.

Aan de andere kant, reactief programmeren is een asynchroon programmeerparadigma dat zich bezighoudt met datastromen en de voortplanting van veranderingen.

Samen, functioneel reactief programmeren vormt een combinatie van functionele en reactieve technieken die een elegante benadering van gebeurtenisgestuurd programmeren kunnen vertegenwoordigen - met waarden die in de loop van de tijd veranderen en waarbij de consument reageert op de gegevens die binnenkomen.

Deze technologie brengt verschillende implementaties van de kernprincipes samen. Sommige auteurs kwamen met een document dat de algemene woordenschat definieert voor het beschrijven van het nieuwe type toepassingen.

3.1. Reactief manifest

Het Reactive Manifesto is een online document dat hoge eisen stelt aan toepassingen binnen de softwareontwikkelingsindustrie. Simpel gezegd, reactieve systemen zijn:

  • Responsief - systemen moeten tijdig reageren
  • Message Driven - systemen moeten asynchrone berichtenuitwisseling tussen componenten gebruiken om een ​​losse koppeling te garanderen
  • Elastisch - systemen moeten responsief blijven onder hoge belasting
  • Veerkrachtig - systemen moeten responsief blijven wanneer sommige componenten falen

4. Waarneembare

Er zijn twee hoofdtypen die u moet begrijpen wanneer u ermee werkt Rx:

  • Waarneembaar vertegenwoordigt elk object dat gegevens uit een gegevensbron kan halen en waarvan de staat van belang kan zijn op een manier dat andere objecten een interesse kunnen registreren
  • Een waarnemer is elk object dat op de hoogte wil worden gebracht wanneer de toestand van een ander object verandert

Een waarnemer abonneert zich op een Waarneembaar volgorde. De reeks stuurt items naar de waarnemer een per keer.

De waarnemer behandelt ze allemaal voordat ze de volgende verwerken. Als veel gebeurtenissen asynchroon binnenkomen, moeten ze in een wachtrij worden opgeslagen of worden verwijderd.

In Rx, een waarnemer zal nooit worden gebeld met een item dat niet in orde is of gebeld voordat de terugbelopdracht is teruggekeerd voor het vorige item.

4.1. Types van Waarneembaar

Er zijn twee soorten:

  • Niet-blokkerend - asynchrone uitvoering wordt ondersteund en kan zich op elk moment in de gebeurtenisstream uitschrijven. In dit artikel zullen we ons voornamelijk concentreren op dit soort typen
  • Blokkeren - alle onNext waarnemersoproepen zijn synchroon en het is niet mogelijk om je af te melden midden in een gebeurtenisstroom. We kunnen altijd een Waarneembaar in een Waarneembaar blokkeren, met behulp van de methode naarBlocking:
BlockingObservable blockingObservable = observable.toBlocking ();

4.2. Operatoren

Een operator is een functie die er een nodig heeft Obruikbaar (de bron) als het eerste argument en retourneert een ander Waarneembaar (de bestemming). Vervolgens zal voor elk item dat de waarneembare bron uitzendt, een functie op dat item worden toegepast en vervolgens het resultaat op de bestemming verzenden Waarneembaar.

Operatoren kunnen aan elkaar worden geketend om complexe gegevensstromen te creëren die gebeurtenissen filteren op basis van bepaalde criteria. Er kunnen meerdere operators op hetzelfde worden toegepast waarneembaar.

Het is niet moeilijk om in een situatie te komen waarin een Waarneembaar zendt items sneller uit dan een operator of waarnemer kan ze consumeren. U kunt hier meer lezen over tegendruk.

4.3. Creëer waarneembaar

De basisoperator alleen maar produceert een Waarneembaar die een enkele generieke instantie uitzendt voordat deze is voltooid, de String "Hallo". Als we informatie uit een Waarneembaarimplementeren we een waarnemer interface en vervolgens abonneren op de gewenste Waarneembaar:

Observable observable = Observable.just ("Hallo"); observable.subscribe (s -> resultaat = s); assertTrue (result.equals ("Hallo"));

4.4. OnNext, OnError, en OnCompleted

Er zijn drie methoden op het waarnemer interface die we willen weten over:

  1. OnNext wordt opgeroepen waarnemer elke keer dat een nieuw evenement wordt gepubliceerd naar de bijgevoegde Waarneembaar. Dit is de methode waarbij we een actie uitvoeren op elke gebeurtenis
  2. OnCompleted wordt aangeroepen wanneer de reeks gebeurtenissen die is gekoppeld aan een Waarneembaar is compleet, wat aangeeft dat we niet meer mogen verwachten onNext roept onze waarnemer op
  3. OnError wordt aangeroepen wanneer een niet-verwerkte uitzondering wordt gegenereerd tijdens de RxJava framework code of onze event handling code

De retourwaarde voor de Waarneembareabonneren methode is een abonneren koppel:

String [] letters = {"a", "b", "c", "d", "e", "f", "g"}; Observable observable = Observable.from (letters); observable.subscribe (i -> resultaat + = i, // OnNext Throwable :: printStackTrace, // OnError () -> resultaat + = "_Completed" // OnCompleted); assertTrue (result.equals ("abcdefg_Completed"));

5. Waarneembare transformaties en voorwaardelijke operatoren

5.1. Kaart

Henap-operator transformeert items die worden uitgezonden door een Waarneembaar door een functie op elk item toe te passen.

Laten we aannemen dat er een gedeclareerde reeks strings is die enkele letters uit het alfabet bevat en we willen ze in hoofdlettermodus afdrukken:

Observable.from (letters) .map (String :: toUpperCase) .subscribe (letter -> resultaat + = letter); assertTrue (result.equals ("ABCDEFG"));

De flatMap kan worden gebruikt om af te vlakken Waarneembare telkens als we eindigen met genest Waarneembare.

Meer details over het verschil tussen kaart en flatMap vind je hier.

Ervan uitgaande dat we een methode hebben die een retourneert Waarneembaar uit een lijst met strings. Nu gaan we afdrukken voor elke string van een nieuwe Waarneembaar de lijst met titels op basis van wat Abonnee ziet:

Observable getTitle () {return Observable.from (titleList); } Observable.just ("book1", "book2") .flatMap (s -> getTitle ()) .subscribe (l -> resultaat + = l); assertTrue (result.equals ("titletitle"));

5.2. Scannen

De scanoperator apast een functie toe voor elk item dat wordt uitgezonden door een Waarneembaar opeenvolgend en zendt elke opeenvolgende waarde uit.

Het stelt ons in staat om de status van gebeurtenis naar gebeurtenis over te dragen:

String [] letters = {"a", "b", "c"}; Observable.from (letters) .scan (nieuwe StringBuilder (), StringBuilder :: append) .subscribe (totaal -> resultaat + = total.toString ()); assertTrue (result.equals ("aababc"));

5.3. GroupBy

Groeperen op operator stelt ons in staat om de gebeurtenissen in de invoer te classificeren Waarneembaar in outputcategorieën.

Laten we aannemen dat we een reeks gehele getallen van 0 tot 10 hebben gemaakt en vervolgens toepassen groeperen op dat zal ze in de categorieën verdelen zelfs en vreemd:

Observable.from (numbers) .groupBy (i -> 0 == (i% 2)? "EVEN": "ODD") .subscribe (group -> group.subscribe ((number) -> {if (group.getKey () .toString (). equals ("EVEN")) {EVEN [0] + = getal;} anders {ONEVEN [0] + = getal;}})); assertTrue (EVEN [0] .equals ("0246810")); assertTrue (ODD [0] .equals ("13579"));

5.4. Filter

De operator filter zendt alleen die items uit een waarneembaar die een predikaat test.

Dus laten we filteren in een integer-array voor de oneven getallen:

Observable.from (numbers) .filter (i -> (i% 2 == 1)) .subscribe (i -> resultaat + = i); assertTrue (result.equals ("13579"));

5.5. Voorwaardelijke operators

DefaultIfEmpty zendt item uit de bron Waarneembaar, of een standaarditem als de source Waarneembaar is leeg:

Observable.empty () .defaultIfEmpty ("Observable is leeg") .subscribe (s -> resultaat + = s); assertTrue (result.equals ("Waarneembaar is leeg"));

De volgende code geeft de eerste letter van het alfabet ‘een' omdat de array brieven is niet leeg en dit is wat het bevat in de eerste positie:

Observable.from (letters) .defaultIfEmpty ("Observable is leeg") .first () .subscribe (s -> resultaat + = s); assertTrue (result.equals ("a"));

TakeWhile operator verwijdert items die zijn uitgezonden door een Waarneembaar nadat een opgegeven voorwaarde onwaar wordt:

Observable.from (getallen) .takeWhile (i -> i som [0] + = s); assertTrue (som [0] == 10);

Natuurlijk zijn er nog meer andere operators die in onze behoeften kunnen voorzien, zoals Bevat, SkipWhile, SkipUntil, TakeUntil, enz.

6. Verbindbare waarneembare

EEN Verbindbaar Waarneembaar lijkt op een gewoon Waarneembaar, behalve dat het geen items begint uit te zenden wanneer het is geabonneerd, maar alleen wanneer het aansluiten operator wordt erop toegepast.

Op deze manier kunnen we wachten tot alle beoogde waarnemers zich hebben geabonneerd op het Waarneembaar voor de Waarneembaar begint items uit te zenden:

String [] resultaat = {""}; ConnectableObservable connectable = Observable.interval (200, TimeUnit.MILLISECONDS) .publish (); connectable.subscribe (i -> resultaat [0] + = i); assertFalse (resultaat [0] .equals ("01")); connectable.connect (); Thread.sleep (500); assertTrue (resultaat [0] .equals ("01"));

7. Enig

Single is als een Waarneembaar die, in plaats van een reeks waarden uit te zenden, één waarde of een foutmelding uitzendt.

Met deze gegevensbron kunnen we slechts twee methoden gebruiken om ons te abonneren:

  • OnSuccess geeft een terug Single dat roept ook een methode op die we specificeren
  • OnError geeft ook een Single die abonnees onmiddellijk op de hoogte stelt van een fout
String [] resultaat = {""}; Single single = Observable.just ("Hallo") .toSingle () .doOnSuccess (i -> resultaat [0] + = i) .doOnError (fout -> {gooi nieuwe RuntimeException (error.getMessage ());}); single.subscribe (); assertTrue (resultaat [0] .equals ("Hallo"));

8. Onderwerpen

EEN Onderwerpen is tegelijkertijd twee elementen, a abonnee en een waarneembaar. Als abonnee kan een onderwerp worden gebruikt om de gebeurtenissen te publiceren die afkomstig zijn van meer dan één waarneembaar.

En omdat het ook waarneembaar is, kunnen de gebeurtenissen van meerdere abonnees opnieuw worden uitgezonden als gebeurtenissen voor iedereen die het observeert.

In het volgende voorbeeld zullen we bekijken hoe de waarnemers de gebeurtenissen kunnen zien die plaatsvinden nadat ze zich hebben geabonneerd:

Geheel getal abonnee1 = 0; Geheel getal abonnee2 = 0; Observer getFirstObserver () {return new Observer () {@Override public void onNext (Integer value) {subscriber1 + = value; } @Override public void onError (Throwable e) {System.out.println ("fout"); } @Override public void onCompleted () {System.out.println ("Abonnee1 voltooid"); }}; } Observer getSecondObserver () {retourneer nieuwe Observer () {@Override public void onNext (Integer waarde) {subscriber2 + = value; } @Override public void onError (Throwable e) {System.out.println ("fout"); } @Override public void onCompleted () {System.out.println ("Abonnee2 voltooid"); }}; } PublishSubject subject = PublishSubject.create (); subject.subscribe (getFirstObserver ()); subject.onNext (1); subject.onNext (2); subject.onNext (3); subject.subscribe (getSecondObserver ()); subject.onNext (4); subject.onCompleted (); assertTrue (abonnee1 + abonnee2 == 14)

9. Beheer van middelen

Gebruik makend van werking stelt ons in staat om bronnen te koppelen, zoals een JDBC-databaseverbinding, een netwerkverbinding, of bestanden te openen met onze waarneembare gegevens.

Hier presenteren we in commentaren de stappen die we moeten nemen om dit doel te bereiken en ook een voorbeeld van implementatie:

String [] resultaat = {""}; Waarneembare waarden = Observable.using (() -> "MyResource", r -> {return Observable.create (o -> {for (Character c: r.toCharArray ()) {o.onNext (c);} o. onCompleted ();});}, r -> System.out.println ("Disposed:" + r)); waarden.abonneren (v -> resultaat [0] + = v, e -> resultaat [0] + = e); assertTrue (resultaat [0] .equals ("MyResource"));

10. Conclusie

In dit artikel hebben we het gehad over het gebruik van de RxJava-bibliotheek en ook over het verkennen van de belangrijkste functies.

De volledige broncode voor het project inclusief alle codevoorbeelden die hier worden gebruikt, is te vinden op Github.