12 oktober 2022

Serverless data pipelines: ETL-workflow met Step Functions en Athena

Deze blog is deel 3 van een miniseries rond het analyseren van het verkeer in Vlaanderen en het benutten van de kracht van cloud componenten! Deze miniserie werd geschreven in samenwerking met Nick Van Hoof van TransIT.

Doel

Deze blog wil het gebruik onderzoeken van de AWS Glue service in combinatie met de AWS Athena service om ruwe streaming van data events te herverdelen. We hebben deze gebeurtenissen eerder neergezet op een Amazon S3 bucket verdeeld volgens de verwerkingstijd op Kinesis.

Wij zouden deze gebeurtenissen echter graag herverdeeld zien op basis van gebeurtenistijdstempels om een zinvolle batchanalyse mogelijk te maken.

Wat is AWS Glue?

AWS Glue (geïntroduceerd in augustus 2017) is een serverloze Extract, Transform and Load (ETL) cloud-geoptimaliseerde dienst. Deze dienst kan worden gebruikt om ETL-processen te automatiseren die datasets organiseren, lokaliseren, verplaatsen en transformeren die zijn opgeslagen binnen een verscheidenheid aan gegevensbronnen, zodat gebruikers deze datasets efficiënt kunnen voorbereiden op gegevensanalyse. Deze gegevensbronnen kunnen bijvoorbeeld data lakes zijn in Amazon Simple Storage Service (S3), data warehouses in Amazon Redshift of andere databases die deel uitmaken van de Amazon Relational Database Service. Andere soorten databases zoals MySQL, Oracle, Microsoft SQL Server en PostgreSQL worden ook ondersteund in AWS Glue.

Aangezien AWS Glue een serverloze dienst is, hoeven gebruikers geen servers beschikbaar te stellen, te configureren en op te starten, en hoeven zij geen tijd te besteden aan het beheer van servers.

AWS Glue Catalogue

Het hart van AWS Glue is de Catalogus, een gecentraliseerde metadata opslagplaats voor alle data-assets. Alle relevante informatie over data-assets (zoals tabel definities, data locaties, bestandstypen, schema informatie) wordt opgeslagen in dit archief.

Om deze informatie in de catalogus te krijgen, gebruikt AWS Glue crawlers. Deze crawlers kunnen data stores scannen en automatisch het schema afleiden van alle gestructureerde en semi-gestructureerde gegevens die zich in de data stores kunnen bevinden. Bovendien kan het ook:

  • bestandstypen ontdekken
  • het schema opvragen
  • automatisch datasets ontdekken
  • al deze informatie opslaan in de gegevenscatalogus

Wanneer de gegevens zijn gecatalogiseerd, zijn ze toegankelijk en kunnen er ETL-taken op worden uitgevoerd. AWS Glue biedt de mogelijkheid om automatisch ETL-scripts te genereren, die kunnen worden gebruikt als uitgangspunt. Dat betekent dat gebruikers niet vanaf nul hoeven te beginnen bij het ontwikkelen van ETL-processen. In deze blog zullen we ons echter richten op het gebruik van een alternatief voor de AWS Glue ETL jobs. We zullen gebruik maken van SQL-queries die in AWS Athena zijn geïmplementeerd om het ETL-proces uit te voeren.

Voor de implementatie en orkestratie van complexere ETL processen biedt AWS Glue gebruikers de mogelijkheid om workflows te gebruiken. Deze kunnen worden gebruikt voor de coördinatie van complexere ETL-activiteiten waarbij meerdere crawlers, jobs en triggers betrokken zijn. Wij zullen echter een alternatief gebruiken voor deze AWS Glue-workflows, namelijk een toestandsmachine met stapfuncties om ons ETL-proces te coördineren.

Nogmaals, AWS Glue heeft 3 hoofdcomponenten:

  • De Gegevenscatalogus: een gecentraliseerde metadata opslagplaats, waar alle metadata informatie over uw gegevens is opgeslagen. Dit omvat informatie over tabellen (die de metadata representaties of schema’s van de opgeslagen datasets definiëren), schema’s en partities. De metadata-eigenschappen worden binnen de gegevensbronnen afgeleid door crawlers, die er ook verbindingen mee leggen.
  • De Apache Spark ETL-engine: zodra metadata beschikbaar zijn in de gegevenscatalogus en bron- en doelgegevensopslagplaatsen kunnen worden geselecteerd uit de catalogus, maakt de Apache Spark ETL-engine het mogelijk ETL-banen te creëren waarmee de gegevens kunnen worden verwerkt.
  • De Scheduler: hiermee kunnen gebruikers een schema opstellen voor hun AWS ETL jobs. Deze planning kan worden gekoppeld aan een specifieke trigger (bv. de voltooiing van een andere ETL job), een bepaald tijdstip, of een job kan worden ingesteld om on-demand te draaien.

AWS state machine

Zoals hierboven vermeld, zijn onze doelstellingen het creëren van een ELT-pipeline die de gegevens die we al in een S3-data lake hebben neergezet, opnieuw zal herverdelen. Deze herverdeling zal ervoor zorgen dat de gegevens worden verdeeld op basis van een tijdstempel binnen de gebeurtenis. Dit past bij onze analysedoelstelling, in tegenstelling tot herverdeling op basis van het tijdstip waarop het record in Kinesis Firehose kwam.

Om dit te bereiken, bouwen we een ETL-taak om de bestaande gegevens uit S3 te halen, deze te transformeren door nieuwe kolommen te maken op basis van het tijdstempel van de gebeurtenis binnen de gegevens en deze te laten landen in nieuwe delen.

De ETL-taken bereikten het volgende:

  • Eerst moesten we uitzoeken hoe de huidige gegevens eruit zagen. Met andere woorden, we moesten een schema registreren voor onze brongegevens (d.w.z. de gegevens herverdeeld volgens Kinesis Firehose timestamps, die we in S3 hebben neergezet) in de Glue-catalogus.
  • Om dit schema te bepalen, moesten we een crawler draaien, die de bestaande gegevens verkende en dit gegevensformaat bepaalde. Het uitvoeren van de crawler creëerde een schema voor de brongegevens en registreerde dat schema met de Glue-catalogus.
  • Vervolgens moesten we een ETL-proces uitvoeren om de gegevens om te zetten in een nieuwe verdeling. Zoals reeds vermeld in deze blog, zullen we ons concentreren op het gebruik van Athena om de herverdeling tot stand te brengen. Lees onze volgende blog om te zien hoe het met AWS Glue wordt gedaan.
  • Nadat de gegevens waren herverdeeld, wilden we natuurlijk de gegevens kunnen bevragen voor analysedoeleinden. Daarvoor moesten we opnieuw een crawler draaien om vast te stellen hoe de herverdeelde gegevens eruit zagen. De crawler registreerde vervolgens het nieuwe schema in de Glue-catalogus.

Dit proces continu uitvoeren zou niet erg efficiënt zijn. Anderzijds zou het niet frequent genoeg uitvoeren van dit proces (bijvoorbeeld slechts eenmaal per week) betekenen dat we te lang zouden moeten wachten om over nieuwe gegevens te kunnen rapporteren.

We hebben een proces van een paar gerichte stappen (crawler uitvoeren, schema registreren, ETL-taak uitvoeren, crawler uitvoeren) dat we regelmatig moeten uitvoeren. Daarom zou het ideaal zijn voor orkestratie met behulp van AWS Step Functions.

In AWS Step Functions definieer je je workflows in de Amazon States Language. De Step Functions console biedt een grafische voorstelling van die toestandsmachine om jouw applicatielogica te helpen visualiseren. States zijn elementen in de state machine. Een state wordt aangeduid met zijn naam, die elke willekeurige string kan zijn, maar die uniek moet zijn binnen het bereik van de gehele state machine.

Hier is een overzicht van hoe onze state machine eruit zag. Zoals je ziet, hebben we een eindig aantal stappen die na elkaar worden uitgevoerd.

ASL — Amazon State Language

ASL is een op JSON gebaseerde taal om de stappen van je state machine te definiëren. Later zullen we dieper ingaan op de logica die in elke stap wordt uitgevoerd. Laten we eerst eens kijken naar de ASL die deze stappen definieert.

AWS Sam en het Serverless Framework staan beide toe dat je de ASL specificeert als YAML. Wij vonden dat het gebruik van YAML de leesbaarheid verbeterde. Daarom hebben we onze ASL als volgt gedefinieerd (het volledige ASL-schema is hier beschikbaar):

Deze ASL beschrijft dezelfde workflow als de afbeelding hierboven. Het is alleen veel moeilijker leesbaar.

Merk op dat we inderdaad de stappen hebben: crawler uitvoeren, schema registreren, ETL-taak uitvoeren en crawler opnieuw uitvoeren. Maar we hebben ook ‘wacht’-stappen waarbij we periodiek controleren of een crawler klaar is met zijn werk. En we hebben failed states die we gebruiken om te reageren op mogelijke fouten in ons proces.

Deze blog gaat over data en niet over hoe je state machines bouwt. Maar mocht je meer willen weten over AWS State Machines en Step Functions, klik dan hier.

Logica van Step functions

Nu is het tijd om wat dieper in te gaan op wat er tijdens elke stap gebeurt.

Tip: kies beschrijvende namen voor je stappen, zodat het meteen duidelijk is wat er in een bepaalde stap gebeurt.

Hier zijn een paar van onze stappen (nogmaals, bekijk het archief als je alle logica wilt zien):

RunDataCrawler: Dit triggert het uitvoeren van een Lambda Function die op zijn beurt een Glue Crawler triggert.

GetCrawlerState: We controleren periodiek de status van de lopende crawler. Aangezien er (nog) geen directe integratie is voor crawler events met step functies, moeten we dit controleren met behulp van een Lambda functie.

Dit geeft de status van de crawler terug, en vertelt ons dus of de crawler klaar is of niet. Zoals je kunt zien in het diagram en de ASL, zullen we deze status gebruiken om een keuze te maken wat de volgende stap is om uit te voeren.

RunETLInsertAthena: Als de crawler klaar is, is het tijd om de ETL job uit te voeren. Dit gebeurt met behulp van AWS Athena. Lees meer over het hoe en wat van Athena in de volgende paragraaf.

Het is echter de taak van een Lambda-functie om de ETL-taak in Athena te starten en te controleren wanneer deze klaar is.

ETL job

De handler van de lambda-functie die de ETL-taak start ziet er als volgt uit.

  • Definieer de queries, waarbij je aangeeft welk gegevensbereik je opnieuw wilt verdelen.
  • Geef deze queries door aan Athena.
  • Geef de Athena-uitvoerings-ID terug. Een ID die we kunnen gebruiken om de status van de ETL job met Athena te controleren.

De volgende functie controleert of de ETL taak klaar is, door gebruik te maken van de uitvoerings-ID die werd teruggegeven van de laatste stap.

De QueryExecutionIds van de vorige stap wordt nu gebruikt om de status van een specifieke query te krijgen.

We zagen de stappen die nodig zijn in de workflow om onze gegevens te herpartitioneren. Deze herverdeling werd bereikt met Athena. Hieronder gaan we hier dieper op in.

Zoals gezegd hebben we AWS Athena gebruikt om de ETL job uit te voeren, in plaats van een Glue ETL job met een automatisch gegenereerd script.

Het bevragen van datasets en gegevensbronnen die in de Glue Data Catalogue zijn geregistreerd, wordt door AWS Athena ondersteund. Dit betekent dat Athena de Glue Data Catalogue gebruikt als een gecentraliseerde locatie waar tabelmetadata worden opgeslagen en opgehaald. Deze metadata instrueren de Athena query engine waar het data moet lezen en op welke manier het de data moet lezen, en geven aanvullende informatie die nodig is om de gegevens te verwerken. Het is bijvoorbeeld mogelijk een INSERT INTO DML query uit te voeren naar een brontabel die in de gegevenscatalogus is geregistreerd. Deze query zal rijen invoegen in de bestemmingstabel op basis van een SELECT statement tegen de brontabel.

Hieronder tonen we een deel van onze volledige INSERT INTO DML query, met aanvullende subqueries waarin gegevens uit de brontabel stap voor stap worden omgezet zodat ze opnieuw kunnen worden herverdeeld en gebruikt voor analyse.

De (deel van de) INSERT INTO DML query die direct hierboven is weergegeven, voerde het volgende uit:

  • Definitieve selectie van relevante informatie voor de gegevensanalyse. Niet alle informatie in de ruwe gegevens was nuttig voor de analyse, en sommige gegevens waren mogelijk ongeldig (bv. als gevolg van slecht functionerende meetapparatuur).
  • De berekening van geaggregeerde waarden en afgeleide velden voor analysedoeleinden. Bijvoorbeeld de berekening van de gemiddelde snelheid en de implementatie van de logica van wat wij als een file beschouwen.
  • De herverdeling van de gegevens naar tijd van de gebeurtenis (d.w.z. de waarden voor jaar, maand en dag van de oorspronkelijke timestamp). Herverdeling wordt bereikt door eerst een doeltabel te definiëren in de AWS Glue-catalogus waarin jaar, maand, dag en uur bigint-velden werden aangewezen als verdeelsleutels. Vervolgens hebben wij de jaar-, maand- en dagwaarden van de oorspronkelijke timestamp geëxtraheerd (d.w.z. het tijdstempel van de meting zelf, niet het tijdstempel van de verwerkingstijd op Kinesis) en ten slotte zijn deze waarden toegewezen aan de bigint-velden jaar, maand, dag en uur, die wij als verdeelsleutels in de doeltabel hebben aangewezen.

De aanvullende subqueries voerden het volgende uit:

  • De selectie en transformatie (waar nodig) van relevante informatie uit de brongegevens voor de berekening van geaggregeerde waarden en afgeleide velden.
  • De selectie van een subset van locaties uit het totale aantal van 4600 meetlocaties en de natuurlijke hergroepering van deze locaties (bv. groepering van reeksen rijstroken op dezelfde weg).
  • De opsplitsing van queries in gegevensbereiken van (maximaal) 4 dagen (d.w.z. een bereik tussen een begindag en een einddag). Omdat Amazon een limiet oplegt van 100 gelijktijdig geschreven partities met een INSERT INTO statement, hebben we een Lambda-functie geïmplementeerd om meerdere gelijktijdige queries uit te voeren. Het splitsen van de query’s beperkt het aantal gelijktijdig geschreven partities tot 96 uur.

Voor een link naar de volledige INSERT INTO DML query, zie Github.
Voor een link naar de uitleg van de velddefinities, zie deze link.

Conclusie

Wanneer AWS Athena wordt gebruikt om de ETL job uit te voeren, in tegenstelling tot Glue ETL jobs, is er geen functionaliteit om automatisch het volgende proces in een workflow te starten. Daarom hebben we ook een pollingmechanisme geïmplementeerd om periodiek te controleren of de crawler/ETL query is voltooid.

Zoals reeds vermeld, hadden we ook Glue ETL jobs kunnen gebruiken voor de implementatie van de ETL workflow. Deze ETL jobs handelen alle verwerking en herverdeling van de gegevens af via python scripts met Spark.

In onze volgende blog in deze zullen we de praktische uitvoering van deze alternatieve oplossing onderzoeken en de voor- en nadelen vergelijken van het gebruik van Glue ETL jobs vs. AWS Athena ETL queries voor de uitvoering van ETL workflows.

Leer meer

Deze blog is deel 3 van een miniseries rond het analyseren van het verkeer in Vlaanderen en het benutten van de kracht van cloud componenten.

Facebook
Twitter
LinkedIn

Schrijf je in op de nieuwsbrief om nooit een blog te missen!