Java EE 7 batchverwerking

1. Inleiding

Stel je voor dat we handmatig taken moesten uitvoeren zoals het verwerken van loonstroken, het berekenen van rente en het genereren van rekeningen. Het zou behoorlijk saai, foutgevoelig en een eindeloze lijst met handmatige taken worden!

In deze tutorial kijken we naar Java Batch Processing (JSR 352), een onderdeel van het Jakarta EE-platform, en een geweldige specificatie voor het automatiseren van dergelijke taken. Het biedt applicatieontwikkelaars een model voor het ontwikkelen van robuuste batchverwerkingssystemen, zodat ze zich kunnen concentreren op de bedrijfslogica.

2. Maven afhankelijkheden

Aangezien JSR 352 slechts een specificatie is, moeten we de API en implementatie ervan opnemen, zoals jberet:

 javax.batch javax.batch-api 1.0.1 org.jberet jberet-core 1.0.2.Final org.jberet jberet-support 1.0.2.Final org.jberet jberet-se 1.0.2.Final 

We zullen ook een in-memory database toevoegen, zodat we enkele meer realistische scenario's kunnen bekijken.

3. Sleutelbegrippen

JSR 352 introduceert een paar concepten, die we op deze manier kunnen bekijken:

Laten we eerst elk stuk definiëren:

  • We beginnen aan de linkerkant en hebben de JobOperator. Het beheert alle aspecten van de taakverwerking, zoals starten, stoppen en opnieuw starten
  • Vervolgens hebben we de Job. Een baan is een logische verzameling stappen; het omvat een volledig batchproces
  • Een job bevat tussen de 1 en n Staps. Elke stap is een onafhankelijke, opeenvolgende werkeenheid. Een stap is samengesteld uit lezing invoer, verwerken die input, en schrijven output
  • En last but not least hebben we de JobRepository die de lopende informatie van de banen opslaat. Het helpt om banen, hun status en hun voltooiingsresultaten bij te houden

Stappen hebben wat meer details dan dit, dus laten we dat nu eens bekijken. Eerst kijken we naar Brok stappen en dan op Batchlets.

4. Creëren van een Chunk

Zoals eerder vermeld, is een brok een soort stap. We zullen vaak een brok gebruiken om een ​​bewerking uit te drukken die steeds opnieuw wordt uitgevoerd, bijvoorbeeld over een reeks items. Het lijkt een beetje op tussenliggende bewerkingen van Java Streams.

Bij het beschrijven van een brok moeten we aangeven waar we items vandaan kunnen halen, hoe ze moeten worden verwerkt en waar ze ze daarna naartoe moeten sturen.

4.1. Items lezen

Om items te lezen, moeten we implementeren ItemReader.

In dit geval maken we een lezer die eenvoudig de cijfers 1 tot en met 10 uitzendt:

@Named openbare klasse SimpleChunkItemReader breidt AbstractItemReader {privé geheel getal [] tokens uit; aantal privé-gehele getallen; @Inject JobContext jobContext; @Override public Integer readItem () gooit uitzondering {if (count> = tokens.length) {return null; } jobContext.setTransientUserData (aantal); retourneer tokens [count ++]; } @Override public void open (Serializable checkpoint) gooit uitzondering {tokens = nieuw geheel getal [] {1,2,3,4,5,6,7,8,9,10}; count = 0; }}

Nu lezen we hier gewoon uit de interne toestand van de klas. Maar natuurlijk, readItem uit een database kunnen halen, van het bestandssysteem of een andere externe bron.

Merk op dat we een deel van deze interne status opslaan met JobContext # setTransientUserData () wat later van pas zal komen.

Let ook op de controlepunt parameter. We pakken dat ook weer op.

4.2. Artikelen verwerken

De reden dat we aan het chunken zijn, is natuurlijk dat we een of andere bewerking op onze items willen uitvoeren!

Elke keer dat we terugkeren nul van een itemverwerker, laten we dat item uit de batch vallen.

Dus laten we zeggen dat we alleen de even getallen willen behouden. We kunnen een ItemProcessor dat verwerpt de oneven door terug te keren nul:

@Named public class SimpleChunkItemProcessor implementeert ItemProcessor {@Override public Integer processItem (Object t) {Integer item = (Integer) t; item% 2 == 0 retourneren? item: null; }}

processItem wordt één keer gebeld voor elk item dat onze ItemReader zendt.

4.3. Artikelen schrijven

Ten slotte roept de taak het ItemWriter zodat we onze getransformeerde items kunnen schrijven:

@Named openbare klasse SimpleChunkWriter breidt AbstractItemWriter uit {Lijst verwerkt = nieuwe ArrayList (); @Override public void writeItems (List items) gooit Uitzondering {items.stream (). Map (Integer.class :: cast) .forEach (verwerkt :: add); }} 

Hoe lang is items? In een oogwenk zullen we de grootte van een chunk definiëren, die de grootte van de lijst zal bepalen waarnaar wordt verzonden writeItems.

4.4. Een brok in een taak definiëren

Nu zetten we dit allemaal samen in een XML-bestand met JSL of Job Specification Language. Merk op dat we onze lezer, processor, chunker en ook een chunkgrootte zullen vermelden:

De chunkgrootte geeft aan hoe vaak de voortgang in de chunk wordt toegewezen aan de jobrepository, wat belangrijk is om voltooiing te garanderen, mocht een deel van het systeem falen.

We moeten dit bestand in META-INF / batch-taken voor .pot bestanden en in WEB-INF / klassen / META-INF / batch-jobs voor .oorlog bestanden.

We hebben onze baan de ID gegeven "SimpleChunk", dus laten we dat proberen in een unit-test.

Taken worden nu asynchroon uitgevoerd, waardoor ze moeilijk te testen zijn. Bekijk in het voorbeeld onze BatchTestHelper die peilt en wacht totdat de taak is voltooid:

@Test openbare ongeldig gegevenChunk_thenBatch_completesWithSuccess () gooit uitzondering {JobOperator jobOperator = BatchRuntime.getJobOperator (); Long executionId = jobOperator.start ("simpleChunk", nieuwe eigenschappen ()); JobExecution jobExecution = jobOperator.getJobExecution (executionId); jobExecution = BatchTestHelper.keepTestAlive (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); } 

Dus dat is wat brokken zijn. Laten we nu eens kijken naar batchlets.

5. Een batch maken

Niet alles past keurig in een iteratief model. We hebben bijvoorbeeld een taak die we gewoon nodig hebben eenmaal aanroepen, uitvoeren tot voltooiing en een exitstatus retourneren.

Het contract voor een batchlet is vrij eenvoudig:

@Named openbare klasse SimpleBatchLet breidt AbstractBatchlet uit {@Override public String process () genereert Uitzondering {return BatchStatus.COMPLETED.toString (); }}

Net als de JSL:

En we kunnen het testen met dezelfde aanpak als voorheen:

@Test openbare ongeldige gegevenBatchlet_thenBatch_completeWithSuccess () gooit Uitzondering {JobOperator jobOperator = BatchRuntime.getJobOperator (); Long executionId = jobOperator.start ("simpleBatchLet", nieuwe eigenschappen ()); JobExecution jobExecution = jobOperator.getJobExecution (executionId); jobExecution = BatchTestHelper.keepTestAlive (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

We hebben dus een aantal verschillende manieren bekeken om stappen te implementeren.

Laten we nu eens kijken naar mechanismen voor voortgang markeren en garanderen.

6. Aangepast controlepunt

Mislukkingen gebeuren midden in een baan. Moeten we gewoon opnieuw beginnen, of kunnen we op de een of andere manier beginnen waar we waren gebleven?

Zoals de naam al doet vermoeden, controlepunten help ons om periodiek een bladwijzer in te stellen in het geval van een storing.

Standaard is het einde van de chunkverwerking een natuurlijk ijkpunt.

We kunnen het echter aanpassen met die van onszelf CheckpointAlgorithm:

@Named openbare klasse CustomCheckPoint breidt AbstractCheckpointAlgorithm {@Inject JobContext jobContext uit; @Override openbare boolean isReadyToCheckpoint () gooit uitzondering {int counterRead = (geheel getal) jobContext.getTransientUserData (); return counterRead% 5 == 0; }}

Herinner je je de telling die we eerder in tijdelijke gegevens hebben geplaatst? Hier, we kunnen het eruit trekken JobContext # getTransientUserDataom aan te geven dat we ons willen vastleggen op elk 5 verwerkt nummer.

Zonder dit zou een commit plaatsvinden aan het einde van elk blok, of in ons geval elk derde nummer.

En dan matchen we dat met de checkout-algoritme richtlijn in onze XML onder ons stuk:

Laten we de code testen, waarbij we nogmaals opmerken dat sommige van de standaardplaatstappen verborgen zijn in BatchTestHelper:

@Test openbare ongeldige gegevenChunk_whenCustomCheckPoint_thenCommitCountIsThree () genereert uitzondering {// ... start taak en wacht op voltooiing jobOperator.getStepExecutions (executionId) .stream () .map (BatchTestHelper :: getCommitCount) .forEach (count -> assertEquals (3 -> assertEquals) .longValue ())); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

Dus we verwachten misschien een commit-telling van 2, aangezien we tien items hebben en de commits zo hebben geconfigureerd dat ze elk 5e item zijn. Maar, het framework doet aan het einde nog een laatste leescommissie om ervoor te zorgen dat alles is verwerkt, en dat is wat ons op 3 brengt.

Laten we vervolgens kijken hoe we met fouten kunnen omgaan.

7. Afhandeling van uitzonderingen

Standaard, de joboperator zal onze job markeren als MISLUKT in het geval van een uitzondering.

Laten we onze itemlezer wijzigen om er zeker van te zijn dat het mislukt:

@Override openbaar geheel getal readItem () gooit uitzondering {if (tokens.hasMoreTokens ()) {String tempTokenize = tokens.nextToken (); gooi nieuwe RuntimeException (); } retourneer null; }

En test dan:

@Test openbare leegte whenChunkError_thenBatch_CompletesWithFailed () genereert uitzondering {// ... start taak en wacht op voltooiing assertEquals (jobExecution.getBatchStatus (), BatchStatus.FAILED); }

Maar we kunnen dit standaardgedrag op een aantal manieren omzeilen:

  • skip-limiet specificeert het aantal uitzonderingen dat deze stap zal negeren voordat het mislukt
  • retry-limiet specificeert het aantal keren dat de taakoperator de stap opnieuw moet proberen voordat deze mislukt
  • uitzonderingsklasse die kan worden overgeslagen specificeert een reeks uitzonderingen die de chunk-verwerking zal negeren

We kunnen onze taak dus bewerken zodat deze negeert RuntimeException, evenals een paar andere, alleen ter illustratie:

En nu zal onze code passeren:

@Test openbare ongeldig gegevenChunkError_thenErrorSkipped_CompletesWithSuccess () gooit uitzondering {// ... start taak en wacht op voltooiing jobOperator.getStepExecutions (executionId) .stream () .map (BatchTestHelper :: getProcessSkipCount) .forEach (skipCount - (1L, skipCount) .forEach (skipCount - .longValue ())); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8. Meerdere stappen uitvoeren

We zeiden eerder dat een taak een aantal stappen kan hebben, dus laten we dat nu bekijken.

8.1. Vuur de volgende stap af

Standaard, elke stap is de laatste stap in het werk.

Om de volgende stap binnen een batchtaak uit te voeren, moeten we dit expliciet specificeren door de De volgende attribuut binnen de stapdefinitie:

Als we dit attribuut vergeten, wordt de volgende stap niet uitgevoerd.

En we kunnen zien hoe dit eruit ziet in de API:

@Test openbare leegte gegevenTwoSteps_thenBatch_CompleteWithSuccess () gooit uitzondering {// ... start de taak en wacht op voltooiing assertEquals (2, jobOperator.getStepExecutions (executionId) .size ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8.2. Stromen

Een reeks stappen kan ook worden ingekapseld in een stromen. Als de stroom is afgelopen, gaat de hele stroom over naar het uitvoeringselement. Ook kunnen elementen binnen de stroom niet overgaan naar elementen buiten de stroom.

We kunnen bijvoorbeeld twee stappen binnen een stroom uitvoeren en die stroom vervolgens laten overgaan naar een geïsoleerde stap:

En we kunnen nog steeds de uitvoering van elke stap onafhankelijk zien:

@Test openbare leegte gegevenFlow_thenBatch_CompleteWithSuccess () gooit uitzondering {// ... start taak en wacht op voltooiing assertEquals (3, jobOperator.getStepExecutions (executionId) .size ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8.3. Beslissingen

We hebben ook if / else-ondersteuning in de vorm van beslissingen. Beslissingen bieden een aangepaste manier om een ​​volgorde te bepalen tussen stappen, stromen en splitsingen.

Net als trappen werkt het op overgangselementen zoals De volgende die de taakuitvoering kunnen leiden of beëindigen.

Laten we eens kijken hoe de taak kan worden geconfigureerd:

Ieder besluit element moet worden geconfigureerd met een klasse die implementeert Beslisser. Het is zijn taak om een ​​beslissing terug te geven als een Draad.

Elk De volgende binnen besluit is zoals een geval in een schakelaar uitspraak.

8,4. Splitst

Splitst zijn handig omdat ze ons in staat stellen om gelijktijdig stromen uit te voeren:

Natuurlijk, dit betekent dat de bestelling niet gegarandeerd is.

Laten we bevestigen dat ze nog steeds allemaal worden uitgevoerd. De stroomstappen worden in een willekeurige volgorde uitgevoerd, maar de geïsoleerde stap is altijd de laatste:

@Test openbare leegte gegevenSplit_thenBatch_CompletesWithSuccess () genereert uitzondering {// ... start taak en wacht op voltooiing Lijst stepExecutions = jobOperator.getStepExecutions (executionId); assertEquals (3, stepExecutions.size ()); assertEquals ("splitJobSequenceStep3", stepExecutions.get (2) .getStepName ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

9. Partitioneren van een taak

We kunnen ook de batch-eigenschappen binnen onze Java-code gebruiken die in ons werk zijn gedefinieerd.

Ze kunnen op drie niveaus worden ingedeeld: de taak, de stap en het batchartefact.

Laten we enkele voorbeelden bekijken van hoe ze consumeerden.

Wanneer we de eigendommen op functieniveau willen consumeren:

@Inject JobContext jobContext; ... jobProperties = jobContext.getProperties (); ...

Dit kan ook op stapniveau worden geconsumeerd:

@Inject StepContext stepContext; ... stepProperties = stepContext.getProperties (); ...

Wanneer we de eigenschappen op batch-artefactniveau willen consumeren:

@Inject @BatchProperty (name = "name") private String nameString;

Dit is handig bij partities.

Kijk, met splitsingen kunnen we stromen gelijktijdig uitvoeren. Maar we kunnen ook partitie een stap in n sets van items of set aparte inputs, waardoor we een andere manier hebben om het werk over meerdere threads te verdelen.

Om het werksegment te begrijpen dat elke partitie zou moeten doen, kunnen we eigenschappen met partities combineren:

10. Stop en start opnieuw

Nu, dat is het voor het definiëren van banen. Laten we nu even praten over het beheer ervan.

We hebben al gezien in onze unit-tests waarvan we een exemplaar kunnen krijgen JobOperator van BatchRuntime:

JobOperator jobOperator = BatchRuntime.getJobOperator ();

En dan kunnen we aan de klus beginnen:

Long executionId = jobOperator.start ("simpleBatchlet", nieuwe eigenschappen ());

We kunnen de klus echter ook stopzetten:

jobOperator.stop (executionId);

En als laatste kunnen we de klus opnieuw starten:

executId = jobOperator.restart (executionId, nieuwe eigenschappen ());

Laten we eens kijken hoe we een lopende taak kunnen stoppen:

@Test openbare ongeldige gegevenBatchLetStarted_whenStopped_thenBatchStopped () genereert Uitzondering {JobOperator jobOperator = BatchRuntime.getJobOperator (); Long executionId = jobOperator.start ("simpleBatchLet", nieuwe eigenschappen ()); JobExecution jobExecution = jobOperator.getJobExecution (executionId); jobOperator.stop (executionId); jobExecution = BatchTestHelper.keepTestStopped (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.STOPPED); }

En als een batch is GESTOPT, dan kunnen we het herstarten:

@Test openbare leegte gegevenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess () {// ... start en stop de taak assertEquals (jobExecution.getBatchStatus (), BatchStatus.STOPPED); executionId = jobOperator.restart (jobExecution.getExecutionId (), nieuwe eigenschappen ()); jobExecution = BatchTestHelper.keepTestAlive (jobOperator.getJobExecution (executionId)); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

11. Jobs ophalen

Als er dan een batchtaak wordt verzonden de batch-runtime maakt een exemplaar van Jobuitvoering om het te volgen.

Om het Jobuitvoering voor een uitvoerings-ID kunnen we de JobOperator # getJobExecution (executionId) methode.

En, Stapuitvoering biedt nuttige informatie voor het volgen van de uitvoering van een stap.

Om het Stapuitvoering voor een uitvoerings-ID kunnen we de JobOperator # getStepExecutions (executionId) methode.

En daaruit kunnen we verschillende statistieken over de stap krijgen via StepExecution # getMetrics:

@Test openbare ongeldig gegevenChunk_whenJobStarts_thenStepsHaveMetrics () genereert Uitzondering {// ... start taak en wacht op voltooiing assertTrue (jobOperator.getJobNames (). Bevat ("simpleChunk")); assertTrue (jobOperator.getParameters (executionId) .isEmpty ()); StepExecution stepExecution = jobOperator.getStepExecutions (executionId) .get (0); Kaart metricTest = BatchTestHelper.getMetricsMap (stepExecution.getMetrics ()); assertEquals (10L, metricTest.get (Metric.MetricType.READ_COUNT) .longValue ()); assertEquals (5L, metricTest.get (Metric.MetricType.FILTER_COUNT) .longValue ()); assertEquals (4L, metricTest.get (Metric.MetricType.COMMIT_COUNT) .longValue ()); assertEquals (5L, metricTest.get (Metric.MetricType.WRITE_COUNT) .longValue ()); // ... en nog veel meer! }

12. Nadelen

JSR 352 is krachtig, hoewel het op een aantal gebieden ontbreekt:

  • Er lijkt een gebrek te zijn aan lezers en schrijvers die andere formaten zoals JSON kunnen verwerken
  • Er is geen ondersteuning voor generieke geneesmiddelen
  • Partitioneren ondersteunt slechts een enkele stap
  • De API biedt niets om planning te ondersteunen (hoewel J2EE een aparte planningsmodule heeft)
  • Vanwege het asynchrone karakter kan testen een uitdaging zijn
  • De API is behoorlijk uitgebreid

13. Conclusie

In dit artikel hebben we JSR 352 bekeken en geleerd over brokken, batchlets, splitsingen, stromen en nog veel meer. Toch hebben we nauwelijks het oppervlak bekrast.

Zoals altijd is de demo-code te vinden op GitHub.


$config[zx-auto] not found$config[zx-overlay] not found