Gelijktijdigheid met LMAX Disruptor - een inleiding

1. Overzicht

Dit artikel introduceert de LMAX Disruptor en bespreekt hoe het helpt om softwareconcurrency te bereiken met een lage latentie. We zullen ook een basisgebruik van de Disruptor-bibliotheek zien.

2. Wat is een disruptor?

Disruptor is een open source Java-bibliotheek geschreven door LMAX. Het is een gelijktijdig programmeerraamwerk voor de verwerking van een groot aantal transacties, met een lage latentie (en zonder de complexiteit van gelijktijdige code). De prestatie-optimalisatie wordt bereikt door een software-ontwerp dat gebruik maakt van de efficiëntie van onderliggende hardware.

2.1. Mechanische sympathie

Laten we beginnen met het kernconcept van mechanische sympathie - dat gaat erom te begrijpen hoe de onderliggende hardware werkt en programmeert op een manier die het beste met die hardware werkt.

Laten we bijvoorbeeld eens kijken hoe de organisatie van de CPU en het geheugen de softwareprestaties kan beïnvloeden. De CPU heeft verschillende cachelagen tussen de CPU en het hoofdgeheugen. Wanneer de CPU een bewerking uitvoert, zoekt deze eerst in L1 naar de gegevens, dan L2, dan L3 en tenslotte het hoofdgeheugen. Hoe verder het moet, hoe langer de operatie zal duren.

Als dezelfde bewerking meerdere keren op een gegevensstuk wordt uitgevoerd (bijvoorbeeld een lusteller), is het logisch om die gegevens op een plaats zeer dicht bij de CPU te laden.

Enkele indicatieve cijfers voor de kosten van cache-missers:

Latentie van CPU naarCPU-cycliTijd
Hoofd geheugenMeerdere~ 60-80 ns
L3-cache~ 40-45 cycli~ 15 ns
L2-cache~ 10 cycli~ 3 ns
L1-cache~ 3-4 cycli~ 1 ns
Registreren1 cyclusHeel erg snel

2.2. Waarom geen wachtrijen

Wachtrij-implementaties hebben de neiging om schrijfconflicten te hebben over de variabelen head, tail en size. Wachtrijen zijn doorgaans altijd bijna vol of bijna leeg vanwege de snelheidsverschillen tussen consumenten en producenten. Ze opereren zeer zelden in een evenwichtige middenweg waar de snelheid van productie en consumptie gelijkmatig op elkaar zijn afgestemd.

Om met de schrijfconflicten om te gaan, gebruikt een wachtrij vaak vergrendelingen, die een contextwisseling naar de kernel kunnen veroorzaken. Wanneer dit gebeurt, verliest de betrokken processor waarschijnlijk de gegevens in zijn caches.

Om het beste cachegedrag te krijgen, moet het ontwerp slechts één kern hebben die naar een willekeurige geheugenlocatie kan worden geschreven (meerdere lezers zijn prima, aangezien processors vaak speciale hogesnelheidsverbindingen tussen hun caches gebruiken). Wachtrijen voldoen niet aan het principe van één schrijver.

Als twee afzonderlijke threads naar twee verschillende waarden schrijven, maakt elke kern de cacheregel van de andere ongeldig (gegevens worden overgedragen tussen het hoofdgeheugen en de cache in blokken van vaste grootte, cacheregels genoemd). Dat is een schrijfconflict tussen de twee threads, ook al schrijven ze naar twee verschillende variabelen. Dit wordt vals delen genoemd, omdat elke keer dat de kop wordt benaderd, ook de staart wordt benaderd, en vice versa.

2.3. Hoe de disruptor werkt

Disruptor heeft een array-gebaseerde circulaire datastructuur (ringbuffer). Het is een array met een pointer naar het volgende beschikbare slot. Het is gevuld met vooraf toegewezen transferobjecten. Producenten en consumenten schrijven en lezen gegevens naar de ring zonder vergrendeling of twist.

In een Disruptor worden alle gebeurtenissen naar alle consumenten gepubliceerd (multicast), voor parallel gebruik via afzonderlijke downstream-wachtrijen. Vanwege parallelle verwerking door consumenten is het noodzakelijk om de afhankelijkheden tussen de consumenten te coördineren (afhankelijkheidsgrafiek).

Producenten en consumenten hebben een volgordeteller om aan te geven aan welk slot in de buffer momenteel wordt gewerkt. Elke producent / consument kan zijn eigen sequentieteller schrijven, maar kan de sequentietellers van anderen lezen. Producenten en consumenten lezen de tellers om er zeker van te zijn dat de gleuf waarop ze willen schrijven zonder sloten beschikbaar is.

3. De Disruptor-bibliotheek gebruiken

3.1. Afhankelijkheid van Maven

Laten we beginnen met het toevoegen van Disruptor-bibliotheekafhankelijkheid in pom.xml:

 com.lmax disruptor 3.3.6 

De laatste versie van de afhankelijkheid kan hier worden gecontroleerd.

3.2. Een evenement definiëren

Laten we de gebeurtenis definiëren die de gegevens draagt:

openbare statische klasse ValueEvent {privé int waarde; openbare definitieve statische EventFactory EVENT_FACTORY = () -> nieuwe ValueEvent (); // standaard getters en setters} 

De EventFactory laat de Disruptor de gebeurtenissen vooraf toewijzen.

3.3. Klant

Consumenten lezen gegevens uit de ringbuffer. Laten we een consument definiëren die de gebeurtenissen zal afhandelen:

openbare klasse SingleEventPrintConsumer {... openbare EventHandler [] getEventHandler () {EventHandler eventHandler = (gebeurtenis, reeks, endOfBatch) -> print (gebeurtenis.getValue (), reeks); retourneer nieuwe EventHandler [] {eventHandler}; } private void print (int id, lange sequentie-id) {logger.info ("Id is" + id + "reeks-id die werd gebruikt is" + sequentie-id); }}

In ons voorbeeld drukt de consument gewoon af naar een logboek.

3.4. De disruptor construeren

Construeer de disruptor:

ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE; WaitStrategy waitStrategy = nieuwe BusySpinWaitStrategy (); Disruptor disruptor = nieuwe Disruptor (ValueEvent.EVENT_FACTORY, 16, threadFactory, ProducerType.SINGLE, waitStrategy); 

In de constructor van Disruptor wordt het volgende gedefinieerd:

  • Event Factory - Verantwoordelijk voor het genereren van objecten die tijdens de initialisatie in de ringbuffer worden opgeslagen
  • De grootte van de ringbuffer - We hebben 16 gedefinieerd als de grootte van de ringbuffer. Het moet een macht van 2 zijn, anders genereert het een uitzondering tijdens het initialiseren. Dit is belangrijk omdat het gemakkelijk is om de meeste bewerkingen uit te voeren met behulp van logische binaire operatoren, bijv. mod operatie
  • Thread Factory - Fabriek om threads te maken voor eventprocessors
  • Producer Type - Geeft aan of we één of meerdere producenten hebben
  • Wachtstrategie - Bepaalt hoe we willen omgaan met langzame abonnees die het tempo van de producent niet bijhouden

Verbind de consumentenhandler:

disruptor.handleEventsWith (getEventHandler ()); 

Het is mogelijk om meerdere consumenten te voorzien van Disruptor om de door de producent geproduceerde data te verwerken. In het bovenstaande voorbeeld hebben we slechts één consument, ook wel event handler genoemd.

3.5. De disruptor starten

Om de Disruptor te starten:

RingBuffer ringBuffer = disruptor.start ();

3.6. Evenementen produceren en publiceren

Producenten plaatsen de gegevens in een volgorde in de ringbuffer. Producenten moeten op de hoogte zijn van het volgende beschikbare slot, zodat ze gegevens die nog niet worden gebruikt, niet overschrijven.

Gebruik de RingBuffer van Disruptor voor publicatie:

voor (int eventCount = 0; eventCount <32; eventCount ++) {long sequenceId = ringBuffer.next (); ValueEvent valueEvent = ringBuffer.get (sequenceId); valueEvent.setValue (eventCount); ringBuffer.publish (sequenceId); } 

Hier produceert en publiceert de producent items op volgorde. Het is belangrijk op te merken dat Disruptor vergelijkbaar werkt met het 2-fasen commit-protocol. Het leest een nieuw sequentie-id en publiceert. De volgende keer zou het moeten zijn sequentie-id +1 als de volgende sequentie-id.

4. Conclusie

In deze tutorial hebben we gezien wat een disruptor is en hoe deze gelijktijdigheid bereikt met een lage latentie. We hebben het concept van mechanische sympathie gezien en hoe het kan worden uitgebuit om een ​​lage latentie te bereiken. We hebben toen een voorbeeld gezien met de Disruptor-bibliotheek.

De voorbeeldcode is te vinden in het GitHub-project - dit is een op Maven gebaseerd project, dus het zou gemakkelijk moeten kunnen worden geïmporteerd en uitgevoerd zoals het is.


$config[zx-auto] not found$config[zx-overlay] not found