Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision

Target

Select target project
  • esde/training/large-data-lecture
1 result
Select Git revision
Show changes
Commits on Source (3)
%% Cell type:markdown id:6cec99d8-cf59-4320-9fa2-4a3ddfcada34 tags:
 
# Design Pattern für die Verarbeitung von großen Datenmengen
 
## Inhalte
 
 
 
%% Cell type:markdown id:f5b5c09e-7599-4768-a7f6-8cb2490f2290 tags:
 
## 1. Motivation
 
_Dieser Teil würde normalerweise interaktiv in einer Diskussion entwickelt und in einem separaten Notebook zusammengefasst._
 
Zum Ursprung des Wortes Design Pattern, siehe z.B. https://www.architecture.com/knowledge-and-resources/knowledge-landing-page/pattern-books-creating-the-georgian-ideal
 
### 1.1 Daten-Architekturen
 
Klassische "large data" Systeme:
 
![grafik.png](attachment:8254ff1d-93b1-404d-8e8c-f3d2712faf09.png)
 
Das hierarchische Datensystem am JSC:
 
![grafik.png](attachment:0e033fc4-51d5-44cf-9a5a-86a4e10c6eb4.png)
 
Verteilte, cloud-basierte Systeme:
 
Beispiel:
![grafik.png](attachment:61573f5c-a3b6-438c-87a0-f54d8f397973.png)
 
_Source: https://learn.microsoft.com/en-us/azure/architecture/example-scenario/data/geospatial-data-processing-analytics-azure_
_Quelle: https://learn.microsoft.com/en-us/azure/architecture/example-scenario/data/geospatial-data-processing-analytics-azure_
 
In einem solchen verteilten System kommen zu Kapazität, Wartezeit und Durchsatz noch mehrere wichtige Design-Parameter hinzu. Je nach Anwendung sind beim Aufbau einer Prozessierungskette u.U. zu beachten:
* Ausfallsicherheit bzw. Umgang mit fehlenden Komponenten/Monitoring
* Minimierung von Datentransfers
* Portabilität über Systemgrenzen
* Harmonisierung der Datenstrukturen und -formate
* Zwischenspeicherung von Ergebnissen (Sicherung vor Ausfall, leichtere/schnellere Wiederverwendbarkeit)
 
%% Cell type:markdown id:43e1d3bd-e9b7-43cb-a9fb-cec345ba0a26 tags:
 
### 1.2 Datenstrukturen, Datenmodelle und Zugriffsmuster
 
Bevor wir uns den Design Patterns für Datenzugriffe zuwenden, empfiehlt es sich, ein gewisses Verständnis der üblichen Datenstrukturen und der damit verbundenen Datenmodelle zu erarbeiten und auch die typischen Zugriffsmuster für die verschiedenen Datentypen kennenzulernen.
 
Aus Zeitgründen gibt es diesen Teil zunächst nur als Stichpunktliste:
 
Datenstrukturen:
* unstrukturierte Daten
* Punktwolken
* Serien und Reihen, inkl. Audiodaten
* Baumstrukturen
* Relationale Tabellen
* Graphen
* Gitterdaten
* reguläre Gitter
* irreguläre Gitter
* Bilder und Videos
 
Datenmodelle:
* CSV
* XML Tree
* NetCDF/HDF
* DICOM
* Bild- und Videoformate
etc.
 
Diskussionspunkte zu Datenmodellen:
* Datenstruktur
* Zusammenhang zwischen Daten und Metadaten
* Parallelisierbarkeit und Skalierbarkeit
 
Zugriffsmuster:
* seriell / parallel
* sequentiell / random access
* als Ganzes / blockweise / gepuffert / ausschnittsweise
* WORM (write once read multiple) etc.
* periodisch bzw. (häufig) wiederkehrend / episodisch / burst
* ETL / streaming
 
%% Cell type:markdown id:6cdedf41-b175-4a6c-b805-13e81902754b tags:
 
## 2. Klassische Design-Patterns
 
### 2.1 Extract - Transform - Load (ETL)
 
Das ETL Schema beschreibt ganz allgemein das Zurverfügungstellen von Daten. Der Grundgedanke ist,
dass die Daten im Originalformat vorliegen, sei es in einer Datenbank oder als Dateien, und dass
man zur Verarbeitung der Daten wahrscheinlich bestimmte Transformationen vornehmen muss. Durch die
Trennung in drei Schritte lassen sich Optimierungen gezielt auf die Problemstellung anpassen.
 
[Dieser Blog-Artikel](https://www.dbta.com/Editorial/Trends-and-Applications/Designing-an-ETL-Design-Pattern-145438.aspx) beschreibt anschaulich einige Dinge, auf die man beim ETL-Design achten sollte.
 
__Schritt 1:__ Es ist gute Praxis, immer von den Originaldaten auszugehen und den Informationsreichtum
in den Originaldaten zu erhalten.
 
__Schritt 2:__ Vor der eigentlichen Transformation sollten die Daten "gereinigt" werden. Dies bedeutet insbesondere eine Überprüfung, ob die Transformation überhaupt sinnvoll ausgeführt werden kann und ob die Daten das beinhalten, was man erwartet (z.B. gültiger Wertebereich). Hierbei ist es gute Praxis, fehlerhafte Daten nicht einfach zu löschen, sondern sie stattdessen mit einem Flag zu versehen, das den Fehlercode angibt (z.B. bei Messdaten).
 
Eine beliebte Methode des Bereinigens erfolgt mit Hilfe einer (temporären) Datenbank.
```
insert into
select from
```
nutzt die Kontrollfilter der Datenbank, um viele Formatierungsfehler zu finden und auszumerzen.
 
Beim Aufsetzen der Transformationen sollte man eine gute Balance zwischen Performance und Komplexität finden. Bis auf wenige Ausnahmen ist die zweitschnellste Lösung oft die bessere, wenn der Code dadurch
lesbarer, modularer und besser wartbar wird. Performance-Optimierung sollte man nur vornehmen, wenn man sie wirklich benötigt.
 
Das Rendern (Formatieren) der Daten zählt ebenfalls zu den Transformationen und ist üblicherweise der
letzte Schritt.
 
__Schritt 3:__ Der Load (oder auch Publish) Schritt sollte nur genau das machen, also insbesondere keine weiteren Transformationen anwenden. Hier geht es nur noch um die Auslieferung der Daten.
 
Aaron Segesmann beschreibt in dem oben verlinkten Blog folgende Design-Muster für die Auslieferung von Daten:
1. Truncate and load - d.h. zuvor ausgelieferte Daten werden gelöscht und die neu transformierten Daten an deren Stelle gesetzt. Beispiel: Neuaufbau einer Webseite nach Aktualisierung.
2. Data surgery - einzelne Elemente werden gezielt ausgetauscht. Diese Methode lohnt oft, wenn weniger als 40 % der Daten geändert wurden.
3. Append - die neuen Daten werden an die vorhandenen angehängt.
4. Blue/green - hier gibt es zwei parallel existierende Systeme (z.B. Server), von denen einer aktiv, der andere inaktiv ist. Dann werden im Hintergrund die neuen Daten auf den inaktiven Server gespielt und wenn das passiert ist, wird dieser aktiviert, während der vorher aktive inaktiviert wird.
 
%% Cell type:markdown id:91b32cfe-06ba-4984-9849-151171967e7a tags:
 
### 2.2 Chunking und Tiling
 
Ein Merkmal von "Großen Daten" (= Big Data) ist es, dass nicht alle Daten auf einmal in den Speicher passen. Daraus ergibt sich zwangsläufig die Notwendigkeit, die Daten stückweise zu prozessieren.
 
Beim __Chunking__ (= Stückeln) liest man die Daten blockweise ein, verarbeitet sie und gibt sie wieder aus. Die Verarbeitung kann entweder seriell oder parallel erfolgen (s.a. Abschnitt 3.5 zu Hadoop und MapReduce).
 
__Tiling__ (= Kacheln) wird verwendet, um 2-dimensionale Daten (z.B. Modell-Output oder Satellitendaten aus der Erdsystemforschung) in einzelnen Abschnitten zu verarbeiten. Ein gutes Beispiel hierfür sind die sogenannten Tile-Server von Google Maps oder [Open Street Map](https://wiki.openstreetmap.org/wiki/Tile_servers). Ein solcher Server wandelt die als Vektordaten gespeicherten Informationen zu Straßen, Plätzen und anderen Landschaftsmerkmalen in Bilder um, wobei jeweils nur die Daten aus einer bestimmten Region (bounding box) prozessiert werden. Die Bilder können in verschiedenen Zoom-Stufen erzeugt (= gerendert) werden und man kann bestimmen, welche Bilder für schnellere Zugriffe in einem Cache abgelegt werden sollen (zu Caching, siehe Abschnitt 3.2).
 
%% Cell type:code id:cafca7df-b8b4-4caa-966c-d034efff0611 tags:
 
``` python
# Etwas sinnfreies Beispielprogramm zum Chunking
# let's count the number of occurences of the letter E in a randomly generated long string
import numpy as np
SIZE = 999967 # purposefully not a round number
data = "".join([chr(x) for x in np.random.randint(65, 91, SIZE)])
print(data[:1000])
# set up result counter and loop
BUFSIZE = 200 # process 200 characters at a time
count = 0
pos = 0
while pos < len(data):
chunk = data[pos:pos+BUFSIZE]
count += chunk.count("E")
pos += BUFSIZE
print(f"There are {count} occurences of the letter E in the random string, i.e. {100.*count/SIZE}%.")
```
 
%% Cell type:markdown id:52da6084-96a9-4bdc-b3f3-c429ff7ba865 tags:
 
### 2.3 Indextabellen
 
Relationale Datenbanken erlauben die Definition mehrerer Indizes in einer Datentabelle und auch
über Tabellengrenzen hinweg ([__foreign key__](https://www.postgresqltutorial.com/postgresql-tutorial/postgresql-foreign-key/)). Die Möglichkeit, hierdurch auch komplexere Indextabellen aufzubauen,
ist ein entscheidender Punkt für die Effizienz (=Zugriffsgeschwindigkeit) relationaler Datenbanken.
 
In NoSQL Datenbanken (z.B. [MongoDB](https://www.mongodb.com/) oder [Apache Cassandra](https://cassandra.apache.org/_/index.html) gibt es keine sekundären Indizes; dort muss man
Indextabellen selbst erstellen (siehe z.B. [https://learn.microsoft.com/en-us/azure/architecture/patterns/index-table]).
 
Das Prinzip von Indextabellen kann man auch außerhalb von Datenbank-Anwendungen nutzen. So wird z.B. in
manchen Dateiformaten (z.B. [netCDF](https://www.unidata.ucar.edu/software/netcdf/))
neben den eigentlichen Daten ein header gespeichert, der Metadaten enthält und
eben Indizes zu den einzelnen Datenfeldern.
 
![grafik.png](attachment:b8918922-d5e0-4e89-9818-6e75200c57c2.png)
 
%% Cell type:code id:6ff96347-7076-4fdb-a11f-c832e6eaba3c tags:
 
``` python
# Programmbeispiel
# create some data arrays of arbitrary size and store them in a file with an index table
import numpy as np
from pprint import pprint
import os
import json
 
os.makedirs("data", exist_ok=True)
 
dlength = [ 100, 20000, 57, 23456, 100 ] # length of data arrays
container = [ np.full(length, float(enum+1)) for enum, length in enumerate(dlength) ]
pprint(container)
 
# create index table as dictionary - note that first entry is 0, and last entry should be removed
index = { "var1": 0 }
mem = 0
for enum, ar in enumerate(container[:-1]):
index[f"var{enum+2}"] = mem + ar.nbytes
mem += ar.nbytes
pprint(index)
# store data as binary file
MAGIC = b"\xf0\xf0\x00\x00"
with open("data/test_index_file.dat", "wb") as f:
# write index table as json string
f.write(json.dumps(index).encode("utf-8"))
# add CRLF as end marker
f.write(MAGIC)
# write data tables
for ar in container:
f.write(ar)
 
# now let's try to read the file and access var3
# read index table
with open("data/test_index_file.dat", "rb") as f:
buf = f.read(10000)
# search for MAGIC sequnce, i.e. end of index
imagic = buf.find(MAGIC)
index = json.loads(buf[:imagic].decode("utf-8"))
# calculate length of variables
vlen = { f"var{enum}": index[f"var{enum+1}"] - index[f"var{enum}"] for enum in range(1, len(index)) }
# add last element - ends with length of file
f.seek(0, os.SEEK_END)
vlen[f"var{len(index)}"] = f.tell() - index[f"var{len(index)}"]
print("reconstructed index table: ", index, "\nlength of data arrays: ", vlen)
# load var3 array
f.seek(imagic+len(MAGIC)+index["var3"])
ar = np.frombuffer(f.read(vlen["var3"]))
print("reconstructed data array: ", ar)
```
 
%% Cell type:markdown id:0c7393f8-7ab4-4ce0-a845-0b2fe15cd110 tags:
 
__Anmerkungen:__
Dieser Code setzt implizit voraus, dass die numpy arrays aus float Werten bestehen und die arrays 1-dimensional sind. Man kann natürlich den Datentyp und die array shapes ebenfalls im Header speichern.
Allerdings sollte man stattdessen sehr wahrscheinlich auf ein existierendes Dateiformat wie
netCDF oder HDF5 wechseln.
 
%% Cell type:markdown id:a9c70979-53d7-4f20-adf2-fd863a3ee92a tags:
 
### 2.4 Memory Mapping
 
Um auf Daten in besonders großen Dateien flexibel zugreifen zu können (random access) bietet sich u.U. __memory mapping__ an. Dabei wird ein Teil der großen Datei in den Hauptspeicher abgebildet; für die Nutzer:innen fühlt es sich aber so an, als würde man direkt in der Datei arbeiten. Für Details hierzu siehe z.B. [Python 3 Dokumentation](https://docs.python.org/3/library/mmap.html).
 
%% Cell type:markdown id:0ecf8add-47f6-498f-8943-bd1a4aa0852c tags:
 
## 3. Moderne Design-Patterns
 
Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/
 
### 3.1 Asynchrone Prozessierung
 
In modernen, verteilten Systemen erfolgt der Datenaustausch sehr häufig über REST APIs (siehe z.B. auch unsere [TOAR Datenbank](https://toar-data.fz-juelich.de/api/v2/) am JSC. Bei solchen http(s)-basierten Systemen kann es aus verschiedenen Gründen zu Verzögerungen kommen. Z.B.:
* Systemsoftware und Sicherheitskomponenten auf dem Server
* Internetanbindung (v.a. bei IoT Sensoren)
* Auslastung der Server
* Größe der Anfrage und Antwort
* Rechenintensive Prozessierung vor Audslieferung der Daten
 
Asynchrone Abfragen ermöglichen es, mehrere Abfragen parallel auszuführen, ohne jeweils auf Fertigstellung einzelner Abfragen warten zu müssen.
 
Das folgende Programmbeispiel realisiert eine asynchrone Abfrage an die TOAR-Datenbank am JSC. Man kann diverse Zeitserien-IDs in dem Feld ```series```angeben und diese Zeitserien werden dann quasi-parallel asynchron heruntergeladen.
 
%% Cell type:code id:9ad4595f-ce4e-4619-b526-200b6b3701e0 tags:
 
``` python
# Programm-Beispiel
# download a few time series datasets from the TOAR database at JSC in parallel
 
import requests
import asyncio
from aiohttp import ClientSession
## from aiohttp import ClientOSError, ClientResponseError, ClientSession, ClientTimeout, ServerTimeoutError, TCPConnector
import os
 
# system settings
TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2"
# CONNECT_TIMEOUT = 3.05
# DATA_READ_TIMEOUT = 20
# MAX_TRIES = 3
# POOL_SIZE = 8
 
# the following are arbitrary dataset ids for demonstration; the last one should fail
series = [ 2834, 3333, 7243, 10112, 999999999 ]
 
os.makedirs("data", exist_ok=True)
 
# the download routine
async def download(session, series_id):
url = f"{TOAR_SERVICE_URL}/data/timeseries/{series_id}?format=csv"
print(f"Loading data for time series id {series_id} from {url}.")
async with session.get(url) as resp:
status = resp.status
if status == 200:
content = await resp.text()
with open(f"data/toar_timeseries_{series_id}.csv", "w") as f:
f.write(content)
print("Download of series id {series_id} complete.")
else:
print(f"Download of series {series_id} failed.")
return status
 
 
# the asynchronous driver
async def main(session, coroutine, params=None):
tasks = [asyncio.create_task(coroutine(session, s)) for s in params]
results = None
try:
results = await asyncio.gather(*tasks)
except Exception as e:
print("failed", e)
for task in tasks:
task.cancel()
await asyncio.sleep(1)
return results
 
 
session = ClientSession()
a = await main(session, download, series)
print(a)
await session.close()
```
 
%% Cell type:markdown id:b3df5141-3564-491a-adde-34e7cbea24d7 tags:
 
Im Falle sehr großer Datenmengen wird die Dauer eines einzelnen Tasks ("download") zu lang. Dann empfiehlt es sich, den Workflow wie folgt aufzusetzen:
![grafik.png](attachment:dec7c0f8-232d-468a-b055-b7115fbb0051.png)
 
_Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/async-request-reply_
 
In diesem Schema kann man z.B. den Status-Endpunkt dazu nutzen, eine Email an den User zu generieren, die einen Link zu einem speziellen Resource-Endpunkt enthält. Klickt der User auf diesen Link, werden die vorbereiteten Daten heruntergeladen.
 
%% Cell type:markdown id:c0c24d27-933b-4aa1-ae85-a81a6c78b6ae tags:
 
### 3.2 Caching
 
Viele Datenprozessierungsketten erzeugen aggregierte Daten aus einer großen Datenmenge, die dann
zur weiteren Verarbeitung oder Analyse an die Nutzerin geschickt werden. Ein typisches Beispiel aus
der Erdsystemforschung ist die Erzeugung von (Monats)Mittelwerten aus den Dateien eines Klimamodells.
Der originale Output des Modells liegt z.B. stündlich oder 3-stündlich vor, d.h. pro Monatsmittelwert
müssen 672 bis 744 (bzw. 224 bis 248) Datenfelder gelesen und aufaddiert werden.
 
Wenn man davon ausgehen kann, dass viele Nutzerinnen an solchen aggregierten Daten interessiert sind, dann
kann sich das Caching dieser Daten lohnen. Der Cache kann entweder auf demselben Speichersystem (Festplatte)
liegen wie die Originaldaten, auf einem schnelleren Dateisystem
(z.B. [NVMe](https://www.cisco.com/c/en/us/solutions/computing/what-is-nvme.html\#~benefits-of-nvme)),
oder im Memory (z.B. [redis](https://redis.com/solutions/use-cases/caching/)).
 
Solch ein "explizites Caching" ist von automatischem Caching zu unterscheiden, das z.B. in der CPU oder durch das Betriebssystem auf dem Dateisystem stattfindet.
 
![grafik.png](attachment:08124af3-a92f-4e74-a2dc-ab5fdadc047e.png)
 
_Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/cache-aside_
 
Zu beachten ist bei Caching vor allem, dass die Datenkonsistenz gewährleistet werden sollte. Dazu gibt es
verschiedene Ansätze, je nach Anwendung.
 
_Diskussion über verschiedene Anwendungsfälle und deren Auswirkungen auf den Caching-Mechanismus_
 
Ferner sollte man überlegen, ob es eine Bevorzugung bestimmter Daten geben sollte, die z.B. besonders
häufig nachgefragt werden oder besonders aufwendig zu berechnen sind. Und dann spielt bei der Auslegung
eines Caches noch eine Rolle, wie lange die dort enthaltenen Daten leben sollen.
 
Generell gilt: Cache-Speicherplatz ist teuer und begrenzt.
 
%% Cell type:markdown id:761a342b-ccbf-44e6-9c36-8cbffbd9dc24 tags:
 
### 3.3 Messaging und mehrere Server-Instanzen
 
Eine gewöhnliche REST API prozessiert die eingehenden Anfragen seriell. In Abschnitt 3.1 haben
wir bereits asynchrone Abfragen kennengelernt. Allerdings erreichen auch diese ihr Limit bei sehr
vielen Anfragen bzw. sehr datenintensiven Transfers. Eine Möglichkeit, solche Datenservices zu
skalieren besteht darin, die Entgegennahme der Anfragen von ihrer Bearbeitung zu trennen und dann
mehrere Instanzen des eigentlichen Datenservers aufzusetzen. Der Webservice, den die Nutzer:innen zu sehen
bekommen, übernimmt dann lediglich eine Umsetzung der Anfragen in eine Warteschlange von Nachrichten,
die dann asynchron an die nächste freie Datenserver-Instanz gesendet werden.
 
![grafik.png](attachment:cee91b15-0e8d-4c39-ad44-ece70750892e.png)
 
_Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/competing-consumers_
 
Man kann u.U. sogar die einzelnen Datenserver dynamisch hoch- oder runterfahren, je nach aktueller
Auslastung. _--> siehe Container und Service-Orchestrierung_ So etwas wird immer bedeutender, auch um
Energie zu sparen.
 
Wichtig ist bei diesem Pattern, dass alle Anfragen __idempotent__ sind, d.h. dass nachfolgende Abfragen
nicht auf den Ergebnissen früherer Abfragen basieren. Möchte man z.B. gleitende Mittelwerte aus
Modelldateien berechnen, dann benötigt man jeweils einige Zeitschritte vor und nach dem eigentlichen
Zeitraum, der prozessiert werden soll. Bei der klassischen, sequentiellen Bearbeitung kann man jeweils
ein paar Zeitschritte zwischenspeichern und macht dann einfach weiter. Im asynchronen Umfeld müssen
diese zusätzlichen Zeitschritte jeweils neu geladen werden, da man sich nicht auf die Reihenfolge der
Prozessierung verlassen kann.
 
%% Cell type:markdown id:1575bc17-2a54-408b-a984-9b827df8de42 tags:
 
### 3.4 Sharding
 
Beim sharding werden ebenfalls verteilte Instanzen eines (Datenbank) Servers aufgesetzt. Allerdings wird hier der Datenraum nach festgelegten Regeln auf die einzelnen Server verteilt. Beim Zugriff erscheint das dann wie eine einzige riesige Datenbank.
 
Es gibt verschiedene Strategien, wie die Daten aufgeteilt werden können. Neben einfach zu überblickenden Konzepten (wie z.B. ein shard pro Kalenderjahr) empfiehlt sich insbesondere eine Aufteilung über __key hashes__. Dies verhindert, dass einzelne shards sehr viel Last bekommen, während andere unterbeschäftigt bleiben.
 
![grafik.png](attachment:51af7cf4-ff41-4d39-8126-87e582be0af8.png)
 
_Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/sharding_
 
%% Cell type:markdown id:b7de3976-13f8-4433-b9e9-65d944c31603 tags:
 
### 3.5 Hadoop und MapReduce
 
In den letzten Jahren wurden mehrere Plattformen entwickelt, mit denen eine verteilte, skalierbare Analyse großer Datenmengen ermöglicht wird. Ein Beispiel ist [Apache Hadoop](https://hadoop.apache.org/docs/stable/hadoop-project-dist/).
 
"Hadoop ist ein [java-basiertes] Open-source Framework, das es ermöglicht große Datenmengen in einer verteilten Umgebung auf mehreren Clustern zu speichern und prozessieren. Dabei werden einfache Programmiermodelle verwendet. Hadoop skaliert von Einzelsystemen bis zu Systemen mit mehreren tausend Maschinen, die jeweils [gewöhnliche Hardware für] Computing und Storage beinhalten."
 
_Quelle: https://www.tutorialspoint.com/hadoop/index.htm, übersetzt von M. Schultz_
 
Die wichtigsten Elemente von Hadoop umfassen:
* das Hadoop Distributed File System (HDFS)
* das Resourchen-Managementsystem Yet Another Resource Negotiator (YARN)
* den von Google entwickelten MapReduce Algorithmus
 
![grafik.png](attachment:42493d13-6d3a-464e-b4c6-07129962566c.png)
 
_Quelle: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html_
 
Das __HDFS__ stellt sich dem Benutzer als ein großes Dateisystem dar mit Verzeichnissen und Dateien. Intern werden die Dateien in Blöcke aufgeteilt, sodass eine parallele Prozessierung möglich wird. Es gibt in jedem Hadoop System genau einen __Namenode__ und beliebig viele __Datanodes__. Der Namenode verwaltet die Metadaten und Zugriffe, es fließen jedoch keine Daten über ihn.
 
Das HDFS wurde optimiert auf __WORM__ (write once, read multiple) Zugriffe und ist vor allem für Streaming performant.
 
Hadoop baut auf der Prämisse auf, dass in einem großen, verteilten System immer irgend etwas nicht funktioniert. Daher gibt es diverse Sicherheitsmechanismen, um Ausfälle zu vermeiden bzw. schnell reparieren zu können. Beispielsweise werden die Datenblöcke redundant gespeichert.
 
Das __MapReduce__ framework erlaubt es, mit relativ einfachen Befehlen prallele Datenprozessierung von TByte-scale Datensätzen auf einem verteilten System mit bis zu mehreren Tausend Knoten auszuführen. Vom Konzept her werden Inputdaten in Chunks aufgeteilt (siehe oben), die dann parallel berechnet werden können ("mapping"). Die Ergebnisse dieser Rechnungen werden anschließend im "Reduce" Schritt zusammengeführt. Dabei werden __key, value__ Paare benutzt.
 
Der/die Nutzer:in schreibt eine __map__ Funktion, die aus einem Input Key-Value-Paar ein Output Key-Value-Paar mit einem intermediären Schlüssel (= key) erzeugt. Ferner erstellt er/sie eine __reduce__ Funktion, welche alle Daten mit demselben Schlüssel zusammenführt (= merge). Typischerweise erzeugt eine reduce-Operation genau einen oder keinen Outputwert. Die MapReduce Bibliothek sorgt dafür, dass alle Key-Value-Paare mit demselben Schlüssel zusammengeführt werden.
 
Hier ein Beispiel für eine map und eine reduce Funktion zum Zählen von Wörtern in einer großen Menge an text-Dokumenten aus dem Original-Artikel:
 
```Java
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
 
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
```
 
_Original Paper: Jeffrey Dean Sanjay Ghemawat (2004): MapReduce: Simplified Data Processing on Large Clusters, OSDI'04: Sixth Symposium on Operating System Design and Implementation, San Francisco, CA (2004), pp. 137-150._
 
Die Resourcenverwaltung obliegt __YARN__, das aus einem __ResourceManager__ für das Gesamtsystem, jeweils einem __NodeManager__ pro Cluster-Knoten und jeweils einem __MRAppMaster__ pro Anwendung besteht. Jede MapReduce Anwendung wird durch eine Konfigursation beschrieben, die zumindest die Input- und Outputdaten spezifiziert und die auszuführenden __map__ und __reduce__ Kommandos. YARN übernimmt das Scheduling, überwacht die Ausführung der einzelnen Tasks und startet abgebrochene Jobs erneut falls nötig. Hadoop ist damit vor allem für Batch-Anwendungen ausgelegt. Die Berechnungen werden möglichst dort ausgeführt, wo die Daten liegen, so dass unnötige Datentransfers vermieden werden.
Die Resourcenverwaltung obliegt __YARN__, das aus einem __ResourceManager__ für das Gesamtsystem, jeweils einem __NodeManager__ pro Cluster-Knoten und jeweils einem __MRAppMaster__ pro Anwendung besteht. Jede MapReduce Anwendung wird durch eine Konfiguration beschrieben, die zumindest die Input- und Outputdaten spezifiziert und die auszuführenden __map__ und __reduce__ Kommandos. YARN übernimmt das Scheduling, überwacht die Ausführung der einzelnen Tasks und startet abgebrochene Jobs erneut falls nötig. Hadoop ist damit vor allem für Batch-Anwendungen ausgelegt. Die Berechnungen werden möglichst dort ausgeführt, wo die Daten liegen, so dass unnötige Datentransfers vermieden werden.
 
%% Cell type:code id:fdb0e25c-b363-48cc-aefc-66c914cf223b tags:
 
``` python
```
......
This diff is collapsed.