Workflows
In Nextflow, a workflow is a specialized function for composing processes and dataflow logic:
-
An entry workflow is the entrypoint of a pipeline. It can take parameters as inputs using the
paramsblock, and it can publish outputs using theoutputblock. -
A named workflow is a workflow that can be called by other workflows. It can define its own inputs and outputs, which are called takes and emits.
-
Both entry workflows and named workflows can contain dataflow logic such as calling processes, workflows, and channel operators.
Entry workflow
A script can define up to one entry workflow, which does not have a name and serves as the entrypoint of the script:
workflow {
channel.of('Bonjour', 'Ciao', 'Hello', 'Hola')
.map { v -> "$v world!" }
.view()
}
Parameters
Parameters can be declared by assigning a params property to a default value:
params.input = '/some/data/file'
params.save_intermeds = false
workflow {
if( params.input )
analyze(params.input, params.save_intermeds)
else
analyze(fake_input(), params.save_intermeds)
}
The default value can be overridden by the command line, params file, or config file. Parameters from multiple sources are resolved in the order described in Pipeline parameters.
Outputs
Workflow outputs are intended to replace the publishDir directive. See Migrating to workflow outputs for guidance on migrating from publishDir to workflow outputs.
A script can define an output block to declare the top-level workflow outputs. Each output should be assigned in the publish section of the entry workflow. Any channel in the workflow can be assigned to an output, including process and subworkflow outputs.
Example:
process fetch {
// ...
output:
path 'sample.txt'
// ...
}
workflow {
main:
ch_samples = fetch(params.input)
publish:
samples = ch_samples
}
output {
samples {
path '.'
}
}
In the above example, the output of process Each workflow output can define how files are published from the work directory to a designated output directory. Output directory You can set the top-level output directory for a run using the The default output directory is Publish path By default, Nextflow publishes all output files to the output directory. Each workflow output can define where to publish files within the output directory using the The following directory structure is created: Nextflow publishes all files received by an output into the specified directory. Nextflow recursively scans lists, maps, and tuples for nested files: Files that do not originate from the work directory are not published. Dynamic publish path The The above example publishes each channel value to a different subdirectory. In this case, each pair of FASTQ files is published into a subdirectory based on the sample ID. Alternatively, you can define a different path for each individual file using the Each When using this syntax, only files captured with the Conditional publishing Outputs can be conditionally published using pipeline parameters: In the above example, the BAM files specified by Index files are structured metadata files that catalog published outputs and their associated metadata. An index file preserves the structure of channel values, including metadata, which is more robust than encoding this information into file paths. The index file can be a CSV ( Each output can create an index file of its published values: The above example writes the following CSV file to You can customize the index file with additional directives, for example: This example produces the following index file: Files that do not originate from the work directory are not published, but are included in the index file. See Output directives for the list of available index directives. You can apply labels to each workflow output using the Labels can be used to find and filter output files across workflow runs with data lineage. See Data lineage for details on how to query output files by label. The following directives are available for each output in the output block: Create an index file containing a record of each published value. The following directives are available in an index definition: When The name of the index file relative to the base output directory (required). Can be a CSV, JSON, or YAML file. The character used to separate values (default: Attach a label to every file published by this output. Can be specified multiple times to attach multiple labels. Labels are stored in the Specify the publish path relative to the output directory (default: Additionally, the following options from the workflow config scope can be specified as directives: For example:fetch is assigned to the samples workflow output. How this output is published to a directory structure is described in the next section.Publishing files
-output-dir command-line option or the outputDir config option:nextflow run main.nf -output-dir 'my-results'// nextflow.config
outputDir = 'my-results'results in the launch directory.path directive:workflow {
main:
ch_step1 = step1()
ch_step2 = step2(ch_step1)
publish:
step1 = ch_step1
step2 = ch_step2
}
output {
step1 {
path 'step1'
}
step2 {
path 'step2'
}
}results/
└── step1/
└── ...
└── step2/
└── ...workflow {
main:
ch_samples = channel.of(
tuple( [id: 'SAMP1'], [ file('1.txt'), file('2.txt') ] )
)
publish:
samples = ch_samples // 1.txt and 2.txt are published
}path directive can also be a closure which defines a custom publish path for each channel value:workflow {
main:
ch_samples = channel.of(
[id: 'SAMP1', fastq_1: file('1.fastq'), fastq_2: file('2.fastq')]
)
publish:
samples = ch_samples
}
output {
samples {
path { sample -> "fastq/${sample.id}/" }
}
}>> operator:output {
samples {
path { sample ->
sample.fastq_1 >> "fastq/${sample.id}/"
sample.fastq_2 >> "fastq/${sample.id}/"
}
}
}>> specifies a source file and publish target. The source file should be a file or collection of files, and the publish target should be a directory or file name. If the publish target ends with a slash, Nextflow treats it as the directory in which to publish source files.>> operator are saved to the output directory.output {
samples {
path { sample ->
sample.fastqc >> "fastqc/"
sample.bam >> (params.save_bams ? "align/" : null)
}
}
}sample.bam are published only when params.save_bams is true.Index files
.csv), JSON (.json), or YAML (.yml, .yaml) file. The channel values should be files, lists, maps, or tuples.workflow {
main:
ch_samples = channel.of(
[id: 1, name: 'sample 1', fastq_1: '1a.fastq', fastq_2: '1b.fastq'],
[id: 2, name: 'sample 2', fastq_1: '2a.fastq', fastq_2: '2b.fastq'],
[id: 3, name: 'sample 3', fastq_1: '3a.fastq', fastq_2: null]
)
publish:
samples = ch_samples
}
output {
samples {
path 'fastq'
index {
path 'samples.csv'
}
}
}results/samples.csv:"1","sample 1","results/fastq/1a.fastq","results/fastq/1b.fastq"
"2","sample 2","results/fastq/2a.fastq","results/fastq/2b.fastq"
"3","sample 3","results/fastq/3a.fastq",""index {
path 'samples.csv'
header true
sep '|'
}"id"|"name"|"fastq_1"|"fastq_2"
"1"|"sample 1"|"results/fastq/1a.fastq"|"results/fastq/1b.fastq"
"2"|"sample 2"|"results/fastq/2a.fastq"|"results/fastq/2b.fastq"
"3"|"sample 3"|"results/fastq/3a.fastq"|""Labels
label directive:output {
multiqc_report {
label 'qc'
label 'summary'
}
}Output directives
indexheadertrue, the keys of the first record are used as the column names (default: false). Can also be a list of column names. Only used for CSV files.pathsep','). Only used for CSV files.labellabels field of FileOutput records in the lineage store.path'.'). Can be a path, a closure that defines a custom directory for each published value, or a closure that publishes individual files using the >> operator.
contentTypeenabledignoreErrorsmodeoverwritestorageClasstagsoutput {
samples {
mode 'copy'
}
}
Named workflows
A named workflow is a workflow that can be called by other workflows:
workflow my_workflow {
hello()
bye( hello.out.collect() )
}
workflow {
my_workflow()
}
The above example defines a workflow named The Inputs can be specified like arguments when calling the workflow: The When calling the workflow, the output can be accessed using the If an output is assigned to a name, the name can be used to reference the output from the calling workflow. For example: The result of the above workflow can be accessed using Every output must be assigned to a name when multiple outputs are declared.my_workflow which is called by the entry workflow. Both hello and bye could be any other process or workflow.Takes and emits
take: section declares the inputs of a named workflow:workflow my_workflow {
take:
data1
data2
main:
hello(data1, data2)
bye(hello.out)
}workflow {
my_workflow( channel.of('/some/data') )
}emit: section declares the outputs of a named workflow:workflow my_workflow {
main:
hello(data)
bye(hello.out)
emit:
bye.out
}out property, i.e. my_workflow.out.workflow my_workflow {
main:
hello(data)
bye(hello.out)
emit:
my_data = bye.out
}my_workflow.out.my_data.
Dataflow
Workflows consist of dataflow logic, in which processes are connected to each other through dataflow channels and dataflow values. A dataflow channel (or simply channel) is an asynchronous sequence of values. The values in a channel cannot be accessed directly, but only through an operator or process. For example: Factories A channel can be created by factories in the See Channel factories for the full list of channel factories. Operators Channel operators, or operators for short, are functions that consume and produce channels. Because channels are asynchronous, operators are necessary to manipulate the values in a channel. Operators are particularly useful for implementing glue logic between processes. Commonly used operators include: collect: collect the channel values into a collection combine: emit the combinations of two channels filter: emit only the channel values that satisfy a condition flatMap: emit multiple values for each channel value with a closure grouptuple: group the channel values based on a grouping key join: join the values from two channels based on a matching key map: transform each channel value with a mapping function mix: emit the values from multiple channels view: print each channel value to standard output See Operators for the full set of operators. A dataflow value is an asynchronous value. Dataflow values can be created using the channel.value factory, and they are created by processes (under certain conditions). A dataflow value cannot be accessed directly, but only through an operator or process. For example: See Value<V> for the set of available methods for dataflow values. Processes and workflows are called like functions, passing their inputs as arguments: Processes and workflows have a few extra rules for how they can be called: Processes and workflows can only be called by workflows A given process or workflow can only be called once in a given workflow. To use a process or workflow multiple times in the same workflow, use module aliases. The "return value" of a process or workflow call is the process outputs or workflow emits, respectively. The return value can be assigned to a variable or passed into another call: Named outputs can be accessed as properties of the return value: As a convenience, process and workflow outputs can also be accessed without first assigning to a variable, by using the Process named outputs are defined using the Process and workflow outputs can also be accessed by index (e.g., Workflows can be composed in the same way: The same process can be called in different workflows without using an alias, like The fully qualified process name can be used as a process selector in a Nextflow configuration file, and it takes priority over the simple process name. The following operators have a special meaning when used in a workflow with process and workflow calls. As a best practice, avoid these operators when type checking is enabled. Using these operators will prevent the type checker from validating your code. Pipe The The above snippet defines a process named The same code can also be written as: And The In the above snippet, the initial channel is piped to the map operator, which reverses the string value. Then, the result is passed to the processes The same code can also be written as: This is a preview feature and requires the Processes can be invoked recursively using the In the above example, the The recursive output can also be limited using the Workflows can also be invoked recursively: Limitations A recursive process or workflow must have matching inputs and outputs, such that the outputs for each iteration can be supplied as the inputs for the next iteration. Recursive workflows cannot use reduction operators such as Channels
channel.of(1, 2, 3).view { v -> "channel emits ${v}" }channel emits 1
channel emits 2
channel emits 3channel namespace. For example, the channel.fromPath() factory creates a channel from a file name or glob pattern, similar to the files() function:channel.fromPath('input/*.txt').view()
Values
channel.value(1).view { v -> "dataflow value is ${v}" }dataflow value is 1Calling processes and workflows
process hello {
output:
path 'hello.txt', emit: txt
script:
"""
your_command > hello.txt
"""
}
process bye {
input:
path 'hello.txt'
output:
path 'bye.txt', emit: txt
script:
"""
another_command hello.txt > bye.txt
"""
}
workflow hello_bye {
take:
data
main:
hello()
bye(data)
}
workflow {
data = channel.fromPath('/some/path/*.txt')
hello_bye(data)
}
workflow hello_bye {
take:
data
main:
bye_out = bye(hello(data))
emit:
bye_out
}
workflow {
data = channel.fromPath('/some/path/*.txt')
bye_out = hello_bye(data)
}workflow hello_bye {
take:
data
main:
hello_out = hello(data)
bye_out = bye(hello_out.txt)
emit:
bye = bye_out.txt
}
workflow {
data = channel.fromPath('/some/path/*.txt')
flow_out = hello_bye(data)
bye_out = flow_out.bye
}.out property of the process or workflow name:workflow hello_bye {
take:
data
main:
hello(data)
bye(hello.out)
emit:
bye = bye.out
}
workflow {
data = channel.fromPath('/some/path/*.txt')
hello_bye(data)
hello_bye.out.bye.view()
}emit option on a process output. See naming process outputs for more information.hello.out[0], hello.out[1], etc.). As a best practice, multiple outputs should be accessed by name.workflow flow1 {
take:
data
main:
tick(data)
tack(tick.out)
emit:
tack.out
}
workflow flow2 {
take:
data
main:
tick(data)
tock(tick.out)
emit:
tock.out
}
workflow {
data = channel.fromPath('/some/path/*.txt')
flow1(data)
flow2(flow1.out)
}tick in the above example, which is used in both flow1 and flow2. The workflow call stack determines the fully qualified process name, which is used to distinguish the different process calls, i.e. flow1:tick and flow2:tick in the above example.Special operators
|| pipe operator can be used to chain processes, operators, and workflows:process greet {
input:
val data
output:
val result
exec:
result = "$data world"
}
workflow {
channel.of('Hello', 'Hola', 'Ciao')
| greet
| map { v -> v.toUpperCase() }
| view
}greet and invokes it with the input channel. The result is then piped to the map operator, which converts each string to uppercase, and finally to the view operator which prints it.workflow {
ch_input = channel.of('Hello', 'Hola', 'Ciao')
ch_greet = greet(ch_input)
ch_greet
.map { v -> v.toUpperCase() }
.view()
}&& and operator can be used to call multiple processes in parallel with the same channel(s):process greet {
input:
val data
output:
val result
exec:
result = "$data world"
}
process to_upper {
input:
val data
output:
val result
exec:
result = data.toUpperCase()
}
workflow {
channel.of('Hello')
| map { v -> v.reverse() }
| (greet & to_upper)
| mix
| view
}greet and to_upper, which are executed in parallel. Each process outputs a channel, and the two channels are combined using the mix operator. Finally, the result is printed using the view operator.workflow {
ch = channel.of('Hello').map { v -> v.reverse() }
ch_greet = greet(ch)
ch_upper = to_upper(ch)
ch_greet.mix(ch_upper).view()
}Process and workflow recursion
nextflow.preview.recursion feature flag to be enabled. The syntax and behavior may change in future releases.recurse method.nextflow.preview.recursion = true
params.start = 10
workflow {
count_down
.recurse(params.start)
.until { v -> v == 0 }
.view { v -> "${v}..." }
}
process count_down {
input:
val v
output:
val v
exec:
sleep(1000)
v = v - 1
}9...
8...
7...
6...
5...
4...
3...
2...
1...
0...count_down process is first invoked with the value params.start. On each subsequent iteration, the process is invoked again using the output from the previous iteration. The recursion continues until the specified condition is satisfied, as defined by the until method, which terminates the recursion.times method:count_down
.recurse(params.start)
.times(3)
.view { v -> "${v}..." }nextflow.preview.recursion = true
params.input = "recurse-workflow.in"
workflow {
clock
.recurse(file(params.input))
.until { file -> file.size() > 64 }
.view { file -> file.text }
}
workflow clock {
take:
logfile
emit:
tock(tick(logfile))
}
process tick {
input:
path 'input.txt'
output:
path 'result.txt'
script:
"""
cat input.txt > result.txt
echo "Task ${task.index} : tick" >> result.txt
"""
}
process tock {
input:
path 'input.txt'
output:
path 'result.txt'
script:
"""
cat input.txt > result.txt
echo "Task ${task.index} : tock" >> result.txt
"""
}hello
Task 1 : tick
Task 1 : tock
hello
Task 1 : tick
Task 1 : tock
Task 2 : tick
Task 2 : tock
hello
Task 1 : tick
Task 1 : tock
Task 2 : tick
Task 2 : tock
Task 3 : tick
Task 3 : tock
collect, reduce, and toList, because these operators cause the recursion to hang indefinitely after the initial iteration.