Spring integreren met AWS Kinesis

1. Inleiding

Kinesis is een tool voor het verzamelen, verwerken en analyseren van datastromen in realtime, ontwikkeld bij Amazon. Een van de belangrijkste voordelen is dat het helpt bij de ontwikkeling van gebeurtenisgestuurde applicaties.

In deze tutorial gaan we enkele bibliotheken verkennen stellen onze Spring-applicatie in staat om records van een Kinesis Stream te produceren en te consumeren. De codevoorbeelden tonen de basisfunctionaliteit, maar vertegenwoordigen niet de productieklare code.

2. Voorwaarde

Voordat we verder gaan, moeten we twee dingen doen.

De eerste is om een ​​Spring-project te maken, aangezien het doel hier is om vanuit een Spring-project te communiceren met Kinesis.

De tweede is om een ​​Kinesis-gegevensstroom te maken. We kunnen dit doen vanuit een webbrowser in ons AWS-account. Een alternatief voor de AWS CLI-fans onder ons is om de opdrachtregel te gebruiken. Omdat we ermee communiceren vanuit code, moeten we ook de AWS IAM-referenties, de toegangssleutel en geheime sleutel en de regio bij de hand hebben.

Al onze producenten zullen dummy IP-adresrecords maken, terwijl de consumenten die waarden zullen lezen en ze in de applicatieconsole zullen vermelden.

3. AWS SDK voor Java

De allereerste bibliotheek die we zullen gebruiken, is de AWS SDK voor Java. Het voordeel is dat het ons in staat stelt om veel onderdelen van het werken met Kinesis Data Streams te beheren. Wij kunnen gegevens lezen, gegevens produceren, gegevensstromen maken en gegevensstromen opnieuw hard maken. Het nadeel is dat we, om productieklare code te hebben, aspecten als resharding, foutafhandeling of een daemon moeten coderen om de consument in leven te houden.

3.1. Afhankelijkheid van Maven

De Maven-afhankelijkheid van amazon-kinesis-client biedt alles wat we nodig hebben om werkende voorbeelden te hebben. We zullen het nu toevoegen aan ons pom.xml het dossier:

 com.amazonaws amazon-kinesis-client 1.11.2 

3.2. Lente setup

Laten we het AmazonKinesis object dat nodig is om te communiceren met onze Kinesis Stream. We maken het als een @Boon binnen onze @SpringBootApplication klasse:

@Bean openbare AmazonKinesis buildAmazonKinesis () {BasicAWSCredentials awsCredentials = nieuwe BasicAWSCredentials (accessKey, secretKey); retourneer AmazonKinesisClientBuilder.standard () .withCredentials (nieuwe AWSStaticCredentialsProvider (awsCredentials)) .withRegion (Regions.EU_CENTRAL_1) .build (); }

Laten we vervolgens de aws.access.key en aws.secret.key, nodig voor de lokale machine, in application.properties:

aws.access.key = mijn-aws-access-key-gaat-hier aws.secret.key = mijn-aws-geheime-sleutel-gaat-hier

En we zullen ze lezen met behulp van de @Waarde annotatie:

@Value ("$ {aws.access.key}") private String accessKey; @Value ("$ {aws.secret.key}") private String secretKey;

Eenvoudigheidshalve gaan we erop vertrouwen @Verwacht methoden om records te maken en te consumeren.

3.3. Klant

De AWS SDK Kinesis Consumer maakt gebruik van een pull-model, wat betekent dat onze code records zal trekken uit de scherven van de Kinesis-datastroom:

GetRecordsRequest recordsRequest = nieuwe GetRecordsRequest (); recordsRequest.setShardIterator (shardIterator.getShardIterator ()); recordsRequest.setLimit (25); GetRecordsResult recordsResult = kinesis.getRecords (recordsRequest); while (! recordsResult.getRecords (). isEmpty ()) {recordsResult.getRecords (). stream () .map (record -> nieuwe String (record.getData (). array ())) .forEach (System.out: : println); recordsRequest.setShardIterator (recordsResult.getNextShardIterator ()); recordsResult = kinesis.getRecords (recordsRequest); }

De GetRecordsRequest object bouwt het verzoek om stroomgegevens. In ons voorbeeld hebben we een limiet van 25 records per verzoek gedefinieerd en we blijven lezen totdat er niets meer te lezen is.

We kunnen ook opmerken dat we voor onze iteratie een GetShardIteratorResult voorwerp. We hebben dit object gemaakt in een @PostConstruct-methode, zodat we meteen beginnen met het bijhouden van records:

privé GetShardIteratorResult shardIterator; @PostConstruct private void buildShardIterator () {GetShardIteratorRequest readShardsRequest = nieuwe GetShardIteratorRequest (); readShardsRequest.setStreamName (IPS_STREAM); readShardsRequest.setShardIteratorType (ShardIteratorType.LATEST); readShardsRequest.setShardId (IPS_SHARD_ID); this.shardIterator = kinesis.getShardIterator (readShardsRequest); }

3.4. Producent

Laten we nu eens kijken hoe het aanmaken van records voor onze Kinesis-datastroom afhandelen.

We voegen gegevens in met behulp van een PutRecordsRequest voorwerp. Voor dit nieuwe object voegen we een lijst toe die uit meerdere bestaat PutRecordsRequestEntry voorwerpen:

Lijstitems = IntStream.range (1, 200) .mapToObj (ipSuffix -> {PutRecordsRequestEntry entry = nieuwe PutRecordsRequestEntry (); entry.setData (ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())) ; entry.setPartitionKey (IPS_PARTITION_KEY); return entry;}). collect (Collectors.toList ()); PutRecordsRequest createRecordsRequest = nieuwe PutRecordsRequest (); createRecordsRequest.setStreamName (IPS_STREAM); createRecordsRequest.setRecords (vermeldingen); kinesis.putRecords (createRecordsRequest);

We hebben een basisconsument gecreëerd en een producent van gesimuleerde IP-records. Het enige dat u nu hoeft te doen, is ons Spring-project uit te voeren en de IP-adressen in onze applicatieconsole te zien.

4. KCL en KPL

Kinesis Client Library (KCL) is een bibliotheek die het gebruik van records vereenvoudigt. Het is ook een abstractielaag over de AWS SDK Java API's voor Kinesis Data Streams. Achter de schermen zorgt de bibliotheek voor taakverdeling over vele instanties, reageert op instantiefouten, controleert verwerkte records en reageert op resharding.

Kinesis Producer Library (KPL) is een bibliotheek die handig is om naar een Kinesis-gegevensstroom te schrijven. Het biedt ook een abstractielaag die boven de AWS SDK Java API's voor Kinesis Data Streams zit. Voor betere prestaties verwerkt de bibliotheek automatisch batchverwerking, multi-threading en logica voor opnieuw proberen.

KCL en KPL hebben beide het grote voordeel dat ze gebruiksvriendelijk zijn, zodat we ons kunnen concentreren op het produceren en consumeren van platen.

4.1. Afhankelijkheden van Maven

De twee bibliotheken kunnen indien nodig afzonderlijk in ons project worden gebracht. Om KPL en KCL in ons Maven-project op te nemen, moeten we ons pom.xml-bestand bijwerken:

 com.amazonaws amazon-kinesis-producer 0.13.1 com.amazonaws amazon-kinesis-client 1.11.2 

4.2. Lente setup

De enige voorbereiding op de lente die we nodig hebben, is ervoor zorgen dat we de IAM-inloggegevens bij de hand hebben. De waarden voor aws.access.key en aws.secret.key zijn gedefinieerd in onze application.properties bestand zodat we ze kunnen lezen met @Waarde wanneer nodig.

4.3. Klant

Ten eerste zullen we maak een klasse die de IRecordProcessor interface en definieert onze logica voor het omgaan met Kinesis-gegevensstroomrecords, dat is om ze af te drukken in de console:

public class IpProcessor implementeert IRecordProcessor {@Override public void initialize (InitializationInput initializationInput) {} @Override public void processRecords (ProcessRecordsInput processRecordsInput) {processRecordsInput.getRecords () .forEach (record - >getData (record - >getData (record - >getData (record) () .array ()))); } @Override public void shutdown (ShutdownInput shutdownInput) {}}

De volgende stap is om definieer een fabrieksklasse die de IRecordProcessorFactory koppel en retourneert een eerder gemaakt IpProcessor voorwerp:

openbare klasse IpProcessorFactory implementeert IRecordProcessorFactory {@Override openbare IRecordProcessor createProcessor () {retourneer nieuwe IpProcessor (); }}

En nu voor de laatste stap, we gebruiken een Werknemer bezwaar maken om onze consumentenpijplijn te definiëren. We hebben een ... nodig KinesisClientLibConfiguration object dat, indien nodig, de IAM-referenties en de AWS-regio zal definiëren.

We passeren de KinesisClientLibConfiguration, en onze IpProcessorFactory bezwaar tegen onze Werknemer en start het dan in een aparte thread. We houden deze logica van het consumeren van records altijd levend met het gebruik van de Werknemer klas, dus we lezen nu continu nieuwe records:

BasicAWSCredentials awsCredentials = nieuwe BasicAWSCredentials (accessKey, secretKey); KinesisClientLibConfiguration consumerConfig = nieuwe KinesisClientLibConfiguration (APP_NAME, IPS_STREAM, nieuwe AWSStaticCredentialsProvider (awsCredentials), IPS_WORKER) .withRegionName (Regions.EU_CENTRAL_1.getName ()); laatste werknemer werknemer = nieuwe Worker.Builder () .recordProcessorFactory (nieuwe IpProcessorFactory ()) .config (consumerConfig) .build (); CompletableFuture.runAsync (worker.run ());

4.4. Producent

Laten we nu de KinesisProducerConfiguration object, door de IAM-referenties en de AWS-regio toe te voegen:

BasicAWSCredentials awsCredentials = nieuwe BasicAWSCredentials (accessKey, secretKey); KinesisProducerConfiguration producerConfig = nieuwe KinesisProducerConfiguration () .setCredentialsProvider (nieuwe AWSStaticCredentialsProvider (awsCredentials)) .setVerifyCertificate (false) .setRegion (Regions.EU_CENTRAL_1.getName ()); this.kinesisProducer = nieuwe KinesisProducer (producerConfig);

We zullen de kinesisProducer object dat eerder is gemaakt in een @Verwacht werk en produceer continu records voor onze Kinesis-datastroom:

IntStream.range (1, 200) .mapToObj (ipSuffix -> ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())) .forEach (entry -> kinesisProducer.addUserRecord (IPS_STREAM, IPS_PARTITION_KEY, entry )_KEY );

5. Spring Cloud Stream Binder Kinesis

We hebben al twee bibliotheken gezien, beide gemaakt buiten het Spring-ecosysteem. Wel nu zie hoe de Spring Cloud Stream Binder Kinesis ons leven verder kan vereenvoudigen terwijl u bovenop Spring Cloud Stream bouwt.

5.1. Afhankelijkheid van Maven

De Maven-afhankelijkheid die we in onze applicatie voor de Spring Cloud Stream Binder Kinesis moeten definiëren, is:

 org.springframework.cloud spring-cloud-stream-binder-kinesis 1.2.1.RELEASE 

5.2. Lente setup

Wanneer u op EC2 draait, worden de vereiste AWS-eigenschappen automatisch ontdekt, dus het is niet nodig om ze te definiëren. Omdat we onze voorbeelden op een lokale computer uitvoeren, moeten we onze IAM-toegangssleutel, geheime sleutel en regio voor ons AWS-account definiëren. We hebben ook de automatische CloudFormation-stacknaamdetectie uitgeschakeld voor de applicatie:

cloud.aws.credentials.access-key = mijn-aws-toegangssleutel cloud.aws.credentials.secret-key = mijn-aws-geheime-sleutel cloud.aws.region.static = eu-central-1 cloud.aws .stack.auto = false

Spring Cloud Stream is gebundeld met drie interfaces die we kunnen gebruiken in onze stream-binding:

  • De Wastafel is voor gegevensopname
  • De Bron wordt gebruikt voor het publiceren van records
  • De Verwerker is een combinatie van beide

We kunnen ook onze eigen interfaces definiëren als dat nodig is.

5.3. Klant

Het definiëren van een consument is een tweedelige taak. Eerst zullen we definiëren, in de application.properties, de datastroom waaruit we zullen consumeren:

spring.cloud.stream.bindings.input.destination = live-ips spring.cloud.stream.bindings.input.group = live-ips-groep spring.cloud.stream.bindings.input.content-type = text / plain

En laten we vervolgens een lente definiëren @Component klasse. De annotatie @EnableBinding (Sink.class) zal ons in staat stellen om uit de Kinesis-stream te lezen met behulp van de methode die is geannoteerd met @StreamListener (Sink.INPUT):

@EnableBinding (Sink.class) openbare klasse IpConsumer {@StreamListener (Sink.INPUT) openbare leegte consumeren (String ip) {System.out.println (ip); }}

5.4. Producent

De producent kan ook in tweeën worden gesplitst. Eerst moeten we onze stream-eigenschappen binnenin definiëren application.properties:

spring.cloud.stream.bindings.output.destination = live-ips spring.cloud.stream.bindings.output.content-type = text / plain

En dan we voegen toe @EnableBinding (Source.class) op een lente @Component en maak nieuwe testberichten om de paar seconden:

@Component @EnableBinding (Source.class) openbare klasse IpProducer {@Autowired privé bronbron; @Scheduled (fixedDelay = 3000L) private void produce () {IntStream.range (1, 200) .mapToObj (ipSuffix -> "192.168.0." + IpSuffix) .forEach (entry -> source.output (). Send ( MessageBuilder.withPayload (entry) .build ())); }}

Dat is alles wat we nodig hebben om Spring Cloud Stream Binder Kinesis te laten werken. We kunnen de applicatie nu gewoon starten.

6. Conclusie

In dit artikel hebben we gezien hoe we ons Spring-project kunnen integreren met twee AWS-bibliotheken voor interactie met een Kinesis-gegevensstroom. We hebben ook gezien hoe we de Spring Cloud Stream Binder Kinesis-bibliotheek kunnen gebruiken om de implementatie nog eenvoudiger te maken.

De broncode voor dit artikel is te vinden op Github.