Inleiding tot RSocket

1. Inleiding

In deze tutorial zullen we een eerste blik werpen op RSocket en hoe het client-server-communicatie mogelijk maakt.

2. Wat is RSocket?

RSocket is een binair point-to-point communicatieprotocol bedoeld voor gebruik in gedistribueerde toepassingen. In die zin biedt het een alternatief voor andere protocollen zoals HTTP.

Een volledige vergelijking tussen RSocket en andere protocollen valt buiten het bestek van dit artikel. In plaats daarvan zullen we ons concentreren op een belangrijk kenmerk van RSocket: de interactiemodellen.

RSocket biedt vier interactiemodellen. Met dat in gedachten zullen we ze allemaal onderzoeken met een voorbeeld.

3. Maven afhankelijkheden

RSocket heeft slechts twee directe afhankelijkheden nodig voor onze voorbeelden:

 io.rsocket rsocket-core 0.11.13 io.rsocket rsocket-transport-netty 0.11.13 

De afhankelijkheden rsocket-core en rsocket-transport-netty zijn beschikbaar op Maven Central.

Een belangrijke opmerking is dat de RSocket-bibliotheek veelvuldig gebruik maakt van reactieve streams. De Flux en Mono klassen worden in dit artikel gebruikt, dus een basiskennis ervan zal nuttig zijn.

4. Serverinstellingen

Laten we eerst het Server klasse:

public class Server {private final Wegwerpserver; openbare server () {this.server = RSocketFactory.receive () .acceptor ((setupPayload, reactiveSocket) -> Mono.just (nieuwe RSocketImpl ())) .transport (TcpServerTransport.create ("localhost", TCP_PORT)) .start (). abonneren (); } public void dispose () {this.server.dispose (); } privéklasse RSocketImpl breidt AbstractRSocket uit {}}

Hier gebruiken we de RSocketFactory om een ​​TCP-socket in te stellen en ernaar te luisteren. We passeren in onze gewoonte RSocketImpl om verzoeken van klanten af ​​te handelen. We zullen methoden toevoegen aan het RSocketImpl als we gaan.

Om de server vervolgens te starten, hoeven we deze alleen maar te instantiëren:

Server server = nieuwe server ();

Een enkele serverinstantie kan meerdere verbindingen verwerken. Als gevolg hiervan ondersteunt slechts één serverinstantie al onze voorbeelden.

Als we klaar zijn, is de weggooien methode stopt de server en geeft de TCP-poort vrij.

4. Interactiemodellen

4.1. Aanvraag antwoord

RSocket biedt een verzoek- / antwoordmodel - elk verzoek ontvangt één antwoord.

Voor dit model maken we een eenvoudige service die een bericht terugstuurt naar de klant.

Laten we beginnen met het toevoegen van een methode aan onze extensie van SamenvattingRSocket, RSocketImpl:

@Override public Mono requestResponse (Payload payload) {try {return Mono.just (payload); // reflecteer de payload terug naar de afzender} catch (uitzondering x) {return Mono.error (x); }}

De aanvraag antwoord methode retourneert een enkel resultaat voor elk verzoek, zoals we kunnen zien aan de Mono reactietype.

Laadvermogen is de klasse die berichtinhoud en metagegevens bevat. Het wordt gebruikt door alle interactiemodellen. De inhoud van de payload is binair, maar er zijn handige methoden die Draad-gebaseerde inhoud.

Vervolgens kunnen we onze klantenklasse maken:

openbare klasse ReqResClient {privé definitieve RSocket-socket; openbare ReqResClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } public String callBlocking (String string) {return socket .requestResponse (DefaultPayload.create (string)) .map (Payload :: getDataUtf8) .block (); } public void dispose () {this.socket.dispose (); }}

De klant gebruikt de RSocketFactory.connect () methode om een ​​socketverbinding met de server tot stand te brengen. Wij gebruiken de aanvraag antwoord methode op de socket om een ​​payload naar de server te sturen.

Onze payload bevat de Draad doorgegeven aan de klant. Wanneer de Mono antwoord komt, kunnen we de getDataUtf8 () methode om toegang te krijgen tot het Draad inhoud van de reactie.

Ten slotte kunnen we de integratietest uitvoeren om het verzoek / antwoord in actie te zien. We sturen een Draad naar de server en controleer of hetzelfde Draad wordt geretourneerd:

@ Test openbare leegte whenSendingAString_thenRevceiveTheSameString () {ReqResClient client = nieuwe ReqResClient (); String string = "Hallo RSocket"; assertEquals (string, client.callBlocking (string)); client.dispose (); }

4.2. Vuur-en-vergeten

Met het vuur-en-vergeet-model, de klant zal geen reactie van de server ontvangen.

In dit voorbeeld stuurt de client gesimuleerde metingen naar de server in intervallen van 50 ms. De server zal de metingen publiceren.

Laten we een afvuren-en-vergeten-handler toevoegen aan onze server in de RSocketImpl klasse:

@Override openbare Mono fireAndForget (Payload-payload) {probeer {dataPublisher.publish (payload); // stuur de payload terug Mono.empty (); } catch (uitzondering x) {return Mono.error (x); }}

Deze handler lijkt erg op de request / response-handler. Echter, fireAndForget geeft terug Mono in plaats van Mono.

De dataPublisher is een voorbeeld van org.reactivestreams.Publisher. Het maakt de payload dus beschikbaar voor abonnees. Daar maken we gebruik van in het request / stream-voorbeeld.

Vervolgens maken we de fire-and-forget-client:

openbare klasse FireNForgetClient {privé definitieve RSocket-socket; privé definitieve lijstgegevens; openbare FireNForgetClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } / ** Verzend binaire snelheid (float) elke 50 ms * / public void sendData () {data = Collections.unmodifiableList (GenereerData ()); Flux.interval (Duration.ofMillis (50)) .take (data.size ()) .map (this :: createFloatPayload) .flatMap (socket :: fireAndForget) .blockLast (); } // ...}

De socket setup is precies hetzelfde als voorheen.

De verstuur data() methode maakt gebruik van een Flux stream om meerdere berichten te verzenden. Voor elk bericht roepen we in socket :: fireAndForget.

We moeten ons abonneren op het Mono antwoord voor elk bericht. Als we ons dan vergeten in te schrijven socket :: fireAndForget zal niet worden uitgevoerd.

De flatMap operator zorgt ervoor dat het Ongeldig reacties worden doorgegeven aan de abonnee, terwijl de blockLast operator treedt op als abonnee.

We wachten tot de volgende sectie om de vuur-en-vergeet-test uit te voeren. Op dat moment maken we een aanvraag / stream-client om de gegevens te ontvangen die door de fire-and-forget-client zijn gepusht.

4.3. Verzoek / stream

In het aanvraag- / streammodel, een enkel verzoek kan meerdere reacties ontvangen. Om dit in actie te zien, kunnen we voortbouwen op het voorbeeld van vuur en vergeten. Om dat te doen, vragen we een stream om de metingen op te halen die we in de vorige sectie hebben verzonden.

Laten we net als eerder beginnen met het toevoegen van een nieuwe luisteraar aan het RSocketImpl op de server:

@Override openbare Flux requestStream (Payload payload) {return Flux.from (dataPublisher); }

De requestStream handler retourneert een Flux stroom. Zoals we ons herinneren uit de vorige sectie, is de fireAndForget handler publiceerde inkomende gegevens naar de dataPublisher. Nu gaan we een Flux stream met datzelfde dataPublisher als de gebeurtenisbron. Hierdoor stromen de meetgegevens asynchroon van onze fire-and-forget client naar onze request / stream client.

Laten we vervolgens de aanvraag / stream-client maken:

openbare klasse ReqStreamClient {privé definitieve RSocket-socket; openbare ReqStreamClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); } openbare Flux getDataStream () {retour socket .requestStream (DefaultPayload.create (DATA_STREAM_NAME)) .map (Payload :: getData) .map (buf -> buf.getFloat ()) .onErrorReturn (null); } public void dispose () {this.socket.dispose (); }}

We maken op dezelfde manier verbinding met de server als onze vorige clients.

In getDataStream ()we gebruiken socket.requestStream () om een ​​Flux-stream van de server te ontvangen. Uit die stroom halen we de Vlotter waarden uit de binaire gegevens. Ten slotte wordt de stream teruggestuurd naar de beller, zodat de beller zich erop kan abonneren en de resultaten kan verwerken.

Laten we nu testen. We verifiëren de heen-en terugreis van fire-and-forget naar request / stream.

We kunnen stellen dat elke waarde wordt ontvangen in dezelfde volgorde als waarin deze is verzonden. Vervolgens kunnen we stellen dat we hetzelfde aantal waarden ontvangen als verzonden:

@Test openbare leegte whenSendingStream_thenReceiveTheSameStream () {FireNForgetClient fnfClient = nieuwe FireNForgetClient (); ReqStreamClient streamClient = nieuwe ReqStreamClient (); Lijstgegevens = fnfClient.getData (); Lijst dataReceived = nieuwe ArrayList (); Wegwerpabonnement = streamClient.getDataStream () .index () .subscribe (tuple -> {assertEquals ("Verkeerde waarde", data.get (tuple.getT1 (). IntValue ()), tuple.getT2 ()); dataReceived. add (tuple.getT2 ());}, err -> LOG.error (err.getMessage ())); fnfClient.sendData (); // ... dispose client & abonnement assertEquals ("Verkeerde datatelling ontvangen", data.size (), dataReceived.size ()); }

4.4. Kanaal

Het kanaalmodel biedt bidirectionele communicatie. In dit model stromen berichtstromen asynchroon in beide richtingen.

Laten we een eenvoudige spelsimulatie maken om dit te testen. In dit spel wordt elke kant van het kanaal een speler. Terwijl het spel loopt, sturen deze spelers met willekeurige tussenpozen berichten naar de andere kant. De andere kant reageert op de berichten.

Ten eerste maken we de handler op de server. Net als eerder voegen we toe aan de RSocketImpl:

@Override openbare Flux requestChannel (payloads uitgever) {Flux.from (payloads) .subscribe (gameController :: processPayload); retourneer Flux.from (gameController); }

De requestChannel handler heeft Laadvermogen streams voor zowel input als output. De Uitgever invoerparameter is een stroom van payloads die van de client wordt ontvangen. Bij aankomst worden deze ladingen doorgegeven aan de gameController :: processPayload functie.

Als reactie sturen we een ander terug Flux stream terug naar de klant. Deze stream is gemaakt op basis van ons gameController, wat ook een Uitgever.

Hier is een samenvatting van de GameController klasse:

public class GameController implementeert uitgever {@Override public void subscribe (abonnee-abonnee) {// verzend Payload-berichten naar de abonnee met willekeurige tussenpozen} public void processPayload (Payload-payload) {// reageer op berichten van de andere speler}}

Wanneer de GameController een abonnee ontvangt, begint het berichten naar die abonnee te verzenden.

Laten we vervolgens de client maken:

openbare klasse ChannelClient {privé definitieve RSocket-socket; privé finale GameController gameController; openbare ChannelClient () {this.socket = RSocketFactory.connect () .transport (TcpClientTransport.create ("localhost", TCP_PORT)) .start () .block (); this.gameController = nieuwe GameController ("Client Player"); } openbare ongeldige playGame () {socket.requestChannel (Flux.from (gameController)) .doOnNext (gameController :: processPayload) .blockLast (); } public void dispose () {this.socket.dispose (); }}

Zoals we in onze vorige voorbeelden hebben gezien, maakt de client op dezelfde manier verbinding met de server als de andere clients.

De client maakt zijn eigen exemplaar van het GameController.

We gebruiken socket.requestChannel () om onze te sturen Laadvermogen stream naar de server. De server reageert met een eigen Payload-stream.

Als payloads ontvangen van de server geven we ze door aan onze gameController :: processPayload handler.

In onze spelsimulatie zijn de client en server spiegelbeelden van elkaar. Dat is, elke kant stuurt een stroom van Laadvermogen en het ontvangen van een stroom van Laadvermogen van de andere kant.

De streams draaien onafhankelijk, zonder synchronisatie.

Laten we tot slot de simulatie in een test uitvoeren:

@Test openbare leegte whenRunningChannelGame_thenLogTheResults () {ChannelClient-client = nieuwe ChannelClient (); client.playGame (); client.dispose (); }

5. Conclusie

In dit inleidende artikel hebben we de interactiemodellen van RSocket onderzocht. De volledige broncode van de voorbeelden is te vinden in onze Github-repository.

Zorg ervoor dat je de RSocket-website bezoekt voor een diepere discussie. Met name de documenten met veelgestelde vragen en motivaties bieden een goede achtergrond.