Spring Batch met Partitioner

1. Overzicht

In onze vorige introductie van Spring Batch hebben we het framework geïntroduceerd als een tool voor batchverwerking. We hebben ook de configuratiedetails en de implementatie onderzocht voor het uitvoeren van een taak met één thread en één proces.

Om een ​​taak met enige parallelle verwerking te implementeren, is er een reeks opties beschikbaar. Op een hoger niveau zijn er twee manieren van parallelle verwerking:

  1. Single-Process, multi-threaded
  2. Multiproces

In dit korte artikel bespreken we de partitionering van Stap, die kan worden geïmplementeerd voor zowel enkelvoudige als meervoudige taken.

2. Partitioneren van een stap

Spring Batch met tussenschot biedt ons de mogelijkheid om de uitvoering van een Stap:

Partitionering Overzicht

De bovenstaande afbeelding toont een implementatie van een Job met een gepartitioneerd Stap.

Er is een Stap genaamd "Master", waarvan de uitvoering is onderverdeeld in enkele "Slave" -stappen. Deze slaven kunnen de plaats van een meester innemen en het resultaat zal nog steeds ongewijzigd zijn. Zowel master als slave zijn voorbeelden van Stap. Slaves kunnen externe services zijn of alleen lokaal uitgevoerde threads.

Indien nodig kunnen we gegevens van de master naar de slave doorgeven. De metagegevens (d.w.z. de JobRepository), zorgt ervoor dat elke slave slechts één keer wordt uitgevoerd in een enkele uitvoering van het Job.

Hier is het sequentiediagram dat laat zien hoe het allemaal werkt:

Partitionering stap

Zoals getoond, is de PartitionStep drijft de executie. De PartitionHandler is verantwoordelijk voor het opsplitsen van het werk van “Master” in de “Slaves”. De meest rechtse Stap is de slaaf.

3. De Maven POM

De Maven-afhankelijkheden zijn dezelfde als vermeld in ons vorige artikel. Dat wil zeggen, Spring Core, Spring Batch en de afhankelijkheid voor de database (in ons geval, SQLite).

4. Configuratie

In ons inleidende artikel zagen we een voorbeeld van het converteren van sommige financiële gegevens van CSV naar XML-bestand. Laten we hetzelfde voorbeeld uitbreiden.

Hier converteren we de financiële informatie van vijf CSV-bestanden naar overeenkomstige XML-bestanden met behulp van een multi-threaded implementatie.

We kunnen dit bereiken met een enkele Job en Stap verdeling. We hebben vijf threads, één voor elk van de CSV-bestanden.

Laten we eerst een vacature maken:

@Bean (name = "partitionerJob") openbare Job partitionerJob () genereert UnexpectedInputException, MalformedURLException, ParseException {return jobs.get ("partitioningJob") .start (partitionStep ()) .build (); }

Zoals we kunnen zien, dit Job begint met de Partitioneren Stap. Dit is onze masterstap die zal worden onderverdeeld in verschillende slave-stappen:

@Bean public Step partitionStep () gooit UnexpectedInputException, MalformedURLException, ParseException {return steps.get ("partitionStep") .partitioner ("slaveStep", partitioner ()) .step (slaveStep ()) .taskExecutor (taskExecutor ()). (); }

Hier maken we de PartitioningStep met behulp van de StepBuilderFactory. Daarvoor moeten we de informatie geven over de SlaveSteps en de Partitioner.

De Partitioner is een interface die de mogelijkheid biedt om een ​​set invoerwaarden voor elk van de slaves te definiëren. Met andere woorden, logica om taken in respectieve threads te verdelen, komt hier.

Laten we er een implementatie van maken, genaamd CustomMultiResourcePartitioner, waar we de invoer- en uitvoerbestandsnamen in de ExecutionContext om door te geven aan elke slaafstap:

openbare klasse CustomMultiResourcePartitioner implementeert Partitioner {@Override openbare kaartpartitie (int gridSize) {Map map = nieuwe HashMap (gridSize); int i = 0, k = 1; for (Resource resource: resources) {ExecutionContext context = nieuwe ExecutionContext (); Assert.state (resource.exists (), "Resource bestaat niet:" + resource); context.putString (keyName, resource.getFilename ()); context.putString ("opFileName", "output" + k +++ ". xml"); map.put (PARTITION_KEY + i, context); i ++; } terugkeer kaart; }}

We maken ook de bean voor deze klas, waar we de brondirectory voor invoerbestanden geven:

@Bean openbare CustomMultiResourcePartitioner partitioner () {CustomMultiResourcePartitioner partitioner = nieuwe CustomMultiResourcePartitioner (); Hulpbronnen [] hulpbronnen; probeer {resources = resoursePatternResolver .getResources ("file: src / main / resources / input / *. csv"); } catch (IOException e) {throw new RuntimeException ("I / O-problemen bij het oplossen" + "het patroon van het invoerbestand.", e); } partitioner.setResources (bronnen); terugkeer partitioner; }

We zullen de slaafstap definiëren, net als elke andere stap met de lezer en de schrijver. De lezer en schrijver zullen dezelfde zijn als we zagen in ons inleidende voorbeeld, behalve dat ze de bestandsnaam-parameter ontvangen van de StepExecutionContext.

Merk op dat deze bonen stapsgewijs moeten zijn, zodat ze de stepExecutionContext params, bij elke stap. Als ze geen step-scoped zouden hebben, worden hun bonen in eerste instantie gemaakt en accepteren ze de bestandsnamen niet op stapniveau:

@StepScope @Bean openbare FlatFileItemReader itemReader (@Value ("# {stepExecutionContext [bestandsnaam]}") String bestandsnaam) genereert UnexpectedInputException, ParseException {FlatFileItemReader reader = nieuwe FlatFileItemReader (); DelimitedLineTokenizer tokenizer = nieuwe DelimitedLineTokenizer (); String [] tokens = {"gebruikersnaam", "userid", "transactiondate", "amount"}; tokenizer.setNames (tokens); reader.setResource (nieuwe ClassPathResource ("input /" + bestandsnaam)); DefaultLineMapper lineMapper = nieuwe DefaultLineMapper (); lineMapper.setLineTokenizer (tokenizer); lineMapper.setFieldSetMapper (nieuwe RecordFieldSetMapper ()); reader.setLinesToSkip (1); reader.setLineMapper (lineMapper); terug lezer; } 
@Bean @StepScope openbare ItemWriter itemWriter (Marshaller marshaller, @Value ("# {stepExecutionContext [opFileName]}") String bestandsnaam) gooit MalformedURLException {StaxEventItemWriter itemWriter = nieuwe StaxEventItemWriter (); itemWriter.setMarshaller (marshaller); itemWriter.setRootTagName ("transactionRecord"); itemWriter.setResource (nieuwe ClassPathResource ("xml /" + bestandsnaam)); retourneer itemWriter; }

Bij het vermelden van de lezer en schrijver in de slave-stap, kunnen we de argumenten als null doorgeven, omdat deze bestandsnamen niet zullen worden gebruikt, aangezien ze de bestandsnamen zullen ontvangen van stepExecutionContext:

@Bean public Step slaveStep () gooit UnexpectedInputException, MalformedURLException, ParseException {return steps.get ("slaveStep"). Chunk (1) .reader (itemReader (null)) .writer (itemWriter (marshaller (), null)) .build (); }

5. Conclusie

In deze zelfstudie hebben we besproken hoe u een taak met parallelle verwerking implementeert met behulp van Spring Batch.

Zoals altijd is de volledige implementatie voor dit voorbeeld beschikbaar op GitHub.