Inleiding tot Apache Flink met Java

1. Overzicht

Apache Flink is een Big Data-verwerkingsraamwerk waarmee programmeurs de enorme hoeveelheid gegevens op een zeer efficiënte en schaalbare manier kunnen verwerken.

In dit artikel introduceren we enkele van de API-kernconcepten en standaard datatransformaties die beschikbaar zijn in het Apache Flink Java API. De vloeiende stijl van deze API maakt het gemakkelijk om te werken met de centrale constructie van Flink - de gedistribueerde collectie.

Eerst kijken we naar Flink's DataSet API-transformaties en gebruik ze om een ​​programma voor het tellen van woorden te implementeren. Daarna kijken we kort naar Flink's Data stroom API, waarmee u streams van evenementen in realtime kunt verwerken.

2. Maven Afhankelijkheid

Om aan de slag te gaan, moeten we Maven-afhankelijkheden toevoegen aan flink-java en flink-test-utils bibliotheken:

 org.apache.flink flink-java 1.2.0 org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. Kern-API-concepten

Wanneer we met Flink werken, moeten we enkele dingen weten met betrekking tot de API:

  • Elk Flink-programma voert transformaties uit op gedistribueerde gegevensverzamelingen. Er zijn verschillende functies beschikbaar voor het transformeren van gegevens, waaronder filteren, in kaart brengen, samenvoegen, groeperen en aggregeren
  • EEN wastafel operatie in Flink triggert de uitvoering van een stream om het gewenste resultaat van het programma te produceren, zoals het opslaan van het resultaat naar het bestandssysteem of het afdrukken naar de standaarduitvoer
  • Flink-transformaties zijn lui, wat betekent dat ze pas worden uitgevoerd als a wastafel operatie wordt aangeroepen
  • De Apache Flink API ondersteunt twee bewerkingsmodi: batch en realtime. Als u te maken heeft met een beperkte gegevensbron die in batchmodus kan worden verwerkt, gebruikt u de DataSet API. Als u onbegrensde gegevensstromen in realtime wilt verwerken, moet u de Data stroom API

4. DataSet API-transformaties

Het toegangspunt tot het Flink-programma is een exemplaar van het Uitvoeringsomgeving class - dit definieert de context waarin een programma wordt uitgevoerd.

Laten we een Uitvoeringsomgeving om onze verwerking te starten:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment ();

Merk op dat wanneer u de toepassing op de lokale computer start, deze de verwerking op de lokale JVM zal uitvoeren. Als u de verwerking op een cluster van machines wilt starten, moet u Apache Flink op die machines installeren en het Uitvoeringsomgeving overeenkomstig.

4.1. Een gegevensset maken

Om datatransformaties uit te voeren, moeten we ons programma van de data voorzien.

Laten we een instantie maken van het DataSet klasse met behulp van onze Uitvoeringsomgeving:

DataSet-bedragen = env.fromElements (1, 29, 40, 50);

U kunt een DataSet uit meerdere bronnen, zoals Apache Kafka, een CSV, bestand of vrijwel elke andere gegevensbron.

4.2. Filter en verklein

Zodra u een instantie van het DataSet klasse, kunt u er transformaties op toepassen.

Stel dat u getallen wilt filteren die boven een bepaalde drempel liggen en ze vervolgens allemaal wilt optellen. U kunt de filter() en verminderen() transformaties om dit te bereiken:

int drempel = 30; Lijst verzamelen = bedragen .filter (a -> a> drempel) .reduce ((geheel getal, t1) -> geheel getal + t1) .collect (); assertThat (collect.get (0)). isEqualTo (90); 

Merk op dat de verzamelen() methode is een wastafel operatie die de feitelijke datatransformaties activeert.

4.3. Kaart

Laten we zeggen dat je een DataSet van Persoon voorwerpen:

privé statische klasse Persoon {privé int leeftijd; private String naam; // standard constructors / getters / setters}

Laten we vervolgens een DataSet van deze objecten:

DataSet personDataSource = env.fromCollection (Arrays.asList (nieuwe persoon (23, "Tom"), nieuwe persoon (75, "Michael")));

Stel dat u alleen het leeftijd veld van elk object van de collectie. U kunt de kaart() transformatie om alleen een specifiek veld van de Persoon klasse:

Lijst leeftijden = personDataSource .map (p -> p.age) .collect (); assertThat (leeftijden) .hasSize (2); assertThat (leeftijden) .bevat (23, 75);

4.4. Doe mee

Als u twee datasets heeft, wilt u ze misschien bij enkele ID kaart veld. Hiervoor kunt u de toetreden () transformatie.

Laten we verzamelingen transacties en adressen van een gebruiker maken:

Tuple3-adres = nieuwe Tuple3 (1, "5th Avenue", "London"); DataSet adressen = env.fromElements (adres); Tuple2 firstTransaction = nieuwe Tuple2 (1, "Transaction_1"); DataSet transacties = env.fromElements (firstTransaction, new Tuple2 (12, "Transaction_2")); 

Het eerste veld in beide tupels is een Geheel getal type, en dit is een ID kaart veld waarop we beide gegevenssets willen samenvoegen.

Om de feitelijke samenvoegingslogica uit te voeren, moeten we een KeySelector interface voor adres en transactie:

private statische klasse IdKeySelectorTransaction implementeert KeySelector {@Override public Integer getKey (Tuple2 value) {return value.f0; }} privé statische klasse IdKeySelectorAddress implementeert KeySelector {@Override public Integer getKey (Tuple3 value) {return value.f0; }}

Elke selector retourneert alleen het veld waarop de join moet worden uitgevoerd.

Helaas is het hier niet mogelijk om lambda-expressies te gebruiken omdat Flink generieke typegegevens nodig heeft.

Laten we vervolgens samenvoeglogica implementeren met behulp van die selectors:

Lijst<>> lid = transacties.join (adressen) .where (nieuwe IdKeySelectorTransaction ()) .equalTo (nieuwe IdKeySelectorAddress ()) .collect (); assertThat (toegetreden) .hasSize (1); assertThat (toegetreden) .contains (new Tuple2 (firstTransaction, address)); 

4.5. Soort

Stel dat u de volgende verzameling Tuple2:

Tuple2 secondPerson = nieuwe Tuple2 (4, "Tom"); Tuple2 thirdPerson = nieuwe Tuple2 (5, "Scott"); Tuple2 vierdePerson = nieuwe Tuple2 (200, "Michael"); Tuple2 firstPerson = nieuwe Tuple2 (1, "Jack"); DataSet transacties = env.fromElements (vierde persoon, tweede persoon, derde persoon, eerste persoon); 

Als u deze verzameling op het eerste veld van het tupel wilt sorteren, kunt u de sortPartitions () transformatie:

Lijst gesorteerd = transacties .sortPartition (nieuwe IdKeySelectorTransaction (), Order.ASCENDING) .collect (); assertThat (gesorteerd) .containsExactly (firstPerson, secondPerson, thirdPerson, 4thPerson);

5. Aantal woorden

Het probleem met het aantal woorden is een probleem dat vaak wordt gebruikt om de mogelijkheden van Big Data-verwerkingsraamwerken te demonstreren. De basisoplossing omvat het tellen van woordoccurrences in een tekstinvoer. Laten we Flink gebruiken om een ​​oplossing voor dit probleem te implementeren.

Als eerste stap in onze oplossing creëren we een LineSplitter klasse die onze invoer opsplitst in tokens (woorden), waarbij voor elk token een Tuple2 van sleutel / waarde-paren. In elk van deze tupels is de sleutel een woord dat in de tekst wordt gevonden, en de waarde is het gehele getal één (1).

Deze klasse implementeert het FlatMapFunction interface die Draad als input en produceert een Tuple2:

openbare klasse LineSplitter implementeert FlatMapFunction {@Override public void flatMap (String-waarde, Collector out) {Stream.of (value.toLowerCase (). split ("\ W +")) .filter (t -> t.length ()> 0) .forEach (token -> out.collect (new Tuple2 (token , 1))); }}

We noemen de verzamelen() methode op de Verzamelaar class om gegevens naar voren te duwen in de verwerkingspijplijn.

Onze volgende en laatste stap is om de tupels te groeperen op basis van hun eerste elementen (woorden) en vervolgens een som aggregeren op de tweede elementen om een ​​telling van de woordcombinaties te produceren:

openbare statische DataSet startWordCount (ExecutionEnvironment env, lijstregels) genereert uitzondering {DataSet text = env.fromCollection (regels); return text.flatMap (nieuwe LineSplitter ()) .groupBy (0) .aggregate (Aggregations.SUM, 1); }

We gebruiken drie soorten Flink-transformaties: flatMap (), groupBy (), en aggregaat().

Laten we een test schrijven om te bevestigen dat de implementatie van het aantal woorden werkt zoals verwacht:

List lines = Arrays.asList ("Dit is een eerste zin", "Dit is een tweede zin met één woord"); DataSet result = WordCount.startWordCount (env, regels); Lijst collect = result.collect (); assertThat (collect) .containsExactlyInAnyOrder (nieuwe Tuple2 ("a", 3), nieuwe Tuple2 ("zin", 2), nieuwe Tuple2 ("woord", 1), nieuwe Tuple2 ("is", 2), nieuwe Tuple2 ( "dit", 2), nieuwe Tuple2 ("tweede", 1), nieuwe Tuple2 ("eerste", 1), nieuwe Tuple2 ("met", 1), nieuwe Tuple2 ("één", 1));

6. DataStream API

6.1. Een DataStream maken

Apache Flink ondersteunt ook de verwerking van gebeurtenissenstromen via de DataStream API. Als we evenementen willen gaan consumeren, moeten we eerst de StreamExecutionEnvironment klasse:

StreamExecutionEnvironment executEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ();

Vervolgens kunnen we een stroom evenementen maken met behulp van de ExecutionMilieu uit verschillende bronnen. Het kan een berichtbus zijn zoals Apache Kafka, maar in dit voorbeeld zullen we eenvoudig een bron maken op basis van een aantal stringelementen:

DataStream dataStream = executionEnvironment.fromElements ("Dit is een eerste zin", "Dit is een tweede zin met één woord");

We kunnen transformaties toepassen op elk element van de Data stroom zoals in het normale DataSet klasse:

SingleOutputStreamOperator upperCase = text.map (String :: toUpperCase);

Om de uitvoering te activeren, moeten we een Sink-bewerking aanroepen, zoals afdrukken() dat zal alleen het resultaat van transformaties naar de standaarduitvoer afdrukken, gevolgd door de uitvoeren () methode op de StreamExecutionEnvironment klasse:

upperCase.print (); env.execute ();

Het levert de volgende output op:

1> DIT IS EEN EERSTE ZIN 2> DIT IS EEN TWEEDE ZIN MET EEN WOORD

6.2. Windowing van evenementen

Wanneer u een stroom gebeurtenissen in realtime verwerkt, moet u soms gebeurtenissen groeperen en wat berekeningen toepassen op een venster van die gebeurtenissen.

Stel dat we een stroom gebeurtenissen hebben, waarbij elke gebeurtenis een paar is dat bestaat uit het gebeurtenisnummer en het tijdstempel waarop de gebeurtenis naar ons systeem is verzonden, en dat we gebeurtenissen kunnen tolereren die niet in de juiste volgorde staan, maar alleen als ze niet in orde zijn meer dan twintig seconden te laat.

Laten we voor dit voorbeeld eerst een stream maken die twee gebeurtenissen simuleert die enkele minuten van elkaar verwijderd zijn en een tijdstempelextractor definiëren die onze latentiedrempel specificeert:

SingleOutputStreamOperator windowed = env.fromElements (nieuwe Tuple2 (16, ZonedDateTime.now (). plusMinutes (25) .toInstant (). getEpochSecond ()), nieuwe Tuple2 (15, ZonedDateTime.now (). plusMinutes (2) .toInstant () .getEpochSecond ())) .assignTimestampsAndWatermarks (nieuwe BoundedOutOfOrdernessTimestampExtractor (Time.seconds (20)) {@Override public long extractTimestamp (Tuple2 element) {return element.f1 * 1000; }});

Laten we vervolgens een vensterbewerking definiëren om onze gebeurtenissen in vensters van vijf seconden te groeperen en een transformatie op die gebeurtenissen toe te passen:

SingleOutputStreamOperator gereduceerd = windowed .windowAll (TumblingEventTimeWindows.of (Time.seconds (5))) .maxBy (0, true); gereduceerde afdruk ();

Het krijgt het laatste element van elk venster van vijf seconden, dus het wordt afgedrukt:

1> (15,1491221519)

Houd er rekening mee dat we de tweede gebeurtenis niet zien omdat deze later is aangekomen dan de opgegeven latentiedrempel.

7. Conclusie

In dit artikel hebben we het Apache Flink-framework geïntroduceerd en hebben we gekeken naar enkele transformaties die bij de API worden geleverd.

We hebben een programma voor het tellen van woorden geïmplementeerd met behulp van Flink's vloeiende en functionele DataSet API. Vervolgens keken we naar de DataStream API en implementeerden we een eenvoudige realtime transformatie op een stroom evenementen.

De implementatie van al deze voorbeelden en codefragmenten is te vinden op GitHub - dit is een Maven-project, dus het moet gemakkelijk te importeren en uit te voeren zijn zoals het is.


$config[zx-auto] not found$config[zx-overlay] not found