Gids voor het Fork / Join Framework in Java

1. Overzicht

Het fork / join-framework werd gepresenteerd in Java 7. Het biedt tools om parallelle verwerking te versnellen door te proberen alle beschikbare processorkernen te gebruiken - wat wordt bereikt door een verdeel en heers aanpak.

In de praktijk betekent dit dat het raamwerk eerste "vorken", waarbij de taak recursief wordt opgedeeld in kleinere onafhankelijke subtaken totdat ze eenvoudig genoeg zijn om asynchroon te worden uitgevoerd.

Daarna, het "join" -gedeelte begint, waarin resultaten van alle subtaken recursief worden samengevoegd tot een enkel resultaat, of in het geval van een taak die leegte retourneert, wacht het programma gewoon totdat elke subtaak is uitgevoerd.

Om een ​​effectieve parallelle uitvoering te bieden, gebruikt het fork / join-framework een pool van threads genaamd de ForkJoinPool, dat type worker-threads beheert ForkJoinWorkerThread.

2. ForkJoinPool

De ForkJoinPool is het hart van het raamwerk. Het is een implementatie van de ExecutorService die werkthreads beheert en ons tools biedt om informatie te krijgen over de status en prestaties van de threadpool.

Worker-threads kunnen slechts één taak tegelijk uitvoeren, maar de ForkJoinPool maakt niet voor elke subtaak een aparte thread. In plaats daarvan heeft elke thread in de pool zijn eigen dubbele wachtrij (of deque, uitgesproken dek) waarin taken worden opgeslagen.

Deze architectuur is essentieel voor het balanceren van de werklast van de thread met behulp van de algoritme voor het stelen van werk.

2.1. Werk stelen algoritme

Simpel gezegd - gratis discussielijnen proberen werk te "stelen" van grote hoeveelheden drukke discussies.

Standaard haalt een werkthread taken op van het hoofd van zijn eigen deque. Als het leeg is, neemt de thread een taak over van de staart van de deque van een andere drukke thread of van de globale invoerwachtrij, aangezien hier waarschijnlijk de grootste stukken werk zullen worden gelokaliseerd.

Deze benadering minimaliseert de mogelijkheid dat threads zullen strijden om taken. Het vermindert ook het aantal keren dat de thread op zoek moet naar werk, omdat het eerst aan de grootste beschikbare brokken werk werkt.

2.2. ForkJoinPool Instantiëring

In Java 8 is dit de handigste manier om toegang te krijgen tot de instantie van het ForkJoinPool is om zijn statische methode te gebruiken gemeenschappelijk zwembad (). Zoals de naam doet vermoeden, geeft dit een verwijzing naar de gemeenschappelijke pool, die een standaard threadpool is voor elke ForkJoinTask.

Volgens de documentatie van Oracle vermindert het gebruik van de vooraf gedefinieerde gemeenschappelijke pool het resourceverbruik, aangezien dit het creëren van een aparte threadpool per taak ontmoedigt.

ForkJoinPool commonPool = ForkJoinPool.commonPool ();

Hetzelfde gedrag kan worden bereikt in Java 7 door een ForkJoinPool en het toewijzen aan een openbare statische veld van een utiliteitsklasse:

openbare statische ForkJoinPool forkJoinPool = nieuwe ForkJoinPool (2);

Nu is het gemakkelijk toegankelijk:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

Met ForkJoinPool's constructeurs, is het mogelijk om een ​​aangepaste threadpool te maken met een specifiek niveau van parallellisme, threadfabriek en exception handler. In het bovenstaande voorbeeld heeft de pool een parallelliteitsniveau van 2. Dit betekent dat de pool 2 processorkernen gebruikt.

3. ForkJoinTask

ForkJoinTask is het basistype voor taken die binnen worden uitgevoerd ForkJoinPool. In de praktijk zou een van de twee subklassen moeten worden uitgebreid: de RecursiveAction voor leegte taken en de RecursiveTask voor taken die een waarde retourneren.Ze hebben allebei een abstracte methode berekenen() waarin de logica van de taak wordt gedefinieerd.

3.1. RecursiveAction - een voorbeeld

In het onderstaande voorbeeld wordt de te verwerken werkeenheid weergegeven door een Draad gebeld werkdruk. Voor demonstratiedoeleinden is de taak een onzinnige taak: de invoer wordt simpelweg in hoofdletters geschreven en vastgelegd.

Om het vorkgedrag van het raamwerk aan te tonen, het voorbeeld splitst de taak als werkdruk.lengte() is groter dan een gespecificeerde drempelde ... gebruiken createSubtask () methode.

De String wordt recursief onderverdeeld in subtekenreeksen, waardoor CustomRecursiveTask gevallen die zijn gebaseerd op deze substrings.

Als resultaat retourneert de methode een Lijst.

De lijst wordt voorgelegd aan de ForkJoinPool de ... gebruiken invokeAll () methode:

openbare klasse CustomRecursiveAction breidt RecursiveAction uit {private String workload = ""; private statische finale int THRESHOLD = 4; privé statische Logger-logger = Logger.getAnonymousLogger (); openbare CustomRecursiveAction (String-workload) {this.workload = workload; } @Override beschermde ongeldige compute () {if (workload.length ()> THRESHOLD) {ForkJoinTask.invokeAll (createSubtasks ()); } else {verwerking (werklast); }} privélijst createSubtasks () {Lijst subtaken = nieuwe ArrayList (); String partOne = workload.substring (0, workload.length () / 2); String partTwo = workload.substring (workload.length () / 2, workload.length ()); subtasks.add (nieuwe CustomRecursiveAction (partOne)); subtasks.add (nieuwe CustomRecursiveAction (partTwo)); subtaken teruggeven; } private ongeldige verwerking (String werk) {String resultaat = work.toUpperCase (); logger.info ("Dit resultaat - (" + resultaat + ") - werd verwerkt door" + Thread.currentThread (). getName ()); }}

Dit patroon kan worden gebruikt om uw eigen patroon te ontwikkelen RecursiveAction klassen. Om dit te doen, maakt u een object dat de totale hoeveelheid werk vertegenwoordigt, kiest u een geschikte drempel, definieert u een methode om het werk te verdelen en definieert u een methode om het werk uit te voeren.

3.2. RecursiveTask

Voor taken die een waarde retourneren, is de logica hier vergelijkbaar, behalve dat het resultaat voor elke subtaak is verenigd in één resultaat:

openbare klasse CustomRecursiveTask breidt RecursiveTask {private int [] arr; private statische finale int THRESHOLD = 20; openbare CustomRecursiveTask (int [] arr) {this.arr = arr; } @Override protected Integer compute () {if (arr.length> THRESHOLD) {return ForkJoinTask.invokeAll (createSubtasks ()) .stream () .mapToInt (ForkJoinTask :: join) .sum (); } else {retourverwerking (arr); }} privéverzameling createSubtasks () {Lijst verdeeldTasks = nieuwe ArrayList (); verdeeldeTasks.add (nieuwe CustomRecursiveTask (Arrays.copyOfRange (arr, 0, arr.length / 2))); verdeeldeTasks.add (nieuwe CustomRecursiveTask (Arrays.copyOfRange (arr, arr.length / 2, arr.length))); geef verdeelde taken terug; } private Integer processing (int [] arr) {return Arrays.stream (arr) .filter (a -> a> 10 && a a * 10) .sum (); }}

In dit voorbeeld wordt het werk weergegeven door een array die is opgeslagen in het arr veld van de CustomRecursiveTask klasse. De createSubtasks () methode verdeelt de taak recursief in kleinere stukken werk totdat elk stuk kleiner is dan de drempel. Dan de invokeAll () methode verzendt de subtaken naar de gemeenschappelijke pool en retourneert een lijst met Toekomst.

Om de uitvoering te activeren, moet het toetreden () methode wordt aangeroepen voor elke subtaak.

In dit voorbeeld wordt dit bereikt met behulp van Java 8's Stream API; de som() methode wordt gebruikt als representatie van het combineren van deelresultaten tot het eindresultaat.

4. Taken verzenden naar het ForkJoinPool

Om taken in te dienen bij de threadpool, kunnen enkele benaderingen worden gebruikt.

De indienen () of uitvoeren ()methode (hun gebruiksscenario's zijn hetzelfde):

forkJoinPool.execute (customRecursiveTask); int resultaat = customRecursiveTask.join ();

De beroep doen op()methode verdeelt de taak en wacht op het resultaat, en heeft geen handmatige verbinding nodig:

int resultaat = forkJoinPool.invoke (customRecursiveTask);

De invokeAll () methode is de handigste manier om een ​​reeks ForkJoinTasks naar de ForkJoinPool. Het neemt taken als parameters (twee taken, var args of een verzameling), forks retourneert vervolgens een verzameling Toekomst objecten in de volgorde waarin ze zijn geproduceerd.

Als alternatief kunt u aparte vork() en toetreden () methoden. De vork() methode legt een taak voor aan een pool, maar activeert de uitvoering ervan niet. De toetreden () hiervoor moet de methode worden gebruikt. In het geval van RecursiveAction, de toetreden () geeft niets anders terug dan nul; voor Recursieve taak, het geeft het resultaat van de uitvoering van de taak terug:

customRecursiveTaskFirst.fork (); resultaat = customRecursiveTaskLast.join ();

In onze RecursiveTask voorbeeld hebben we de invokeAll () methode om een ​​reeks subtaken naar de pool te verzenden. Dezelfde klus kan worden gedaan vork() en toetreden (), al heeft dit gevolgen voor de ordening van de resultaten.

Om verwarring te voorkomen, is het over het algemeen een goed idee om invokeAll () methode om meer dan één taak naar de ForkJoinPool.

5. Conclusies

Het gebruik van het fork / join-framework kan de verwerking van grote taken versnellen, maar om dit resultaat te bereiken, moeten enkele richtlijnen worden gevolgd:

  • Gebruik zo min mogelijk threadpools - in de meeste gevallen is de beste beslissing om één threadpool per applicatie of systeem te gebruiken
  • Gebruik de standaard gemeenschappelijke thread-pool, als er geen specifieke afstemming nodig is
  • Gebruik een redelijke drempel voor het splitsen ForkJoinTask in subtaken
  • Voorkom blokkering in uwForkJoinTasks

De voorbeelden die in dit artikel worden gebruikt, zijn beschikbaar in de gekoppelde GitHub-opslagplaats.