Kafka en Spring Boot testen

1. Overzicht

Apache Kafka is een krachtig, gedistribueerd, fouttolerant stroomverwerkingssysteem. In een eerdere tutorial leerden we werken met Spring en Kafka.

In deze tutorial we zullen voortbouwen op de vorige en leren hoe we betrouwbare, op zichzelf staande integratietests kunnen schrijven die niet afhankelijk zijn van een externe Kafka-server die draait.

We beginnen eerst maar kijken hoe we een ingebedde instantie van Kafka kunnen gebruiken en configureren. Daarna zullen we zien hoe we gebruik kunnen maken van het populaire framework Testcontainers uit onze tests.

2. Afhankelijkheden

Natuurlijk moeten we de standaard toevoegen lente-kafka afhankelijkheid van onze pom.xml:

 org.springframework.kafka spring-kafka 2.6.3.RELEASE 

Dan hebben we nog twee afhankelijkheden nodig, specifiek voor onze tests. Eerst voegen we de lente-kafka-test artefact:

 org.springframework.kafka spring-kafka-test 2.6.3.RELEASE-test 

En tot slot voegen we de Testcontainers Kafka-afhankelijkheid toe, die ook beschikbaar is via Maven Central:

 org.testcontainers kafka 1.15.0 test 

Nu we alle benodigde afhankelijkheden hebben geconfigureerd, kunnen we een eenvoudige Spring Boot-applicatie schrijven met Kafka.

3. Een eenvoudige Kafka Producer-Consumer-applicatie

In deze tutorial zal de focus van onze tests liggen op een eenvoudige Spring Boot Kafka-applicatie voor producenten en consumenten.

Laten we beginnen met het definiëren van ons toegangspunt voor toepassingen:

@SpringBootApplication @EnableAutoConfiguration openbare klasse KafkaProducerConsumerApplication {openbare statische leegte hoofd (String [] args) {SpringApplication.run (KafkaProducerConsumerApplication.class, args); }}

Zoals we kunnen zien, is dit een standaard Spring Boot-applicatie. Waar mogelijk willen we gebruik maken van standaard configuratiewaarden. Met dit in gedachten maken we gebruik van de @EnableAutoConfiguration annotatie om onze applicatie automatisch te configureren.

3.1. Producer instellen

Laten we vervolgens een producer bean bekijken die we zullen gebruiken om berichten naar een bepaald Kafka-onderwerp te sturen:

@Component openbare klasse KafkaProducer {privé statische laatste Logger LOGGER = LoggerFactory.getLogger (KafkaProducer.class); @Autowired privé KafkaTemplate kafkaTemplate; public void send (String-onderwerp, String-payload) {LOGGER.info ("send payload =" {} "to topic =" {} "", payload, onderwerp); kafkaTemplate.send (onderwerp, payload); }}

Onze KafkaProducer boon hierboven gedefinieerd is slechts een wikkel rond de Kafka-sjabloon klasse. Deze klasse biedt threadveilige bewerkingen op hoog niveau, zoals het verzenden van gegevens naar het opgegeven onderwerp, wat precies is wat we doen in onze sturen methode.

3.2. Consumentenconfiguratie

Evenzo zullen we nu een eenvoudige consumentenboon definiëren die naar een Kafka-onderwerp luistert en berichten ontvangt:

@Component openbare klasse KafkaConsumer {privé statische laatste Logger LOGGER = LoggerFactory.getLogger (KafkaConsumer.class); private CountDownLatch latch = nieuwe CountDownLatch (1); private String payload = null; @KafkaListener (topics = "$ {test.topic}") openbare ongeldige ontvangst (ConsumerRecord consumerRecord) {LOGGER.info ("ontvangen payload =" {} "", consumerRecord.toString ()); setPayload (consumerRecord.toString ()); latch.countDown (); } openbare CountDownLatch getLatch () {return latch; } public String getPayload () {retourlading; }}

Onze eenvoudige consument gebruikt de @KafkaListener annotatie op de te ontvangen methode om naar berichten over een bepaald onderwerp te luisteren. We zullen later zien hoe we het test.topic van onze tests.

Bovendien slaat de ontvangstmethode de berichtinhoud op in onze bean en verlaagt het aantal klink variabele. Deze variabele is een eenvoudig thread-safe tellerveld dat we later uit onze tests zullen gebruiken om er zeker van te zijn dat we met succes een bericht hebben ontvangen.

Nu we onze eenvoudige Kafka-applicatie met Spring Boot hebben geïmplementeerd, laten we eens kijken hoe we integratietests kunnen schrijven.

4. Een woord over testen

Over het algemeen moeten we bij het schrijven van schone integratietests niet afhankelijk zijn van externe services die we mogelijk niet kunnen controleren of die plotseling stoppen met werken. Dit kan nadelige gevolgen hebben voor onze testresultaten.

Evenzo, als we afhankelijk zijn van een externe service, in dit geval een actieve Kafka-makelaar, zullen we deze waarschijnlijk niet kunnen instellen, beheren en afbreken op de manier die we willen van onze tests.

4.1. Toepassingseigenschappen

We gaan een zeer lichte set van applicatieconfiguratie-eigenschappen uit onze tests gebruiken. We zullen deze eigenschappen definiëren in onze src / test / resources / application.yml het dossier:

spring: kafka: consumer: auto-offset-reset: vroegste groeps-id: baeldung test: topic: embedded-test-topic

Dit is de minimale set eigenschappen die we nodig hebben wanneer we werken met een ingesloten instantie van Kafka of een lokale broker.

De meeste hiervan spreken voor zich, maar degene die we van bijzonder belang moeten benadrukken, is het eigendom van de consument auto-offset-reset: vroegste. Deze eigenschap zorgt ervoor dat onze consumentengroep de berichten ontvangt die we verzenden, omdat de container kan starten nadat de verzendingen zijn voltooid.

Bovendien configureren we een onderwerpeigenschap met de waarde embedded-test-topic, dat is het onderwerp dat we zullen gebruiken tijdens onze tests.

5. Testen met Embedded Kafka

In deze sectie zullen we bekijken hoe u een in-memory Kafka-instantie kunt gebruiken om onze tests op uit te voeren. Dit wordt ook wel Embedded Kafka genoemd.

De afhankelijkheid lente-kafka-test die we eerder hebben toegevoegd, bevat enkele handige hulpprogramma's om te helpen bij het testen van onze applicatie. Het bevat met name de EmbeddedKafkaBroker klasse.

Met dat in gedachten, laten we doorgaan en onze eerste integratietest schrijven:

@SpringBootTest @DirtiesContext @EmbeddedKafka (partitions = 1, brokerProperties = {"listeners = PLAINTEXT: // localhost: 9092", "port = 9092"}) class EmbeddedKafkaIntegrationTest {@Autowired private KafkaConsumer-consument; @Autowired particuliere KafkaProducer-producent; @Value ("$ {test.topic}") privé String-onderwerp; @Test openbare leegte gegevenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived () gooit uitzondering {producer.send (topic, "Verzenden met eigen eenvoudige KafkaProducer"); consumer.getLatch (). await (10000, TimeUnit.MILLISECONDS); assertThat (consumer.getLatch (). getCount (), equalTo (0L)); assertThat (consumer.getPayload (), containsString ("embedded-test-topic")); }}

Laten we de belangrijkste onderdelen van onze test eens doornemen. Eerst beginnen we met het verfraaien van onze testklas met twee vrij standaard lente-annotaties:

  • De @BuienRadarNL annotatie zorgt ervoor dat onze test de Spring-toepassingscontext opstart
  • We gebruiken ook de @DirtiesContext annotatie, die ervoor zorgt dat deze context wordt opgeschoond en opnieuw wordt ingesteld tussen verschillende tests

Hier komt het cruciale deel, we gebruiken de @EmbeddedKafka annotatie om een ​​instantie van een EmbeddedKafkaBroker in onze tests. Bovendien zijn er verschillende eigenschappen beschikbaar die we kunnen gebruiken om het ingesloten Kafka-knooppunt te configureren:

  • partities - dit is het aantal partities dat per onderwerp wordt gebruikt. Om de zaken leuk en simpel te houden, willen we dat er maar één uit onze tests wordt gebruikt
  • makelaarProperties - aanvullende eigendommen voor de Kafka-makelaar. Ook hier houden we het simpel en specificeren we een luisteraar in platte tekst en een poortnummer

Vervolgens bedraden we onze klant en producent klassen en configureer een onderwerp om de waarde van onze application.properties.

Voor het laatste stukje van de puzzel, we sturen gewoon een bericht naar ons testonderwerp en controleren of het bericht is ontvangen en de naam van ons testonderwerp bevat.

Wanneer we onze test uitvoeren, zien we tussen de uitgebreide Spring-uitvoer:

... 12: 45: 35.099 [main] INFO cbkafka.embedded.KafkaProducer - send payload = "Verzenden met onze eigen eenvoudige KafkaProducer" to topic = "embedded-test-topic" ... 12: 45: 35.103 [org .springframework.kafka.KafkaListenerEndpointContainer # 0-0-C-1] INFO cbkafka.embedded.KafkaConsumer - ontvangen payload = 'ConsumerRecord (topic = embedded-test-topic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, grootte van geserialiseerde sleutel = -1, grootte van geserialiseerde waarde = 41, headers = RecordHeaders (headers = [], isReadOnly = false),  key = null, value = Verzenden met onze eigen eenvoudige KafkaProducer) ' 

Dit bevestigt dat onze test naar behoren werkt. Geweldig! We hebben nu een manier om op zichzelf staande, onafhankelijke integratietests te schrijven met behulp van een in-memory Kafka-makelaar.

6. Kafka testen met TestContainers

Soms zien we kleine verschillen tussen een echte externe service en een ingebedde instantie in het geheugen van een service die specifiek is geleverd voor testdoeleinden. Hoewel het onwaarschijnlijk is, kan het ook zijn dat de poort die tijdens onze test wordt gebruikt, bezet is, waardoor een storing optreedt.

Met dat in gedachten zullen we in deze sectie een variatie zien op onze eerdere benadering van testen met behulp van het Testcontainers-framework. We zullen zien hoe we een externe Apache Kafka-broker kunnen instantiëren en beheren die wordt gehost in een Docker-container van onze integratietest.

Laten we een andere integratietest definiëren die vrij gelijkaardig zal zijn aan degene die we in de vorige sectie hebben gezien:

@RunWith (SpringRunner.class) @Import (com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest (classes = KafkaProducerConsonscontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration. .parse ("confluentinc / cp-kafka: 5.4.3")); @Autowired particuliere KafkaConsumer-consument; @Autowired particuliere KafkaProducer-producent; @Value ("$ {test.topic}") privé String-onderwerp; @Test openbare leegte gegevenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived () gooit Uitzondering {producer.send (onderwerp, "Verzenden met eigen controller"); consumer.getLatch (). await (10000, TimeUnit.MILLISECONDS); assertThat (consumer.getLatch (). getCount (), equalTo (0L)); assertThat (consumer.getPayload (), containsString ("embedded-test-topic")); }}

Laten we deze keer eens kijken naar de verschillen. We verklaren het kafka veld, wat een standaard JUnit is @ClassRule. Dit veld is een instantie van de KafkaContainer klasse die de levenscyclus van onze container met Kafka voorbereidt en beheert.

Om poortconflicten te voorkomen, wijst Testcontainers dynamisch een poortnummer toe wanneer onze dockercontainer start. Om deze reden bieden we een aangepaste fabrieksconfiguratie voor consumenten en producenten met behulp van de klasse KafkaTestContainersConfiguration:

@Bean public Map consumerConfigs () {Map props = new HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers ()); props.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "vroegste"); props.put (ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); // meer standaard configuratie retour rekwisieten; } @Bean public ProducerFactory producerFactory () {Map configProps = nieuwe HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers ()); // meer standaardconfiguratie retourneert nieuwe DefaultKafkaProducerFactory (configProps); }

We verwijzen vervolgens naar deze configuratie via de @Importeren annotatie aan het begin van onze test.

De reden hiervoor is dat we een manier nodig hebben om het serveradres in onze applicatie te injecteren, die zoals eerder vermeld dynamisch wordt gegenereerd. We bereiken dit door de getBootstrapServers () methode, die de bootstrap-serverlocatie retourneert:

bootstrap.servers = [PLAINTEXT: // localhost: 32789]

Als we nu onze test uitvoeren, zouden we moeten zien dat Testcontainers verschillende dingen doet:

  • Controleert onze lokale Docker-instellingen.
  • Trekt de confluentinc / cp-kafka: 5.4.3 docker-afbeelding indien nodig
  • Start een nieuwe container en wacht tot deze klaar is
  • Ten slotte wordt de container afgesloten en verwijderd nadat onze test is voltooid

Nogmaals, dit wordt bevestigd door de testoutput te inspecteren:

13: 33: 10.396 [main] INFO 🐳 [confluentinc / cp-kafka: 5.4.3] - Container aanmaken voor afbeelding: confluentinc / cp-kafka: 5.4.3 13: 33: 10.454 [main] INFO 🐳 [confluentinc / cp -kafka: 5.4.3] - beginnend container met ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3 13: 33: 10,785 [main] INFO 🐳 [confluentinc / cp-kafka: 5.4.3] - container confluentinc / cp-kafka: 5.4.3 begint: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Presto! Een werkende integratietest met behulp van een Kafka-dockercontainer.

7. Conclusie

In dit artikel hebben we een aantal benaderingen geleerd voor het testen van Kafka-applicaties met Spring Boot. In de eerste benadering hebben we gezien hoe we een lokale in-memory Kafka-broker kunnen configureren en gebruiken.

Toen zagen we hoe we Testcontainers konden gebruiken om een ​​externe Kafka-makelaar op te zetten die in een dockercontainer draait uit onze tests.

Zoals altijd is de volledige broncode van het artikel beschikbaar op GitHub.