Gids voor Java Parallel Collectors Library

1. Inleiding

Parallel-collectors is een kleine bibliotheek die een set Java Stream API-collectors biedt die parallelle verwerking mogelijk maken - terwijl tegelijkertijd de belangrijkste tekortkomingen van standaard Parallel Streams worden omzeild.

2. Maven afhankelijkheden

Als we de bibliotheek willen gaan gebruiken, moeten we een enkele vermelding in Maven's toevoegen pom.xml het dossier:

 com.pivovarit parallelle collectoren 1.1.0 

Of een enkele regel in het buildbestand van Gradle:

compileer 'com.pivovarit: parallel-collectors: 1.1.0'

De nieuwste versie is te vinden op Maven Central.

3. Parallelle streams Voorbehoud

Parallel Streams waren een van de hoogtepunten van Java 8, maar ze bleken uitsluitend van toepassing te zijn op zware CPU-verwerking.

De reden hiervoor was het feit dat Parallel Streams werden intern ondersteund door een JVM-brede gedeelde map ForkJoinPool, wat een beperkt parallellisme opleverde en werd gebruikt door alle parallelle streams die op één JVM-instantie werden uitgevoerd.

Stel je voor dat we een lijst met ID's hebben en we deze willen gebruiken om een ​​lijst met gebruikers op te halen en dat deze bewerking duur is.

Daarvoor zouden we Parallel Streams kunnen gebruiken:

Lijst-id's = Arrays.asList (1, 2, 3); Lijstresultaten = ids.parallelStream () .map (i -> fetchById (i)) // elke bewerking duurt een seconde .collect (Collectors.toList ()); System.out.println (resultaten); // [user-1, user-2, user-3]

En inderdaad, we kunnen zien dat er een merkbare versnelling is. Maar het wordt problematisch als we beginnen met het uitvoeren van meerdere parallelle blokkeerbewerkingen… parallel. Dit kan het zwembad snel verzadigen en resulteren in potentieel enorme latenties. Daarom is het belangrijk om schotten te bouwen door afzonderlijke threadpools te maken - om te voorkomen dat niet-gerelateerde taken elkaars uitvoering beïnvloeden.

Om een ​​maatwerk te bieden ForkJoinPool We konden bijvoorbeeld gebruik maken van de truc die hier wordt beschreven, maar deze aanpak was gebaseerd op een niet-gedocumenteerde hack en was defect tot JDK10. We kunnen meer lezen in de uitgave zelf - [JDK8190974].

4. Parallelle verzamelaars in actie

Parallel Collectors, zoals de naam al doet vermoeden, zijn gewoon standaard Stream API Collectors die het mogelijk maken om aanvullende bewerkingen parallel uit te voeren op verzamelen() fase.

ParallelCollectors (welke spiegels Verzamelaars class) class is een gevel die toegang geeft tot de volledige functionaliteit van de bibliotheek.

Als we het bovenstaande voorbeeld opnieuw zouden willen doen, zouden we eenvoudig kunnen schrijven:

ExecutorService executor = Executors.newFixedThreadPool (10); Lijst-id's = Arrays.asList (1, 2, 3); CompletableFuture results = ids.stream () .collect (ParallelCollectors.parallelToList (i -> fetchById (i), uitvoerder, 4)); System.out.println (results.join ()); // [user-1, user-2, user-3]

Het resultaat is echter hetzelfde, we waren in staat om onze aangepaste threadpool te leveren, ons aangepaste parallelliteitsniveau te specificeren en het resultaat kwam verpakt in een CompletableFuture instantie zonder de huidige thread te blokkeren.

Standard Parallel Streams daarentegen konden geen van deze bereiken.

4.1. ParallelCollectors.parallelToList / ToSet ()

Hoe intuïtief het ook wordt, als we een Stroom parallel en verzamel resultaten in een Lijst of Set, kunnen we gewoon gebruiken ParallelCollectors.parallelToList of parallelToSet:

Lijst-id's = Arrays.asList (1, 2, 3); Lijstresultaten = ids.stream () .collect (parallelToList (i -> fetchById (i), uitvoerder, 4)) .join ();

4.2. ParallelCollectors.parallelToMap ()

Als we willen verzamelen Stroom elementen in een Kaart Bijvoorbeeld, net als bij Stream API, hebben we twee mappers nodig:

Lijst-id's = Arrays.asList (1, 2, 3); Kaartresultaten = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), uitvoerder, 4)) .join (); // {1 = gebruiker-1, 2 = gebruiker-2, 3 = gebruiker-3}

We kunnen ook een maatwerk verzorgen Kaart voorbeeld Leverancier:

Kaartresultaten = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, executor, 4)) .join (); 

En een aangepaste strategie voor conflictoplossing:

Lijst-id's = Arrays.asList (1, 2, 3); Kaartresultaten = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, (s1, s2) -> s1, uitvoerder, 4)) .join ();

4.3. ParallelCollectors.parallelToCollection ()

Net als het bovenstaande kunnen we onze gewoonte doorgeven Collectieleverancier als we resultaten willen verkrijgen die zijn verpakt in onze aangepaste container:

Lijstresultaten = ids.stream () .collect (parallelToCollection (i -> fetchById (i), LinkedList :: new, executor, 4)) .join ();

4.4. ParallelCollectors.parallelToStream ()

Als het bovenstaande niet genoeg is, kunnen we daadwerkelijk een Stroom instantie en ga daar door met de aangepaste verwerking:

Kaart results = ids.stream () .collect (parallelToStream (i -> fetchById (i), uitvoerder, 4)) .thenApply (stream -> stream.collect (Collectors.groupingBy (i -> i.length ()))) .join ();

4.5. ParallelCollectors.parallel ()

Met deze kunnen we resultaten streamen in volgorde van voltooiing:

ids.stream () .collect (parallel (i -> fetchByIdWithRandomDelay (i), uitvoerder, 4)) .forEach (System.out :: println); // gebruiker-1 // gebruiker-3 // gebruiker-2 

In dit geval kunnen we verwachten dat de verzamelaar elke keer verschillende resultaten retourneert sinds we een willekeurige verwerkingsvertraging hebben geïntroduceerd.

4.6. ParallelCollectors.parallelOrdered ()

Deze faciliteit maakt streamingresultaten mogelijk zoals hierboven, maar behoudt de oorspronkelijke volgorde:

ids.stream () .collect (parallelOrdered (i -> fetchByIdWithRandomDelay (i), uitvoerder, 4)) .forEach (System.out :: println); // gebruiker-1 // gebruiker-2 // gebruiker-3 

In dit geval handhaaft de verzamelaar altijd de volgorde, maar kan hij langzamer zijn dan de bovenstaande.

5. Beperkingen

Op het moment van schrijven, parallelle verzamelaars werken niet met oneindige stromen zelfs als er kortsluitingsbewerkingen worden gebruikt - het is een ontwerpbeperking opgelegd door Stream API-internals. Simpel gezegd, Strooms behandelen collectoren als operaties die geen kortsluiting veroorzaken, dus de stroom moet alle stroomopwaartse elementen verwerken voordat deze wordt beëindigd.

De andere beperking is dat kortsluitingen onderbreken de resterende taken niet na kortsluiting.

6. Conclusie

We hebben gezien hoe we met de parallel-collectors-bibliotheek parallelle verwerking kunnen uitvoeren met behulp van een aangepaste Java Stream API Verzamelaars en CompletableFutures om aangepaste threadpools, parallellisme en niet-blokkerende stijl van CompletableFutures.

Zoals altijd zijn codefragmenten beschikbaar op GitHub.

Zie voor meer informatie de parallel-collectors-bibliotheek op GitHub, de blog van de auteur en het Twitter-account van de auteur.