Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
:toc-title: Table of Contents
:toc:
// configure EN settings for asciidoc
include::src/config.adoc[]
// numbering from here on
:numbered:
[[section-introduction-and-goals]]
== Introduction and Goals
Following describes the architecture of eFlows4HPC Data Logistics Service. The service
provides scientific users with a means to prepare, conduct, and monitor data
movement and transformations. The primary use case is to facilitate the data ingestion
process.
Main features:
* facilitate and formalize data movements
* keep track of data sources (Data Catalog)
* monitor the data transfer and implement restart techniques
=== Requirements Overview
[cols="1,2,4"]
|===
| **ID** | **Requirement** | **Explanation**
| R1 | Orchestrate movement of data to and from target locations | Ensure that the data required for computation is moved from external repoistories to target locations
| R2 | Transfer computation results | Results of computations should be put in target repositories
| R3 | Integrate user tools | There are already some codes to download, upload, and do QA
| R4 | Integrate new data sources and targets | New sources and targets can be added at the later stage
| R5 | Self-service | In the long-term move from developers to data scientists
|===
=== Quality Goals
[cols="1,1,1,4"]
|===
| **ID** | **Prio**| **Quality**| **Explanation**
| Q1 | 1 | Performance/Reliability | Required data should be moved to target in a timely manner (accounting for retransfers upon errors)
| Q2 | 3 | Extensibility/Interoperability | New sources (up to 10 at the same time) and targets (repositories) can be added. Integration with project ML libraries
| Q3 | 2 | Transparency/Reproducibility | Other researchers should be able to verify and redo the data movement and processing steps
| Q4 | 4 | Elasticity | Future changes in workload should be accounted for
|===
[[section-architecture-constraints]]
== Architecture Constrains
[cols="1,4"]
|===
| **Constraint** | **Explanation**
| Local orientation | The user shall have an impression of owning the local process and codes. The data should be brought to Data Logistics Service
| Python-compatible | There are some modules and codes in Python and scripts, they should be reuse/integrated
| Reuse infra | Take advantage of existing services (Splunk, Grafana, Gitlab)
| External developer| Some of the code developed outside should be included in the transformation step, in non-intrusible way
|===
[[section-system-scope-and-context]]
== System Scope and Context
=== Business Context
image:business_context.png["Business view"]
=== Technical Context
image:technical_context.png["Technical L1 view"]
**Mapping Input/Output to Channels**
Scheduler -> Worker: asynchornous though redis messaging
Worker -> Data Source (real-time): HTTP-based, synchron
Worker -> Data Source (initial replication): API (S3 for OpenAQ)
Worker -> Local Store: API in first attempt, direct Postgres access could also be an option
Worker -> Model Repository: HTTP API through CLI/ BashOperator
DAG Repository -> {Workers, Scheduler}: GitHub CI/CD pipeline with local Docker image repository
[[section-solution-strategy]]
== Solution Strategy
Fundamental decisions and solutions strategies.
=== Async Communication
We are not planning to address the streaming use-cases, thus the communication between
scheduler and workers will be done async with a message bus. Airflow offers this.
This is one-leader async replication use-case.
=== Airflow
The pipelines in Airflow are defined in Python (rather than in special workflow language), in long
term the data scientists should be able to create their own pipelines.
Airflow pipelines are executed on workflows but can interact with storages their in-storage processing
capabilities. This can be beneficial when moving to Spark for more performance-requiring jobs.
=== "Pure functions" workflows
The idea is to build the workflows as pure functions, no orthogonal concerns should be
included (invocation/scheduling/input-output locations). The primary reason for this
is the testablility outside of Airflow.
Also the idea of the functions is to achieve idempotentce. In case we have to rerun
the pipeline, the results should be the same, e.g., a measurement series is recreated
rather than new measurements are added. Motivation for this the ability to re-run
the workflows. Also relevant for continuous deployment. If some tasks are lost, we
can restart them. In worst case, the work will be done twice.
In general, we want to have a possibility to recreate full data sets.
=== Deployment
Reuse our CD infrastructure. See <<Deployment>> for more details.
[[section-building-block-view]]
== Building Block View
image:technical_context.png["Technical L1 view"]
=== Data Logistics UI
Web service offering view of the defined pipelines, execution details, performance metrics.
Also offers view of the data sources. The view can be extended with plugins.
_Interfaces_
Primary interface is the web view, but there is also CLI used for deployment and defining
data sources.
_Fulfilled Requirements_
* Self-service (to some extend)
Qualities:
* Monitoring/Transparency
Extensibility: DAG in Python
_Open Issues/Problems/Risks_
For the proper working it requires to have DAGs injected (locally). This becomes an issue
when DAGs use e.g. +PythonOperators+ and thus require additional libraries for the runtime.
Either built the pipelines in such a way that this is not necessary (imports) or rebuild
the UI on each change (here again conflicts).
=== Data Logistics Scheduler + Executor
Not directly accessible to the users. It keeps track of scheduling tasks on workers. Require
connection to workers (can be indirect through messaging system). Also keeps track of the
executions (through metadata store) and restarts failed tasks.
_Interfaces_
Stand-alone system, read DAGs (local) and metadata store to find out if new instantiations
are required. The communication with Workers uses two queuing backends:
* Queue Broker (put the commands to be executed)
* Result backend (gets the status of the completed tasks)
Multi queue deployment for initial snap shot and updates.
.Operator Example
[source,Python]
----
t3 = BashOperator(
task_id='print_host',
bash_command='hostname >> /tmp/dag_output.log',
queue='local_queue',
dag=dag)
----
_Fulfilled Requirements_
Quality:
* Extensibility
* Elasticity
_Open Issues/Problems/Risks_
To some extend the scheduler is a single point of failure. It is relative hard to restart upon
rebuild.
Not so stateless.
=== Data Logistics Worker
Workers are responsible for executing task from pipelines. The instantiate the operators
of different type. Operators that are executed on the worker need to have their
dependencies met in that context. The worker needs to have access to its DAGS_FOLDER,
and you need to synchronize the filesystems by your own means. We use git for that.
The number of workers can be changed to enable some elasticity in the system.
Operator scaffoldings.
_Interfaces_
Not directly accessible to the users. The commands are received from scheduler (through)
messaging system. Workers get task execution context and can access MD to get information
about required connections (directly from MD DB).
Each worker can listen on multiple queues of tasks. This can be used if the workers are
heterogeneous, e.g., separate queue for Spark jobs or differentiate between different loads.
The workers do not communicate directly with each other. Small amounts of data can be exchanged
through 3rd party in form of XCOM. Tested to work with a shared storage, can be an option for
deployment.
The worker offers and interface to access log files this is used by the UI.
_Quality/Performance Characteristics_
* Integrate user tools
* New sources
Extensibility: multiple workers, multiple queues
Interoperability: Operators to access new data sources
Self-service: possiblity to create and publish own pipelines
=== DAG Repo
This repo will store the information about the pipelines in form of DAGs. The
pipeline definition comprises: code (in Python), dependencies (if required),
config metadata (frequency, backfill, etc).
To store, share, etc we use DAG Repo to store locally available GitLab of the
requester research group.
For some parts of DAG we will have unittests.
The deployment of the DAGs is done through the Continuous Deployment GitLab.
_Interfaces_
Standard GitLab access.
_Fulfilled Requirements_
Extensibility, Self-service
<<<<
[[section-runtime-view]]
== Runtime View
=== Scheduling
image:scheduling.png["Scheduling process"]
=== Deployment
[[Deployment]]
image:deployment.png["Deployment and image using process"]
// <<<<
// 7. Deployment View
// #include::src/07_deployment_view.adoc[]
// <<<<
// 8. Concepts
// include::src/08_concepts.adoc[]
// <<<<
// 9. Design Decisions
// include::src/09_design_decisions.adoc[]
// <<<<
// 10. Quality Scenarios
// include::src/10_quality_scenarios.adoc[]
// <<<<
// 11. Technical Risks
// include::src/11_technical_risks.adoc[]
// <<<<
// 12. Glossary
// include::src/12_glossary.adoc[]