Inleiding tot Apache Storm

1. Overzicht

Deze tutorial is een inleiding tot Apache Storm, een gedistribueerd real-time rekensysteem.

We zullen ons concentreren op en behandelen:

  • Wat is Apache Storm precies en welke problemen lost het op?
  • Zijn architectuur, en
  • Hoe het in een project te gebruiken

2. Wat is Apache Storm?

Apache Storm is een gratis en open source gedistribueerd systeem voor realtime berekeningen.

Het biedt fouttolerantie, schaalbaarheid en garandeert gegevensverwerking, en is vooral goed in het verwerken van onbegrensde gegevensstromen.

Enkele goede use-cases voor Storm zijn het verwerken van creditcardbewerkingen voor fraudedetectie of het verwerken van gegevens van slimme huizen om defecte sensoren te detecteren.

Storm maakt integratie mogelijk met verschillende databases en wachtrijsystemen die op de markt verkrijgbaar zijn.

3. Maven Afhankelijkheid

Voordat we Apache Storm gebruiken, moeten we de storm-core-afhankelijkheid in ons project opnemen:

 org.apache.storm storm-core 1.2.2 voorzien 

We zouden alleen de voorziene ruimte als we van plan zijn onze applicatie op het Storm-cluster uit te voeren.

Om de applicatie lokaal uit te voeren, kunnen we een zogenaamde lokale modus gebruiken die het Storm-cluster in een lokaal proces simuleert, in dat geval moeten we de voorzien.

4. Gegevensmodel

Het datamodel van Apache Storm bestaat uit twee elementen: tuples en streams.

4.1. Tuple

EEN Tuple is een geordende lijst van benoemde velden met dynamische typen. Dit betekent dat we de typen velden niet expliciet hoeven aan te geven.

Storm moet weten hoe alle waarden die in een tuple worden gebruikt, moeten worden geserialiseerd. Standaard kan het primitieve typen al serialiseren, Snaren en byte arrays.

En aangezien Storm Kryo-serialisering gebruikt, moeten we de serialisator registreren met Config om de aangepaste typen te gebruiken. We kunnen dit op twee manieren doen:

Ten eerste kunnen we de klasse registreren om te serialiseren met de volledige naam:

Config config = nieuwe Config (); config.registerSerialization (User.class);

In dat geval zal Kryo de klasse serialiseren met FieldSerializer. Standaard worden hiermee alle niet-tijdelijke velden van de klasse geserialiseerd, zowel privé als openbaar.

Of in plaats daarvan kunnen we zowel de klasse leveren die moet worden geserialiseerd als de serialisator die Storm voor die klasse moet gebruiken:

Config config = nieuwe Config (); config.registerSerialization (User.class, UserSerializer.class);

Om de aangepaste serialisator te maken, moeten we de generieke klasse uitbreiden Serializer dat heeft twee methoden schrijven en lezen.

4.2. Stroom

EEN Stroom is de kern abstractie in het Storm-ecosysteem. De Stroom is een onbegrensde opeenvolging van tupels.

Storms maakt het mogelijk om meerdere streams parallel te verwerken.

Elke stream heeft een id die wordt verstrekt en toegewezen tijdens de aangifte.

5. Topologie

De logica van de real-time Storm-applicatie is verpakt in de topologie. De topologie bestaat uit tuiten en bouten.

5.1. Tuit

Tuiten zijn de bronnen van de stromen. Ze zenden tupels uit naar de topologie.

Tuples kunnen worden gelezen vanuit verschillende externe systemen zoals Kafka, Kestrel of ActiveMQ.

Tuiten kunnen zijn betrouwbaar of onbetrouwbaar. Betrouwbaar betekent dat de tuit kan antwoorden dat het tupel niet door Storm is verwerkt. Onbetrouwbaar betekent dat de tuit niet antwoordt omdat hij een vuur-en-vergeet-mechanisme zal gebruiken om de tupels uit te zenden.

Om de aangepaste uitloop te maken, moeten we de IRichSpout interface of breid een klasse uit die de interface al implementeert, bijvoorbeeld een abstract BaseRichSpout klasse.

Laten we een onbetrouwbaar tuit:

openbare klasse RandomIntSpout breidt BaseRichSpout uit {privé Willekeurig willekeurig; privé SpoutOutputCollector outputCollector; @Override public void open (Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {random = new Random (); outputCollector = spoutOutputCollector; } @Override public void nextTuple () {Utils.sleep (1000); outputCollector.emit (nieuwe waarden (random.nextInt (), System.currentTimeMillis ())); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (nieuwe velden ("randomInt", "timestamp")); }}

Onze gewoonte RandomIntSpout genereert elke seconde een willekeurig geheel getal en een tijdstempel.

5.2. Bout

Bouten verwerken tupels in de stroom. Ze kunnen verschillende bewerkingen uitvoeren, zoals filteren, aggregaties of aangepaste functies.

Sommige bewerkingen vereisen meerdere stappen en daarom zullen we in dergelijke gevallen meerdere bouten moeten gebruiken.

Om het aangepaste Bout, we moeten implementeren IRichBolt of voor eenvoudigere bewerkingen IBasicBolt koppel.

Er zijn ook meerdere helperklassen beschikbaar om te implementeren Bout. In dit geval gebruiken we BaseBasicBolt:

public class PrintingBolt breidt BaseBasicBolt uit {@Override public void execute (Tuple tuple, BasicOutputCollector basicOutputCollector) {System.out.println (tuple); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {}}

Deze gewoonte AfdrukkenBolt zal gewoon alle tuples naar de console afdrukken.

6. Een eenvoudige topologie maken

Laten we deze ideeën samenbrengen in een eenvoudige topologie. Onze topologie heeft één uitloop en drie bouten.

6.1. RandomNumberTuit

In het begin maken we een onbetrouwbare tuit. Het genereert elke seconde willekeurige gehele getallen uit het bereik (0,100):

openbare klasse RandomNumberSpout breidt BaseRichSpout uit {privé Willekeurig willekeurig; privé SpoutOutputCollector-verzamelaar; @Override public void open (Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {random = new Random (); collector = tuitOutputCollector; } @Override public void nextTuple () {Utils.sleep (1000); int operatie = random.nextInt (101); lange tijdstempel = System.currentTimeMillis (); Waardenwaarden = nieuwe waarden (bewerking, tijdstempel); collector.emit (waarden); } @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (nieuwe velden ("operatie", "timestamp")); }}

6.2. FilterenBout

Vervolgens maken we een bout waarmee alle elementen worden uitgefilterd operatie gelijk aan 0:

public class FilteringBolt breidt BaseBasicBolt uit {@Override public void execute (Tuple tuple, BasicOutputCollector basicOutputCollector) {int operation = tuple.getIntegerByField ("operation"); if (operatie> 0) {basicOutputCollector.emit (tuple.getValues ​​()); }} @Override public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (nieuwe velden ("operatie", "timestamp")); }}

6.3. AggregatingBolt

Laten we vervolgens een ingewikkelder maken Bout dat zal alle positieve operaties van elke dag samenvoegen.

Voor dit doel gebruiken we een specifieke klasse die speciaal is gemaakt voor het implementeren van bouten die op vensters werken in plaats van op enkele tuples: BaseWindowedBolt.

ramen zijn een essentieel concept in stroomverwerking, waarbij de oneindige stromen in eindige brokken worden opgesplitst. We kunnen dan berekeningen toepassen op elk blok. Er zijn over het algemeen twee soorten vensters:

Tijdvensters worden gebruikt om elementen uit een bepaalde tijdsperiode te groeperen met behulp van tijdstempels. Tijdvensters kunnen een ander aantal elementen hebben.

Telvensters worden gebruikt om vensters met een gedefinieerde grootte te maken. In dat geval hebben alle vensters dezelfde grootte en het venster ook worden niet uitgezonden als er minder elementen zijn dan de gedefinieerde grootte.

Onze AggregatingBolt genereert de som van alle positieve bewerkingen uit een tijdsvenster samen met de begin- en eindtijdstempels:

openbare klasse AggregatingBolt breidt BaseWindowedBolt {private OutputCollector outputCollector uit; @Override openbare leegte voorbereiden (Map stormConf, TopologyContext-context, OutputCollector-verzamelaar) {this.outputCollector = verzamelaar; } @Override public void declareOutputFields (OutputFieldsDeclarer declarer) {declarer.declare (new Fields ("sumOfOperations", "beginTimestamp", "endTimestamp")); } @Override openbare leegte uitvoeren (TupleWindow tupleWindow) {Lijst tuples = tupleWindow.get (); tuples.sort (Comparator.comparing (this :: getTimestamp)); int sumOfOperations = tuples.stream () .mapToInt (tuple -> tuple.getIntegerByField ("operatie")) .sum (); Lang beginTimestamp = getTimestamp (tuples.get (0)); Long endTimestamp = getTimestamp (tuples.get (tuples.size () - 1)); Waarden waarden = nieuwe waarden (sumOfOperations, beginTimestamp, endTimestamp); outputCollector.emit (waarden); } private Long getTimestamp (Tuple tuple) {return tuple.getLongByField ("timestamp"); }}

Merk op dat het in dit geval veilig is om het eerste element van de lijst direct te krijgen. Dat komt omdat elk venster wordt berekend met behulp van de tijdstempel veld van de Tuple, zo er moet zijn ten minste één element in elk venster.

6.4. FileWritingBolt

Ten slotte maken we een bout waarmee alle elementen kunnen worden meegenomen sumOfOperations groter dan 2000, serialiseer ze en schrijf ze naar het bestand:

openbare klasse FileWritingBolt breidt BaseRichBolt uit {openbare statische Logger-logger = LoggerFactory.getLogger (FileWritingBolt.class); privé BufferedWriter-schrijver; private String filePath; privé ObjectMapper objectMapper; @Override public void cleanup () {probeer {writer.close (); } catch (IOException e) {logger.error ("Kan schrijver niet sluiten!"); }} @Override openbare leegte voorbereiden (Map map, TopologyContext topologyContext, OutputCollector outputCollector) {objectMapper = new ObjectMapper (); objectMapper.setVisibility (PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); probeer {schrijver = nieuwe BufferedWriter (nieuwe FileWriter (filePath)); } catch (IOException e) {logger.error ("Kan een bestand niet openen om te schrijven.", e); }} @Override public void execute (Tuple tuple) {int sumOfOperations = tuple.getIntegerByField ("sumOfOperations"); long beginTimestamp = tuple.getLongByField ("beginTimestamp"); long endTimestamp = tuple.getLongByField ("endTimestamp"); if (sumOfOperations> 2000) {AggregatedWindow aggregatedWindow = nieuw AggregatedWindow (sumOfOperations, beginTimestamp, endTimestamp); probeer {writer.write (objectMapper.writeValueAsString (aggregatedWindow)); writer.newLine (); schrijver.flush (); } catch (IOException e) {logger.error ("Kan geen gegevens naar bestand schrijven.", e); }}} // openbare constructor en andere methoden}

Merk op dat we de uitvoer niet hoeven te declareren, aangezien dit de laatste bout in onze topologie zal zijn

6.5. De topologie uitvoeren

Eindelijk kunnen we alles samenvoegen en onze topologie uitvoeren:

openbare statische leegte runTopology () {TopologyBuilder builder = nieuwe TopologyBuilder (); Uitloop random = nieuwe RandomNumberSpout (); builder.setSpout ("randomNumberSpout"); Boutfiltering = nieuwe FilteringBolt (); builder.setBolt ("filteringBolt", filtering) .shuffleGrouping ("randomNumberSpout"); Boutaggregatie = nieuwe AggregatingBolt () .withTimestampField ("timestamp") .withLag (BaseWindowedBolt.Duration.seconds (1)) .withWindow (BaseWindowedBolt.Duration.seconds (5)); builder.setBolt ("aggregatingBolt", aggregating) .shuffleGrouping ("filteringBolt"); String filePath = "./src/main/resources/data.txt"; Bolt-bestand = nieuwe FileWritingBolt (filePath); builder.setBolt ("fileBolt", bestand) .shuffleGrouping ("aggregatingBolt"); Config config = nieuwe Config (); config.setDebug (false); LocalCluster-cluster = nieuwe LocalCluster (); cluster.submitTopology ("Test", config, builder.createTopology ()); }

Om de gegevens door elk stuk in de topologie te laten stromen, moeten we aangeven hoe ze met elkaar moeten worden verbonden. shuffleGroup stelt ons in staat om die gegevens te vermelden voor filteringBolt zal komen van randomNumberSpout.

Voor elk Bout, we moeten toevoegen shuffleGroup die de bron van elementen voor deze bout definieert. De bron van elementen kan een Tuit of een ander Bout. En als we dezelfde bron instellen voor meer dan één bout, de bron zal alle elementen naar elk van hen sturen.

In dit geval gebruikt onze topologie de LocalCluster om de taak lokaal uit te voeren.

7. Conclusie

In deze tutorial hebben we Apache Storm geïntroduceerd, een gedistribueerd real-time rekensysteem. We hebben een tuit gemaakt, enkele bouten en ze samengetrokken tot een complete topologie.

En, zoals altijd, zijn alle codevoorbeelden te vinden op GitHub.