Management Summary

In diesem Beitrag sollen fünf Technologien vorgestellt werden, die jeder Data Engineer für seine tägliche Arbeit kennen und beherrschen sollte. Aufgeführt werden Spark als Data Processing Tool im Big Data Umfeld, Kafka als Streaming Platform, Airflow und Serverless-Architektur zur Koordinierung bzw. Orchestrierung. Zuvor werden Stellenwert und Rolle von SQL (Structured Query Language) und relationalen Datenbanken besprochen.

Trotz des stetigen Wandels hat sich SQL eine Sonderposition herausgearbeitet und findet sich auch in neuen Entwicklungen als Schnittstelle wieder. Mitnichten ist es aber so, dass wie vor Jahrzehnten ein stabiles Wissen von dieser Query-Sprache ausreicht, um das Gros der Datenarbeiten bewältigen zu können. Dafür ist zum einen die Datenlandschaft, zum anderen die verarbeiteten Daten mittlerweile zu heterogen geworden. Darüber hinaus reicht es in vielen Fällen nicht mehr aus, Daten nach einem zeitlich fixierten Schema zu aktualisieren oder die zu prozessierenden Datenmengen sind schlicht nicht mehr von klassischen ETL (Extract-Transform-Load) Prozessen in Datenbanken zu schultern.

Data Streaming Plattformen wie Apache Kafka sind eine Antwort auf die Real-time-Problematik, dabei hat es die Möglichkeit mit den Anforderungen zu skalieren und für Ausfallsicherheit zu sorgen. Bezüglich der Möglichkeit des Prozessierens enorm großer Datenmengen ist Spark das Tool der Wahl. Ebenso wie Kafka kann es mit den Anforderungen skalieren. Ferner bietet es eine großzügige Auswahl an Implementierungssprachen an.

Auf die Heterogenität der eingesetzten Data Stores und Prozessierungsframeworks kann man mit zwei Ansätzen antworten: eine zentrale (globale) Orchestrierung, die die verschiedenen Komponenten bedienen kann, wie es Airflow auf beeindruckende Weise tut; eine dezentrale (lokale) Lösung, die nur auf spezifische Signale einzelner Komponenten reagiert, wie es der Serverless-Ansatz vorsieht. Alle gängigen Cloud-Anbieter können einen solchen Service vorweisen. Zusammenfassend findet man hier also eine Mischung an Tools als Antwort auf die aktuellen Fragen im Data Engineering.

Intro

Die Datenlandschaft war in den letzten Jahrzehnten geprägt von einer zunehmenden Dynamik. Galten bis zum Jahrtausendwechsel Data Warehouses und relationale Datenbank-Managementsysteme (RDBMS) noch als der Goldstandard für die Datenhaltung und -aufbereitung, so durchbrach vor allem die Verbreitung und die Dynamik des Internets diese Alleinstellung. Zum einen vervielfachte sich damit die Datenmenge, zum anderen galt nun das Interesse vermehrt auch semi-strukturierten und unstrukturierteren Daten. Man war also im Big Data Zeitalter angekommen. Der nächste Schub wurde durch die von mobilen Endgeräten und Sensoren generierten Datenflüsse (Stichwort: Internet of Things) verursacht. Es ging nun nicht mehr nur darum, den erneut enorm gestiegenen Datenaufwand zu bewältigen, sondern Ereignisse aus vielen Datenpunkten in Echtzeit zu erkennen und darauf reagieren zu können. Schlussendlich hat das Cloud Computing zusätzliches Potential für Datenanalysen und -verarbeitung geborgen, indem Infrastruktur nun in vielerlei Hinsicht günstig zur Verfügung steht und diverse Technologien mit geringer Einstiegsschwelle genutzt werden können. Aus der anfänglich monolithischen Welt der Datenbanken ist eine heterogene und dynamische Datenlandschaft geworden, die Data Engineers benötigt, um den Anforderungen gerecht zu werden.

Anhand von fünf Technologien soll mit diesem Artikel eine Orientierung zur Lösung aktueller Problemstellungen im Bereich „Data Engineering“ gegeben werden. In den Gebieten „Batch Processing“ und „Streaming“ werden die etablierten Technologien Apache Spark und Apache Kafka vorgestellt. Mit Apache Airflow und der Serverless-Technologie Lambda werden zwei unterschiedliche Konzepte präsentiert, um Abläufe zu steuern. Zuletzt und entgegen der Stoßrichtung der Einleitung findet sich auch ein Kapitel über SQL und relationale Datenbanken.

* Unter Data Engineering ist hier ein sehr enger Begriff verstanden, nämlich die Tätigkeit, eine kontinuierliche Datenversorgung und -aufbereitung herzustellen.

SQL und relationale Datenbanksysteme

SQL ist eine Abkürzung für structured query language und ist fester Bestandteil von relationalen Datenbanksystemen. Entgegen des allgemeinen Tones und trotz diverser (Weiter-)Entwicklungen von NoSQL[1]-Lösungen, spielen SQL-Systeme weiterhin eine tragende Rolle in der modernen Datenarchitektur. Das zeigt auch eine Umfrage aus dem Jahr 2019, die sich folgendermaßen zusammenfassen lässt: Der Einsatz von NoSQL-Systemen ist in den allermeisten Fällen keine Abkehr, sondern ein Zusatz zu bestehenden Systemen, die auf SQL setzen.

Auch in modernen Frameworks spiegelt sich die Popularität von SQL wider: So zeugen die Entwicklung in Spark und Kafka (siehe beide in den nachstehenden Absätzen) vom Stellenwert, der SQL eingeräumt wird. Schließlich gibt es Tools, die SQL-Queries für NoSQL-Systeme kompatibel machen, hier beispielhaft zu nennen Apache Drill.

Ein Grund für die Popularität von SQL, neben seinen Vorzügen als einfach und semantisch an die englische Sprache angelehnt, liegt in seiner weiten Verbreitung. Auch außerhalb des Datenbankmilieus finden sich Analysten und Beschäftigte im Reporting, die SQL beherrschen.

SELECT
      DEPARTMENT,
      MANAGER,
      COUNT(USER_ID)
FROM TBL_EMPLOYEE
WHERE IS_EXTERNAL = 1
GROUP BY DEPARTMENT, MANAGER

Klar lesbar führt diese Query eine Selektion, Filterung und Gruppierung von Daten aus.

Relationale Datenbanken sind eng mit SQL verbunden. Diese ausgereifte Technologie sticht vor allem durch Konsistenz und Dauerhaftigkeit hervor. Zudem muss das Tabellenschema vor dem ersten Schreiben definiert sein, was zwar zu Erwartbarkeit und Sicherheit führt, aber auch als aufwendig in der Verwaltung und starr angesehen werden kann. Der Umgang mit Daten, deren Struktur nicht explizit angegeben werden kann oder wechselhaft ist, kann sich also in relationalen Datenbanken als beschwerlich gestalten. Gleiches gilt für komplex-strukturierte Daten, da auch diese in Tabellen und Relationen eingepasst werden müssen, damit sie von der Datenbank effektiv behandelt werden können.

Vorzüge von relationalen Datenbanken:

  • Eine seit den 1970er Jahren entwickelte und damit ausgereifte Technologie, die von vielen Experten beherrscht wird
  • Eine starke Typisierung und der Definitionszwang a priori von Tabellenschemata garantieren Erwartbarkeit und Sicherheit
  • Mit SQL eine weitverbreitete, verständliche Query-Sprache
  • Die Implementierung des ACID-Schemas, das Konsistenz, Sicherheit und Dauerhaftigkeit von Datenständen garantiert
  • Redundanzarmer Speicherverbrauch durch Normalisierung und Referenzmöglichkeiten

Spark

Fast schon ein Klassiker und nun bereits zur Version 3 gereift, ist Spark der Standard, um sehr große Datenvolumen effizient zu prozessieren. Die Performance steht vor allem auf zwei Säulen: Zum einen werden Verarbeitungsschritte auf eine Schar an Worker-Nodes verteilt, was große Datenmengen parallelisiert bearbeitbar macht. Zum anderen ist es ein intelligentes System, um Zwischenergebnisse im Arbeitsspeicher zu halten, um Berechnungsstrecken abzukürzen und Zugriffszeiten zu verkürzen.

Die Bearbeitung durch das Spark-Cluster wird erst dann ausgelöst, wenn die Strecke End-to-End (von den Ausgangsdaten über Transformationen bis zum Endprodukt) definiert ist. Wie angedeutet, versucht Spark die Aufgabenlast möglichst parallelisiert auf den Worker-Nodes auszuführen. Gleich einem Query-Optimizer in einer relationalen Datenbank, sucht Spark einen möglichst performanten Weg, die Definition in einzelne Schritte zu zerlegen und diese wiederum als Tasks an die Worker zu verteilen.

Anhand eines einfachen Beispiels soll dieses Schema veranschaulicht werden. Dabei wird angenommen, dass Daten aus einer großen Menge Dateien prozessiert werden, die in einem Textformat vorliegen. Es werden diverse Transformation durchgeführt, um die wichtigen Informationen zu extrahieren. Aus einer zweiten Datenquelle, die ebenfalls in einem rohen Zustand im Textformat vorliegt, sollen weitere Informationen entnommen werden und an das Zwischenprodukt angereichert werden.

Die unten dargestellte Abbildung, die Spark intern generiert, kann folgendes beobachtet werden:

Die Ausgangsdaten, die als Textdateien vorliegen, werden zu Anbeginn der Stages 8 und 10 eingelesen und anschließend transformiert, was sich in den Aktionen „map“ und „distinct“ widerspiegelt. Zuletzt werden die Ergebnisse mit einer „join“ Aktion (wie man sie auch von SQL kennt) in Stage 10 vereint.

Ein weiterer Vorteil von Spark ist die Kompatibilität mit verschiedenen Datenformaten. So erlaubt Spark Lese- und Schreibvorgänge in unter anderem JSON, XML, CSV, Avro, kann aber auch ebenso gut mit Datenbankverbindungen umgehen – seien es nun klassische RDBMS oder Big-Data Datenbanken wie Hive.  

Zwar basiert die Engine auf Java, längst aber hat Python (in Form der PySpark Bibliothek) als primäre Implementierungssprache die mit Abstand weiteste Verbreitung[2]. Entwicklungen wie die von Koalas, welches die beliebte Datenanalyse-Bibliothek Pandas in Spark integriert, unterstreicht dies zusätzlich. Weitere unterstütze Sprachen sind R, Scala und SQL.

Sparks primäre Stärke ist die Abarbeitungen definierter Prozessierungs-Jobs, in allen Facetten und Farben. Für Use-Cases hingegen, die geringe Latenzen auf Datenanfragen erwarten, sollte man besser auf andere Tools setzen, wie etwa Presto, Impala oder Dremio. 

Vorzüge von Spark:

  • Verarbeitet große Datenmengen (TB-Bereich) sehr effizient
  • Skaliert durch Hinzufügen weiterer Worker Nodes zum Cluster
  • Unterstützt viele Datenformate und Anbindungen zu Datenbanken
  • Prozessierungs-Abfragen lassen sich in SQL, R, Scala, Java und Python schreiben

Spark gibt es in allen möglichen Ausgaben und Größen:

  • Jupyter Notebooks mit PySpark und lokalem Spark Context (zum Beispiel als Docker-Container)
  • On-premise Lösungen wie Cloudera oder Hortonworks
  • Notebook zentrierte Cloud-Lösung von Databricks
  • Cluster in der Cloud: EMR in AWS, HDInsights in Azure, Dataproc in GCP

Kafka

Kafka ist eine verteilte, fehler-tolerante und performante Streaming-Plattform. Wenn es darum geht, hohes Datenaufkommen in Echtzeit zu verarbeiten, kommt häufig Kafka ins Spiel. Die Plattform wird auch für Use Cases genutzt, in denen eine große Anzahl heterogener Systeme mit gegenseitigen Abhängigkeiten auftreten. Anders als bei gewöhnlichen Data Stores, ist ein Stream per Definition nicht erschöpft und Operationen darauf sind ebenso kontinuierlich.

Messages werden als Key-Value-Paare behandelt und in Topics gegliedert. Topics wiederum sind in Partitionen unterteilt, die redundant und damit gegen Ausfälle gesichert auf mehreren Nodes gehalten werden. Und schließlich gibt es noch die Teilnehmer an beiden Enden: Konsumenten (Consumers) und Produzenten (Producers), die aus den Topics lesen bzw. in jene schreiben.

In der Abbildung sieht man, welche Stelle Kafka im System einnimmt, nämlich als Datenhub: Von unten liefern die Quellsysteme hauptsächlich ins Kafka-System (Producer) und von oben werden die (mithin aufbereiteten) Streams konsumiert (Consumer).

Durch die Streaming API und die Entwicklung von ksql, das es ermöglicht, direkt SQL Queries auf Streams auszuführen, steht eine high-level Interaktionsmöglichkeit zur Verfügung, um mit Streams zu interagieren und aus bestehenden neue zu bauen. Dabei sind alle Transformationen vertreten, die man auch aus anderen Processing Frameworks (wie Spark) oder Datenbank-SQL-Dialekten kennt: Filtern, Gruppieren, Joinen, Mappen, etc. Darüber hinaus gibt es aber auch die Streams immanente Zeitachse, die mit Windowing Funktionen behandelt werden kann, also die Fragestellungen danach, wie oft Events pro Zeitabschnitt stattgefunden haben, z.B. Klicks auf Elementen von Web Pages.

Die Vorzüge von Kafka nochmal auf einem Blick:

  • Ermöglicht Analysen und Operationen in real time
  • Integriert heterogene Systeme zu einem zentralen Datenhub (Producer und Consumer API)
  • Skalierbar und ausfallsicher durch redundantes Halten von Partitionen
  • Stellt mit ksqldb eine Technologie bereit, um mit SQL Queries Streams zu transformieren

Ein gutes Starter-Kit findet man hier. In allen gängigen Cloud Plattformen gibt es auch Services (Amazon MSK, Azure HD Insight, …).

Airflow

Ein weiteres Themenfeld ist die Strukturierung des Ablaufs der einzelnen Datenaufbereitungs- und Datenverarbeitungsschritte. Besonders bei einer hohen Anzahl an beteiligten und einer hohen Heterogenität in den Komponenten ist es höchst ratsam ein Orchestrierungstool einzusetzen, um einen stabilen Ablauf zu garantieren. Für moderne Orchestrierungstools gibt es dabei einen breiten Anforderungskatalog, der in folgende Kategorien eingeteilt werden kann:

  • Kontrollierbarkeit: Die beteiligten Komponenten und Schritte sind wohldefiniert. Die Ausführung soll nachvollziehbar sein, sowohl im Laufen als auch im Nachgang.
  • Integrierbarkeit: Diverse und meist sehr heterogene Komponenten sollen ansprechbar und ihre Ausführung soll beobachtbar sein.
  • Reaktivität: Ergebnisse einzelner Schritte sollen ausgewertet werden können und in den Ablaufprozess einfließen. So soll zum Beispiel definiert sein, was im Fall des Scheiterns eines ganzen Schrittes passieren soll: eine Wiederholung, eine Abweichung vom normalen Prozessweg, ein Ignorieren?
  • Skalierbarkeit: Das Wachsen der Ablaufstruktur kann durch die Erweiterung der Ausführungsinfrastruktur möglichst einfach aufgefangen werden.

Apache Airflow erweist sich für diese Anforderungen als optimaler Kandidat. Komplexe Abläufe werden als directed acyclic graph (DAG) beschrieben, die Schritte sind als Knoten durch Bedingungen untereinander verknüpft. Diese DAGs bilden eine ausführbare Einheit, ihre Ausführung lässt sich automatisieren. Die Beschreibung wird rein in Python geschrieben. Das hat den Vorteil, dass die Entwicklung schnell geht und Operatoren gegebenenfalls leicht selbst geschrieben werden können. Zumeist ist das jedoch gar nicht nötig, da in Apache Airflow bereits mit eine Fülle an diversen und gängigen Operatoren implementiert ist. Darunter fallen: Triggern von Spark Jobs, Bereitstellung von Cloud Infrastruktur und das Ausführen von Datenbank-Queries.

Im Beispiel unten sieht man einen beispielhaften DAG-Ablauf, bei dem mehrere Stränge parallel ausgeführt werden und zuletzt zusammengeführt werden.

In der Abbildung sieht man, wie DAGs mehrere Richtungen nehmen können. Der letzte Schritt wertet die Ergebnisse von beiden Vorgängern aus und führt die Stränge wieder zusammen.

Beim Thema Skalierbarkeit liefert Airflow zwei Lösungswege mit: Der Celery Executor verteilt Tasks über einen Message Broker auf die dort registrierten Arbeiterprozesse. Kubernetes lässt sich als Ausführungsplattform über den Kubernetes Executor anschließen.

Zuletzt soll noch die Verwaltung von Zugängen zu Fremdsystemen (seien es nun Datenbanken oder Cloud Computing Instanzen) angesprochen werden. Dafür gibt es den sogenannten Secrets Manager, um übersichtlich, zentral und verschlüsselt die Verbindungsschlüssel zu den Systemen hinterlegen kann.

Die Vorzüge von Airflow auf einem Blick:

  • Komplexe Abläufe werden in DAGs beschrieben und angesteuert
  • Airflow bietet ein Füllhorn an Operatoren zur Kommunikation mit Fremdsystemen
  • Die Ausführung kann skalierbar konfiguriert werden mithilfe von Celery und Kubernetes Executors
  • Secrets von Datenbanken und anderen Systemen werden von Airflow zentral verwaltet und können im Code referenziert werden

Es gibt bereits fertige Docker Images, um Airflow auszuprobieren. Die Installation und Einrichtung ist aber auch recht einfach. Darüber hinaus bietet die Google Cloud auch den verwalteten Airflow Service Cloud Composer an, mit dem man sofort loslegen kann.

Serverless

Zuletzt soll hier noch ein komplett anderer Ansatz der Ablaufsteuerung angeführt werden, ein Ansatz der sich der Event-Driven Architektur zuordnen lässt. Statt einer zentralen Steuereinheit, die die Abläufe orchestriert, wird hierbei dezentral definiert, mit welchen Abläufen auf spezifische Events reagiert werden soll. Das kann zum Beispiel eine Veränderung im Data Lake sein, ein Infrastrukturevent wie die Bereitstellung einer Recheninstanz oder auch schlicht ein Zeit-Event. Auf Seiten der Funktionsaufrufe sind typische Beispiele das Anstoßen einer Data Ingestion, das Ausführen eines Tabellenupdates oder das Senden einer Benachrichtigung. Durch lokale Konfiguration der Reaktivität, wird hiermit also ein Ablaufkomplex zusammengebaut.

In der Abbildung sieht man das Schema der Serverless-Architektur, entnommen von OpenWhisk, einer Open-Source Serverless Plattform. Ein Feed stellt den Eventfluss zur Verfügung. Eine Regel verbindet das Auftreten eines Events (Trigger) mit der Aktion (Action).

Eine weitere Besonderheit ist das Serverless-Paradigma. Demnach entfällt der Aufwand für die Bereitstellung von Rechenressourcen zu sorgen. Stattdessen bestimmt die unterliegende Umgebung (also z.B. eine Cloud-Infrastruktur), wie Ressourcen allokiert werden und wo Code ausgeführt wird. Ein Vertreter einer solchen Applikation zur Verwaltung und Deployment von Ressourcen ist Kubernetes. Dem Entwickler wird also die Annehmlichkeit geboten, seinen Fokus ganz auf die Entwicklung zu legen. Zudem ist die Ausführung sprachagnostisch und alle gängigen modernen Programmiersprachen werden unterstützt. Damit können Ausführungen in verschiedenen Sprachen koexistieren und auch miteinander gekoppelt werden.  

Einige Beispiele für den Einsatz:

  • Eine neue Datei ist im ADL (Azure Data Lake) abgelegt worden und eine Data Ingestion soll angestoßen werden.
  • Ein Tabellen-Inhalt der No-SQL-Datenbank DynamoDB wurde verändert und dies soll ins Archiv geschrieben werden (das entspricht Triggers in Datenbanken).
  • Zeitereignisse: Jede Stunde sollen Daten zum Modelltraining bereitgestellt werden (vergleichbar mit cron jobs in der serverzentrierten Welt).

Die Vorteile nochmal auf einem Blick:

  • Keine Aufwände für Infrastrukturbereitstellung, stattdessen Konzentration auf Implementierung der Funktionalität.
  • Event-Driven statt zentraler Orchestrierung.
  • Funktionen können in verschiedenen Programmiersprachen geschrieben werden .
  • Funktionen verschiedener Sprache können koexistieren.

Die Anbieter findet man meistens in der Cloud: Lambda in AWS, Cloud Functions in GCP, Azure Functions in der Azure Cloud – ohne das Open-Source-Projekt OpenWhisk verschweigen zu wollen.

Zusammenfassung

Dem Wandel und den Brüchen zum Trotz bildet SQL weiterhin die zentrale Schnittstelle zu Daten, sowohl was Queries als auch die Prozessierung betrifft. Womöglich überdauert SQL sogar noch die Welt der relationalen Datenbanken, aus der sie ursprünglich kam.

Andererseits lässt sich ebenso feststellen, dass es längst nicht mehr genügt, SQL zu beherrschen, um modernen Problemstellungen gerecht zu werden, was in der vormaligen Welt der Data Warehouses galt. Verlässt man den bereitgestellten Standard an Funktionalitäten in Spark oder Kafka, den SQL mitliefert, um spezifische Funktionen zu implementieren (man spricht hier von User Defined Functions), so benötigt es zusätzliches Wissen in Programmiersprachen wie Python oder Java. Ebenso erfordert der Umgang mit Airflow oder der Serverless-Technologie, wie in den vorherigen Abschnitten gezeigt wurde, die Beherrschung der Programmiersprache Python – oder im Fall von Serverless der einer anderen modernen Sprache.

Wählt man fünf Technologien aus, fallen im Gegenzug andere herunter, die dennoch hier Erwähnung finden sollen:

  • MongoDB als Vertreter von Dokumentdatenbanken (NoSQL), die mit Flexibilität und Skalierbarkeit glänzen können.
  • Apache Cassandra als Vertreter von Key-Value-Datenbanken (NoSQL), das vor allem für hohe Skalierbarkeit steht.
  • Apache Flink als Data Streaming Framework in Ergänzung zu Apache Kafka.
  • Apache Beam als Abstraktionsschicht zur Datenpipeline-Definition, die dann in Apache Flink oder in Apache Spark ausgeführt werden können.

[1] NoSQL bedeutet in erster Linie die Abkehr vom SQL Paradigma und solche Datenbanken bilden keine homogene Klasse. Stattdessen kann man grob drei Unterklassen definieren: Graphendatenbanken, Dokumentendatenbanken und Key-Value-Datenbanken.

[2] laut Studie mit einem Anteil von 70% in Notebooks

In this blog post, I want to present two models of how to secure a REST API. Both models work with JSON Web Tokens (JWT). We suppose that token creation has already happened somewhere else, e.g., on the customer side. We at STATWORX are often interested in the verification part itself.

The first model extends a running unsecured interface without changing any code in it. This model does not even rely on Web Tokens and can be applied in a broader context. The second model demonstrates how token verification can be implemented inside a Flask application.

About JWT

JSON Web Tokens are more or less JSON objects. They contain authority information like the issuer, the type of signing, information on their validity span, and custom information like the username, user groups and further personal display data. To be more precise: it consists of three parts: the header, the payload, and the signature. All three parts are JSON objects, are base64url encoded and put together in the following fashion, where the dots separate the three parts:

base64url(header).base64url(payload).base64url(signature)

All information is transparent and readable to whoever the token has (just decode the parts). Its security and validity stem from its digital signing, the signing guarantees that a Web Token is from its issuer.

There are two different ways of digital signing: with a secret or with a public-private-key pair in various forms and algorithms. This choice also influences the verification possibilities: The symmetric algorithm with a password can only be verified by the owner of the password – the issuer – whereas, for asymmetric algorithms, the public part can be distributed and, therefore, used for verification.

Besides this feature, JWT offers additional advantages and features. Here is a brief overview

  • Decoupling: The application or the API will not need to implement a secure way to exchange passwords and verify them against a backend.
  • Purpose oriented: Tokens can be issued for a particular purpose (defined within the payload) and are only valid for this purpose.
  • Less password exchange: Tokens can be used multiple times without password interactions.
  • Expiration: Tokens expire. Even in the event of theft and criminal intent, the amount of information that can be obtained is limited to its validity span.

More information about JWT and also a debugger can be found at https://jwt.io

Authorization Header

Moving over to consideration on how Authorization is exchanged within HTTP requests.

To authenticate against a server, you can use the Request Header Authorization, of which various types exist. Down below are two common examples:

  • Basic
  Authorization: Basic QWxhZGRpbjpvcGVuIHNlc2FtZQ==

where the latter part is a base64 encoded string of user:password

  • Bearer
  Authorization: Bearer eyJhbGciOiJIUzI1NiJ9.eyJpbmZvIjoiSSdtIGEgc2lnbmVkIHRva2VuIn0.rjnRMAKcaRamEHnENhg0_Fqv7Obo-30U4bcI_v-nfEM

where the latter part is a JWT.

Note that this information is totally transparent on transfer unless HTTPS protocol is used.

Pattern 1 – Sidecar Verification

Let’s start with the first pattern that I’d like to introduce.

In this section, I’ll introduce a setup with nginx and use its feature of sub-requests. This is especially useful in cases where the implementation of an API can not be changed.

Nginx is a versatile tool. It acts as a web server, a load balancer, a content cache, and a reversed proxy. With the latest Web Server Survey of April 2020, nginx is the most used web server for public web sites.

By using nginx and sub-requests, each incoming request has to go through nginx, and the header, including the Authorization part, is passed to a sub-module (calling it Auth Service). This sub-module then checks the validity of the Authorization before sending it through to the actual REST API or declining it. This decision process depends on the following status codes of the Auth Service:

  • 20x: nginx passes the request over to the actual resource service
  • 401 or 403: nginx denies the access and sends the response from the authentication service instead

As you might have noticed, this setup does not exclusively build on JWTs, and therefore other authorization types can be used.

Sub-request model.

Schema of request handling: 1) pass over to Auth Service for verification, 2) pass over to REST API if the verification was successful.

Nginx configuration

Two configuration amendments need to be taken to have nginx configured for token validation:

  • Add an (internal) directive to the authorization service which verifies the request
  • Add authentication parameters to the directive that needs to be secured

The authorization directive is configured like this

location /auth {
        internal;
        proxy_pass                          http://auth-service/verify;
        proxy_pass_request_body off;
        proxy_set_header                Content-Length "";
        proxy_set_header                X-Original-URI $request-uri;
}

This cuts off the body and sends the rest of the request to the authorization service.

Next, the configuration of the directive which forwards to your REST API:

location /protected {
        auth_request    auth/;
        proxy_pass      http://rest-service;
}

Note that auth_request points to the authorization directive we have set up before.

Please see the Nginx documentation for additional information on the sub-request pattern.

Basic Implementation of the Auth Service

In the sub-module, we use the jwcrypto library for the token operations. It offers all features and algorithms needed for the authentication task. Many other libraries can do similar things, which can be found at Jwt.io.

Suppose the token was created by an asymmetric cryptographical algorithm like RSA, and you have access to the public key (here called: public.pem)

Following our configuration on nginx, we will add the directive /verify that does the verification job:

from jwcrypto import jwt, jwk
from flask import Flask

app = Flask(__name__)

# Load the public key
with open('public.pem', 'rb') as pemfile:
        public_key = jwk.JWK.from_pem(pemfile.read())

def parse_token(auth_header):
      # your implementation of Authorization Header extraction
    pass

@app.route("/verify")
def verify():
      token_raw = parse_token(request.headers["Authorization"])
      try:
            # decode token
        token = jwt.JWT(jwt=token_raw, key=public_key)
        return 200, "this is a secret information"
    except:
          return 403, ""

The script consists of three parts: Reading the public key with the start of the API, extracting the header information (not given here), and the actual verification that is embedded in a try-catch expression.

Pattern 2 – Verify within the API

In this section, we will implement the verification within our Flask API.

There are packages in the Flask universe available like flask_jwt; however, they do not offer the full scope of features and, in particular, not for our case here. Instead, we again use the library jwcrypto like before.

It is further assumed that you have access to the public key again, but this time, a specific directive is secured (here called: /secured ) – and also, access should only be granted to admin users.

from jwcrypto import jwt, jwk
from flask import Flask

app = Flask(__name__)

# Load the public key
with open('public.pem', 'rb') as pemfile:
        public_key = jwk.JWK.from_pem(pemfile.read())


@app.route("/secured")
def secured():
      token_raw = parse_token(request.headers["Authorization"])
      try:
            # decode token
        token = jwt.JWT(jwt=token_raw, key=public_key)
        if "admin" in token.claims["groups"]:
              return 200, "this is a secret information"
        else:
              return 403, ""
    except:
          return 403, ""

The setup is the same as in the previous example, but an additional check was added: whether the user is part of the admin group.

Summary

Two patterns were discussed, which both use Web Tokens to authenticate the request. While the first pattern’s charm lies in the decoupling of Authorization and API functionality, the second approach is more compact. It perfectly fits situations where the number of APIs is low, or the overhead caused by a separate service is too high. Instead, the sidecar pattern is perfect when you provide several APIs and like to unify the Authorization in one separate service.

Web Tokens are used in popular authorization schemes like OAuth and OpenID Connect. In another blog post, I will focus on how they work and how they connect to the verification I presented.

I hope you have enjoyed the post. Happy coding!

REST APIs have become a quasi-standard, be it to provide an interface to your application processes, be by setting up a flexible microservice architecture. Sooner or later, you might ask yourself what a proper testing schema would look like and which tools can support you. Some time ago, we at STATWORX asked this ourselves. A toolset that helps us with this task is Newman and Postman, which I will present to you in this blog post.

Many of you, who are regularly using and developing REST, might already be quite familiar with Postman. It’s a handy and comfortable desktop tool that comes with some excellent features (see below). Newman, instead, is a command-line agent that runs the previously defined requests. Because of its lean interface, it can be used in several situations, for instance, it can be easily integrated into testing stages of Pipelines.

In the following, I will explain how these siblings can be used to implement a neat testing environment. We start with Postman’s feature sets, then move on to the ability to interact with Newman. We will further have a look at a testing schema, touching some test cases, and lastly, integrate it into a Jenkins pipeline.

About Postman

Postman is a convenient desktop tool handling REST request. Furthermore, Postman gives you the possibility to define test cases (in JavaScript), has a feature to switch environments, and provides you with Pre-Request steps to set up the setting before your calls. In the following, I will give you examples of some interesting features.

Collection and Requests

Requests are the basic unit in Postman, and everything else spins around them. As I said previously, Postman’s GUI provides you with a comfortable way to define these: request method can be picked from a drop-down list, header information is presented clearly, there is a helper for authorization, and many more.

You should have at least one collection per REST interface defined to bundle your requests. At the very end of the definition process, collections can be exported into JSON format. This result will, later on, be exploited for Newman.

Environments

Postman also implements the concept of environment variables. This means: Depending on where your requests are fired from, the variables adapt. The API’s hostname is a good example that should be kept variable: In the development stage, it may be just your localhost but could be different in a dockerized environment.

The syntax of environment variables is double-curly brackets. If you want to use the hostname variable hostname put it like this: {{ hostname }}

Like for collections, environments can be exported into JSON files. We should keep this in mind when we move to Newman.

Tests

Each API request in Postman should come along with at least one test. I propose the following list as an orientation on what to test:

  • the status code: Check the status code according to your expectation: regular GET requests are supposed to return 200 OK, POST requests 201 Created if successful. On the other hand, authorization should be tested as well as invalid client requests which are supposed to return 40x. – See below a POST request test:
pm.test("Successful POST request", function () {
     pm.expect(pm.response.code).to.be.oneOf([201,202]);
 });
  • whether data is returned Test if the response has any data as a first approximation
  • the schema of returned data Test if the structure of the request data fits the expectations: non-nullable fields, data types, names of properties. Find below an example of a schema validation:
pm.test("Response has correct schema", function () {
    var schema = {"type":"object",
                  "properties":{
                      "access_token":{"type":"string"},
                      "created_on":{"type":"string"},
                      "expires_seconds":{"type":"number"}
                  }};
    var jsonData = pm.response.json();
    pm.expect(tv4.validate(jsonData,schema)).to.be.true;
});
  • values of returned data: Check if the values of the response data are sound; for non-negative values:
pm.test("Expires non negative", function() {
    pm.expect(pm.response.json().expires_seconds).to.be.above(0);
})
  • Header values Check the header of the response if useful relevant is stored there.

All tests have to be written in JavaScript. Postman ships with its own library and tv4 for schema validation.

Below you find a complete running test:

Introduction to Newman

As mentioned before, Newman acts as an executor of what was defined in Postman. To generate results, Newman uses reporters. Reporters can be the command line interface itself, but also known standards as JUnit can be found.
The simplest way to install newman is via NPM (Node package manager). There are ready to use docker images of NodeJS on DockerHub. Install the package via npm install -g newman.

There are two ways to call Newman: command-line interface and within JS code. We will only focus on the first.

Calling the CLI

To run a predefined test collections use the command newman run. Please see the example below:

newman run
            --reporters cli,junit
            --reporter-junit-export /test/out/report.xml
            -e /test/env/auth_jwt-docker.pmenv.json
            /test/src/auth_jwt-test.pmc.json

Let us take a closer look: Recall that we have previously exported the collection and the environment from Postman. The environment can be attached with the -e option. Moreover, two reporters were specified: the cli itself which prints into the terminal and junit which additional shall export a report to the file report.xml

The CLI reporter prints the following (Note that the first three test cases are those from the test schema proposal):

→ jwt-new-token
  POST http://tp_auth_jwt:5000/new-token/bot123 [201 CREATED, 523B, 42ms]
  ✓  Successful POST request
  ✓  Response has correct schema
  ✓  Expires non negative

→ jwt-auth
  POST http://tp_auth_jwt:5000/new-token/test [201 CREATED, 521B, 11ms]
  GET http://tp_auth_jwt:5000/auth [200 OK, 176B, 9ms]
  ✓  Status code is 200
  ✓  Login name is correct

→ jwt-auth-no-token
  GET http://tp_auth_jwt:5000/auth [401 UNAUTHORIZED, 201B, 9ms]
  ✓  Status is 401 or 403

→ jwt-auth-bad-token
  GET http://tp_auth_jwt:5000/auth [403 FORBIDDEN, 166B, 6ms]
  ✓  Status is 401 or 403

Integration into Jenkins

Newman functionality can now be integrated into (almost?) any Pipeline tool. For Jenkins, we create a docker image based on NodeJS and with Newman installed. Next, we either pack or mount both the environment and the collection file into the docker container. When running the container, we use Newman as a command-line tool, just as we did before. To use this in a test stage of a Pipeline, we have to make sure that the REST API is actually running when Newman is executed.

In the following example, the functionalities were defined as targets of a Makefile:

  • run to run the REST API with all dependencies
  • test to run Newman container which itself runs the testing collections
  • rm to stop and remove the REST API

After the API has been tested the report from JUnit is digested by Jenkins with the command junit <report>

See below a Pipeline snippet of a test run:

node{
       stage('Test'){
            try{
                sh "cd docker && make run"
                sh "sleep 5"
                sh "cd docker && make test"
                junit "source/test/out/report.xml"

            } catch (Exception e){
                    echo e
            } finally {
                    sh "cd docker && make rm"
            }
        }
}

Summary

Now it’s time to code tests for your REST API. Please also try to integrate it into your build-test cycle and into your automation pipeline because automation and defined processes are crucial to delivering reliable code and packages.
I hope with this blog post, you now have a better understanding of how Postman and Newman can be used to implement a test framework for REST APIs. Postman was used as a definition tool, whereas Newman was the runner of these definitions. Because of his nature, we have also seen that Newman is the tool for your build pipeline.

Happy coding!

We’re hiring!

Data Engineering is your jam and you’re looking for a job? We’re currently looking for Junior Consultants and Consultants in Data Engineering. Check the requirements and benefits of working with us on our career site. We’re looking forward to your application!

Livy is a REST web service for submitting Spark Jobs or accessing – and thus sharing – long-running Spark Sessions from a remote place. Instead of tedious configuration and installation of your Spark client, Livy takes over the work and provides you with a simple and convenient interface.

We at STATWORX use Livy to submit Spark Jobs from Apache’s workflow tool Airflow on volatile Amazon EMR cluster. Besides, several colleagues with different scripting language skills share a running Spark cluster.

Another great aspect of Livy, namely, is that you can choose from a range of scripting languages: Java, Scala, Python, R. As it is the case for Spark, which one of them you actually should/can use, depends on your use case (and on your skills).

livy-architecture
Architecture – https://livy.incubator.apache.org/

Apache Livy is still in the Incubator state, and code can be found at the Git project.

When you should use it

Since REST APIs are easy to integrate into your application, you should use it when:

  • multiple clients want to share a Spark Session.
  • the clients are lean and should not be overloaded with installation and configuration.
  • you need a quick setup to access your Spark cluster.
  • you want to Integrate Spark into an app on your mobile device.
  • you have volatile clusters, and you do not want to adapt configuration every time.
  • a remote workflow tool submits spark jobs.

Preconditions

Livy is generally user-friendly, and you do not really need too much preparation. All you basically need is an HTTP client to communicate to Livy’s REST API. REST APIs are known to be easy to access (states and lists are accessible even by browsers), HTTP(s) is a familiar protocol (status codes to handle exceptions, actions like GET and POST, etc.) while providing all security measures needed.

Since Livy is an agent for your Spark requests and carries your code (either as script-snippets or packages for submission) to the cluster, you actually have to write code (or have someone writing the code for you or have a package ready for submission at hand).

I opted to maily use python as Spark script language in this blog post and to also interact with the Livy interface itself. Some examples were executed via curl, too.

How to use Livy

There are two modes to interact with the Livy interface:

  • Interactive Sessions have a running session where you can send statements over. Provided that resources are available, these will be executed, and output can be obtained. It can be used to experiment with data or to have quick calculations done.
  • Jobs/Batch submit code packages like programs. A typical use case is a regular task equipped with some arguments and workload done in the background. This could be a data preparation task, for instance, which takes input and output directories as parameters.

In the following, we will have a closer look at both cases and the typical process of submission. Each case will be illustrated by examples.

Interactive Sessions

Let’s start with an example of an interactive Spark Session. Throughout the example, I use python and its requests package to send requests to and retrieve responses from the REST API. As mentioned before, you do not have to follow this path, and you could use your preferred HTTP client instead (provided that it also supports POST and DELETE requests).

Starting with a Spark Session. There is a bunch of parameters to configure (you can look up the specifics at Livy Documentation), but for this blog post, we stick to the basics, and we will specify its name and the kind of code. If you have already submitted Spark code without Livy, parameters like executorMemory, (YARN) queue might sound familiar, and in case you run more elaborate tasks that need extra packages, you will definitely know that the jars parameter needs configuration as well.

To initiate the session we have to send a POST request to the directive /sessions along with the parameters.

import requests
LIVY_HOST = 'http://livy-server'

directive = '/sessions'
headers = {'Content-Type': 'application/json'}

data = {'kind':'pyspark','name':'first-livy'}

resp = requests.post(LIVY_HOST+directive, headers=headers, data=json.dumps(data))

if resp.status_code == requests.codes.created:
    session_id = resp.json()['id']
else:
    raise CustomError()

Livy, in return, responds with an identifier for the session that we extract from its response.

Note that the session might need some boot time until YARN (a resource manager in the Hadoop world) has allocated all the resources. Meanwhile, we check the state of the session by querying the directive: /sessions/{session_id}/state. Once the state is idle, we are able to execute commands against it.

To execute spark code, statements are the way to go. The code is wrapped into the body of a POST request and sent to the right directive: sessions/{session_id}/statements.

directive = f'/sessions/{session_id}/statements'

data = {'code':'...'}

resp = request.post(LIVY_HOST+directive, headers=headers, data=json.dumps(data))

As response message, we are provided with the following attributes:

attribute meaning
id to identify the statement
code the code, once again, that has been executed
state the state of the execution
output the output of the statement

The statement passes some states (see below) and depending on your code, your interaction (statement can also be canceled) and the resources available, it will end up more or less likely in the success state. The crucial point here is that we have control over the status and can act correspondingly.

statement-state

By the way, cancelling a statement is done via GET request /sessions/{session_id}/statements/{statement_id}/cancel

It is time now to submit a statement: Let us imagine to be one of the classmates of Gauss and being asked to sum up the numbers from 1 to 1000. Luckily you have access to a spark cluster and – even more luckily – it has the Livy REST API running which we are connected to via our mobile app: what we just have to do is write the following spark code:

import textwrap

code = textwrap.dedent("""df = spark.createDataFrame(list(range(1,1000)),'int')
df.groupBy().sum().collect()[0]['sum(value)']""")

code_packed = {'code':code}

This is all the logic we need to define. The rest is the execution against the REST API:

import time

directive = f'/sessions/{session_id}/statements'
resp = requests.post(LIVY_HOST+directive, headers=headers, data=json.dumps(code_packed))
if resp.status_code == requests.codes.created:
    stmt_id = resp.json()['id']

    while True:
        info_resp = requests.get(LIVY_HOST+f'/sessions/{session_id}/statements/{stmt_id}')
        if info_resp.status_code == requests.codes.ok:
            state = info_resp.json()['state']
                if state in ('waiting','running'):
                    time.sleep(2)
                elif state in ('cancelling','cancelled','error'):
                    raise CustomException()
                else:
                    break
        else:
            raise CustomException()
    print(info_resp.json()['output'])

else:
  #something went wrong with creation
  raise CustomException()

Every 2 seconds, we check the state of statement and treat the outcome accordingly: So we stop the monitoring as soon as state equals available. Obviously, some more additions need to be made: probably error state would be treated differently to the cancel cases, and it would also be wise to set up a timeout to jump out of the loop at some point in time.

Assuming the code was executed successfully, we take a look at the output attribute of the response:

{'status': 'ok', 'execution_count': 2, 'data': {'text/plain': '499500'}}

There we go, the answer is 499500.

Finally, we kill the session again to free resources for others:

directive = f'/sessions/{session_id}/statements'
requests.delete(LIVY_HOST+directive)

Job Submission

We now want to move to a more compact solution. Say we have a package ready to solve some sort of problem packed as a jar or as a python script. What only needs to be added are some parameters – like input files, output directory, and some flags.

For the sake of simplicity, we will make use of the well known Wordcount example, which Spark gladly offers an implementation of: Read a rather big file and determine how often each word appears. We again pick python as Spark language. This time curl is used as an HTTP client.

As an example file, I have copied the Wikipedia entry found when typing in Livy. The text is actually about the roman historian Titus Livius.

I have moved to the AWS cloud for this example because it offers a convenient way to set up a cluster equipped with Livy, and files can easily be stored in S3 by an upload handler. Let’s now see, how we should proceed:

curl -X POST --data '{"file":"s3://livy-example/wordcount.py","args":[s3://livy-example/livy_life.txt"]}' 
-H "Content-Type: application/json" http://livy-server:8998/batches

The structure is quite similar to what we have seen before. By passing over the batch to Livy, we get an identifier in return along with some other information like the current state.

{"id":1,"name":null,"state":"running","appId":"application_1567416002081_0005",...}

To monitor the progress of the job, there is also a directive to call: /batches/{batch_id}/state

Most probably, we want to guarantee at first that the job ran successfully. In all other cases, we need to find out what has happened to our job. The directive /batches/{batchId}/log can be a help here to inspect the run.

Finally, the session is removed by:

curl -X DELETE http://livy-server:8998/batches/1 

which returns: {"msg":"deleted"} and we are done.

Trivia

  • AWS‘ Hadoop cluster service EMR supports Livy natively as Software Configuration option.
aws-emr
  • Apache’s notebook tool Zeppelin supports Livy as an Interpreter, i.e. you write code within the notebook and execute it directly against Livy REST API without handling HTTP yourself.
  • Be cautious not to use Livy in every case when you want to query a Spark cluster: Namely, In case you want to use Spark as Query backend and access data via Spark SQL, rather check out Thriftserver instead of building around Livy.
  • Kerberos can be integrated into Livy for authentication purposes.

Livy is a REST web service for submitting Spark Jobs or accessing – and thus sharing – long-running Spark Sessions from a remote place. Instead of tedious configuration and installation of your Spark client, Livy takes over the work and provides you with a simple and convenient interface.

We at STATWORX use Livy to submit Spark Jobs from Apache’s workflow tool Airflow on volatile Amazon EMR cluster. Besides, several colleagues with different scripting language skills share a running Spark cluster.

Another great aspect of Livy, namely, is that you can choose from a range of scripting languages: Java, Scala, Python, R. As it is the case for Spark, which one of them you actually should/can use, depends on your use case (and on your skills).

livy-architecture
Architecture – https://livy.incubator.apache.org/

Apache Livy is still in the Incubator state, and code can be found at the Git project.

When you should use it

Since REST APIs are easy to integrate into your application, you should use it when:

Preconditions

Livy is generally user-friendly, and you do not really need too much preparation. All you basically need is an HTTP client to communicate to Livy’s REST API. REST APIs are known to be easy to access (states and lists are accessible even by browsers), HTTP(s) is a familiar protocol (status codes to handle exceptions, actions like GET and POST, etc.) while providing all security measures needed.

Since Livy is an agent for your Spark requests and carries your code (either as script-snippets or packages for submission) to the cluster, you actually have to write code (or have someone writing the code for you or have a package ready for submission at hand).

I opted to maily use python as Spark script language in this blog post and to also interact with the Livy interface itself. Some examples were executed via curl, too.

How to use Livy

There are two modes to interact with the Livy interface:

In the following, we will have a closer look at both cases and the typical process of submission. Each case will be illustrated by examples.

Interactive Sessions

Let’s start with an example of an interactive Spark Session. Throughout the example, I use python and its requests package to send requests to and retrieve responses from the REST API. As mentioned before, you do not have to follow this path, and you could use your preferred HTTP client instead (provided that it also supports POST and DELETE requests).

Starting with a Spark Session. There is a bunch of parameters to configure (you can look up the specifics at Livy Documentation), but for this blog post, we stick to the basics, and we will specify its name and the kind of code. If you have already submitted Spark code without Livy, parameters like executorMemory, (YARN) queue might sound familiar, and in case you run more elaborate tasks that need extra packages, you will definitely know that the jars parameter needs configuration as well.

To initiate the session we have to send a POST request to the directive /sessions along with the parameters.

import requests
LIVY_HOST = 'http://livy-server'

directive = '/sessions'
headers = {'Content-Type': 'application/json'}

data = {'kind':'pyspark','name':'first-livy'}

resp = requests.post(LIVY_HOST+directive, headers=headers, data=json.dumps(data))

if resp.status_code == requests.codes.created:
    session_id = resp.json()['id']
else:
    raise CustomError()

Livy, in return, responds with an identifier for the session that we extract from its response.

Note that the session might need some boot time until YARN (a resource manager in the Hadoop world) has allocated all the resources. Meanwhile, we check the state of the session by querying the directive: /sessions/{session_id}/state. Once the state is idle, we are able to execute commands against it.

To execute spark code, statements are the way to go. The code is wrapped into the body of a POST request and sent to the right directive: sessions/{session_id}/statements.

directive = f'/sessions/{session_id}/statements'

data = {'code':'...'}

resp = request.post(LIVY_HOST+directive, headers=headers, data=json.dumps(data))

As response message, we are provided with the following attributes:

attribute meaning
id to identify the statement
code the code, once again, that has been executed
state the state of the execution
output the output of the statement

The statement passes some states (see below) and depending on your code, your interaction (statement can also be canceled) and the resources available, it will end up more or less likely in the success state. The crucial point here is that we have control over the status and can act correspondingly.

statement-state

By the way, cancelling a statement is done via GET request /sessions/{session_id}/statements/{statement_id}/cancel

It is time now to submit a statement: Let us imagine to be one of the classmates of Gauss and being asked to sum up the numbers from 1 to 1000. Luckily you have access to a spark cluster and – even more luckily – it has the Livy REST API running which we are connected to via our mobile app: what we just have to do is write the following spark code:

import textwrap

code = textwrap.dedent("""df = spark.createDataFrame(list(range(1,1000)),'int')
df.groupBy().sum().collect()[0]['sum(value)']""")

code_packed = {'code':code}

This is all the logic we need to define. The rest is the execution against the REST API:

import time

directive = f'/sessions/{session_id}/statements'
resp = requests.post(LIVY_HOST+directive, headers=headers, data=json.dumps(code_packed))
if resp.status_code == requests.codes.created:
    stmt_id = resp.json()['id']

    while True:
        info_resp = requests.get(LIVY_HOST+f'/sessions/{session_id}/statements/{stmt_id}')
        if info_resp.status_code == requests.codes.ok:
            state = info_resp.json()['state']
                if state in ('waiting','running'):
                    time.sleep(2)
                elif state in ('cancelling','cancelled','error'):
                    raise CustomException()
                else:
                    break
        else:
            raise CustomException()
    print(info_resp.json()['output'])

else:
  #something went wrong with creation
  raise CustomException()

Every 2 seconds, we check the state of statement and treat the outcome accordingly: So we stop the monitoring as soon as state equals available. Obviously, some more additions need to be made: probably error state would be treated differently to the cancel cases, and it would also be wise to set up a timeout to jump out of the loop at some point in time.

Assuming the code was executed successfully, we take a look at the output attribute of the response:

{'status': 'ok', 'execution_count': 2, 'data': {'text/plain': '499500'}}

There we go, the answer is 499500.

Finally, we kill the session again to free resources for others:

directive = f'/sessions/{session_id}/statements'
requests.delete(LIVY_HOST+directive)

Job Submission

We now want to move to a more compact solution. Say we have a package ready to solve some sort of problem packed as a jar or as a python script. What only needs to be added are some parameters – like input files, output directory, and some flags.

For the sake of simplicity, we will make use of the well known Wordcount example, which Spark gladly offers an implementation of: Read a rather big file and determine how often each word appears. We again pick python as Spark language. This time curl is used as an HTTP client.

As an example file, I have copied the Wikipedia entry found when typing in Livy. The text is actually about the roman historian Titus Livius.

I have moved to the AWS cloud for this example because it offers a convenient way to set up a cluster equipped with Livy, and files can easily be stored in S3 by an upload handler. Let’s now see, how we should proceed:

curl -X POST --data '{"file":"s3://livy-example/wordcount.py","args":[s3://livy-example/livy_life.txt"]}' 
-H "Content-Type: application/json" http://livy-server:8998/batches

The structure is quite similar to what we have seen before. By passing over the batch to Livy, we get an identifier in return along with some other information like the current state.

{"id":1,"name":null,"state":"running","appId":"application_1567416002081_0005",...}

To monitor the progress of the job, there is also a directive to call: /batches/{batch_id}/state

Most probably, we want to guarantee at first that the job ran successfully. In all other cases, we need to find out what has happened to our job. The directive /batches/{batchId}/log can be a help here to inspect the run.

Finally, the session is removed by:

curl -X DELETE http://livy-server:8998/batches/1 

which returns: {"msg":"deleted"} and we are done.

Trivia

aws-emr