Gids voor Akka Streams

1. Overzicht

In dit artikel zullen we kijken naar de akka-streams bibliotheek die is gebouwd bovenop het Akka actor-framework, dat zich houdt aan het manifest van reactive streams. De Akka Streams API stelt ons in staat om eenvoudig datatransformatiestromen samen te stellen uit onafhankelijke stappen.

Bovendien vindt alle verwerking plaats op een reactieve, niet-blokkerende en asynchrone manier.

2. Maven afhankelijkheden

Om te beginnen, moeten we het akka-stream en akka-stream-testkit bibliotheken in onze pom.xml:

 com.typesafe.akka akka-stream_2.11 2.5.2 com.typesafe.akka akka-stream-testkit_2.11 2.5.2 

3. Akka Streams API

Om met Akka Streams te werken, moeten we op de hoogte zijn van de belangrijkste API-concepten:

  • Bron - het toegangspunt tot verwerking in het akka-stream bibliotheek - we kunnen een instantie van deze klasse maken vanuit meerdere bronnen; we kunnen bijvoorbeeld de enkel () methode als we een Bron van een enkele Draad, of we kunnen een Bron van een Herhaalbaar van elementen
  • Stromen - de belangrijkste bouwsteen voor de verwerking - elk Stromen instantie heeft één invoer- en één uitvoerwaarde
  • Materializer - wWe kunnen er een gebruiken als we die van ons willen Stromen om enkele bijwerkingen te hebben, zoals het vastleggen of opslaan van resultaten; meestal passeren we de Niet gebruikt alias als een Materializer om aan te geven dat onze Stromen mag geen bijwerkingen hebben
  • Wastafel operatie - wanneer we een Stromen, het wordt niet uitgevoerd totdat we een Wastafel operatie erop - het is een terminaloperatie die alle berekeningen in het geheel triggert Stromen

4. Creëren Stromen in Akka Streams

Laten we beginnen met het bouwen van een eenvoudig voorbeeld, waarin we laten zien hoe maak en combineer meerdere Stromens - om een ​​stroom van gehele getallen te verwerken en het gemiddelde bewegende venster van paren van gehele getallen uit de stroom te berekenen.

We ontleden een puntkomma als scheidingsteken Draad van gehele getallen als invoer om onze te maken akka-stream Bron voor het voorbeeld.

4.1. Gebruik maken van een Stromen om invoer te parseren

Laten we eerst een DataImporter klasse die een instantie van de ActorSystem die we later zullen gebruiken om onze Stromen:

openbare klasse DataImporter {privé ActorSystem actorSystem; // standaard constructeurs, getters ...}

Laten we vervolgens een parseLine methode die een Lijst van Geheel getal van onze gescheiden invoer Draad. Houd er rekening mee dat we Java Stream API hier alleen gebruiken voor het parseren:

privélijst parseLine (String lijn) {String [] velden = line.split (";"); retourneer Arrays.stream (velden) .map (geheel getal :: parseInt) .collect (Collectors.toList ()); }

Onze initiaal Stromen zal van toepassing zijn parseLine op onze inbreng om een Stromen met invoertype Draad en uitvoertype Geheel getal:

private Flow parseContent () {return Flow.of (String.class) .mapConcat (this :: parseLine); }

Als we de parseLine () methode, weet de compiler dat het argument voor die lambda-functie a zal zijn Draad - hetzelfde als het invoertype voor onze Stromen.

Merk op dat we de mapConcat () methode - gelijk aan de Java 8 flatMap () methode - omdat we de Lijst van Geheel getal geretourneerd door parseLine () in een Stromen van Geheel getal zodat de volgende stappen in onze verwerking niet te maken hebben met de Lijst.

4.2. Gebruik maken van een Stromen om berekeningen uit te voeren

Op dit punt hebben we onze Stromen van ontlede gehele getallen. Nu moeten we implementeer logica die alle invoerelementen in paren groepeert en een gemiddelde van die paren berekent.

Nu, we zullen Maak een Stromen van Geheel getals en groepeer ze met behulp van de gegroepeerd () methode.

Vervolgens willen we een gemiddelde berekenen.

Omdat we niet geïnteresseerd zijn in de volgorde waarin die gemiddelden worden verwerkt, kunnen we dat wel hebben gemiddelden die parallel zijn berekend met behulp van meerdere threads met behulp van de mapAsyncUnordered () methode, het aantal threads doorgeven als een argument voor deze methode.

De actie die zal worden doorgegeven als de lambda naar de Stromen moet een CompletableFuture omdat die actie asynchroon wordt berekend in de afzonderlijke thread:

private Flow computeAverage () {return Flow.of (Integer.class) .grouped (2) .mapAsyncUnordered (8, gehele getallen -> CompletableFuture.supplyAsync (() -> gehele getallen.stream () .mapToDouble (v -> v). gemiddelde () .orElse (-1.0))); }

We berekenen gemiddelden in acht parallelle threads. Merk op dat we de Java 8 Stream API gebruiken voor het berekenen van een gemiddelde.

4.3. Meerdere samenstellen Stromen in een single Stromen

De Stromen API is een vloeiende abstractie waarmee we dat kunnen stel meerdere samen Stromen instanties om ons uiteindelijke verwerkingsdoel te bereiken. We kunnen granulaire stromen hebben waar men bijvoorbeeld aan het ontleden is JSON, een ander is bezig met een transformatie, en een ander is bezig met het verzamelen van statistieken.

Een dergelijke granulariteit helpt ons om meer testbare code te maken, omdat we elke verwerkingsstap onafhankelijk kunnen testen.

We hebben hierboven twee stromen gemaakt die onafhankelijk van elkaar kunnen werken. Nu willen we ze samen samenstellen.

Ten eerste willen we onze input analyseren Draad, en vervolgens willen we een gemiddelde berekenen op basis van een stroom elementen.

We kunnen onze stromen samenstellen met behulp van de via() methode:

Stroom berekenenAverage () {retour Flow.of (String.class) .via (parseContent ()) .via (computeAverage ()); }

We hebben een Stromen met invoertype Draad en twee andere stromen erachter. De parseContent ()Stromen duurt een Draad input en retourneert een Geheel getal als output. De computeAverage () stroom neemt dat Geheel getal en berekent een gemiddeld rendement Dubbele als het uitvoertype.

5. Toevoegen Wastafel naar de Stromen

Zoals we al zeiden, tot nu toe het geheel Stromen is nog niet uitgevoerd omdat het lui is. Om de uitvoering van het Stromen we moeten een Wastafel. De Wastafel bewerking kan bijvoorbeeld gegevens in een database opslaan of resultaten naar een externe webservice verzenden.

Stel dat we een GemiddeldRepository klasse met de volgende sparen() methode die resultaten naar onze database schrijft:

CompletionStage save (dubbel gemiddelde) {return CompletableFuture.supplyAsync (() -> {// schrijf naar database retour gemiddelde;}); }

Nu willen we een Wastafel bewerking die deze methode gebruiken om de resultaten van onze Stromen verwerken. Om onze Wastafel, we moeten eerst Maak een Stromen dat een resultaat van onze verwerking als invoertype heeft. Vervolgens willen we al onze resultaten opslaan in de database.

Nogmaals, we geven niet om de volgorde van de elementen, dus dat kunnen we voer de sparen() operaties parallel de ... gebruiken mapAsyncUnordered () methode.

Om een Wastafel van de Stromen we moeten de toMat () met Sink.ignore () als eerste argument en Rechts aanhouden() als tweede omdat we een status van de verwerking willen retourneren:

privé wastafel storeAverages () {return Flow.of (Double.class) .mapAsyncUnordered (4, averageRepository :: save) .toMat (Sink.ignore (), Keep.right ()); }

6. Een bron definiëren voor Stromen

Het laatste dat we moeten doen, is Maak een Bron van de ingang Draad. We kunnen een berekenenGemiddelde ()Stromen naar deze bron met behulp van de via() methode.

Voeg vervolgens het Wastafel voor de verwerking, moeten we de rennen met() methode en geef de storeAverages () Sink die we zojuist hebben gemaakt:

CompletionStage berekenAverageForContent (String inhoud) {return Source.single (inhoud) .via (berekenAverage ()) .runWith (storeAverages (), ActorMaterializer.create (actorSystem)) .whenComplete ((d, e) -> {if (d! = null) {System.out.println ("Import voltooid");} else {e.printStackTrace ();}}); }

Merk op dat wanneer de verwerking is voltooid, we de wanneer voltooid () callback, waarin we een actie kunnen uitvoeren, afhankelijk van het resultaat van de verwerking.

7. Testen Akka Streams

We kunnen onze verwerking testen met behulp van de akka-stream-testkit.

De beste manier om de feitelijke logica van de verwerking te testen, is door alles te testen Stromen logica en gebruik TestSink om de berekening te activeren en te bevestigen op de resultaten.

In onze test maken we de Stromen die we willen testen, en vervolgens maken we een Bron van de inhoud van de testinvoer:

@Test openbare leegte gegevenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults () {// gegeven Flow getest = nieuwe DataImporter (actorSystem) .calculateAverage (); String input = "1; 9; 11; 0"; // when Source flow = Source.single (input) .via (getest); // stroom dan .runWith (TestSink.probe (actorSystem), ActorMaterializer.create (actorSystem)) .request (4) .expectNextUnordered (5d, 5.5); }

We controleren of we vier invoerargumenten verwachten, en twee resultaten die gemiddelden zijn, kunnen in elke volgorde binnenkomen omdat onze verwerking op de asynchrone en parallelle manier gebeurt.

8. Conclusie

In dit artikel keken we naar de akka-stream bibliotheek.

We hebben een proces gedefinieerd dat meerdere combineert Stromen om het voortschrijdend gemiddelde van elementen te berekenen. Vervolgens hebben we een Bron dat is een ingangspunt van de streamverwerking en een Wastafel dat triggert de daadwerkelijke verwerking.

Ten slotte hebben we een test geschreven voor onze verwerking met behulp van de akka-stream-testkit.

De implementatie van al deze voorbeelden en codefragmenten is te vinden in het GitHub-project - dit is een Maven-project, dus het moet gemakkelijk te importeren en uit te voeren zijn zoals het is.


$config[zx-auto] not found$config[zx-overlay] not found