Inleiding tot Thread Pools in Java

1. Inleiding

Dit artikel is een blik op threadpools in Java - te beginnen met de verschillende implementaties in de standaard Java-bibliotheek en vervolgens te kijken naar de Guava-bibliotheek van Google.

2. De Thread Pool

In Java worden threads toegewezen aan threads op systeemniveau die de bronnen van het besturingssysteem zijn. Als u oncontroleerbare threads maakt, kunt u snel zonder deze bronnen komen te zitten.

Het wisselen van context tussen threads wordt ook gedaan door het besturingssysteem - om parallellisme te emuleren. Een simplistische mening is dat - hoe meer threads je spawt, hoe minder tijd elke thread besteedt aan echt werk.

Het Thread Pool-patroon helpt om bronnen te besparen in een multithreaded applicatie, en ook om het parallellisme binnen bepaalde vooraf gedefinieerde limieten te houden.

Als je een thread-pool gebruikt, jij schrijf uw gelijktijdige code in de vorm van parallelle taken en verzend deze voor uitvoering naar een instantie van een threadpool. Deze instantie beheert verschillende hergebruikte threads voor het uitvoeren van deze taken.

Het patroon staat je toe controle over het aantal threads dat de applicatie aanmaakt, hun levenscyclus, evenals om de uitvoering van taken te plannen en binnenkomende taken in een wachtrij te houden.

3. Thread Pools in Java

3.1. Uitvoerders, Uitvoerder en ExecutorService

De Uitvoerders helper-klasse bevat verschillende methoden voor het maken van vooraf geconfigureerde threadpoolinstanties voor u. Die lessen zijn een goede plek om mee te beginnen - gebruik ze als u geen aangepaste fijnafstelling hoeft toe te passen.

De Uitvoerder en ExecutorService interfaces worden gebruikt om te werken met verschillende threadpool-implementaties in Java. Meestal zou je dat moeten doen houd uw code losgekoppeld van de daadwerkelijke implementatie van de threadpool en gebruik deze interfaces in uw hele applicatie.

De Uitvoerder interface heeft een enkele uitvoeren methode om in te dienen Runnable instanties voor uitvoering.

Hier is een snel voorbeeld van hoe u de Uitvoerders API om een Uitvoerder instantie ondersteund door een enkele thread-pool en een onbegrensde wachtrij voor het opeenvolgend uitvoeren van taken. Hier voeren we een enkele taak uit die simpelweg 'Hallo Wereld" op het scherm. De taak wordt ingediend als een lambda (een Java 8-functie) waarvan wordt aangenomen dat het Runnable.

Executor executor = Executors.newSingleThreadExecutor (); executor.execute (() -> System.out.println ("Hallo wereld"));

De ExecutorService interface bevat een groot aantal methoden voor het bewaken van de voortgang van de taken en het beheren van de beëindiging van de dienst. Met behulp van deze interface kunt u de taken indienen voor uitvoering en ook hun uitvoering controleren met behulp van de geretourneerde Toekomst voorbeeld.

In het volgende voorbeeldmaken we een ExecutorService, dien een taak in en gebruik vervolgens het geretourneerde Toekomst‘S krijgen methode om te wachten tot de ingediende taak is voltooid en de waarde wordt geretourneerd:

ExecutorService executorService = Executors.newFixedThreadPool (10); Future future = executorService.submit (() -> "Hallo wereld"); // enkele bewerkingen String resultaat = future.get ();

In een realistisch scenario wilt u natuurlijk meestal niet bellen future.get () maar stel het aanroepen uit totdat je de waarde van de berekening echt nodig hebt.

De indienen methode is overbelast om een ​​van beide te nemen Runnable of Oproepbaar beide zijn functionele interfaces en kunnen worden doorgegeven als lambdas (beginnend met Java 8).

RunnableDe enkele methode genereert geen uitzondering en retourneert geen waarde. De Oproepbaar interface is misschien handiger, omdat het ons in staat stelt een uitzondering te genereren en een waarde te retourneren.

Eindelijk - om de compiler het Oproepbaar type, retourneer eenvoudig een waarde uit de lambda.

Voor meer voorbeelden van het gebruik van de ExecutorService interface en toekomst, kijk dan eens naar "A Guide to the Java ExecutorService".

3.2. ThreadPoolExecutor

De ThreadPoolExecutor is een uitbreidbare thread pool-implementatie met veel parameters en hooks voor fijnafstelling.

De belangrijkste configuratieparameters die we hier zullen bespreken, zijn: corePoolSize, maximumPoolSize, en keepAliveTime.

De pool bestaat uit een vast aantal kerndraden die de hele tijd binnen worden gehouden, en enkele buitensporige draden die kunnen worden uitgezet en vervolgens worden beëindigd wanneer ze niet meer nodig zijn. De corePoolSize parameter is het aantal kernthreads dat wordt geïnstantieerd en in de pool wordt bewaard. Als er een nieuwe taak binnenkomt, als alle kernthreads bezet zijn en de interne wachtrij vol is, mag de pool uitgroeien tot maximumPoolSize.

De keepAliveTime parameter is het tijdsinterval waarvoor de overmatige threads (geïnstantieerd boven de corePoolSize) mogen bestaan ​​in de inactieve toestand. Standaard is het ThreadPoolExecutor beschouwt alleen niet-kerndraden voor verwijdering. Om hetzelfde verwijderingsbeleid toe te passen op kernthreads, kunnen we de allowCoreThreadTimeOut (true) methode.

Deze parameters dekken een breed scala aan gebruiksscenario's, maar de meest typische configuraties zijn voorgedefinieerd in het Uitvoerders statische methoden.

Bijvoorbeeld, newFixedThreadPool methode creëert een ThreadPoolExecutor met gelijk corePoolSize en maximumPoolSize parameterwaarden en een nul keepAliveTime. Dit betekent dat het aantal threads in deze threadpool altijd hetzelfde is:

ThreadPoolExecutor uitvoerder = (ThreadPoolExecutor) Executors.newFixedThreadPool (2); executor.submit (() -> {Thread.sleep (1000); return null;}); executor.submit (() -> {Thread.sleep (1000); return null;}); executor.submit (() -> {Thread.sleep (1000); return null;}); assertEquals (2, uitvoerder.getPoolSize ()); assertEquals (1, executor.getQueue (). size ());

In het bovenstaande voorbeeld instantiëren we een ThreadPoolExecutor met een vast aantal threads van 2. Dit betekent dat als het aantal gelijktijdig uitgevoerde taken altijd kleiner of gelijk is aan twee, ze meteen worden uitgevoerd. Anders, sommige van deze taken kunnen in een wachtrij worden geplaatst om op hun beurt te wachten.

We hebben er drie gemaakt Oproepbaar taken die zwaar werk imiteren door 1000 milliseconden te slapen. De eerste twee taken worden in één keer uitgevoerd en de derde zal in de wachtrij moeten wachten. We kunnen het verifiëren door het getPoolSize () en getQueue (). size () methoden onmiddellijk na het indienen van de taken.

Nog een voorgeconfigureerd ThreadPoolExecutor kan worden gemaakt met de Executors.newCachedThreadPool () methode. Deze methode krijgt helemaal geen aantal threads. De corePoolSize is feitelijk ingesteld op 0, en de maximumPoolSize ingesteld op Geheel getal.MAX_VALUE voor dit geval. De keepAliveTime is 60 seconden voor deze.

Deze parameterwaarden betekenen dat de threadpool in de cache kan onbeperkt groeien om een ​​willekeurig aantal ingediende taken te accommoderen. Maar als de draden niet meer nodig zijn, worden ze na 60 seconden inactiviteit weggegooid. Een typische use case is wanneer u veel kortstondige taken in uw applicatie heeft.

ThreadPoolExecutor uitvoerder = (ThreadPoolExecutor) Executors.newCachedThreadPool (); executor.submit (() -> {Thread.sleep (1000); return null;}); executor.submit (() -> {Thread.sleep (1000); return null;}); executor.submit (() -> {Thread.sleep (1000); return null;}); assertEquals (3, uitvoerder.getPoolSize ()); assertEquals (0, executor.getQueue (). size ());

De wachtrijgrootte in het bovenstaande voorbeeld is altijd nul omdat intern een Synchrone wachtrij instantie wordt gebruikt. In een Synchrone wachtrij, paren van invoegen en verwijderen bewerkingen vinden altijd gelijktijdig plaats, dus de wachtrij bevat eigenlijk nooit iets.

De Executors.newSingleThreadExecutor () API creëert een andere typische vorm van ThreadPoolExecutor met een enkele draad. De single thread executor is ideaal voor het creëren van een event loop. De corePoolSize en maximumPoolSize parameters zijn gelijk aan 1, en de keepAliveTime is nul.

Taken in het bovenstaande voorbeeld worden opeenvolgend uitgevoerd, dus de vlagwaarde is 2 na voltooiing van de taak:

AtomicInteger counter = nieuwe AtomicInteger (); ExecutorService executor = Executors.newSingleThreadExecutor (); executor.submit (() -> {counter.set (1);}); executor.submit (() -> {counter.compareAndSet (1, 2);});

Bovendien is dit ThreadPoolExecutor is versierd met een onveranderlijke verpakking, dus het kan niet opnieuw worden geconfigureerd nadat het is gemaakt. Merk op dat dit ook de reden is dat we het niet kunnen casten naar een ThreadPoolExecutor.

3.3. ScheduledThreadPoolExecutor

De ScheduledThreadPoolExecutor breidt de ThreadPoolExecutor class en implementeert ook de ScheduledExecutorService interface met verschillende aanvullende methoden:

  • schema methode maakt het mogelijk om een ​​taak één keer uit te voeren na een gespecificeerde vertraging;
  • scheduleAtFixedRate methode maakt het mogelijk om een ​​taak uit te voeren na een gespecificeerde initiële vertraging en deze vervolgens herhaaldelijk uit te voeren met een bepaalde periode; de periode argument is de tijd gemeten tussen de starttijden van de taken, dus het uitvoeringspercentage ligt vast;
  • scheduleWithFixedDelay methode is vergelijkbaar met scheduleAtFixedRate in dat het herhaaldelijk de gegeven taak uitvoert, maar de gespecificeerde vertraging is gemeten tussen het einde van de vorige taak en het begin van de volgende; de uitvoeringssnelheid kan variëren afhankelijk van de tijd die nodig is om een ​​bepaalde taak uit te voeren.

De Executors.newScheduledThreadPool () methode wordt meestal gebruikt om een ScheduledThreadPoolExecutor met een gegeven corePoolSize, grenzeloos maximumPoolSize en nul keepAliveTime. Hier leest u hoe u een taak plant voor uitvoering in 500 milliseconden:

ScheduledExecutorService executor = Executors.newScheduledThreadPool (5); executor.schedule (() -> {System.out.println ("Hello World");}, 500, TimeUnit.MILLISECONDS);

De volgende code laat zien hoe u een taak uitvoert na een vertraging van 500 milliseconden en deze vervolgens elke 100 milliseconden herhaalt. Nadat we de taak hebben gepland, wachten we tot deze drie keer wordt geactiveerd met behulp van de CountDownLatch slot, annuleer het vervolgens met de Future.cancel () methode.

CountDownLatch lock = nieuwe CountDownLatch (3); ScheduledExecutorService executor = Executors.newScheduledThreadPool (5); ScheduledFuture future = executor.scheduleAtFixedRate (() -> {System.out.println ("Hallo wereld"); lock.countDown ();}, 500, 100, TimeUnit.MILLISECONDS); lock.await (1000, TimeUnit.MILLISECONDS); future.cancel (true);

3.4. ForkJoinPool

ForkJoinPool is het centrale deel van de vork / samenvoegen framework geïntroduceerd in Java 7. Het lost een veelvoorkomend probleem op van het voortbrengen van meerdere taken in recursieve algoritmen. Met behulp van een simpele ThreadPoolExecutor, zul je snel zonder threads komen te zitten, omdat elke taak of subtaak zijn eigen thread nodig heeft om te kunnen draaien.

In een vork / samenvoegen framework, kan elke taak spawnen (vork) een aantal subtaken en wacht op hun voltooiing met behulp van de toetreden methode. Het voordeel van de vork / samenvoegen framework is dat het maakt niet voor elke taak of subtaak een nieuwe thread, in plaats daarvan het algoritme Work Stealing implementeren. Dit framework wordt uitvoerig beschreven in het artikel "Guide to the Fork / Join Framework in Java"

Laten we eens kijken naar een eenvoudig voorbeeld van gebruik ForkJoinPool om een ​​boom met knooppunten te doorlopen en de som van alle bladwaarden te berekenen. Hier is een eenvoudige implementatie van een boom die bestaat uit een knooppunt, een int waarde en een set onderliggende knooppunten:

statische klasse TreeNode {int waarde; Stel kinderen in; TreeNode (int waarde, TreeNode ... kinderen) {this.value = waarde; this.children = Sets.newHashSet (kinderen); }}

Als we nu alle waarden in een boomstructuur parallel willen optellen, moeten we een RecursiveTask koppel. Elke taak krijgt zijn eigen knooppunt en voegt zijn waarde toe aan de som van de waarden van zijn kinderen. Om de som te berekenen van kinderen waarden doet taakimplementatie het volgende:

  • streamt het kinderen set,
  • kaarten over deze stream, waardoor een nieuw CountingTask voor elk element,
  • voert elke subtaak uit door deze te forken,
  • verzamelt de resultaten door de toetreden methode voor elke gevorkte taak,
  • somt de resultaten op met behulp van de Collectors.summingInt verzamelaar.
openbare statische klasse CountingTask breidt RecursiveTask {privé laatste TreeNode-knooppunt uit; openbare CountingTask (TreeNode-knooppunt) {this.node = knooppunt; } @Override protected Integer compute () {return node.value + node.children.stream () .map (childNode -> new CountingTask (childNode) .fork ()) .collect (Collectors.summingInt (ForkJoinTask :: join)) ; }}

De code om de berekening op een echte boom uit te voeren is heel eenvoudig:

TreeNode tree = nieuwe TreeNode (5, nieuwe TreeNode (3), nieuwe TreeNode (2, nieuwe TreeNode (2), nieuwe TreeNode (8))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool (); int sum = forkJoinPool.invoke (nieuwe CountingTask (boom));

4. Thread Pool's implementatie in Guava

Guava is een populaire Google-bibliotheek met hulpprogramma's. Het heeft veel nuttige gelijktijdigheidsklassen, waaronder verschillende handige implementaties van ExecutorService. De implementatieklassen zijn niet toegankelijk voor directe instantiatie of subclassificatie, dus het enige toegangspunt voor het maken van hun instanties is de Meer uitvoerders helper klasse.

4.1. Guava toevoegen als een Maven-afhankelijkheid

Voeg de volgende afhankelijkheid toe aan uw Maven pom-bestand om de Guava-bibliotheek aan uw project toe te voegen. Je kunt de nieuwste versie van de Guava-bibliotheek vinden in de Maven Central-repository:

 com.google.guava guave 19.0 

4.2. Directe uitvoerder en directe uitvoerder

Soms wilt u de taak uitvoeren in de huidige thread of in een threadpool, afhankelijk van bepaalde voorwaarden. U zou liever een enkele gebruiken Uitvoerder interface en verander gewoon de implementatie. Hoewel het niet zo moeilijk is om een ​​implementatie van Uitvoerder of ExecutorService die de taken in de huidige thread uitvoert, vereist het nog steeds het schrijven van wat standaardcode.

Graag biedt Guava ons vooraf gedefinieerde voorbeelden.

Hier is een voorbeeld dat de uitvoering van een taak in dezelfde thread demonstreert. Hoewel de geleverde taak 500 milliseconden slaapt, is het blokkeert de huidige thread, en het resultaat is onmiddellijk beschikbaar na de uitvoeren oproep is beëindigd:

Executor executor = MoreExecutors.directExecutor (); AtomicBoolean uitgevoerd = nieuwe AtomicBoolean (); executor.execute (() -> {probeer {Thread.sleep (500);} vangst (InterruptedException e) {e.printStackTrace ();} uitgevoerd.set (true);}); assertTrue (uitgevoerd.get ());

Het exemplaar dat wordt geretourneerd door het directExecutor () methode is eigenlijk een statische singleton, dus het gebruik van deze methode levert helemaal geen overhead op bij het maken van objecten.

U moet deze methode verkiezen boven de MoreExecutors.newDirectExecutorService () omdat die API bij elke aanroep een volwaardige executor-service-implementatie creëert.

4.3. Executor Services afsluiten

Een ander veel voorkomend probleem is het afsluiten van de virtuele machine terwijl een threadpool nog steeds zijn taken uitvoert. Zelfs met een annuleringsmechanisme is er geen garantie dat de taken zich netjes zullen gedragen en hun werk zullen stoppen wanneer de executor-service wordt afgesloten. Dit kan ertoe leiden dat JVM voor onbepaalde tijd vastloopt terwijl de taken hun werk blijven doen.

Om dit probleem op te lossen, introduceert Guava een familie van spannende executor-services. Ze zijn gebaseerd op daemon-threads die eindigen samen met de JVM.

Deze services voegen ook een afsluitingshaak toe met de Runtime.getRuntime (). AddShutdownHook () methode en voorkom dat de VM wordt beëindigd gedurende een geconfigureerde hoeveelheid tijd voordat de vastgelopen taken worden opgegeven.

In het volgende voorbeeld dienen we de taak in die een oneindige lus bevat, maar we gebruiken een bestaande executor-service met een geconfigureerde tijd van 100 milliseconden om te wachten op de taken bij het beëindigen van de VM. Zonder de exitingExecutorService op zijn plaats zou deze taak ervoor zorgen dat de VM voor onbepaalde tijd vastloopt:

ThreadPoolExecutor uitvoerder = (ThreadPoolExecutor) Executors.newFixedThreadPool (5); ExecutorService executorService = MoreExecutors.getExitingExecutorService (uitvoerder, 100, TimeUnit.MILLISECONDS); executorService.submit (() -> {while (true) {}});

4.4. Luisteren decorateurs

Luisterende decorateurs laten je de ExecutorService en ontvang LuisterbaarFuture instanties bij het indienen van een taak in plaats van eenvoudig Toekomst gevallen. De LuisterbaarFuture interface breidt zich uit Toekomst en heeft een enkele aanvullende methode addListener. Met deze methode kan een listener worden toegevoegd die wordt aangeroepen bij toekomstige voltooiing.

U zult zelden willen gebruiken ListenableFuture.addListener () methode rechtstreeks, maar het is essentieel voor de meeste hulpmethoden in de Futures hulpprogramma klasse. Bijvoorbeeld met de Futures.allAsList () methode kun je er meerdere combineren Luisterbare toekomst instanties in een enkele Luisterbare toekomst dat is voltooid na de succesvolle voltooiing van alle futures gecombineerd:

ExecutorService executorService = Executors.newCachedThreadPool (); ListeningExecutorService luisterenExecutorService = MoreExecutors.listeningDecorator (executorService); ListenableFuture future1 = listeningExecutorService.submit (() -> "Hallo"); ListenableFuture future2 = listeningExecutorService.submit (() -> "Wereld"); String-begroeting = Futures.allAsList (future1, future2) .get () .stream () .collect (Collectors.joining ("")); assertEquals ("Hallo wereld", groet);

5. Conclusie

In dit artikel hebben we het Thread Pool-patroon en de implementaties ervan in de standaard Java-bibliotheek en in de Guava-bibliotheek van Google besproken.

De broncode voor het artikel is beschikbaar op GitHub.