_Dieser Teil würde normalerweise interaktiv in einer Diskussion entwickelt und in einem separaten Notebook zusammengefasst._
Zum Ursprung des Wortes Design Pattern, siehe z.B. https://www.architecture.com/knowledge-and-resources/knowledge-landing-page/pattern-books-creating-the-georgian-ideal
In einem solchen verteilten System kommen zu Kapazität, Wartezeit und Durchsatz noch mehrere wichtige Design-Parameter hinzu. Je nach Anwendung sind beim Aufbau einer Prozessierungskette u.U. zu beachten:
* Ausfallsicherheit bzw. Umgang mit fehlenden Komponenten/Monitoring
* Minimierung von Datentransfers
* Portabilität über Systemgrenzen
* Harmonisierung der Datenstrukturen und -formate
* Zwischenspeicherung von Ergebnissen (Sicherung vor Ausfall, leichtere/schnellere Wiederverwendbarkeit)
### 1.2 Datenstrukturen, Datenmodelle und Zugriffsmuster
Bevor wir uns den Design Patterns für Datenzugriffe zuwenden, empfiehlt es sich, ein gewisses Verständnis der üblichen Datenstrukturen und der damit verbundenen Datenmodelle zu erarbeiten und auch die typischen Zugriffsmuster für die verschiedenen Datentypen kennenzulernen.
Aus Zeitgründen gibt es diesen Teil zunächst nur als Stichpunktliste:
Datenstrukturen:
* unstrukturierte Daten
* Punktwolken
* Serien und Reihen, inkl. Audiodaten
* Baumstrukturen
* Relationale Tabellen
* Graphen
* Gitterdaten
* reguläre Gitter
* irreguläre Gitter
* Bilder und Videos
Datenmodelle:
* CSV
* XML Tree
* NetCDF/HDF
* DICOM
* Bild- und Videoformate
etc.
Diskussionspunkte zu Datenmodellen:
* Datenstruktur
* Zusammenhang zwischen Daten und Metadaten
* Parallelisierbarkeit und Skalierbarkeit
Zugriffsmuster:
* seriell / parallel
* sequentiell / random access
* als Ganzes / blockweise / gepuffert / ausschnittsweise
* WORM (write once read multiple) etc.
* periodisch bzw. (häufig) wiederkehrend / episodisch / burst
Das ETL Schema beschreibt ganz allgemein das Zurverfügungstellen von Daten. Der Grundgedanke ist,
dass die Daten im Originalformat vorliegen, sei es in einer Datenbank oder als Dateien, und dass
man zur Verarbeitung der Daten wahrscheinlich bestimmte Transformationen vornehmen muss. Durch die
Trennung in drei Schritte lassen sich Optimierungen gezielt auf die Problemstellung anpassen.
[Dieser Blog-Artikel](https://www.dbta.com/Editorial/Trends-and-Applications/Designing-an-ETL-Design-Pattern-145438.aspx) beschreibt anschaulich einige Dinge, auf die man beim ETL-Design achten sollte.
__Schritt 1:__ Es ist gute Praxis, immer von den Originaldaten auszugehen und den Informationsreichtum
in den Originaldaten zu erhalten.
__Schritt 2:__ Vor der eigentlichen Transformation sollten die Daten "gereinigt" werden. Dies bedeutet insbesondere eine Überprüfung, ob die Transformation überhaupt sinnvoll ausgeführt werden kann und ob die Daten das beinhalten, was man erwartet (z.B. gültiger Wertebereich). Hierbei ist es gute Praxis, fehlerhafte Daten nicht einfach zu löschen, sondern sie stattdessen mit einem Flag zu versehen, das den Fehlercode angibt (z.B. bei Messdaten).
Eine beliebte Methode des Bereinigens erfolgt mit Hilfe einer (temporären) Datenbank.
```
insert into
select from
```
nutzt die Kontrollfilter der Datenbank, um viele Formatierungsfehler zu finden und auszumerzen.
Beim Aufsetzen der Transformationen sollte man eine gute Balance zwischen Performance und Komplexität finden. Bis auf wenige Ausnahmen ist die zweitschnellste Lösung oft die bessere, wenn der Code dadurch
lesbarer, modularer und besser wartbar wird. Performance-Optimierung sollte man nur vornehmen, wenn man sie wirklich benötigt.
Das Rendern (Formatieren) der Daten zählt ebenfalls zu den Transformationen und ist üblicherweise der
letzte Schritt.
__Schritt 3:__ Der Load (oder auch Publish) Schritt sollte nur genau das machen, also insbesondere keine weiteren Transformationen anwenden. Hier geht es nur noch um die Auslieferung der Daten.
Aaron Segesmann beschreibt in dem oben verlinkten Blog folgende Design-Muster für die Auslieferung von Daten:
1. Truncate and load - d.h. zuvor ausgelieferte Daten werden gelöscht und die neu transformierten Daten an deren Stelle gesetzt. Beispiel: Neuaufbau einer Webseite nach Aktualisierung.
2. Data surgery - einzelne Elemente werden gezielt ausgetauscht. Diese Methode lohnt oft, wenn weniger als 40 % der Daten geändert wurden.
3. Append - die neuen Daten werden an die vorhandenen angehängt.
4. Blue/green - hier gibt es zwei parallel existierende Systeme (z.B. Server), von denen einer aktiv, der andere inaktiv ist. Dann werden im Hintergrund die neuen Daten auf den inaktiven Server gespielt und wenn das passiert ist, wird dieser aktiviert, während der vorher aktive inaktiviert wird.
Ein Merkmal von "Großen Daten" (= Big Data) ist es, dass nicht alle Daten auf einmal in den Speicher passen. Daraus ergibt sich zwangsläufig die Notwendigkeit, die Daten stückweise zu prozessieren.
Beim __Chunking__ (= Stückeln) liest man die Daten blockweise ein, verarbeitet sie und gibt sie wieder aus. Die Verarbeitung kann entweder seriell oder parallel erfolgen (s.a. Abschnitt 3.5 zu Hadoop und MapReduce).
__Tiling__ (= Kacheln) wird verwendet, um 2-dimensionale Daten (z.B. Modell-Output oder Satellitendaten aus der Erdsystemforschung) in einzelnen Abschnitten zu verarbeiten. Ein gutes Beispiel hierfür sind die sogenannten Tile-Server von Google Maps oder [Open Street Map](https://wiki.openstreetmap.org/wiki/Tile_servers). Ein solcher Server wandelt die als Vektordaten gespeicherten Informationen zu Straßen, Plätzen und anderen Landschaftsmerkmalen in Bilder um, wobei jeweils nur die Daten aus einer bestimmten Region (bounding box) prozessiert werden. Die Bilder können in verschiedenen Zoom-Stufen erzeugt (= gerendert) werden und man kann bestimmen, welche Bilder für schnellere Zugriffe in einem Cache abgelegt werden sollen (zu Caching, siehe Abschnitt 3.2).
Relationale Datenbanken erlauben die Definition mehrerer Indizes in einer Datentabelle und auch
über Tabellengrenzen hinweg ([__foreign key__](https://www.postgresqltutorial.com/postgresql-tutorial/postgresql-foreign-key/)). Die Möglichkeit, hierdurch auch komplexere Indextabellen aufzubauen,
ist ein entscheidender Punkt für die Effizienz (=Zugriffsgeschwindigkeit) relationaler Datenbanken.
In NoSQL Datenbanken (z.B. [MongoDB](https://www.mongodb.com/) oder [Apache Cassandra](https://cassandra.apache.org/_/index.html) gibt es keine sekundären Indizes; dort muss man
Indextabellen selbst erstellen (siehe z.B. [https://learn.microsoft.com/en-us/azure/architecture/patterns/index-table]).
Das Prinzip von Indextabellen kann man auch außerhalb von Datenbank-Anwendungen nutzen. So wird z.B. in
Dieser Code setzt implizit voraus, dass die numpy arrays aus float Werten bestehen und die arrays 1-dimensional sind. Man kann natürlich den Datentyp und die array shapes ebenfalls im Header speichern.
Allerdings sollte man stattdessen sehr wahrscheinlich auf ein existierendes Dateiformat wie
Um auf Daten in besonders großen Dateien flexibel zugreifen zu können (random access) bietet sich u.U. __memory mapping__ an. Dabei wird ein Teil der großen Datei in den Hauptspeicher abgebildet; für die Nutzer:innen fühlt es sich aber so an, als würde man direkt in der Datei arbeiten. Für Details hierzu siehe z.B. [Python 3 Dokumentation](https://docs.python.org/3/library/mmap.html).
In modernen, verteilten Systemen erfolgt der Datenaustausch sehr häufig über REST APIs (siehe z.B. auch unsere [TOAR Datenbank](https://toar-data.fz-juelich.de/api/v2/) am JSC. Bei solchen http(s)-basierten Systemen kann es aus verschiedenen Gründen zu Verzögerungen kommen. Z.B.:
* Systemsoftware und Sicherheitskomponenten auf dem Server
* Internetanbindung (v.a. bei IoT Sensoren)
* Auslastung der Server
* Größe der Anfrage und Antwort
* Rechenintensive Prozessierung vor Audslieferung der Daten
Asynchrone Abfragen ermöglichen es, mehrere Abfragen parallel auszuführen, ohne jeweils auf Fertigstellung einzelner Abfragen warten zu müssen.
Das folgende Programmbeispiel realisiert eine asynchrone Abfrage an die TOAR-Datenbank am JSC. Man kann diverse Zeitserien-IDs in dem Feld ```series```angeben und diese Zeitserien werden dann quasi-parallel asynchron heruntergeladen.
Im Falle sehr großer Datenmengen wird die Dauer eines einzelnen Tasks ("download") zu lang. Dann empfiehlt es sich, den Workflow wie folgt aufzusetzen:
In diesem Schema kann man z.B. den Status-Endpunkt dazu nutzen, eine Email an den User zu generieren, die einen Link zu einem speziellen Resource-Endpunkt enthält. Klickt der User auf diesen Link, werden die vorbereiteten Daten heruntergeladen.
oder im Memory (z.B. [redis](https://redis.com/solutions/use-cases/caching/)).
Solch ein "explizites Caching" ist von automatischem Caching zu unterscheiden, das z.B. in der CPU oder durch das Betriebssystem auf dem Dateisystem stattfindet.
Beim sharding werden ebenfalls verteilte Instanzen eines (Datenbank) Servers aufgesetzt. Allerdings wird hier der Datenraum nach festgelegten Regeln auf die einzelnen Server verteilt. Beim Zugriff erscheint das dann wie eine einzige riesige Datenbank.
Es gibt verschiedene Strategien, wie die Daten aufgeteilt werden können. Neben einfach zu überblickenden Konzepten (wie z.B. ein shard pro Kalenderjahr) empfiehlt sich insbesondere eine Aufteilung über __key hashes__. Dies verhindert, dass einzelne shards sehr viel Last bekommen, während andere unterbeschäftigt bleiben.
In den letzten Jahren wurden mehrere Plattformen entwickelt, mit denen eine verteilte, skalierbare Analyse großer Datenmengen ermöglicht wird. Ein Beispiel ist [Apache Hadoop](https://hadoop.apache.org/docs/stable/hadoop-project-dist/).
"Hadoop ist ein [java-basiertes] Open-source Framework, das es ermöglicht große Datenmengen in einer verteilten Umgebung auf mehreren Clustern zu speichern und prozessieren. Dabei werden einfache Programmiermodelle verwendet. Hadoop skaliert von Einzelsystemen bis zu Systemen mit mehreren tausend Maschinen, die jeweils [gewöhnliche Hardware für] Computing und Storage beinhalten."
_Quelle: https://www.tutorialspoint.com/hadoop/index.htm, übersetzt von M. Schultz_
Die wichtigsten Elemente von Hadoop umfassen:
* das Hadoop Distributed File System (HDFS)
* das Resourchen-Managementsystem Yet Another Resource Negotiator (YARN)
* den von Google entwickelten MapReduce Algorithmus
Das __HDFS__ stellt sich dem Benutzer als ein großes Dateisystem dar mit Verzeichnissen und Dateien. Intern werden die Dateien in Blöcke aufgeteilt, sodass eine parallele Prozessierung möglich wird. Es gibt in jedem Hadoop System genau einen __Namenode__ und beliebig viele __Datanodes__. Der Namenode verwaltet die Metadaten und Zugriffe, es fließen jedoch keine Daten über ihn.
Das HDFS wurde optimiert auf __WORM__ (write once, read multiple) Zugriffe und ist vor allem für Streaming performant.
Hadoop baut auf der Prämisse auf, dass in einem großen, verteilten System immer irgend etwas nicht funktioniert. Daher gibt es diverse Sicherheitsmechanismen, um Ausfälle zu vermeiden bzw. schnell reparieren zu können. Beispielsweise werden die Datenblöcke redundant gespeichert.
Das __MapReduce__ framework erlaubt es, mit relativ einfachen Befehlen prallele Datenprozessierung von TByte-scale Datensätzen auf einem verteilten System mit bis zu mehreren Tausend Knoten auszuführen. Vom Konzept her werden Inputdaten in Chunks aufgeteilt (siehe oben), die dann parallel berechnet werden können ("mapping"). Die Ergebnisse dieser Rechnungen werden anschließend im "Reduce" Schritt zusammengeführt. Dabei werden __key, value__ Paare benutzt.
Der/die Nutzer:in schreibt eine __map__ Funktion, die aus einem Input Key-Value-Paar ein Output Key-Value-Paar mit einem intermediären Schlüssel (= key) erzeugt. Ferner erstellt er/sie eine __reduce__ Funktion, welche alle Daten mit demselben Schlüssel zusammenführt (= merge). Typischerweise erzeugt eine reduce-Operation genau einen oder keinen Outputwert. Die MapReduce Bibliothek sorgt dafür, dass alle Key-Value-Paare mit demselben Schlüssel zusammengeführt werden.
Hier ein Beispiel für eine map und eine reduce Funktion zum Zählen von Wörtern in einer großen Menge an text-Dokumenten aus dem Original-Artikel:
```Java
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
```
_Original Paper: Jeffrey Dean Sanjay Ghemawat (2004): MapReduce: Simplified Data Processing on Large Clusters, OSDI'04: Sixth Symposium on Operating System Design and Implementation, San Francisco, CA (2004), pp. 137-150._
Die Resourcenverwaltung obliegt __YARN__, das aus einem __ResourceManager__ für das Gesamtsystem, jeweils einem __NodeManager__ pro Cluster-Knoten und jeweils einem __MRAppMaster__ pro Anwendung besteht. Jede MapReduce Anwendung wird durch eine Konfigursation beschrieben, die zumindest die Input- und Outputdaten spezifiziert und die auszuführenden __map__ und __reduce__ Kommandos. YARN übernimmt das Scheduling, überwacht die Ausführung der einzelnen Tasks und startet abgebrochene Jobs erneut falls nötig. Hadoop ist damit vor allem für Batch-Anwendungen ausgelegt. Die Berechnungen werden möglichst dort ausgeführt, wo die Daten liegen, so dass unnötige Datentransfers vermieden werden.
Die Resourcenverwaltung obliegt __YARN__, das aus einem __ResourceManager__ für das Gesamtsystem, jeweils einem __NodeManager__ pro Cluster-Knoten und jeweils einem __MRAppMaster__ pro Anwendung besteht. Jede MapReduce Anwendung wird durch eine Konfiguration beschrieben, die zumindest die Input- und Outputdaten spezifiziert und die auszuführenden __map__ und __reduce__ Kommandos. YARN übernimmt das Scheduling, überwacht die Ausführung der einzelnen Tasks und startet abgebrochene Jobs erneut falls nötig. Hadoop ist damit vor allem für Batch-Anwendungen ausgelegt. Die Berechnungen werden möglichst dort ausgeführt, wo die Daten liegen, so dass unnötige Datentransfers vermieden werden.
_Dieser Teil würde normalerweise interaktiv in einer Diskussion entwickelt und in einem separaten Notebook zusammengefasst._
Zum Ursprung des Wortes Design Pattern, siehe z.B. https://www.architecture.com/knowledge-and-resources/knowledge-landing-page/pattern-books-creating-the-georgian-ideal
In einem solchen verteilten System kommen zu Kapazität, Wartezeit und Durchsatz noch mehrere wichtige Design-Parameter hinzu. Je nach Anwendung sind beim Aufbau einer Prozessierungskette u.U. zu beachten:
* Ausfallsicherheit bzw. Umgang mit fehlenden Komponenten/Monitoring
* Minimierung von Datentransfers
* Portabilität über Systemgrenzen
* Harmonisierung der Datenstrukturen und -formate
* Zwischenspeicherung von Ergebnissen (Sicherung vor Ausfall, leichtere/schnellere Wiederverwendbarkeit)
### 1.2 Datenstrukturen, Datenmodelle und Zugriffsmuster
Bevor wir uns den Design Patterns für Datenzugriffe zuwenden, empfiehlt es sich, ein gewisses Verständnis der üblichen Datenstrukturen und der damit verbundenen Datenmodelle zu erarbeiten und auch die typischen Zugriffsmuster für die verschiedenen Datentypen kennenzulernen.
Aus Zeitgründen gibt es diesen Teil zunächst nur als Stichpunktliste:
Datenstrukturen:
* unstrukturierte Daten
* Punktwolken
* Serien und Reihen, inkl. Audiodaten
* Baumstrukturen
* Relationale Tabellen
* Graphen
* Gitterdaten
* reguläre Gitter
* irreguläre Gitter
* Bilder und Videos
Datenmodelle:
* CSV
* XML Tree
* NetCDF/HDF
* DICOM
* Bild- und Videoformate
etc.
Diskussionspunkte zu Datenmodellen:
* Datenstruktur
* Zusammenhang zwischen Daten und Metadaten
* Parallelisierbarkeit und Skalierbarkeit
Zugriffsmuster:
* seriell / parallel
* sequentiell / random access
* als Ganzes / blockweise / gepuffert / ausschnittsweise
* WORM (write once read multiple) etc.
* periodisch bzw. (häufig) wiederkehrend / episodisch / burst
Das ETL Schema beschreibt ganz allgemein das Zurverfügungstellen von Daten. Der Grundgedanke ist,
dass die Daten im Originalformat vorliegen, sei es in einer Datenbank oder als Dateien, und dass
man zur Verarbeitung der Daten wahrscheinlich bestimmte Transformationen vornehmen muss. Durch die
Trennung in drei Schritte lassen sich Optimierungen gezielt auf die Problemstellung anpassen.
[Dieser Blog-Artikel](https://www.dbta.com/Editorial/Trends-and-Applications/Designing-an-ETL-Design-Pattern-145438.aspx) beschreibt anschaulich einige Dinge, auf die man beim ETL-Design achten sollte.
__Schritt 1:__ Es ist gute Praxis, immer von den Originaldaten auszugehen und den Informationsreichtum
in den Originaldaten zu erhalten.
__Schritt 2:__ Vor der eigentlichen Transformation sollten die Daten "gereinigt" werden. Dies bedeutet insbesondere eine Überprüfung, ob die Transformation überhaupt sinnvoll ausgeführt werden kann und ob die Daten das beinhalten, was man erwartet (z.B. gültiger Wertebereich). Hierbei ist es gute Praxis, fehlerhafte Daten nicht einfach zu löschen, sondern sie stattdessen mit einem Flag zu versehen, das den Fehlercode angibt (z.B. bei Messdaten).
Eine beliebte Methode des Bereinigens erfolgt mit Hilfe einer (temporären) Datenbank.
```
insert into
select from
```
nutzt die Kontrollfilter der Datenbank, um viele Formatierungsfehler zu finden und auszumerzen.
Beim Aufsetzen der Transformationen sollte man eine gute Balance zwischen Performance und Komplexität finden. Bis auf wenige Ausnahmen ist die zweitschnellste Lösung oft die bessere, wenn der Code dadurch
lesbarer, modularer und besser wartbar wird. Performance-Optimierung sollte man nur vornehmen, wenn man sie wirklich benötigt.
Das Rendern (Formatieren) der Daten zählt ebenfalls zu den Transformationen und ist üblicherweise der
letzte Schritt.
__Schritt 3:__ Der Load (oder auch Publish) Schritt sollte nur genau das machen, also insbesondere keine weiteren Transformationen anwenden. Hier geht es nur noch um die Auslieferung der Daten.
Aaron Segesmann beschreibt in dem oben verlinkten Blog folgende Design-Muster für die Auslieferung von Daten:
1. Truncate and load - d.h. zuvor ausgelieferte Daten werden gelöscht und die neu transformierten Daten an deren Stelle gesetzt. Beispiel: Neuaufbau einer Webseite nach Aktualisierung.
2. Data surgery - einzelne Elemente werden gezielt ausgetauscht. Diese Methode lohnt oft, wenn weniger als 40 % der Daten geändert wurden.
3. Append - die neuen Daten werden an die vorhandenen angehängt.
4. Blue/green - hier gibt es zwei parallel existierende Systeme (z.B. Server), von denen einer aktiv, der andere inaktiv ist. Dann werden im Hintergrund die neuen Daten auf den inaktiven Server gespielt und wenn das passiert ist, wird dieser aktiviert, während der vorher aktive inaktiviert wird.
Ein Merkmal von "Großen Daten" (= Big Data) ist es, dass nicht alle Daten auf einmal in den Speicher passen. Daraus ergibt sich zwangsläufig die Notwendigkeit, die Daten stückweise zu prozessieren.
Beim __Chunking__ (= Stückeln) liest man die Daten blockweise ein, verarbeitet sie und gibt sie wieder aus. Die Verarbeitung kann entweder seriell oder parallel erfolgen (s.a. Abschnitt 3.5 zu Hadoop und MapReduce).
__Tiling__ (= Kacheln) wird verwendet, um 2-dimensionale Daten (z.B. Modell-Output oder Satellitendaten aus der Erdsystemforschung) in einzelnen Abschnitten zu verarbeiten. Ein gutes Beispiel hierfür sind die sogenannten Tile-Server von Google Maps oder [Open Street Map](https://wiki.openstreetmap.org/wiki/Tile_servers). Ein solcher Server wandelt die als Vektordaten gespeicherten Informationen zu Straßen, Plätzen und anderen Landschaftsmerkmalen in Bilder um, wobei jeweils nur die Daten aus einer bestimmten Region (bounding box) prozessiert werden. Die Bilder können in verschiedenen Zoom-Stufen erzeugt (= gerendert) werden und man kann bestimmen, welche Bilder für schnellere Zugriffe in einem Cache abgelegt werden sollen (zu Caching, siehe Abschnitt 3.2).
Relationale Datenbanken erlauben die Definition mehrerer Indizes in einer Datentabelle und auch
über Tabellengrenzen hinweg ([__foreign key__](https://www.postgresqltutorial.com/postgresql-tutorial/postgresql-foreign-key/)). Die Möglichkeit, hierdurch auch komplexere Indextabellen aufzubauen,
ist ein entscheidender Punkt für die Effizienz (=Zugriffsgeschwindigkeit) relationaler Datenbanken.
In NoSQL Datenbanken (z.B. [MongoDB](https://www.mongodb.com/) oder [Apache Cassandra](https://cassandra.apache.org/_/index.html) gibt es keine sekundären Indizes; dort muss man
Indextabellen selbst erstellen (siehe z.B. [https://learn.microsoft.com/en-us/azure/architecture/patterns/index-table]).
Das Prinzip von Indextabellen kann man auch außerhalb von Datenbank-Anwendungen nutzen. So wird z.B. in
Dieser Code setzt implizit voraus, dass die numpy arrays aus float Werten bestehen und die arrays 1-dimensional sind. Man kann natürlich den Datentyp und die array shapes ebenfalls im Header speichern.
Allerdings sollte man stattdessen sehr wahrscheinlich auf ein existierendes Dateiformat wie
Um auf Daten in besonders großen Dateien flexibel zugreifen zu können (random access) bietet sich u.U. __memory mapping__ an. Dabei wird ein Teil der großen Datei in den Hauptspeicher abgebildet; für die Nutzer:innen fühlt es sich aber so an, als würde man direkt in der Datei arbeiten. Für Details hierzu siehe z.B. [Python 3 Dokumentation](https://docs.python.org/3/library/mmap.html).
In modernen, verteilten Systemen erfolgt der Datenaustausch sehr häufig über REST APIs (siehe z.B. auch unsere [TOAR Datenbank](https://toar-data.fz-juelich.de/api/v2/) am JSC. Bei solchen http(s)-basierten Systemen kann es aus verschiedenen Gründen zu Verzögerungen kommen. Z.B.:
* Systemsoftware und Sicherheitskomponenten auf dem Server
* Internetanbindung (v.a. bei IoT Sensoren)
* Auslastung der Server
* Größe der Anfrage und Antwort
* Rechenintensive Prozessierung vor Audslieferung der Daten
Asynchrone Abfragen ermöglichen es, mehrere Abfragen parallel auszuführen, ohne jeweils auf Fertigstellung einzelner Abfragen warten zu müssen.
Das folgende Programmbeispiel realisiert eine asynchrone Abfrage an die TOAR-Datenbank am JSC. Man kann diverse Zeitserien-IDs in dem Feld ```series```angeben und diese Zeitserien werden dann quasi-parallel asynchron heruntergeladen.
Im Falle sehr großer Datenmengen wird die Dauer eines einzelnen Tasks ("download") zu lang. Dann empfiehlt es sich, den Workflow wie folgt aufzusetzen:
In diesem Schema kann man z.B. den Status-Endpunkt dazu nutzen, eine Email an den User zu generieren, die einen Link zu einem speziellen Resource-Endpunkt enthält. Klickt der User auf diesen Link, werden die vorbereiteten Daten heruntergeladen.
oder im Memory (z.B. [redis](https://redis.com/solutions/use-cases/caching/)).
Solch ein "explizites Caching" ist von automatischem Caching zu unterscheiden, das z.B. in der CPU oder durch das Betriebssystem auf dem Dateisystem stattfindet.
Beim sharding werden ebenfalls verteilte Instanzen eines (Datenbank) Servers aufgesetzt. Allerdings wird hier der Datenraum nach festgelegten Regeln auf die einzelnen Server verteilt. Beim Zugriff erscheint das dann wie eine einzige riesige Datenbank.
Es gibt verschiedene Strategien, wie die Daten aufgeteilt werden können. Neben einfach zu überblickenden Konzepten (wie z.B. ein shard pro Kalenderjahr) empfiehlt sich insbesondere eine Aufteilung über __key hashes__. Dies verhindert, dass einzelne shards sehr viel Last bekommen, während andere unterbeschäftigt bleiben.
In den letzten Jahren wurden mehrere Plattformen entwickelt, mit denen eine verteilte, skalierbare Analyse großer Datenmengen ermöglicht wird. Ein Beispiel ist [Apache Hadoop](https://hadoop.apache.org/docs/stable/hadoop-project-dist/).
"Hadoop ist ein [java-basiertes] Open-source Framework, das es ermöglicht große Datenmengen in einer verteilten Umgebung auf mehreren Clustern zu speichern und prozessieren. Dabei werden einfache Programmiermodelle verwendet. Hadoop skaliert von Einzelsystemen bis zu Systemen mit mehreren tausend Maschinen, die jeweils [gewöhnliche Hardware für] Computing und Storage beinhalten."
_Quelle: https://www.tutorialspoint.com/hadoop/index.htm, übersetzt von M. Schultz_
Die wichtigsten Elemente von Hadoop umfassen:
* das Hadoop Distributed File System (HDFS)
* das Resourchen-Managementsystem Yet Another Resource Negotiator (YARN)
* den von Google entwickelten MapReduce Algorithmus
Das __HDFS__ stellt sich dem Benutzer als ein großes Dateisystem dar mit Verzeichnissen und Dateien. Intern werden die Dateien in Blöcke aufgeteilt, sodass eine parallele Prozessierung möglich wird. Es gibt in jedem Hadoop System genau einen __Namenode__ und beliebig viele __Datanodes__. Der Namenode verwaltet die Metadaten und Zugriffe, es fließen jedoch keine Daten über ihn.
Das HDFS wurde optimiert auf __WORM__ (write once, read multiple) Zugriffe und ist vor allem für Streaming performant.
Hadoop baut auf der Prämisse auf, dass in einem großen, verteilten System immer irgend etwas nicht funktioniert. Daher gibt es diverse Sicherheitsmechanismen, um Ausfälle zu vermeiden bzw. schnell reparieren zu können. Beispielsweise werden die Datenblöcke redundant gespeichert.
Das __MapReduce__ framework erlaubt es, mit relativ einfachen Befehlen prallele Datenprozessierung von TByte-scale Datensätzen auf einem verteilten System mit bis zu mehreren Tausend Knoten auszuführen. Vom Konzept her werden Inputdaten in Chunks aufgeteilt (siehe oben), die dann parallel berechnet werden können ("mapping"). Die Ergebnisse dieser Rechnungen werden anschließend im "Reduce" Schritt zusammengeführt. Dabei werden __key, value__ Paare benutzt.
Der/die Nutzer:in schreibt eine __map__ Funktion, die aus einem Input Key-Value-Paar ein Output Key-Value-Paar mit einem intermediären Schlüssel (= key) erzeugt. Ferner erstellt er/sie eine __reduce__ Funktion, welche alle Daten mit demselben Schlüssel zusammenführt (= merge). Typischerweise erzeugt eine reduce-Operation genau einen oder keinen Outputwert. Die MapReduce Bibliothek sorgt dafür, dass alle Key-Value-Paare mit demselben Schlüssel zusammengeführt werden.
Hier ein Beispiel für eine map und eine reduce Funktion zum Zählen von Wörtern in einer großen Menge an text-Dokumenten aus dem Original-Artikel:
```Java
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
```
_Original Paper: Jeffrey Dean Sanjay Ghemawat (2004): MapReduce: Simplified Data Processing on Large Clusters, OSDI'04: Sixth Symposium on Operating System Design and Implementation, San Francisco, CA (2004), pp. 137-150._
Die Resourcenverwaltung obliegt __YARN__, das aus einem __ResourceManager__ für das Gesamtsystem, jeweils einem __NodeManager__ pro Cluster-Knoten und jeweils einem __MRAppMaster__ pro Anwendung besteht. Jede MapReduce Anwendung wird durch eine Konfigursation beschrieben, die zumindest die Input- und Outputdaten spezifiziert und die auszuführenden __map__ und __reduce__ Kommandos. YARN übernimmt das Scheduling, überwacht die Ausführung der einzelnen Tasks und startet abgebrochene Jobs erneut falls nötig. Hadoop ist damit vor allem für Batch-Anwendungen ausgelegt. Die Berechnungen werden möglichst dort ausgeführt, wo die Daten liegen, so dass unnötige Datentransfers vermieden werden.
Die Resourcenverwaltung obliegt __YARN__, das aus einem __ResourceManager__ für das Gesamtsystem, jeweils einem __NodeManager__ pro Cluster-Knoten und jeweils einem __MRAppMaster__ pro Anwendung besteht. Jede MapReduce Anwendung wird durch eine Konfiguration beschrieben, die zumindest die Input- und Outputdaten spezifiziert und die auszuführenden __map__ und __reduce__ Kommandos. YARN übernimmt das Scheduling, überwacht die Ausführung der einzelnen Tasks und startet abgebrochene Jobs erneut falls nötig. Hadoop ist damit vor allem für Batch-Anwendungen ausgelegt. Die Berechnungen werden möglichst dort ausgeführt, wo die Daten liegen, so dass unnötige Datentransfers vermieden werden.