Een gids voor Apache Crunch

1. Inleiding

In deze tutorial demonstreren we Apache Crunch met een voorbeeld van een gegevensverwerkingsapplicatie. We zullen deze applicatie uitvoeren met behulp van het MapReduce-framework.

We beginnen met een korte bespreking van enkele Apache Crunch-concepten. Dan springen we in een voorbeeld-app. In deze app doen we tekstverwerking:

  • Allereerst lezen we de regels uit een tekstbestand
  • Later zullen we ze in woorden splitsen en enkele veelvoorkomende woorden verwijderen
  • Vervolgens groeperen we de resterende woorden om een ​​lijst met unieke woorden en hun aantal te krijgen
  • Ten slotte schrijven we deze lijst naar een tekstbestand

2. Wat is crunch?

MapReduce is een gedistribueerd, parallel programmeerraamwerk voor het verwerken van grote hoeveelheden gegevens op een cluster van servers. Softwareframeworks zoals Hadoop en Spark implementeren MapReduce.

Crunch biedt een raamwerk voor het schrijven, testen en uitvoeren van MapReduce-pijplijnen in Java. Hier schrijven we de MapReduce-banen niet rechtstreeks. In plaats daarvan definiëren we datapijplijn (d.w.z. de bewerkingen om invoer-, verwerkings- en uitvoerstappen uit te voeren) met behulp van de Crunch-API's. Crunch Planner wijst ze toe aan de MapReduce-taken en voert ze uit wanneer dat nodig is.

Daarom wordt elke Crunch-gegevenspijplijn gecoördineerd door een instantie van de Pijpleiding koppel. Deze interface definieert ook methoden voor het lezen van gegevens in een pijplijn via Bron instanties en het wegschrijven van gegevens uit een pijplijn naar Doelwit gevallen.

We hebben 3 interfaces voor het weergeven van gegevens:

  1. PCollection - een onveranderlijke, gedistribueerde verzameling elementen
  2. PTable<>, V> - een onveranderlijke, gedistribueerde, ongeordende multi-map van sleutels en waarden
  3. PGroupedTable<>, V> - een gedistribueerde, gesorteerde kaart van sleutels van het type K tot een Herhaalbaar V die precies één keer kan worden herhaald

DoFn is de basisklasse voor alle gegevensverwerkingsfuncties. Het komt overeen met Mapper, Verloopstuk en Combiner klassen in MapReduce. We besteden het grootste deel van de ontwikkelingstijd aan het schrijven en testen van logische berekeningen.

Nu we meer bekend zijn met Crunch, gaan we het gebruiken om de voorbeeldtoepassing te bouwen.

3. Opzetten van een Crunch Project

Laten we eerst een Crunch-project opzetten met Maven. We kunnen dit op twee manieren doen:

  1. Voeg de vereiste afhankelijkheden toe in het pom.xml bestand van een bestaand project
  2. Gebruik een archetype om een ​​startproject te genereren

Laten we beide benaderingen snel bekijken.

3.1. Afhankelijkheden van Maven

Laten we, om Crunch aan een bestaand project toe te voegen, de vereiste afhankelijkheden toevoegen in het pom.xml het dossier.

Laten we eerst het crunch-core bibliotheek:

 org.apache.crunch crunch-core 0.15.0 

Laten we vervolgens het hadoop-client bibliotheek om te communiceren met Hadoop. We gebruiken de versie die overeenkomt met de Hadoop-installatie:

 org.apache.hadoop hadoop-client 2.2.0 verstrekt 

We kunnen Maven Central controleren op de nieuwste versies van crunch-core en hadoop-client-bibliotheken.

3.2. Maven-archetype

Een andere benadering is om snel een startproject te genereren met behulp van het Maven-archetype van Crunch:

mvn-archetype: genereer -Dfilter = org.apache.crunch: crunch-archetype 

Wanneer daarom wordt gevraagd door het bovenstaande commando, bieden we de Crunch-versie en de details van het projectartefact.

4. Crunch-pijpleiding instellen

Nadat we het project hebben opgezet, moeten we een Pijpleiding voorwerp. Crunch heeft 3 Pijpleiding implementaties:

  • MRPipeline - wordt uitgevoerd binnen Hadoop MapReduce
  • SparkPipeline - wordt uitgevoerd als een reeks Spark-pijpleidingen
  • MemPipeline - voert in-memory uit op de client en is handig voor het testen van eenheden

Meestal ontwikkelen en testen we met behulp van een instantie van MemPipeline. Later gebruiken we een instantie van MRPipeline of SparkPipeline voor daadwerkelijke uitvoering.

Als we een in-memory pipeline nodig hadden, zouden we de statische methode kunnen gebruiken getInstance om het MemPipeline voorbeeld:

Pijplijnpijplijn = MemPipeline.getInstance ();

Maar laten we voor nu een instantie maken van MRPipeline om de applicatie uit te voeren met Hadoop:

Pijplijnpijplijn = nieuwe MRPipeline (WordCount.class, getConf ());

5. Lees de invoergegevens

Nadat we het pijplijnobject hebben gemaakt, willen we invoergegevens lezen. De Pijpleiding interface biedt een gemakkelijke methode om invoer uit een tekstbestand te lezen, readTextFile (padnaam).

Laten we deze methode noemen om het invoertekstbestand te lezen:

PCollection lines = pipeline.readTextFile (inputPath);

De bovenstaande code leest het tekstbestand als een verzameling van Draad.

Laten we als volgende stap een testcase schrijven voor het lezen van invoer:

@Test openbare leegte gegevenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead () {Pipeline pipeline = MemPipeline.getInstance (); PCollection lines = pipeline.readTextFile (INPUT_FILE_PATH); assertEquals (21, lines.asCollection () .getValue () .size ()); }

In deze test controleren we of we het verwachte aantal regels krijgen bij het lezen van een tekstbestand.

6. Stappen voor gegevensverwerking

Na het lezen van de invoergegevens moeten we deze verwerken. Crunch API bevat een aantal subklassen van DoFn om algemene gegevensverwerkingsscenario's af te handelen:

  • FilterFn - filtert leden van een collectie op basis van een booleaanse voorwaarde
  • MapFn - wijst elk invoerrecord toe aan precies één uitvoerrecord
  • CombineerFn - combineert een aantal waarden tot één waarde
  • Doe mee metFn - voert joins uit zoals inner join, left buiten join, right buiten join en full outer join

Laten we de volgende logica voor gegevensverwerking implementeren door deze klassen te gebruiken:

  1. Splits elke regel in het invoerbestand in woorden
  2. Verwijder de stopwoorden
  3. Tel de unieke woorden

6.1. Splits een regel tekst in woorden

Laten we eerst het Tokenizer klasse om een ​​regel in woorden te splitsen.

We verlengen de DoFn klasse. Deze klasse heeft een abstracte methode genaamd werkwijze. Deze methode verwerkt de invoerrecords van een PCollection en stuurt de output naar een Emitter.

We moeten de splitsingslogica in deze methode implementeren:

openbare klasse Tokenizer breidt DoFn uit {privé statische laatste Splitter SPLITTER = Splitter .onPattern ("\ s +") .omitEmptyStrings (); @Override public void process (String line, Emitter emitter) {for (String word: SPLITTER.split (line)) {emitter.emit (word); }}} 

In de bovenstaande implementatie hebben we de Splitser klasse uit de Guava-bibliotheek om woorden uit een regel te extraheren.

Laten we vervolgens een eenheidstest schrijven voor de Tokenizer klasse:

@RunWith (MockitoJUnitRunner.class) openbare klasse TokenizerUnitTest {@Mock private Emitter-zender; @Test openbare ongeldig gegevenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEmitted () {Tokenizer splitter = nieuwe Tokenizer (); splitter.process ("hallo wereld", zender); verifieer (zender) .emit ("hallo"); verifieer (zender) .emit ("wereld"); verifiërenNoMoreInteractions (zender); }}

De bovenstaande test verifieert dat de juiste woorden worden geretourneerd.

Laten we tot slot de regels splitsen die uit het invoertekstbestand worden gelezen met behulp van deze klasse.

De parallelDo methode van PCollection interface past het gegeven DoFn aan alle elementen en retourneert een nieuw PCollection.

Laten we deze methode aanroepen voor de verzameling regels en een instantie van Tokenizer:

PCollection words = lines.parallelDo (new Tokenizer (), Writables.strings ()); 

Als resultaat krijgen we de lijst met woorden in het invoertekstbestand. We verwijderen de stopwoorden in de volgende stap.

6.2. Verwijder stopwoorden

Laten we, net als bij de vorige stap, een StopWordFilter klasse om stopwoorden uit te filteren.

We zullen echter verlengen FilterFn in plaats van DoFn. FilterFn heeft een abstracte methode genaamd aanvaarden. We moeten de filterlogica in deze methode implementeren:

openbare klasse StopWordFilter breidt FilterFn {// Engelse stopwoorden uit, geleend van Lucene. private static final Set STOP_WORDS = ImmutableSet .copyOf (new String [] {"a", "en", "are", "as", "at", "be", "but", "by", "for" , "if", "in", "into", "is", "it", "no", "not", "of", "on", "of", "s", "such", " t "," dat "," de "," hun "," dan "," daar "," deze "," zij "," dit "," naar "," was "," zal "," met " }); @Override public boolean accept (String word) {return! STOP_WORDS.contains (word); }}

Laten we vervolgens de unit-test schrijven voor StopWordFilter klasse:

openbare klasse StopWordFilterUnitTest {@Test openbare leegte gegevenFilter_whenStopWordPassed_thenFalseReturned () {FilterFn filter = nieuwe StopWordFilter (); assertFalse (filter.accept ("de")); assertFalse (filter.accept ("a")); } @Test openbare ongeldig gegevenFilter_whenNonStopWordPassed_thenTrueReturned () {FilterFn filter = nieuw StopWordFilter (); assertTrue (filter.accept ("Hallo")); assertTrue (filter.accept ("Wereld")); } @Test openbare leegte gegevenWordCollection_whenFiltered_thenStopWordsRemoved () {PCollection words = MemPipeline .collectionOf ("Dit", "is", "a", "test", "zin"); PCollection noStopWords = words.filter (nieuw StopWordFilter ()); assertEquals (ImmutableList.of ("This", "test", "zin"), Lists.newArrayList (noStopWords.materialize ())); }}

Deze test controleert of de filterlogica correct wordt uitgevoerd.

Laten we tot slot gebruiken StopWordFilter om de lijst met woorden te filteren die in de vorige stap zijn gegenereerd. De filter methode van PCollection interface past het gegeven FilterFn aan alle elementen en retourneert een nieuw PCollection.

Laten we deze methode aanroepen voor de woordenverzameling en een instantie van doorgeven StopWordFilter:

PCollection noStopWords = words.filter (nieuw StopWordFilter ());

Als resultaat krijgen we de gefilterde verzameling woorden.

6.3. Tel unieke woorden

Nadat we de gefilterde verzameling woorden hebben opgehaald, willen we tellen hoe vaak elk woord voorkomt. PCollection interface heeft een aantal methoden om algemene aggregaties uit te voeren:

  • min - geeft het minimumelement van de verzameling terug
  • max. hoogte - geeft het maximale element van de verzameling terug
  • lengte - geeft het aantal elementen in de verzameling terug
  • tellen - geeft een PTable dat de telling van elk uniek element van de collectie bevat

Laten we de tellen methode om de unieke woorden samen met hun tellingen te krijgen:

// De telmethode past een reeks Crunch-primitieven toe en retourneert // een kaart van de unieke woorden in de invoer PCollection naar hun tellingen. PTable counts = noStopWords.count ();

7. Specificeer Uitvoer

Als resultaat van de vorige stappen hebben we een tabel met woorden en hun tellingen. We willen dit resultaat naar een tekstbestand schrijven. De Pijpleiding interface biedt gemakkelijke methoden om uitvoer te schrijven:

ongeldig schrijven (PCollection-verzameling, Target-doel); void write (PCollection-verzameling, Target-doel, Target.WriteMode writeMode); void writeTextFile (PCollection-verzameling, String pathName);

Laten we daarom de writeTextFile methode:

pipeline.writeTextFile (counts, outputPath); 

8. Beheer de uitvoering van de pijplijn

Alle stappen tot nu toe hebben zojuist de datapijplijn gedefinieerd. Er is geen invoer gelezen of verwerkt. Dit is zo omdat Crunch maakt gebruik van een lui executiemodel.

Het voert de MapReduce-taken niet uit totdat een methode die de taakplanning en -uitvoering regelt, wordt aangeroepen op de Pipeline-interface:

  • rennen - stelt een uitvoeringsplan op om de vereiste outputs te creëren en voert deze vervolgens synchroon uit
  • gedaan - voert alle resterende taken uit die nodig zijn om output te genereren en ruimt vervolgens alle tussenliggende databestanden op
  • runAsync - vergelijkbaar met de run-methode, maar wordt op een niet-blokkerende manier uitgevoerd

Laten we daarom de gedaan methode om de pijplijn uit te voeren als MapReduce-taken:

PipelineResult resultaat = pipeline.done (); 

De bovenstaande instructie voert de MapReduce-taken uit om invoer te lezen, deze te verwerken en het resultaat naar de uitvoermap te schrijven.

9. De pijpleiding samenvoegen

Tot dusver hebben we de logica ontwikkeld en getest om invoergegevens te lezen, te verwerken en naar het uitvoerbestand te schrijven.

Laten we ze vervolgens samenvoegen om de volledige datapijplijn te bouwen:

public int run (String [] args) gooit Uitzondering {String inputPath = args [0]; String outputPath = args [1]; // Maak een object om het maken en uitvoeren van pijpleidingen te coördineren. Pijplijnpijplijn = nieuwe MRPipeline (WordCount.class, getConf ()); // Verwijs naar een bepaald tekstbestand als een verzameling strings. PCollection lines = pipeline.readTextFile (inputPath); // Definieer een functie die elke regel in een PCollection of Strings opsplitst in // een PCollection die bestaat uit de afzonderlijke woorden in het bestand. // Het tweede argument stelt het serialiseringsformaat in. PCollection words = lines.parallelDo (new Tokenizer (), Writables.strings ()); // Neem de verzameling woorden en verwijder bekende stopwoorden. PCollection noStopWords = words.filter (nieuw StopWordFilter ()); // De telmethode past een reeks Crunch-primitieven toe en retourneert // een kaart van de unieke woorden in de invoer PCollection naar hun tellingen. PTable counts = noStopWords.count (); // Geef de pijplijn opdracht om de resulterende tellingen naar een tekstbestand te schrijven. pipeline.writeTextFile (counts, outputPath); // Voer de pijplijn uit als een MapReduce. PipelineResult resultaat = pipeline.done (); return resultaat.succeeded ()? 0: 1; }

10. Startconfiguratie Hadoop

De datapijplijn is dus klaar.

We hebben de code echter nodig om het te starten. Laten we daarom het hoofd methode om de applicatie te starten:

openbare klasse WordCount breidt Geconfigureerde implementaties uit Tool {openbare statische leegte hoofd (String [] args) gooit Uitzondering {ToolRunner.run (nieuwe configuratie (), nieuwe WordCount (), args); }

ToolRunner.run parseert de Hadoop-configuratie vanaf de opdrachtregel en voert de MapReduce-taak uit.

11. Start de applicatie

De volledige applicatie is nu klaar. Laten we de volgende opdracht uitvoeren om het te bouwen:

mvn-pakket 

Als resultaat van het bovenstaande commando krijgen we de verpakte applicatie en een speciale job jar in de doeldirectory.

Laten we deze job jar gebruiken om de applicatie op Hadoop uit te voeren:

hadoop jar-doel / crunch-1.0-SNAPSHOT-job.jar 

De applicatie leest het invoerbestand en schrijft het resultaat naar het uitvoerbestand. Het uitvoerbestand bevat unieke woorden samen met hun tellingen, vergelijkbaar met het volgende:

[Toevoegen, 1] [Toegevoegd, 1] [Bewondering, 1] [Toegegeven, 1] [Toelage, 1]

Naast Hadoop kunnen we de applicatie binnen IDE draaien, als stand-alone applicatie of als unit tests.

12. Conclusie

In deze zelfstudie hebben we een gegevensverwerkingsapplicatie gemaakt die draait op MapReduce. Apache Crunch maakt het gemakkelijk om MapReduce-pipelines in Java te schrijven, te testen en uit te voeren.

Zoals gewoonlijk is de volledige broncode te vinden op Github.