Skip to content
Snippets Groups Projects
Commit fc6ae7fe authored by Martin Schultz's avatar Martin Schultz
Browse files

Added Outlook section

parent 028c282d
No related branches found
No related tags found
No related merge requests found
%% Cell type:markdown id:6cec99d8-cf59-4320-9fa2-4a3ddfcada34 tags: %% Cell type:markdown id:6cec99d8-cf59-4320-9fa2-4a3ddfcada34 tags:
   
# Design Pattern für die Verarbeitung von großen Datenmengen # Design Pattern für die Verarbeitung von großen Datenmengen
   
## Inhalte ## Inhalte
   
   
   
%% Cell type:markdown id:f5b5c09e-7599-4768-a7f6-8cb2490f2290 tags: %% Cell type:markdown id:f5b5c09e-7599-4768-a7f6-8cb2490f2290 tags:
   
## 1. Motivation ## 1. Motivation
   
_Dieser Teil würde normalerweise interaktiv in einer Diskussion entwickelt und in einem separaten Notebook zusammengefasst._ _Dieser Teil würde normalerweise interaktiv in einer Diskussion entwickelt und in einem separaten Notebook zusammengefasst._
   
### 1.1 Klassische "large data" Systeme ### 1.1 Klassische "large data" Systeme
   
![grafik.png](attachment:8254ff1d-93b1-404d-8e8c-f3d2712faf09.png) ![grafik.png](attachment:8254ff1d-93b1-404d-8e8c-f3d2712faf09.png)
   
### 1.2 Moderne, verteilte, cloud-basierte Systeme ### 1.2 Moderne, verteilte, cloud-basierte Systeme
   
Beispiel: Beispiel:
![grafik.png](attachment:61573f5c-a3b6-438c-87a0-f54d8f397973.png) ![grafik.png](attachment:61573f5c-a3b6-438c-87a0-f54d8f397973.png)
   
_Source: https://learn.microsoft.com/en-us/azure/architecture/example-scenario/data/geospatial-data-processing-analytics-azure_ _Source: https://learn.microsoft.com/en-us/azure/architecture/example-scenario/data/geospatial-data-processing-analytics-azure_
   
In einem solchen verteilten System kommen zu Kapazität, Wartezeit und Durchsatz noch mehrere wichtige Design-Parameter hinzu. Je nach Anwendung sind beim Aufbau einer Prozessierungskette u.U. zu beachten: In einem solchen verteilten System kommen zu Kapazität, Wartezeit und Durchsatz noch mehrere wichtige Design-Parameter hinzu. Je nach Anwendung sind beim Aufbau einer Prozessierungskette u.U. zu beachten:
* Ausfallsicherheit bzw. Umgang mit fehlenden Komponenten/Monitoring * Ausfallsicherheit bzw. Umgang mit fehlenden Komponenten/Monitoring
* Minimierung von Datentransfers * Minimierung von Datentransfers
* Portabilität über Systemgrenzen * Portabilität über Systemgrenzen
* Harmonisierung der Datenstrukturen und -formate * Harmonisierung der Datenstrukturen und -formate
* Zwischenspeicherung von Ergebnissen (Sicherung vor Ausfall, leichtere/schnellere Wiederverwendbarkeit) * Zwischenspeicherung von Ergebnissen (Sicherung vor Ausfall, leichtere/schnellere Wiederverwendbarkeit)
   
%% Cell type:markdown id:6cdedf41-b175-4a6c-b805-13e81902754b tags: %% Cell type:markdown id:6cdedf41-b175-4a6c-b805-13e81902754b tags:
   
## 2. Klassische Design-Patterns ## 2. Klassische Design-Patterns
   
### 2.1 Extract - Transform - Load (ETL) ### 2.1 Extract - Transform - Load (ETL)
   
Das ETL Schema beschreibt ganz allgemein das Zurverfügungstellen von Daten. Der Grundgedanke ist, Das ETL Schema beschreibt ganz allgemein das Zurverfügungstellen von Daten. Der Grundgedanke ist,
dass die Daten im Originalformat vorliegen, sei es in einer Datenbank oder als Dateien, und dass dass die Daten im Originalformat vorliegen, sei es in einer Datenbank oder als Dateien, und dass
man zur Verarbeitung der Daten wahrscheinlich bestimmte Transformationen vornehmen muss. Durch die man zur Verarbeitung der Daten wahrscheinlich bestimmte Transformationen vornehmen muss. Durch die
Trennung in drei Schritte lassen sich Optimierungen gezielt auf die Problemstellung anpassen. Trennung in drei Schritte lassen sich Optimierungen gezielt auf die Problemstellung anpassen.
   
[Dieser Blog-Artikel](https://www.dbta.com/Editorial/Trends-and-Applications/Designing-an-ETL-Design-Pattern-145438.aspx) beschreibt anschaulich einige Dinge, auf die man beim ETL-Design achten sollte. [Dieser Blog-Artikel](https://www.dbta.com/Editorial/Trends-and-Applications/Designing-an-ETL-Design-Pattern-145438.aspx) beschreibt anschaulich einige Dinge, auf die man beim ETL-Design achten sollte.
   
__Schritt 1:__ Es ist gute Praxis, immer von den Originaldaten auszugehen und den Informationsreichtum __Schritt 1:__ Es ist gute Praxis, immer von den Originaldaten auszugehen und den Informationsreichtum
in den Originaldaten zu erhalten. in den Originaldaten zu erhalten.
   
__Schritt 2:__ Vor der eigentlichen Transformation sollten die Daten "gereinigt" werden. Dies bedeutet insbesondere eine Überprüfung, ob die Transformation überhaupt sinnvoll ausgeführt werden kann und ob die Daten das beinhalten, was man erwartet (z.B. gültiger Wertebereich). Hierbei ist es gute Praxis, fehlerhafte Daten nicht einfach zu löschen, sondern sie stattdessen mit einem Flag zu versehen, das den Fehlercode angibt (z.B. bei Messdaten). __Schritt 2:__ Vor der eigentlichen Transformation sollten die Daten "gereinigt" werden. Dies bedeutet insbesondere eine Überprüfung, ob die Transformation überhaupt sinnvoll ausgeführt werden kann und ob die Daten das beinhalten, was man erwartet (z.B. gültiger Wertebereich). Hierbei ist es gute Praxis, fehlerhafte Daten nicht einfach zu löschen, sondern sie stattdessen mit einem Flag zu versehen, das den Fehlercode angibt (z.B. bei Messdaten).
   
Eine beliebte Methode des Bereinigens erfolgt mit Hilfe einer (temporären) Datenbank. Eine beliebte Methode des Bereinigens erfolgt mit Hilfe einer (temporären) Datenbank.
``` ```
insert into insert into
select from select from
``` ```
nutzt die Kontrollfilter der Datenbank, um viele Formatierungsfehler zu finden und auszumerzen. nutzt die Kontrollfilter der Datenbank, um viele Formatierungsfehler zu finden und auszumerzen.
   
Beim Aufsetzen der Transformationen sollte man eine gute Balance zwischen Performance und Komplexität finden. Bis auf wenige Ausnahmen ist die zweitschnellste Lösung oft die bessere, wenn der Code dadurch Beim Aufsetzen der Transformationen sollte man eine gute Balance zwischen Performance und Komplexität finden. Bis auf wenige Ausnahmen ist die zweitschnellste Lösung oft die bessere, wenn der Code dadurch
lesbarer, modularer und besser wartbar wird. Performance-Optimierung sollt eman nur vornehmen, wenn man sie wirklich benötigt. lesbarer, modularer und besser wartbar wird. Performance-Optimierung sollt eman nur vornehmen, wenn man sie wirklich benötigt.
   
Das Rendern (Formatieren) der Daten zählt ebenfalls zu den Transformationen und ist üblicherweise der Das Rendern (Formatieren) der Daten zählt ebenfalls zu den Transformationen und ist üblicherweise der
letzte Schritt. letzte Schritt.
   
__Schritt 3:__ Der Load (oder auch Publish) Schritt sollte nur genau das machen, also insbesondere keine weiteren Transformationen anwenden. Hier geht es nur noch um die Auslieferung der Daten. __Schritt 3:__ Der Load (oder auch Publish) Schritt sollte nur genau das machen, also insbesondere keine weiteren Transformationen anwenden. Hier geht es nur noch um die Auslieferung der Daten.
   
Aaron Segesmann beschreibt in dem oben verlinkten Blog folgende Design-Muster für die Auslieferung von Daten: Aaron Segesmann beschreibt in dem oben verlinkten Blog folgende Design-Muster für die Auslieferung von Daten:
1. Truncate and load - d.h. zuvor ausgelieferte Daten werden gelöscht und die neu transformierten Daten an deren Stelle gesetzt. Beispiel: Neuaufbau einer Webseite nach Aktualisierung. 1. Truncate and load - d.h. zuvor ausgelieferte Daten werden gelöscht und die neu transformierten Daten an deren Stelle gesetzt. Beispiel: Neuaufbau einer Webseite nach Aktualisierung.
2. Data surgery - einzelne Elemente werden gezielt ausgetauscht. Diese Methode lohnt oft, wenn weniger als 40 % der Daten geändert wurden. 2. Data surgery - einzelne Elemente werden gezielt ausgetauscht. Diese Methode lohnt oft, wenn weniger als 40 % der Daten geändert wurden.
3. Append - die neuen Daten werden an die vorhandenen angehängt. 3. Append - die neuen Daten werden an die vorhandenen angehängt.
4. Blue/green - hier gibt es zwei parallel existierende Systeme (z.B. Server), von denen einer aktiv, der andere inaktiv ist. Dann werden im Hintergrund die neuen Daten auf den inaktiven Server gespielt und wenn das passiert ist, wird dieser aktiviert, während der vorher aktive inaktiviert wird. 4. Blue/green - hier gibt es zwei parallel existierende Systeme (z.B. Server), von denen einer aktiv, der andere inaktiv ist. Dann werden im Hintergrund die neuen Daten auf den inaktiven Server gespielt und wenn das passiert ist, wird dieser aktiviert, während der vorher aktive inaktiviert wird.
   
%% Cell type:markdown id:91b32cfe-06ba-4984-9849-151171967e7a tags: %% Cell type:markdown id:91b32cfe-06ba-4984-9849-151171967e7a tags:
   
### 2.2 Chunking und Tiling ### 2.2 Chunking und Tiling
   
Ein Merkmal von "Großen Daten" (= Big Data) ist es, dass nicht alle Daten auf einmal in den Speicher passen. Daraus ergibt sich zwangsläufig die Notwendigkeit, die Daten stückweise zu prozessieren. Ein Merkmal von "Großen Daten" (= Big Data) ist es, dass nicht alle Daten auf einmal in den Speicher passen. Daraus ergibt sich zwangsläufig die Notwendigkeit, die Daten stückweise zu prozessieren.
   
Beim Chunking (= Stückeln) liest man die Daten blockweise ein, verarbeitet sie und gibt sie wieder aus oder hängt die Ergebnisse an eine Liste an. Beim Chunking (= Stückeln) liest man die Daten blockweise ein, verarbeitet sie und gibt sie wieder aus oder hängt die Ergebnisse an eine Liste an.
   
Tiling (= Kacheln) wird verwendet, um 2-dimensionale Daten (z.B. Modell-Output oder Satellitendaten aus der Erdsystemforschung) in einzelnen Abschnitten zu verarbeiten. Ein gutes Beispiel hierfür sind die sogenannten Tile-Server von Google Maps oder [Open Street Map](https://wiki.openstreetmap.org/wiki/Tile_servers). Ein solcher Server wandelt die als Vektordaten gespeicherten Informationen zu Straßen, Plätzen und anderen Landschaftsmerkmalen in Bilder um, wobei jeweils nur die Daten aus einer bestimmten Region (bounding box) prozessiert werden. Die Bilder können in verschiedenen Zoom-Stufen erzeugt (= gerendert) werden und man kann bestimmen, welche Bilder für schnellere Zugriffe in einem Cache abgelgt werden sollen (zu Caching, siehe Abschnitt 3.x). Tiling (= Kacheln) wird verwendet, um 2-dimensionale Daten (z.B. Modell-Output oder Satellitendaten aus der Erdsystemforschung) in einzelnen Abschnitten zu verarbeiten. Ein gutes Beispiel hierfür sind die sogenannten Tile-Server von Google Maps oder [Open Street Map](https://wiki.openstreetmap.org/wiki/Tile_servers). Ein solcher Server wandelt die als Vektordaten gespeicherten Informationen zu Straßen, Plätzen und anderen Landschaftsmerkmalen in Bilder um, wobei jeweils nur die Daten aus einer bestimmten Region (bounding box) prozessiert werden. Die Bilder können in verschiedenen Zoom-Stufen erzeugt (= gerendert) werden und man kann bestimmen, welche Bilder für schnellere Zugriffe in einem Cache abgelgt werden sollen (zu Caching, siehe Abschnitt 3.x).
   
%% Cell type:code id:cafca7df-b8b4-4caa-966c-d034efff0611 tags: %% Cell type:code id:cafca7df-b8b4-4caa-966c-d034efff0611 tags:
   
``` python ``` python
# Etwas sinnfreies Beispielprogramm zum Chunking # Etwas sinnfreies Beispielprogramm zum Chunking
# let's count the number of occurences of the letter E in a randomly generated long string # let's count the number of occurences of the letter E in a randomly generated long string
import numpy as np import numpy as np
SIZE = 999967 # purposefully not a round number SIZE = 999967 # purposefully not a round number
data = "".join([chr(x) for x in np.random.randint(65, 91, SIZE)]) data = "".join([chr(x) for x in np.random.randint(65, 91, SIZE)])
print(data[:1000]) print(data[:1000])
# set up result counter and loop # set up result counter and loop
BUFSIZE = 200 # process 200 characters at a time BUFSIZE = 200 # process 200 characters at a time
count = 0 count = 0
pos = 0 pos = 0
while pos < len(data): while pos < len(data):
chunk = data[pos:pos+BUFSIZE] chunk = data[pos:pos+BUFSIZE]
count += chunk.count("E") count += chunk.count("E")
pos += BUFSIZE pos += BUFSIZE
print(f"There are {count} occurences of the letter E in the random string, i.e. {100.*count/SIZE}%.") print(f"There are {count} occurences of the letter E in the random string, i.e. {100.*count/SIZE}%.")
``` ```
   
%% Cell type:markdown id:52bcba8e-32f3-4694-979c-801591f7e701 tags: %% Cell type:markdown id:52bcba8e-32f3-4694-979c-801591f7e701 tags:
   
Als Alternative zum Chunking weisen moderne Tools oft Konzepte wie __memory mapping__ auf. Dabei wird ein Teil einer sehr großen Datei in den Hauptspeicher abgebildet; für den User fühlt es sich aber so an, als würde man direkt in der Datei arbeiten. Für Details hierzu siehe z.B. [Python 3 Dokumentation](https://docs.python.org/3/library/mmap.html). Als Alternative zum Chunking weisen moderne Tools oft Konzepte wie __memory mapping__ auf. Dabei wird ein Teil einer sehr großen Datei in den Hauptspeicher abgebildet; für den User fühlt es sich aber so an, als würde man direkt in der Datei arbeiten. Für Details hierzu siehe z.B. [Python 3 Dokumentation](https://docs.python.org/3/library/mmap.html).
   
%% Cell type:markdown id:52da6084-96a9-4bdc-b3f3-c429ff7ba865 tags: %% Cell type:markdown id:52da6084-96a9-4bdc-b3f3-c429ff7ba865 tags:
   
### 2.3 Indextabellen ### 2.3 Indextabellen
   
Relationale Datenbanken erlauben die Definition mehrerer Indizes in einer Datentabelle und auch Relationale Datenbanken erlauben die Definition mehrerer Indizes in einer Datentabelle und auch
über Tabellengrenzen hinweg ([__foreign key__](https://www.postgresqltutorial.com/postgresql-tutorial/postgresql-foreign-key/)). Die Möglichkeit, hierdurch auch komplexere Indextabellen aufzubauen, über Tabellengrenzen hinweg ([__foreign key__](https://www.postgresqltutorial.com/postgresql-tutorial/postgresql-foreign-key/)). Die Möglichkeit, hierdurch auch komplexere Indextabellen aufzubauen,
ist ein entscheidender Punkt für die Effizienz (=Zugriffsgeschwindigkeit) relationaler Datenbanken. ist ein entscheidender Punkt für die Effizienz (=Zugriffsgeschwindigkeit) relationaler Datenbanken.
   
In NoSQL Datenbanken (z.B. [MongoDB](https://www.mongodb.com/) oder [Apache Cassandra](https://cassandra.apache.org/_/index.html) gibt es keine sekundären Indizes; dort muss man In NoSQL Datenbanken (z.B. [MongoDB](https://www.mongodb.com/) oder [Apache Cassandra](https://cassandra.apache.org/_/index.html) gibt es keine sekundären Indizes; dort muss man
Indextabellen selbst bauen (siehe z.B. [https://learn.microsoft.com/en-us/azure/architecture/patterns/index-table]). Indextabellen selbst bauen (siehe z.B. [https://learn.microsoft.com/en-us/azure/architecture/patterns/index-table]).
   
Das Prinzip von Indextabellen kann man auch außerhalb von Datenbank-Anwendungen nutzen. So wird z.B. in Das Prinzip von Indextabellen kann man auch außerhalb von Datenbank-Anwendungen nutzen. So wird z.B. in
manchen Dateiformaten (z.B. [netCDF](https://www.unidata.ucar.edu/software/netcdf/)) manchen Dateiformaten (z.B. [netCDF](https://www.unidata.ucar.edu/software/netcdf/))
neben den eigentlichen Daten ein header gespeichert, der Metadaten enthält und neben den eigentlichen Daten ein header gespeichert, der Metadaten enthält und
eben Indizes zu den einzelnen Datenfeldern. eben Indizes zu den einzelnen Datenfeldern.
   
![grafik.png](attachment:b8918922-d5e0-4e89-9818-6e75200c57c2.png) ![grafik.png](attachment:b8918922-d5e0-4e89-9818-6e75200c57c2.png)
   
%% Cell type:code id:6ff96347-7076-4fdb-a11f-c832e6eaba3c tags: %% Cell type:code id:6ff96347-7076-4fdb-a11f-c832e6eaba3c tags:
   
``` python ``` python
# Programmbeispiel # Programmbeispiel
# create some data arrays of arbitrary size and store them in a file with an index table # create some data arrays of arbitrary size and store them in a file with an index table
import numpy as np import numpy as np
from pprint import pprint from pprint import pprint
import os import os
import json import json
   
os.makedirs("data", exist_ok=True) os.makedirs("data", exist_ok=True)
   
dlength = [ 100, 20000, 57, 23456, 100 ] # length of data arrays dlength = [ 100, 20000, 57, 23456, 100 ] # length of data arrays
container = [ np.full(length, float(enum+1)) for enum, length in enumerate(dlength) ] container = [ np.full(length, float(enum+1)) for enum, length in enumerate(dlength) ]
pprint(container) pprint(container)
   
# create index table as dictionary - note that first entry is 0, and last entry should be removed # create index table as dictionary - note that first entry is 0, and last entry should be removed
index = { "var1": 0 } index = { "var1": 0 }
mem = 0 mem = 0
for enum, ar in enumerate(container[:-1]): for enum, ar in enumerate(container[:-1]):
index[f"var{enum+2}"] = mem + ar.nbytes index[f"var{enum+2}"] = mem + ar.nbytes
mem += ar.nbytes mem += ar.nbytes
pprint(index) pprint(index)
# store data as binary file # store data as binary file
MAGIC = b"\xf0\xf0\x00\x00" MAGIC = b"\xf0\xf0\x00\x00"
with open("data/test_index_file.dat", "wb") as f: with open("data/test_index_file.dat", "wb") as f:
# write index table as json string # write index table as json string
f.write(json.dumps(index).encode("utf-8")) f.write(json.dumps(index).encode("utf-8"))
# add CRLF as end marker # add CRLF as end marker
f.write(MAGIC) f.write(MAGIC)
# write data tables # write data tables
for ar in container: for ar in container:
f.write(ar) f.write(ar)
   
# now let's try to read the file and access var3 # now let's try to read the file and access var3
# read index table # read index table
with open("data/test_index_file.dat", "rb") as f: with open("data/test_index_file.dat", "rb") as f:
buf = f.read(10000) buf = f.read(10000)
# search for MAGIC sequnce, i.e. end of index # search for MAGIC sequnce, i.e. end of index
imagic = buf.find(MAGIC) imagic = buf.find(MAGIC)
index = json.loads(buf[:imagic].decode("utf-8")) index = json.loads(buf[:imagic].decode("utf-8"))
# calculate length of variables # calculate length of variables
vlen = { f"var{enum}": index[f"var{enum+1}"] - index[f"var{enum}"] for enum in range(1, len(index)) } vlen = { f"var{enum}": index[f"var{enum+1}"] - index[f"var{enum}"] for enum in range(1, len(index)) }
# add last element - ends with length of file # add last element - ends with length of file
f.seek(0, os.SEEK_END) f.seek(0, os.SEEK_END)
vlen[f"var{len(index)}"] = f.tell() - index[f"var{len(index)}"] vlen[f"var{len(index)}"] = f.tell() - index[f"var{len(index)}"]
print("reconstructed index table: ", index, "\nlength of data arrays: ", vlen) print("reconstructed index table: ", index, "\nlength of data arrays: ", vlen)
# load var3 array # load var3 array
f.seek(imagic+len(MAGIC)+index["var3"]) f.seek(imagic+len(MAGIC)+index["var3"])
ar = np.frombuffer(f.read(vlen["var3"])) ar = np.frombuffer(f.read(vlen["var3"]))
print("reconstructed data array: ", ar) print("reconstructed data array: ", ar)
``` ```
   
%% Cell type:markdown id:0c7393f8-7ab4-4ce0-a845-0b2fe15cd110 tags: %% Cell type:markdown id:0c7393f8-7ab4-4ce0-a845-0b2fe15cd110 tags:
   
__Anmerkungen:__ __Anmerkungen:__
Dieser Code setzt implizit voraus, dass die numpy arrays aus float Werten bestehen und die arrays 1-dimensional sind. Man kann natürlich den Datentyp und die array shapes ebenfalls im Header speichern. Dieser Code setzt implizit voraus, dass die numpy arrays aus float Werten bestehen und die arrays 1-dimensional sind. Man kann natürlich den Datentyp und die array shapes ebenfalls im Header speichern.
Allerdings sollte man stattdessen sehr wahrscheinlich auf ein existierendes Dateiformat wie Allerdings sollte man stattdessen sehr wahrscheinlich auf ein existierendes Dateiformat wie
netCDF oder HDF5 wechseln. netCDF oder HDF5 wechseln.
   
%% Cell type:markdown id:0ecf8add-47f6-498f-8943-bd1a4aa0852c tags: %% Cell type:markdown id:0ecf8add-47f6-498f-8943-bd1a4aa0852c tags:
   
## 3. Moderne Design-Patterns ## 3. Moderne Design-Patterns
   
Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/ Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/
   
### 3.1 Asynchrone Prozessierung ### 3.1 Asynchrone Prozessierung
   
In modernen, verteilten Systemen erfolgt der Datenaustausch sehr häufig über REST APIs (siehe z.B. auch unsere [TOAR Datenbank](https://toar-data.fz-juelich.de/api/v2/) am JSC. Bei solchen http(s)-basierten Systemen kann es aus verschiedenen Gründen zu Verzögerungen kommen. Z.B.: In modernen, verteilten Systemen erfolgt der Datenaustausch sehr häufig über REST APIs (siehe z.B. auch unsere [TOAR Datenbank](https://toar-data.fz-juelich.de/api/v2/) am JSC. Bei solchen http(s)-basierten Systemen kann es aus verschiedenen Gründen zu Verzögerungen kommen. Z.B.:
* Systemsoftware und Sicherheitskomponenten auf dem Server * Systemsoftware und Sicherheitskomponenten auf dem Server
* Internetanbindung (v.a. bei IoT Sensoren) * Internetanbindung (v.a. bei IoT Sensoren)
* Auslastung der Server * Auslastung der Server
* Größe der Anfrage und Antwort * Größe der Anfrage und Antwort
* Rechenintensive Prozessierung vor Audslieferung der Daten * Rechenintensive Prozessierung vor Audslieferung der Daten
   
Asynchrone Abfragen ermöglichen es, mehrere Abfragen parallel auszuführen, ohne jeweils auf Fertigstellung einzelner Abfragen warten zu müssen. Asynchrone Abfragen ermöglichen es, mehrere Abfragen parallel auszuführen, ohne jeweils auf Fertigstellung einzelner Abfragen warten zu müssen.
   
%% Cell type:code id:9ad4595f-ce4e-4619-b526-200b6b3701e0 tags: %% Cell type:code id:9ad4595f-ce4e-4619-b526-200b6b3701e0 tags:
   
``` python ``` python
# Programm-Beispiel # Programm-Beispiel
# download a few time series datasets from the TOAR database at JSC in parallel # download a few time series datasets from the TOAR database at JSC in parallel
   
import requests import requests
import asyncio import asyncio
from aiohttp import ClientSession from aiohttp import ClientSession
## from aiohttp import ClientOSError, ClientResponseError, ClientSession, ClientTimeout, ServerTimeoutError, TCPConnector ## from aiohttp import ClientOSError, ClientResponseError, ClientSession, ClientTimeout, ServerTimeoutError, TCPConnector
import os import os
   
# system settings # system settings
TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2" TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2"
# CONNECT_TIMEOUT = 3.05 # CONNECT_TIMEOUT = 3.05
# DATA_READ_TIMEOUT = 20 # DATA_READ_TIMEOUT = 20
# MAX_TRIES = 3 # MAX_TRIES = 3
# POOL_SIZE = 8 # POOL_SIZE = 8
   
# the following are arbitrary dataset ids for demonstration; the last one should fail # the following are arbitrary dataset ids for demonstration; the last one should fail
series = [ 2834, 3333, 7243, 10112, 999999999 ] series = [ 2834, 3333, 7243, 10112, 999999999 ]
   
os.makedirs("data", exist_ok=True) os.makedirs("data", exist_ok=True)
   
# the download routine # the download routine
async def download(session, series_id): async def download(session, series_id):
url = f"{TOAR_SERVICE_URL}/data/timeseries/{series_id}?format=csv" url = f"{TOAR_SERVICE_URL}/data/timeseries/{series_id}?format=csv"
print(f"Loading data for time series id {series_id} from {url}.") print(f"Loading data for time series id {series_id} from {url}.")
async with session.get(url) as resp: async with session.get(url) as resp:
status = resp.status status = resp.status
if status == 200: if status == 200:
content = await resp.text() content = await resp.text()
with open(f"data/toar_timeseries_{series_id}.csv", "w") as f: with open(f"data/toar_timeseries_{series_id}.csv", "w") as f:
f.write(content) f.write(content)
print("Download of series id {series_id} complete.") print("Download of series id {series_id} complete.")
else: else:
print(f"Download of series {series_id} failed.") print(f"Download of series {series_id} failed.")
return status return status
   
   
# the asynchronous driver # the asynchronous driver
async def main(session, coroutine, params=None): async def main(session, coroutine, params=None):
tasks = [asyncio.create_task(coroutine(session, s)) for s in params] tasks = [asyncio.create_task(coroutine(session, s)) for s in params]
results = None results = None
try: try:
results = await asyncio.gather(*tasks) results = await asyncio.gather(*tasks)
except Exception as e: except Exception as e:
print("failed", e) print("failed", e)
for task in tasks: for task in tasks:
task.cancel() task.cancel()
await asyncio.sleep(1) await asyncio.sleep(1)
return results return results
   
   
session = ClientSession() session = ClientSession()
a = await main(session, download, series) a = await main(session, download, series)
print(a) print(a)
await session.close() await session.close()
``` ```
   
%% Cell type:markdown id:b3df5141-3564-491a-adde-34e7cbea24d7 tags: %% Cell type:markdown id:b3df5141-3564-491a-adde-34e7cbea24d7 tags:
   
Im Falle sehr großer Datenmengen wird die Dauer eines einzelnen Tasks ("download") zu lang. Dann empfiehlt es sich, den Workflow wie folgt aufzusetzen: Im Falle sehr großer Datenmengen wird die Dauer eines einzelnen Tasks ("download") zu lang. Dann empfiehlt es sich, den Workflow wie folgt aufzusetzen:
![grafik.png](attachment:dec7c0f8-232d-468a-b055-b7115fbb0051.png) ![grafik.png](attachment:dec7c0f8-232d-468a-b055-b7115fbb0051.png)
   
_Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/async-request-reply_ _Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/async-request-reply_
   
Dann kann man z.B. den Status-Endpunkt dazu nutzen, eine Email an den User zu generieren, die einen Link zu einem speziellen Resource-Endpunkt enthält. Klickt der User auf diesen Link, werden die vorbereiteten Daten heruntergeladen. Dann kann man z.B. den Status-Endpunkt dazu nutzen, eine Email an den User zu generieren, die einen Link zu einem speziellen Resource-Endpunkt enthält. Klickt der User auf diesen Link, werden die vorbereiteten Daten heruntergeladen.
   
%% Cell type:markdown id:c0c24d27-933b-4aa1-ae85-a81a6c78b6ae tags: %% Cell type:markdown id:c0c24d27-933b-4aa1-ae85-a81a6c78b6ae tags:
   
### 3.2 Caching ### 3.2 Caching
   
Viele Datenprozessierungsketten erzeugen aggregierte Daten aus einer großen Datenmenge, die dann Viele Datenprozessierungsketten erzeugen aggregierte Daten aus einer großen Datenmenge, die dann
zur weiteren Verarbeitung oder Analyse an den Nutzer geschickt werden. Ein typisches Beispiel aus zur weiteren Verarbeitung oder Analyse an den Nutzer geschickt werden. Ein typisches Beispiel aus
der Erdsystemforschung ist die Erzeugung von (Monats)Mittelwerten aus den Dateien eines Klimamodells. der Erdsystemforschung ist die Erzeugung von (Monats)Mittelwerten aus den Dateien eines Klimamodells.
Der originale Output des Modells liegt z.B. stündlich oder 3-stündlich vor, d.h. pro Monatsmittelwert Der originale Output des Modells liegt z.B. stündlich oder 3-stündlich vor, d.h. pro Monatsmittelwert
müssen 672 bis 744 (bzw. 224 bis 248) Datenfelder gelesen und aufaddiert werden. müssen 672 bis 744 (bzw. 224 bis 248) Datenfelder gelesen und aufaddiert werden.
   
Wenn man davon ausgehen kann, dass viele Nutzer an solchen aggregierten Daten interessiert sind, dann Wenn man davon ausgehen kann, dass viele Nutzer an solchen aggregierten Daten interessiert sind, dann
kann sich das Caching dieser Daten lohnen. Der Cache kann entweder auf demselben Speichersystem (Festplatte) kann sich das Caching dieser Daten lohnen. Der Cache kann entweder auf demselben Speichersystem (Festplatte)
liegen wie die Originaldaten, auf einem schnelleren Dateisystem liegen wie die Originaldaten, auf einem schnelleren Dateisystem
(z.B. [NVMe](https://www.cisco.com/c/en/us/solutions/computing/what-is-nvme.html\#~benefits-of-nvme)), (z.B. [NVMe](https://www.cisco.com/c/en/us/solutions/computing/what-is-nvme.html\#~benefits-of-nvme)),
oder im Memory (z.B. [redis](https://redis.com/solutions/use-cases/caching/)). oder im Memory (z.B. [redis](https://redis.com/solutions/use-cases/caching/)).
   
![grafik.png](attachment:08124af3-a92f-4e74-a2dc-ab5fdadc047e.png) ![grafik.png](attachment:08124af3-a92f-4e74-a2dc-ab5fdadc047e.png)
   
_Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/cache-aside_ _Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/cache-aside_
   
Zu beachten ist bei Caching vor allem, dass die Datenkonsistenz gewährleistet werden sollte. Dazu gibt es Zu beachten ist bei Caching vor allem, dass die Datenkonsistenz gewährleistet werden sollte. Dazu gibt es
verschiedene Ansätze, je nach Anwendung. verschiedene Ansätze, je nach Anwendung.
   
_Diskussion über verschiedene Anwendungsfälle und deren Auswirkungen auf den Cachine-Mechanismus_ _Diskussion über verschiedene Anwendungsfälle und deren Auswirkungen auf den Cachine-Mechanismus_
   
Ferner sollte man überlegen, ob es eine Bevorzugung bestimmter Daten geben sollte, die z.B. besonders Ferner sollte man überlegen, ob es eine Bevorzugung bestimmter Daten geben sollte, die z.B. besonders
häufig nachgefragt werden oder besonders aufwendig zu berechnen sind. Und dann spielt beoi der Auslegung häufig nachgefragt werden oder besonders aufwendig zu berechnen sind. Und dann spielt beoi der Auslegung
eines Caches noch eine Rolle, wie lange die dort enthaltenen Daten leben sollen. eines Caches noch eine Rolle, wie lange die dort enthaltenen Daten leben sollen.
   
Generell gilt: Cache-Speicherplatz ist teuer und begrenzt. Generell gilt: Cache-Speicherplatz ist teuer und begrenzt.
   
%% Cell type:markdown id:761a342b-ccbf-44e6-9c36-8cbffbd9dc24 tags: %% Cell type:markdown id:761a342b-ccbf-44e6-9c36-8cbffbd9dc24 tags:
   
### 3.3 Messaging und mehrere Server-Instanzen ### 3.3 Messaging und mehrere Server-Instanzen
   
Eine gewöhnliche REST API prozessiert die eingehenden Anfragen seriell. In Abschnitt 3.x haben Eine gewöhnliche REST API prozessiert die eingehenden Anfragen seriell. In Abschnitt 3.x haben
wir bereits asynchrone Abfragen kennengelernt. Allerdings erreichen auch diese ihr Limit bei sehr wir bereits asynchrone Abfragen kennengelernt. Allerdings erreichen auch diese ihr Limit bei sehr
vielen Anfragen bzw. sehr datenintensiven Transfers. Eine Möglichkeit, solche Datenservices zu vielen Anfragen bzw. sehr datenintensiven Transfers. Eine Möglichkeit, solche Datenservices zu
skalieren besteht darin, die Entgegennahme der Anfragen von ihrer Bearbeitung zu trennen und dann skalieren besteht darin, die Entgegennahme der Anfragen von ihrer Bearbeitung zu trennen und dann
mehrere Instanzen des eigentlichen Datenservers aufzusetzen. Der Webservice, den der User zu sehen mehrere Instanzen des eigentlichen Datenservers aufzusetzen. Der Webservice, den der User zu sehen
bekommt, übernimmt dann lediglich eine Umsetzung der Anfragen in eine Warteschlange von Nachrichten, bekommt, übernimmt dann lediglich eine Umsetzung der Anfragen in eine Warteschlange von Nachrichten,
die dann asynchron an die nächste freie Datenserver-Instanz gesendet werden. die dann asynchron an die nächste freie Datenserver-Instanz gesendet werden.
   
![grafik.png](attachment:cee91b15-0e8d-4c39-ad44-ece70750892e.png) ![grafik.png](attachment:cee91b15-0e8d-4c39-ad44-ece70750892e.png)
   
_Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/competing-consumers_ _Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/competing-consumers_
   
Man kann u.U. sogar die einzelnen Datenserver dynamisch hoch- oder runterfahren, je nach aktueller Man kann u.U. sogar die einzelnen Datenserver dynamisch hoch- oder runterfahren, je nach aktueller
Auslastung. _--> siehe Container und Service-Orchestrierung_ So etwas wird immer bedeutender, auch um Auslastung. _--> siehe Container und Service-Orchestrierung_ So etwas wird immer bedeutender, auch um
Energie zu sparen. Energie zu sparen.
   
Wichtig ist bei diesem Pattern, dass alle Anfragen __idempotent__ sind, d.h. dass nachfolgende Abfragen Wichtig ist bei diesem Pattern, dass alle Anfragen __idempotent__ sind, d.h. dass nachfolgende Abfragen
nicht auf den Ergebnissen früherer Abfragen basieren. Möchte man z.B. gleitende Mittelwerte aus nicht auf den Ergebnissen früherer Abfragen basieren. Möchte man z.B. gleitende Mittelwerte aus
Modelldateien berechnen, dann benötigt man jeweils einige Zeitschritte vor und nach dem eigentlichen Modelldateien berechnen, dann benötigt man jeweils einige Zeitschritte vor und nach dem eigentlichen
Zeitraum, der prozessiert werden soll. Bei der klassischen, sequentiellen Bearbeitung kann man jeweils Zeitraum, der prozessiert werden soll. Bei der klassischen, sequentiellen Bearbeitung kann man jeweils
ein paar Zeitschritte zwischenspeichern und macht dann einfach weiter. Im asynchronen Umfeld müssen ein paar Zeitschritte zwischenspeichern und macht dann einfach weiter. Im asynchronen Umfeld müssen
diese zusätzlichen Zeitschritte jeweils neu geladen werden, da man sich nicht auf die Reihenfolge der diese zusätzlichen Zeitschritte jeweils neu geladen werden, da man sich nicht auf die Reihenfolge der
Prozessierung verlassen kann. Prozessierung verlassen kann.
   
%% Cell type:markdown id:1575bc17-2a54-408b-a984-9b827df8de42 tags: %% Cell type:markdown id:1575bc17-2a54-408b-a984-9b827df8de42 tags:
   
### 3.4 Sharding ### 3.4 Sharding
   
Beim sharding werdn ebenfalls verteilte Instanzen eines (Datenbank) Servers aufgesetzt. Allerdings wird hier der Datenraum nach festgelegten Regeln auf die einzelnen Server verteilt. Von außen betrachtet sieht das dann aus wie eine einzige riesige Datenbank. Beim sharding werdn ebenfalls verteilte Instanzen eines (Datenbank) Servers aufgesetzt. Allerdings wird hier der Datenraum nach festgelegten Regeln auf die einzelnen Server verteilt. Von außen betrachtet sieht das dann aus wie eine einzige riesige Datenbank.
   
Es gibt verschiedene Strategien, wie die Daten aufgeteilt werden können. Neben einfach zu überblickenden Konzepten (wie z.B. ein shard pro Kalenderjahr) empfiehlt sich insbesondere eine Aufteilung über __key hashes__. Dies verhindert, dass einzelne shards sehr viel Last bekommen, während andere unterbeschäftigt bleiben. Es gibt verschiedene Strategien, wie die Daten aufgeteilt werden können. Neben einfach zu überblickenden Konzepten (wie z.B. ein shard pro Kalenderjahr) empfiehlt sich insbesondere eine Aufteilung über __key hashes__. Dies verhindert, dass einzelne shards sehr viel Last bekommen, während andere unterbeschäftigt bleiben.
   
![grafik.png](attachment:51af7cf4-ff41-4d39-8126-87e582be0af8.png) ![grafik.png](attachment:51af7cf4-ff41-4d39-8126-87e582be0af8.png)
   
_Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/sharding_ _Quelle: https://learn.microsoft.com/en-us/azure/architecture/patterns/sharding_
   
%% Cell type:code id:142b232f-5922-434e-bd47-edb67af8cb78 tags: %% Cell type:markdown id:b7de3976-13f8-4433-b9e9-65d944c31603 tags:
## 4. Ausblick
In den letzten Jahren wurden mehrere Plattformen entwickelt, mit denen eine verteilte, skalierbare Analyse großer Datenmengen ermöglicht wird. Ein Beispiel ist [Apache Hadoop](https://hadoop.apache.org/docs/stable/hadoop-project-dist/). Der [Wikipedia-Eintrag](https://de.wikipedia.org/wiki/Apache_Hadoop) dazu fasst es gut zusammen:
"Apache Hadoop ist ein freies, in Java geschriebenes Framework für skalierbare, verteilt arbeitende Software. Es basiert auf dem MapReduce-Algorithmus von Google Inc. sowie auf Vorschlägen des Google-Dateisystems und ermöglicht es, intensive Rechenprozesse mit großen Datenmengen (Big Data, Petabyte-Bereich) auf Computerclustern durchzuführen. Hadoop wurde vom Lucene-Erfinder Doug Cutting initiiert und 2006 erstmals veröffentlicht.\[1\] Am 23. Januar 2008 wurde es zum Top-Level-Projekt der Apache Software Foundation. Nutzer sind unter anderem Facebook, a9.com, AOL, Baidu, IBM, ImageShack und Yahoo.\[2\]"
Die wichtigsten Elemente von Hadoop umfassen:
* das Hadoop Distributed File System (HDFS)
* das Resourchen-Managementsystem Yet Another Resource Negotiator (YARN)
* den von Google entwickelten MapReduce Algorithmus
![grafik.png](attachment:42493d13-6d3a-464e-b4c6-07129962566c.png)
_Quelle: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html_
Das __HDFS__ stellt sich dem Benutzer als ein großes Dateisystem dar mit Verzeichnissen und Dateien. Intern werden die Dateien in Blöcke aufgeteilt, sodass eine parallele Prozessierung möglich wird. Es gibt in jedem Hadoop System genau einen __Namenode__ und beliebig viele __Datanodes__. Der Namenode verwaltet die Metadaten und Zugriffe, es fließen jedoch keine Daten über ihn.
Das HDFS wurde optimiert auf __WORM__ (write once, read multiple) Zugriffe und ist vor allem für Streaming performant.
Hadoop baut auf der Prämisse auf, dass in einem großen, verteilten System immer irgend etwas nicht funktioniert. Daher gibt es diverse Sicherheitsmechanismen, um Ausfälle zu vermeiden bzw. schnell reparieren zu können. Beispielsweise werden die Datenblöcke redundant gespeichert.
Das __MapReduce__ framework erlaubt es, mit relativ einfachen Befehlen prallele Datenprozessierung von TByte-scale Datensätzen auf einem verteilten System mit bis zu mehreren Tausend Knoten auszuführen. Vom Konzept her werden Inputdaten in Chunks aufgeteilt (siehe oben), die dann parallel berechnet werden können ("mapping"). Die Ergebnisse dieser Rechnungen werden anschließend im "Reduce" Schritt zusammengeführt.
MapReduce beinhaltet einen Workflow-Manager, der die Ausführung der einzelnen Tasks aufsetzt und überwacht sowie abgebrochene Jobs erneut startet. Hadoop ist damit vor allem für Batch-Anwendungen ausgelegt. Die Berechnungen werden möglichst dort ausgeführt, wo die Daten liegen, so dass unnötige Datentransfers vermieden werden.
Die Resourcenverwaltung obliegt __YARN__, das aus einem __ResourceManager__ für das Gesamtsystem, jeweils einem __NodeManager__ pro Cluster-Knoten und jeweils einem __MRAppMaster__ pro Anwendung besteht. Jede MapReduce Anwendung wird durch eine Konfigursation beschrieben, die zumindest die Input- und Outputdaten spezifiziert und die auszuführenden __map__ und __reduce__ Kommandos.
%% Cell type:code id:fdb0e25c-b363-48cc-aefc-66c914cf223b tags:
   
``` python ``` python
``` ```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment