# A somewhat useless example of chunking
# let's count the number of occurences of the letter E in a randomly generated long string
import numpy as np
SIZE = 999967 # purposefully not a round number
data = "".join([chr(x) for x in np.random.randint(65, 91, SIZE)])
print(data[:1000])
# set up result counter and loop
BUFSIZE = 200 # process 200 characters at a time
count = 0
pos = 0
while pos < len(data):
chunk = data[pos:pos+BUFSIZE]
count += chunk.count("E")
pos += BUFSIZE
print(f"There are {count} occurences of the letter E in the random string, i.e. {100.*count/SIZE}%.")
datalogistics-arch.html
Design Pattern for large-scale data analysis
git repo: https://gitlab.jsc.fz-juelich.de/esde/training/large-data-lecture.git
PD Dr. Martin Schultz (m.schultz@fz-juelich.de)
1. Motivation
On the origin of the word design pattern see for example https://www.architecture.com/knowledge-and-resources/knowledge-landing-page/pattern-books-creating-the-georgian-ideal
1.1 Data architectures
Classical "large data" systems:
The hierarchical data system at the Jülich Supercomputing Centre (JSC):
Distributed, cloud-based systems:
Example:
When designing such a distributed data system, you will need to take into account further design parameters beyond the classical capacity, latency and bandwidth parameters:
- Resilience (robustness against failures); how to deal with missing components/redundance/monitoring
- Minimisation of data transfer
- Portability across system boundaries
- Harmonisation of data structures and formats
- Temporary saving of results to enhance robustness and speed up reproduction or recovery
1.2 Data structures, data models and access patterns
Before we will explore the design patterns for working with large data, it is useful to develop a basic understanding of typical data structures and the associated data models as well as the data access patterns.
We will not go into detail here, but only provide a couple of keywords. You may search the internet for these to learn more.
Data structures:
- unstructured data
- point clouds
- series and time series, incl.audio data
- tree structures
- relational tables
- graphs
- gridded data
- regular grid
- irregular grid
- Images and videos
Data models:
- CSV
- XML Tree
- NetCDF/HDF
- DICOM
- Image and video formats etc.
Discussion points wrt data models:
- data structure
- connection between data and metadata
- parallelisation and scalability
Access patterns:
- serial / parallel
- sequential / random access
- as bulk / block access / buffered / segmented
- WORM (write once read multiple) etc.
- periodic or (frequently) recurring / episodic / burst
- ETL / streaming
2. Classic design patterns
2.1 Extract - Transform - Load (ETL)
The ETL pattern is an abstract representation for providing data from a service. The fundamental idea is to have data stored in their original form, for example in a database or as files, but that working with the data will generally require some sort of transformation. By separating the workflow for accessing data into three steps, it is easier to optimise performance for specific tasks.
Here is a blog article describing various aspects which should be kept in mind when designing an ETL workflow.
Step 1: It is good practice to always start from the original data and preserve the rich information content of the original data to the extent possible.
Step 2: Before applying the actual transformation of the data, they should be cleaned. This includes a check if it actually makes sense to apply the transformation and if the data contain what is expected (for example a test on the value range). It is good practice to not simply delete questionable or erroneous data but instead add a flag that indicates the data quality or the reason for errors.
A common method for data cleaning involves the use of a (temporary) database:
insert into
select from
This simple command sequence makes use of the database filters to ensure formatting and consistency errors in the data.
When designing the transformations, one should strike a balance between performance and complexity. With few exceptions, the second fastest solution is often better if the code remains more readable, more modular and easier adaptable. Performance optimisation should only be done if it is really necessary.
Rendering of data (i.e. formatting for output) is also a transformation and typically the last action.
Step 3: The loading (or publishing) step should be limited to doing exactly that, i.e. in particular, it should not apply any further transformations. This step is only concerned with delivery of the data.
Aaron Segesmann describes (in the blog article linked above) the following design patterns for data delivery:
- Truncate and load - previous data sets are deleted and the new data replace the old ones. Example: refresh of a web page after an update.
- Data surgery - individual data elements are swapped in and out of the published record. This method is often efficient when less than 40 % of the data were changed.
- Append - the new data will be attached to the existing ones.
- Blue/green - this patterns operates with two copies of the delivery system, i.e. two servers. While system A displays the old data, system B is updated with the new data in the background. Only when this is completed, the view is switched from system A to system B. Then, system A can be used to process the next update again.
2.2 Chunking and tiling
A typical property of large data is that it is not possible to load the entire dataset into memory. Hence, it is necessary to process the data in pieces.
Chunking means to read, process, and write data in blocks. The processing can happen serially or in parallel (see also section 3.5 on Hadoop and MapReduce).
Tiling is used to process two-dimensional data (for example numerical model output or satellite data of Earth system science) in individual pieces- A good example are the tile servers of Google Maps or Open Street Map. Such a server converts the vector information of streets, buildings, etc. into images. Only the information within a given bounding box is processed. The images can be rendered in different zoom levels with varying degrees of detail. And one can determine, which images shall be pre-rendered and cached to expedite delivery. On caching, see also section 3.2.
2.3 Index tables
Relational databases allow for the definition of several indices within a data table and also across tables (foreign key). This creates the possibility to build rather complex index tables, and these in turn are one of the main reasons why relational databases can be extremely efficient for searching and accessing data.
In NoSQL databases (z.B. MongoDB or Apache Cassandra there is only one primary index. However, one can generate more complex index tables manually (see for example [https://learn.microsoft.com/en-us/azure/architecture/patterns/index-table]).
The design principle of index tables can also be used outside of databases. For example, some data formats (e.g. netCDF) store an index header next to the actual data. This header contains metadata information ("self-describing data format") and indices to the individual data records ("variables").
# Programming example with an index table
# create some data arrays of arbitrary size and store them in a file with an index table
import numpy as np
from pprint import pprint
import os
import json
os.makedirs("data", exist_ok=True)
dlength = [ 100, 20000, 57, 23456, 100 ] # length of data arrays
container = [ np.full(length, float(enum+1)) for enum, length in enumerate(dlength) ]
pprint(container)
# create index table as dictionary - note that first entry is 0, and last entry should be removed
index = { "var1": 0 }
mem = 0
for enum, ar in enumerate(container[:-1]):
index[f"var{enum+2}"] = mem + ar.nbytes
mem += ar.nbytes
pprint(index)
# store data as binary file
MAGIC = b"\xf0\xf0\x00\x00"
with open("data/test_index_file.dat", "wb") as f:
# write index table as json string
f.write(json.dumps(index).encode("utf-8"))
# add CRLF as end marker
f.write(MAGIC)
# write data tables
for ar in container:
f.write(ar)
# now let's try to read the file and access var3
# read index table
with open("data/test_index_file.dat", "rb") as f:
buf = f.read(10000)
# search for MAGIC sequnce, i.e. end of index
imagic = buf.find(MAGIC)
index = json.loads(buf[:imagic].decode("utf-8"))
# calculate length of variables
vlen = { f"var{enum}": index[f"var{enum+1}"] - index[f"var{enum}"] for enum in range(1, len(index)) }
# add last element - ends with length of file
f.seek(0, os.SEEK_END)
vlen[f"var{len(index)}"] = f.tell() - index[f"var{len(index)}"]
print("reconstructed index table: ", index, "\nlength of data arrays: ", vlen)
# load var3 array
f.seek(imagic+len(MAGIC)+index["var3"])
ar = np.frombuffer(f.read(vlen["var3"]))
print("reconstructed data array: ", ar)
Notes: This code implicitly assumes that the numpy arrays consist of float (float32) values and that the arrays are one-dimensional. One could of course also store the data type and shape in the header. However, if you find yourself thinking about this, you should consider using an established file format such as netCDF or HDF5 instead.
2.4 Memory mapping
To allow flexible access to portions of data in very large files, it may be useful to employ memory mapping. Here, a portion of the file is mapped into the main memory. However, for the users it appears as if they would work directly on the data in the file. For details, see for example Python 3 Dokumentation.
3. Modern design patterns
Source: https://learn.microsoft.com/en-us/azure/architecture/patterns/
3.1 Asynchronous processing
The exchange of data within a system of modern, distributed servers often happens via REST APIs (see for example our TOAR Database at JSC. The http(s) protocol used in such transfers may lead to failures or delays:
- due to system components or security software running on the server
- due to instable or slow internet connections (in particular when IoT sensors are involved)
- due to load on the systems
- due to the size of the request and/or response
- due to computationally intensive transformations prior to data delivery
Asynchronous requests allow parallel processing of several requests without having to wait for completion of single requests. Of course, the requests must be independent for this to work.
The following programming example shows an asynchronous request to the TOAR database at JSC. It is possible to specify several time series ids in the series
variable and download the data of these time series in a quasi-parallel and asynchronous mode.
# Programming example asynchronous requests
# download a few time series datasets from the TOAR database at JSC in parallel
import requests
import asyncio
from aiohttp import ClientSession
## from aiohttp import ClientOSError, ClientResponseError, ClientSession, ClientTimeout, ServerTimeoutError, TCPConnector
import os
# system settings
TOAR_SERVICE_URL = "https://toar-data.fz-juelich.de/api/v2"
# CONNECT_TIMEOUT = 3.05
# DATA_READ_TIMEOUT = 20
# MAX_TRIES = 3
# POOL_SIZE = 8
# the following are arbitrary dataset ids for demonstration; the last one should fail
series = [ 2834, 3333, 7243, 10112, 999999999 ]
os.makedirs("data", exist_ok=True)
# the download routine
async def download(session, series_id):
url = f"{TOAR_SERVICE_URL}/data/timeseries/{series_id}?format=csv"
print(f"Loading data for time series id {series_id} from {url}.")
async with session.get(url) as resp:
status = resp.status
if status == 200:
content = await resp.text()
with open(f"data/toar_timeseries_{series_id}.csv", "w") as f:
f.write(content)
print("Download of series id {series_id} complete.")
else:
print(f"Download of series {series_id} failed.")
return status
# the asynchronous driver
async def main(session, coroutine, params=None):
tasks = [asyncio.create_task(coroutine(session, s)) for s in params]
results = None
try:
results = await asyncio.gather(*tasks)
except Exception as e:
print("failed", e)
for task in tasks:
task.cancel()
await asyncio.sleep(1)
return results
session = ClientSession()
a = await main(session, download, series)
print(a)
await session.close()
If the data volumes are very large, then processing of individual tasks (here, downloading) may take quite long. In this case, it is useful to adopt the following design pattern:
Source: https://learn.microsoft.com/en-us/azure/architecture/patterns/async-request-reply
With this scheme you can use the status endpoint for example to generate an email to the user when processing is ready. This email may contain a link to a special resource endpoint from where the user can download the processed result.
3.2 Caching
Many data processing workflows generate aggregates from a large volume of data. These aggregates form the basis for further analysis by the users. A typical example from Earth system science is the calculation of monthly mean values from the output of a climate model. The original model output may be hourly or 3-hourly, for example, i.e. one has to read and add between 672 and 744 (or 224 to 248) data fields for each monthly mean value.
If you can assume that several users might be interested in such processed aggregates, then it can be useful to deploy a dedicated caching system. This cache may be hosted on the same (file) system as the original data, or perhaps on a faster disk (e.g. NVMe), or in memory (e.g. redis).
You should distinguish such "explicit" caching from automatic caching, which happens inside the CPU or on the file system under control of the operating system.
Source: https://learn.microsoft.com/en-us/azure/architecture/patterns/cache-aside
An important design consideration for caching is to maintain data consistency. There are different approaches to this depending on the application.
One should also contemplate if it makes sense to grant preference to certain data types over others, for example those, which are requested most frequently or those, which are most difficult to re-compute.
Finally, it is important to reflect the lifetime of data in the cache. In general one has to consider that (fast) cache memory is expensive and a limited resource.
3.3 Messaging and multiple server instances
A common REST API processes incoming requests sequentially. Section 3.1 already discussed asynchronous queries as a means to parallelize requests. However, also asynchronous servers reach their limits in case of a large number of requests or very large data loads per request. One possibility for scaling such data servers is to separate the management of the requests from the actual data processing and set-up several instances of the actual data server. The web service that will be seen by the users then focuses on translating the incoming requests into messages and put these in a pipeline. Each message is then sent to the next available data server in the asynchronous fashion described above.
Source: https://learn.microsoft.com/en-us/azure/architecture/patterns/competing-consumers
In such systems one can even activate and de-activate data servers depending on the demand (see containers and service orchestration). Such dynamic systems are gaining importance, no least to save energy.
It is important for such services to ensure that all requests are idempotent. That means that new requests must not depend on the results of earlier requests, but they can be processed independently. For example, if you want to compute running averages, you will always need a couple of time steps before and after the actual interval you are interested in. In the classical, sequential processing chain, one will create a buffer to save some time steps and then rotates through these buffers, loading each piece of data only once. In the asynchronous, multi-server environment, you always need to load these extra bits of data, because you cannot expect that the processing takes place in a given sequence.
3.4 Sharding
Sharding also involves the set-up of several parallel data (often database) servers. However, in this pattern, the data space is distributed across the servers according to pre-defined rules. From the user perspective, a sharded database appears as one single, huge database.
There are various strategies for the data split in a sharded system. Besides simple concepts (e.g. one shard per calendar year of data), it is particularly interesting to use key hashes. This improves the load balancing as it is unlikely that few systems receive all the load while others remain almost idle.
Source: https://learn.microsoft.com/en-us/azure/architecture/patterns/sharding
3.5 Hadoop and MapReduce
In recent years, several platforms have been developed for the distributed and scalable analysis of large amounts of data. One example for such platforms is Apache Hadoop.
"Hadoop is a [Java-based] open-source framework that allows to store and process big data in a distributed environment across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of [commodity] machines, each offering local computation and storage."
Source: https://www.tutorialspoint.com/hadoop/index.htm, additions by M. Schultz
The key elements of Hadoop are:
- the Hadoop Distributed File System (HDFS)
- the resource management system Yet Another Resource Negotiator (YARN)
- the MapReduce algorithm developed by Google
Source: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html
HDFS is seen by the user as one big file system with directories and files. Internally, the files are split into blocks so that parallel processing of file content becomes possible. There is exactly one namenode in each Hadoop system and an arbitrary number of datanodes. The namenode manages the metadata and access requests, but no data processing occurs on the namenode.
The HDFS was optimized for WORM (write once, read multiple) access and is particularly performant in streaming applications.
Hadoop is based on the assumption that there is always some failure in a large, distributed server environment. Therefore, various safety mechanisms have been implemented to prevent or repair failures. For example, all data blocks are saved redundantly.
The MapReduce framework allows for parallel data processing with a set of relatively simple commands. MapReduce scales on systems with several thousand servers and terabytes of data. Inpu data are divided into chunks (see above), which can then be processed in parallel ("mapping"). The results of these calculations are then combined in the "reduce" step. This is implemented with help of key, value pairs.
The user writes a map function, which generates an output key, value pair with an intermediary key from an input key, value pair. Further, the user writes a reduce function, which merges all data with the same key. Typically, such a reduce function generates exactly one (or no) output value. The MapReduce library takes care of merging all outputs with the same key across the server landscape.
Here is an example of a map and a reduce function for counting words within a huge number of text documents. This example has been taken from the original article on the MapReduce algorithm.
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Original Paper: Jeffrey Dean Sanjay Ghemawat (2004): MapReduce: Simplified Data Processing on Large Clusters, OSDI'04: Sixth Symposium on Operating System Design and Implementation, San Francisco, CA (2004), pp. 137-150.
The management of compute and storage resources is done with YARN. YARN consists of a ResourceManager for the complete Hadoop system, one NodeManager per cluster node and one MRAppMaster per MapReduce application. Every MapReduce application is described via a configuration. At a minimum the configuration contains a description of the input and output data paths and the links to the map and reduce functions. YARN take scare of the scheduling, monitors the execution of tasks and restarts jobs if they finish unexpectedly. Hadoop is primarily designed for batch processing. Calculations generally take place close to the location of the data and unnecessary data transfers are minimized.