Workflow description
Introduction
This chapter provides an overview of the JSON workflow description that is supported by the Workflow engine. It will allow you to write workflows by hand, i.e. without using tools such as the Java or Python APIs.
After presenting all the constructs individually, several complete examples are given in Examples.
Overview and simple constructs
The overall workflow document has the following form
{
"inputs": {},
"activities": {},
"subworkflows": {},
"transitions": [],
"variables": [],
"notification": "optional_notification_url",
"tags": ["tag1", "tag2", "..." ],
}
Activities, sub-workflows and transitions make up the workflow logic.
Both activities and sub-workflows are JSON maps (since UNICORE 9.0), where the key is the unique identifier of the element. The 8.x format of using JSON arrays with id elements is still supported.
Here is a simple example of two tasks that are to be run in a sequence:
{
"activities": {
"step1": {
"job": {
"Executable": "echo step1",
}
},
"step2": {
"job": {
"Executable": "echo step2",
}
},
},
"transitions": [
{"from": "step1", "to": "step2" }
]
}
The remaining elements in the workflow description are:
inputs
allows to register external files with the workflow file catalog. See Data handling.tags
is an optional list of initial tags, that can later be used to conveniently filter the list of workflows.notification
(optional) denotes an URL to where UNICORE Workflow server will send aPOST
notification (authenticated via a JWT token signed by the Workflow server) when the workflow has finished processing.
Notification messages sent by the Workflow service have the following content:
{
"href" : "workflow_url",
"group_id": "id of the workflow or sub-workflow",
"status": "...",
"statusMessage": "..."
}
Both of these are analogous to their conterparts for single jobs in UNICORE.
In the next sections the elements of the workflow description will be discussed in detail.
Activities
Activity elements have the following form:
"id": {
"type": "...",
...
}
The id
must be UNIQUE within the workflow. There are different types of activity, which
are distinguished by the type
element.
START
denotes an explicit start activity. If no such activity is present, the processing engine detect the proper starting activities.JOB
denotes a executable (job) activity. In this case, the job sub element holds the JSON job definition (if ajob
element is present, you may leave out thetype
).MODIFY_VARIABLE
allows to modify a workflow variable. An option namedvariable_name
identifies the variable to be modified, and an optionexpression
holds the modification expression in the Groovy programming language syntax (see also the variables section later).SPLIT
: this activity can have multiple outgoing transitions. All transitions with matching conditions will be followed. This is comparable to an “if() … if() … if()” construct in a programming language.BRANCH
: this activity can have multiple outgoing transitions. The transition with the first matching condition will be followed. This is comparable to an “if() … elseif() … else()” construct in a programming language.MERGE
merges multiple flows without synchronising them.SYNCHRONIZE
merges multiple flows and synchronises them.HOLD
stops further processing of the current flow until the client explicitly sends a continue message.
Subworkflows
The workflow description allows nested sub workflows, which have the same formal structure as
the main workflow (without the tags
and inputs
). There is an additional type
element
that is used to distinguish the different control structure types.
{
"id": "unique_id",
"type": "...",
"variables": [],
"activities": {},
"subworkflows": {},
"transitions": [],
"notification" : "optional_notification_url",
}
Job activities
Job activities are the basic executable pieces of a workflow. The embedded JSON job definition will be sent to an execution site (UNICORE/X) for processing.
{
"id": "unique_id",
"type": "job",
"job": {
"... standard UNICORE job ...": ""
},
"options": { },
}
The execution site is specified by the optional Site name
element in the job
{
"id": "unique_id", "type" : "job",
"job": {
"Site name": "DEMO-SITE",
},
}
Note
There is currently no form of brokering in place, it is up to the user to select an execution site.
The job description is covered in detail in Job description format.
The processing of the job can be influenced using the (optional) options
sub-element.
Currently the following options (key-value) can be used:
IGNORE_FAILURE
if set totrue
, the workflow engine will ignore any failure of the task and continue processing as if the activity had been completed successfully.Note
This has nothing to do with the exit code of the actual UNICORE job! Failure means for example data staging failed, or no matching target system for the job could be found.
MAX_RESUBMITS
set to an integer value to control the number of times the activity will be retried. By default, the workflow engine will re-try three times (except in those cases where it makes no sense to retry).
For example,
{
"id": "unique_id",
"job" : {
... standard UNICORE job ...
},
"options": { "IGNORE_FAILURE": "true", },
}
If you need to pass on user preferences to the site, e.g. for selecting your primary group, or
choosing between multiple user IDs, you can specify this in the job
element like this:
...
"job": {
"User prefences": {
"uid": "hpcuser21",
"group": "hpc",
}
}
...
where the allowed field names are role
, uid
, group
and supplementaryGroups
.
Data handling
One of the most common tasks is linking the output of one activity to another activity for
further processing. The UNICORE workflow system supports this by providing a per-workflow
file catalog, where jobs can reference files with special URIs starting with wf:
.
Jobs can register outputs with the file catalog using stage-out directives, for example,
Exports: [
{ "From": "stdout", "To": "wf:step1_stdout" }
]
will register the stdout
file under the name wf:step1_stdout (note that the file will not be
copied anywhere).
Later jobs can reference files from the catalog using stage-in directives, for example,
Imports: [
{ "From": "wf:step1_stdout", "To": "input_file" }
]
The workflow engine will take care of resolving the wf:...
reference to the actual physical location.
Apart from registration of files in jobs, the user can also manually register files using the
inputs
section of the main workflow:
"inputs": {
"wf:input_data_1": "https://some_storage/somefile.pdf",
"wf:input_params": "https://some_storage/parameters.txt"
}
For an example of a workflow, have a look at Simple two-step workflow with data dependency.
The Workflow REST API allows you to list (and modify) the file catalog via
the BASE/{id}/files
endpoint.
Transitions and conditions
The basic flow of control in a workflow is handled using transition elements. These reference from and to activities or subflows, and may have conditions attached. If no condition is present, the transition is followed unconditionally, otherwise the condition is evaluated and the transition is followed only if the condition matches (i.e. evaluates to true).
The syntax for a Transition is as follows:
{
"from" : "from_id",
"to" : "to_id",
"condition": "expression"
}
The from
and to
elements denote activity or subworkflow id’s.
An activity can have outgoing (and incoming) transitions. In general, all outgoing transitions
(where the condition is fulfilled) will be followed. The exception is the Branch
activity,
where only the first matching transition will be followed.
The optional condition element is a string-valued expression. The workflow engine offers some pre-defined functions that can be used in these expressions. For example, you can use the exit code of a job, or check for the existence of a file within these expressions.
eval(expr)
Evaluates the expression expr in Groovy syntax, which must evaluate to a boolean. The expression may contain workflow variables.exitCodeEquals(activityID, value)
Allows to compare the exit code of the UNICORE job associated with the Activity identified by activityID to value.exitCodeNotEquals(activityID, value)
Allows to check the exit code of the UNICORE job associated with the Activity identified by activityID, and check that it is different from value.fileExists(activityID, fileName)
Checks that the working directory of the UNICORE job associated with the given Activity contains a file fileName.fileLengthGreaterThanZero(activityID, fileName)
Checks that the working directory of the UNICORE job associated with the given Activity contains the named file, which has a non-zero length.before(time)
andafter(time)
check whether the current time is before or after the given time (in yyyy-MM-dd HH:mm format).fileContent(activityID, fileName)
Reads the content of the named file in the working directory of the job associated with the given Activity and returns it as a string.
Using workflow variables
Workflow variables need to be declared using an entry in the variables
array before they can be
used.
{
"name": "...",
"type": "...",
"initial_value": "..."
}
Currently variables of type STRING
, INTEGER
, FLOAT
and BOOLEAN
are supported.
Variables can be modified using an activity of type MODIFY_VARIABLE
.
For example, to increment the value of the COUNTER variable, the following Activity is used
{
"type": "MODIFY_VARIABLE",
"id": "incrementCounter",
"variable_name": "COUNTER",
"expression": "COUNTER += 1;"
}
The expression
contains an expression in Groovy syntax (which is very close to Java).
The workflow engine will replace variables in job data staging sections and environment definitions, allowing to inject variables into jobs. Examples for this mechanism will be given in the Examples section.
Loop constructs
Apart from graphs constructed using activity
and transition
elements, the workflow system
supports special looping constructs, for-each, while and repeat-until, which allow to build
complex workflows.
While and repeat-until loops
These allow to loop a certain part of the workflow while (or until) a condition is met. A while loop looks like this
{
"id": "while_example",
"type" : "WHILE",
"variables" : [
{
"name": "C",
"type": "INTEGER",
"initial_value": "1",
}
],
"body":
{
"activities": {
"task" : {
"job": { ... }
},
"mod": {
# this modifies the variable used in the 'while'
# loop's exit condition
"type": "MODIFY_VARIABLE",
"variable_name": "C",
"expression": "C++;",
}
},
"transitions: [
{"from": "task", "to": "mod"}
]
},
"condition": "eval(C<5)",
}
The necessary ingredients are that the loop’s body
modifies the loop variable (“C” in the
example), and the exit condition eventually terminates the loop.
For a full workflow example, see While loop example using workflow variables.
Completely analogously to the while loop, a repeat-until loop is constructed, the only syntactic difference is that the subworkflow now has a different type element:
{
"id": "repeat_example",
"type": "REPEAT_UNTIL",
...
}
Semantically, the repeat-loop will always execute the body at least once, since the condition is checked after executing the body, while in the while case, the condition will be checked before executing the body.
For-each loop
The for-each loop is a complex and powerful feature of the workflow system, since it allows parallel execution of the loop body, and different ways of building the different iterations. Put briefly, one can loop over variables (as in the while and repeat-until case), but one can also loop over enumerated values and (most importantly) over file sets.
The basic syntax is:
{
"id": "for_each_example",
"type": "FOR_EACH",
"iterator_name": "IT",
"body": {
},
# define range to loop over
"values": [...],
# OR variables
"variables": [...],
# OR files
"filesets": [...],
# with optional chunking
"chunking":
}
The iterator_name
element allows to control how the loop iterator variable is to be called,
by default it is named IT.
The values
element
Using value, iteration over a fixed set of strings can be defined. The main use for this is parameter sweeps, i.e. executing the same job multiple times with different arguments or environment variables.
"values": ["1", "2", "3", ],
The following variables are set where IT
is the loop iterator_name
defined
in the for group as shown above:
IT
is set to the current iteration index (1, 2, 3, …)IT_VALUE
is set to the current value
The variables
element
The variables
element allows to define the iteration range using one or more variables,
similar to a for-loop in a programming language.
"variables: [
{
"variable_name": "X",
"type": "INTEGER",
"start_value": "0",
"expression": "Y++",
"end_condition": "Y<2"
},
{
"variable_name": "Y",
"type": "INTEGER",
"start_value": "0",
"expression": "Y++",
"end_condition": "Y<2"
}
],
The sub-elements should be self-explanatory.
Note that you can use more than one variable range, allowing you to quickly create things like parameter studies.
The following variables are set where IT
is the loop iterator_name
defined
in the for group as shown above:
IT
is set to the current iteration index (1, 2, 3, …)IT_VALUE
is set to the current value
The file_sets
element
This variation of the for-each loop, allows to loop over a set of files, optionally chunking together several files in a single iteration.
The basic structure of a file set definition is this:
"file_sets": [
{
"base": "...",
"include": [ "..." ],
"exclude": [ "..." ],
"recurse": "true|false",
"indirection": "true|false",
},
]
The base element defines a base of the filenames, which will be resolved at runtime, and
complemented according to the include and/or exclude elements. The recurse
attribute allows
to control whether the resolution should be done recursively into any subdirectories. The
indirection attribute is explained below.
For example, to recursively collect all PDF files (except two files named unused*.pdf) in a certain directory on a storage:
"file_sets": [
{
"base": "https://mysite/rest/core/storages/my_storage/files/pdf/</s:Base>
"include": [ "*.pdf" ],
"exclude": [ "unused1.pdf", "unused2.pdf", ],
"recurse": "true"
}
]
The following variables are set where IT
is the loop iterator_name
defined
in the for group as shown above:
IT
is set to the current iteration index (1, 2, 3, …)IT_VALUE
is set to the current full file pathIT_FILENAME
is set to the current file name (last element of the path)
Indirection
Sometimes the list of files that should be looped over is not known at workflow design time, but
will be computed at runtime. Or, you wish simply to list the files in a file, and not put them
all in your workflow description. The indirection
attribute on a FileSet allows to do just that.
If indirection
is set to true
, the workflow engine will load the given file(s) in the
fileset at runtime, and read the actual list of files to iterate over from them. As an example,
you might have a file filelist.txt containing a list of UNICORE file URLs:
https://someserver/file1
https://someserver/fileN
...
and the fileset
{
"indirection": "true",
"base": "https://someserver/rest/core/storages/mystorage/files/</s:Base>
"include": [ "filelist.txt" ],
}
You can have more than one file list.
Chunking
Chunking allows to group sets of files into a single iteration, for example for efficiency reasons.
A chunk is either a certain number of files, or a set of files with a certain total size.
"chunking": {
"chunksize": ... ,
"type": "NORMAL|SIZE",
"filename_format": "...,
"expression": "... formula to compute chunksize ...",
}
The chunksize
element is either the number of files in a chunk, or (if type is set to SIZE
)
the total size of a chunk in kbytes.
For example,
To process 10 files per iteration:
"chunking": { "chunksize": "10", }To process 2000 kBytes of data per iteration:
"chunking": { "chunksize": "2000", "type": "SIZE" }
The chunksize
can also be computed at runtime using the expression given in the optional
expression element. In the expression, two special variables may be used. The TOTAL_NUMBER
variable holds the total number of files iterated over, while the TOTAL_SIZE
variable holds
the aggregated size of all files in kbytes. The script must return an integer-valued result.
The type
element is used to choose whether the chunk size is interpreted as number of files or
data size.
For example, to choose a larger chunksize if a certain total file size is exceeded:
"chunking": {
"expression": "if(TOTAL_SIZE>50*1024)return 5*1024 else return 2048;"
"type": "SIZE"
}
The optional filename_format
allows to control how the individual files (which are staged into
the job directory) should be named. By default, the index is prepended, i.e. an import statement
like
"Imports": [{ "From": "${IT_VALUE}", "To" : "infile.txt" }]
would result in 1_infile.txt to N_infile.txt in each chunk.
In the filename_format
pattern you can use the variables {0}
, {1}
and {2}
,
which denote the index, filename without extension and extension respectively.
{0} = 1, 2, 3, ...
{1} = "infile"
{2] = "txt"
For example, if you have a set of PDF files, and you want them to be named file_1.pdf to file_N.pdf, you could use the pattern:
"filename_format": "file_{0}.pdf"
which would ignore the original filename in the To
field completely.
Or, if you prefer to keep the existing extensions, but append an index to the name, use
"filename_format": "{1}{0}.{2}"
which would result in filenames like below:
inputfile1.txt
inputfile2.txt
...
You can also keep the original filenames by setting:
"Imports": [{ "From": "${IT_VALUE}", "To" : "${IT_ORIGINAL_FILENAME}"}]
The following variables are set where IT
is the loop iterator_name
defined
in the for group as shown above:
IT
is set to the current iteration index (1, 2, 3, …)IT_VALUE
is set to the current full file pathIT_ORIGINAL_FILENAME_x
is set to the current file name (last element of the path)IT_ORIGINAL_FILENAMES
is set to a “;”-separated list of all the file names (last elements of the paths) in the current chunk
Examples
This section collects a few simple example workflows. They are intended to be submitted using UNICORE Commandline Client.
Simple two-step workflow with data dependency
This example shows how to link output from one task to the input of another task using the internal file catalog.
The first task, step1, registers its stdout
with the file catalog under the name
wf:step1_out
, and the second task, step2, pulls that file in for further processing.
{
"activities": {
"step1": {
"job": {
"ApplicationName": "Date",
"Exports": [
{"From": "stdout", "To": "wf:step1_out"}
]
}
},
"step2": {
"job": {
"Executable": "md5sum",
"Arguments": ["infile" ],
"Imports": [
{ "From": "wf:step1_out", "To": "infile"}
]
}
}
},
"transitions": [
{"from": "step1", "to": "step2" }
]
}
Simple diamond graph
This example shows how to use transitions for building simple workflow graphs. It consists of four Date jobs arranged in a diamond shape, i.e. date2a and date2b are executed (more or less) simultaneously.
{
"activities": {
"date1": {
"job": { "ApplicationName": "Date" }
},
"date2a": {
"job": { "ApplicationName": "Date" }
},
"date2b": {
"job": { "ApplicationName": "Date" }
},
"date3": {
"job": { "ApplicationName": "Date" }
}
},
"transitions": [
{"from": "date1", "to": "date2a" },
{"from": "date1", "to": "date2b" },
{"from": "date2a", "to": "date3" },
{"from": "date2b", "to": "date3" }
]
}
Conditional execution in an if-else construct
Transitions from one activity to another may be conditional, which allows all sorts of if-else constructs. Here is a simple example:
{
"activities": {
"branch": { "type": "BRANCH" },
"if-job": {
"job": { "ApplicationName": "Date" }
},
"else-job": {
"job": { "ApplicationName": "Date" }
}
},
"transitions": [
{"from": "branch", "to": "if-job", "condition": "2+2==4"},
{"from": "branch", "to": "else-job" }
]
}
Here we use the BRANCH
activity which will only follow the first matching transition.
While loop example using workflow variables
The next example shows some uses of workflow variables in a while loop. The loop variable C is copied into the job’s environment. Another possible use is to use workflow variables in data staging sections, for example to name files.
{
"activities":{},
"subworkflows": {
"while-example": {
"type": "WHILE",
"variables": [
{
"name": "C",
"type": "INTEGER",
"initial_value": "0"
}
],
"condition": "C<5",
"body": {
"activities": {
"job": {
"job": {
"Executable": "echo",
"Arguments": ["$TEST"],
"Environment": ["TEST=${C}"],
"Exports": [
{ "From": "stdout", "To": "wf:/out_${C}" }
]
}
},
"mod": {
"type": "MODIFY_VARIABLE",
"variable_name": "C",
"expression": "C++"
}
},
"transitions": [
{"from": "job", "to": "mod" }
]
}
}
}
}
For-each loop example
The next example shows how to use the for-each loop to loop over a set of files. The jobs will stage-in the current file. Also, the name of the current file is placed into the job environment.
{
"subworkflows": {
"for-example": {
"type": "FOR_EACH",
"iterator_name": "IT",
"body":
{
"activities": {
"job": {
"id": "job",
"job": {
"Executable": "echo",
"Arguments": ["processing: ", "$NAME"],
"Environment": ["NAME=${IT_FILENAME}"],
"Imports": [
{"From": "${IT_VALUE}", "To": "infile"}
],
"Exports": [
{"From": "stdout", "To": "wf:/out_${IT}"}
]
}
}
},
},
"filesets": [
{
"base": "https://mygateway.de:7700/MYSITE/rest/core/storages/my_storage/",
"include": ["*"],
}
]
}
}
}