Het verschil tussen RxJava API en de Java 9 Flow API

1. Inleiding

Java Flow API is geïntroduceerd in Java 9 als een implementatie van Reactive Stream Specification.

In deze tutorial onderzoeken we eerst reactieve streams. Vervolgens leren we over de relatie met RxJava en Flow API.

2. Wat zijn reactieve streams?

Het Reactive Manifesto introduceerde Reactive Streams om een ​​standaard te specificeren voor asynchrone streamverwerking met niet-blokkerende tegendruk.

De reikwijdte van de Reactive Stream-specificatie is te definiëren een minimale set interfaces om die doelen te bereiken:

  • org.reactivestreams.Publisher is een dataprovider die gegevens naar de abonnees publiceert op basis van hun vraag

  • org.reactivestreams.Subscriber is de consument van data - het kan data ontvangen nadat het geabonneerd is op een uitgever

  • org.reactivestreams.Subscription wordt gemaakt wanneer een uitgever een abonnee accepteert

  • org.reactivestreams.Processor is zowel een abonnee als een uitgever - hij is geabonneerd op een uitgever, verwerkt de gegevens en geeft vervolgens de verwerkte gegevens door aan de abonnee

Flow API komt voort uit de specificatie. RxJava gaat eraan vooraf, maar sinds 2.0 ondersteunt RxJava ook de specificatie.

We gaan dieper in op beide, maar laten we eerst een praktische use case bekijken.

3. Gebruiksvoorbeeld

Voor deze zelfstudie gebruiken we een livestream-videoservice als ons geval.

Een livestreamvideo is, in tegenstelling tot videostreaming op aanvraag, niet afhankelijk van de consument. Daarom publiceert de server de stream in zijn eigen tempo en is het de verantwoordelijkheid van de consument om zich aan te passen.

In de meest eenvoudige vorm bestaat ons model uit een videostreamuitgever en een videospeler als abonnee.

Laten we implementeren VideoFrame als ons gegevensitem:

openbare klasse VideoFrame {privé lang nummer; // aanvullende gegevensvelden // constructor, getters, setters}

Laten we dan onze Flow API- en RxJava-implementaties een voor een doornemen.

4. Implementatie met Flow API

De Flow API's in JDK 9 komen overeen met de Reactive Streams-specificatie. Met de Flow API, als de applicatie aanvankelijk N items opvraagt, pusht de uitgever maximaal N items naar de abonnee.

De Flow API-interfaces bevinden zich allemaal in de java.util.concurrent.Flow koppel. Ze zijn semantisch equivalent aan hun respectievelijke Reactive Streams-tegenhangers.

Laten we implementeren VideoStreamServer als uitgever van VideoFrame.

openbare klasse VideoStreamServer breidt SubmissionPublisher uit {openbare VideoStreamServer () {super (Executors.newSingleThreadExecutor (), 5); }}

We hebben onze VideoStreamServer van Indiening Uitgever in plaats van direct te implementeren Flow :: Uitgever. Indiening Uitgever is JDK-implementatie van Flow :: Uitgever voor asynchrone communicatie met abonnees, dus het laat onze VideoStreamServer om in zijn eigen tempo uit te zenden.

Het is ook handig voor tegendruk en bufferbehandeling, want wanneer SubmissionPublisher :: abonneren aangeroepen, maakt het een instantie van Gebufferd abonnement, en voegt vervolgens het nieuwe abonnement toe aan de reeks abonnementen. Gebufferd abonnement kan uitgegeven items bufferen tot SubmissionPublisher # maxBufferCapacity.

Laten we nu definiëren Video speler, die een stroom van VideoFrame. Daarom moet het implementeren Flow :: Abonnee.

openbare klasse VideoPlayer implementeert Flow.Subscriber {Flow.Subscription-abonnement = null; @Override public void onSubscribe (Flow.Subscription-abonnement) {this.subscription = abonnement; abonnement.verzoek (1); } @Override public void onNext (VideoFrame-item) {log.info ("play # {}", item.getNumber ()); abonnement.verzoek (1); } @Override public void onError (Throwable throwable) {log.error ("Er is een fout opgetreden in videostreaming: {}", throwable.getMessage ()); } @Override public void onComplete () {log.error ("Video is beëindigd"); }}

Video speler abonneert zich op VideoStreamServer, dan na een succesvol abonnement Video speler::opSubscribe methode wordt aangeroepen, en het vraagt ​​om één frame. Video speler:: onNext ontvangt het frame en vraagt ​​om een ​​nieuw frame. Het aantal aangevraagde frames is afhankelijk van de use case en Abonnee implementaties.

Laten we tot slot de zaken op een rijtje zetten:

VideoStreamServer streamServer = nieuwe VideoStreamServer (); streamServer.subscribe (nieuwe VideoPlayer ()); // dien videoframes in ScheduledExecutorService executor = Executors.newScheduledThreadPool (1); AtomicLong frameNumber = nieuwe AtomicLong (); executor.scheduleWithFixedDelay (() -> {streamServer.offer (nieuw VideoFrame (frameNumber.getAndIncrement ()), (subscriber, videoFrame) -> {subscriber.onError (nieuwe RuntimeException ("Frame #" + videoFrame.getNumber () + " weggevallen vanwege tegendruk ")); return true;});}, 0, 1, TimeUnit.MILLISECONDS); slaap (1000);

5. Implementatie met RxJava

RxJava is een Java-implementatie van ReactiveX. Het ReactiveX (of Reactive Extensions) project beoogt een reactief programmeerconcept te bieden. Het is een combinatie van het Observer-patroon, het Iterator-patroon en functionele programmering.

De laatste grote versie voor RxJava is 3.x. RxJava ondersteunt Reactive Streams sinds versie 2.x met zijn Vloeiend basisklasse, maar het is een belangrijkere set dan Reactive Streams met verschillende basisklassen zoals Vloeiend, Waarneembaar, Single, Voltooibaar.

Vloeiend als reactieve stroom compliantiecomponent is een stroom van 0 tot N items met tegendrukbehandeling. Vloeiend strekt zich uit Uitgever van Reactive Streams. Daarom accepteren veel RxJava-operators Uitgever direct en maakt directe samenwerking met andere Reactive Streams-implementaties mogelijk.

Laten we nu onze videostreamgenerator maken, een oneindige luie stream:

Stream videoStream = Stream.iterate (nieuw VideoFrame (0), videoFrame -> {// slaap gedurende 1 ms; retourneer nieuw VideoFrame (videoFrame.getNumber () + 1);});

Dan definiëren we een Vloeiend instantie om frames op een aparte thread te genereren:

Vloeiend .fromStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ()))

Het is belangrijk op te merken dat een oneindige stream voldoende voor ons is, maar als we een flexibelere manier nodig hebben om onze stream te genereren, Vloeiend. Creëren is een goede keus.

Flowable .create (new FlowableOnSubscribe () {AtomicLong frame = new AtomicLong (); @Override public void subscribe (@NonNull FlowableEmitter zender) {while (true) {emitter.onNext (nieuw VideoFrame (frame.incrementAndGet ())); / / slaap gedurende 1 ms tot simualte vertraging}}}, / * Stel hier tegendrukstrategie in * /)

Vervolgens, bij de volgende stap, neemt VideoPlayer een abonnement op deze Flowable en observeert items op een aparte thread.

videoFlowable .observeOn (Schedulers.from (Executors.newSingleThreadExecutor ())) .subscribe (item -> {log.info ("play #" + item.getNumber ()); // 30 ms slapen om frameweergave te simuleren}) ;

En tot slot gaan we de strategie voor tegendruk configureren. Als we de video willen stoppen in geval van frameverlies, moeten we daarom gebruiken BackpressureOverflowStrategy :: ERROR als de buffer vol is.

Vloeiend .fromStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ())) .onBackpressureBuffer (5, null, BackpressureOverflowStrategy.ERROR) .observeOn (Schedulers.from (Executors.newSubscribe) (item) -. > {log.info ("play #" + item.getNumber ()); // 30 ms slapen om frameweergave te simuleren});

6. Vergelijking van RxJava en Flow API

Zelfs in deze twee eenvoudige implementaties kunnen we zien hoe de API van RxJava rijk is, vooral voor bufferbeheer, foutafhandeling en tegendrukstrategie. Het geeft ons meer opties en minder regels code met zijn vloeiende API. Laten we nu eens kijken naar meer gecompliceerde gevallen.

Stel dat onze speler geen videoframes kan weergeven zonder een codec. Daarom moeten we met Flow API een Verwerker om de codec te simuleren en tussen server en speler te zitten. Met RxJava kunnen we het doen met Vloeiend :: flatMap of Vloeiend :: kaart.

Of laten we ons voorstellen dat onze speler ook live vertaalaudio gaat uitzenden, dus we moeten streams van video en audio van afzonderlijke uitgevers combineren. Met RxJava kunnen we Flowable :: combineren maar met Flow API is het geen gemakkelijke taak.

Hoewel het mogelijk is om een ​​gewoonte te schrijven Verwerker die zich abonneert op beide streams en de gecombineerde gegevens naar ons verzendt Video speler. De implementatie levert echter hoofdpijn op.

7. Waarom Flow API?

Op dit punt hebben we misschien een vraag: wat is de filosofie achter de Flow API?

Als we zoeken naar Flow API-gebruik in de JDK, kunnen we iets vinden in java.net.http en jdk.internal.net.http.

Verder kunnen we adapters vinden in het reactorproject of reactieve stroompakket. Bijvoorbeeld, org.reactivestreams.FlowAdapters heeft methoden voor het converteren van Flow API-interfaces naar Reactive Stream-interfaces en vice versa. Daarom helpt het de interoperabiliteit tussen Flow API en bibliotheken met ondersteuning voor reactieve streams.

Al deze feiten helpen ons het doel van Flow API te begrijpen: het is gemaakt als een groep van reactieve specificatie-interfaces in JDK zonder relais op derden. Bovendien verwacht Java dat Flow API wordt geaccepteerd als standaardinterfaces voor reactieve specificatie en wordt gebruikt in JDK of andere op Java gebaseerde bibliotheken die de reactieve specificatie voor middlewares en hulpprogramma's implementeren.

8. Conclusies

In deze tutorial hebben we een inleiding tot Reactive Stream Specification, Flow API en RxJava.

Verder hebben we een praktisch voorbeeld gezien van Flow API- en RxJava-implementaties voor een live videostream.

Maar alle aspecten van Flow API en RxJava houden van Stroom :: Processor, Vloeiend :: kaart en Vloeiend :: flatMap of tegendrukstrategieën vallen hier niet onder.

Zoals altijd vind je de volledige code van de tutorial op GitHub.