Nextflow教學 - 以最小可行專案為例


Introduction

workflow framework要學嗎?
其實也不一定要,自己用python之類的自刻一套完整有邏輯的workflow framework(之後簡稱WF)也行

只是想說104上面一堆公司都在那邊最好要會Nextflow
就來學學吧,大家統一都用一樣的WF也好

網路上的教學文章超級少
而且大多都在那邊抄官網的教學翻成中文而已= =
完全沒有一個可以當做友善快速入門的,一堆農場文章
最後我的學習資源還是來自官網說明文件和官方發佈在yt的教學影片
(但都覺得寫的不是很符合需求= =)

Nextflow’s documentation
Nextflow Training Workshop - April 2022

他們有收錄一些常見狀況寫出範例,這個也能參考

https://github.com/nextflow-io/patterns

我認為WF應該只要負責流程管控就好,不應該又再負責運算處理的部分
要運算和分析應該用開發者自己熟悉的語言寫成模組和腳本就好

抱怨完畢可以開始了

我是以最小可行性方案寫一個在組裝基因體workflow的專案
一切都以真實需求為目的導向

下面的解說也都是拿我的專案當範例

https://github.com/hunglin59638/plott

以Nextflow寫一個workflow我認爲最基本的檔案應該要有

  • nextflow.config: 客製化設定都寫在這,像輸入參數和資源配置
  • modules.nf: 所有的process都寫在這,雖然也可放到main.nf,但會顯得很長不符合模組化觀念
  • main.nf: 執行流程寫在這

其他的都太浮誇了,都可以先不用

基本設定 - nextflow.config

這裡的寫法都是需告函式名稱,然後把參數寫在{ }裡
但也可以用函式.參數=,但這樣看起來超冗長

params.input="test.fq"
params.out_dir="output"

manifest

專案的基本資訊都寫在這
比較有用的是mainScript
nextflow run /path/to/project/dir就能自動辨別要執行main.nf這個檔案,當然也可以直接指定要run main.nf

manifest {
    author = 'Hung-Lin, Chen'
    name = 'Plott' 
    homePage = 'http://github.com/hunglin59638/plott'
    description = 'A pipeline to assembly genomes'
    mainScript = 'main.nf'
    version = '1.0.0'
    nextflowVersion = '>= 20.07.0'
}

process

執行每個process的預設設定

  • cpus: 單個process可以允許最多用多少cpu
  • memory: 允許多少記憶體可使用
    其他就看介紹吧

https://www.nextflow.io/docs/latest/process.html#cpus

example:

process {
    cpus = Runtime.getRuntime().availableProcessors()
    memory = {8.Gb*task.attempt}
    errorStrategy = { task.attempt <= 2 ? "retry" : "ignore" }
    maxRetries = 2
    cache = true
}

Runtime.getRuntime().availableProcessors()這個應該是Groovy的語法,跟python的os.cpu_count()應該是一樣的

task也是預留變數,最常用的應該是task.cpus
task.attempt則是記錄該process重新執行幾次了

process的config設定不一定要寫,已經有預設了
像cache就是預設true

params

參數都寫在這,可以寫一樣想要給預設值的參數

example:

params {
    help = false
    output = "plott_out"
    long_reads = ""
    long_read_source = "nano-raw"
    short_1 = ""
    short_2 = ""
    out_dir = "$launchDir/" + params.output
    threads = Runtime.getRuntime().availableProcessors()
    is_meta = false
    sketches = ""
}

即使沒寫在nextflow.config,但在下指令時也是可以寫,nextflow不像python的argparse模組一樣如果在command寫不存在的參數就會報錯(預設的情況下)

像是nextflow run main.nf --input reads.fq
即使在config沒有input這個參數也依然可以執行
且程式執行時可以讀取到params.inputreads.fq

模組化 - modules.nf

process就像是定義函式, 後面接函式名稱

大括弧內基本會寫的是input, output, script
不過這些都不是必填,只是一般情況都會填他們
雖然填寫順序通常是input, output, script
但邏輯順序應該是input寫好後script接收input,最後才是宣告哪些東西是output

input

input大多時候會填的是變數類型是path
如果是path,nextflow就會在預設的work目錄生成一個該名稱的軟連結
不論該路徑是否真實存在

porechop這個process我寫的input有一個path叫read
他就是個變數名,所以可以在script的地方加上$使用

另一個常用的類型是val,可以是數字或字串或是布林值

像是flye的input中read_source是要輸入字串, is_meta是接收布林值

script

自己建議的寫法是一行指令就結束
不要把script的內容寫在nextflow, 應該寫在.py, .sh, .pl之類的腳本再呼叫它們,因爲nextflow應該只要負責流程控管

output

output則是定義哪些路徑或變數要輸出這個process到下個process
如果沒定義,那就沒辦法傳遞出去,跟程式語言的函式最後要定義return一樣

邏輯上的先後順序是在script寫好輸出的路徑名,然後output指定說要把該路徑作爲輸出

像porechop的script可以看到-o trim.fq,所以跑完時會輸出名爲trim.fq的檔案
再來才是用output去指定trim.fq作爲輸出

example:

process porechop {

    input:
        path read
    
    output:
        path 'trim.fq'
    
    script:
    """
    porechop --discard_middle -i $read -o trim.fq --threads $task.cpus
    """

}


process flye {
    input:
        path read
        val read_source
        val is_meta
    
    output:
        path 'flye_out/assembly.fasta'

    script:
    def add_meta = is_meta ? "--meta" : ''
    """
    flye --threads $task.cpus --out-dir flye_out --plasmids $add_meta --iterations 2 --$read_source $read
    """
    

}

額外補充,如果是要依據參數來決定script中指令的參數可以用if else或是簡單點用單行if-else
下面範例變成人類語言就是如果is_meta是true輸出”–meta”,反之輸出’’

def add_meta = is_meta ? "--meta" : ''

完整的if-else範例(conditional-scripts)

https://www.nextflow.io/docs/latest/process.html?highlight=cache#conditional-scripts

Workflow撰寫 - main.nf

DSL版本

簡單說就是nextflow語法的版本,跟docker-compose一樣可以宣告要使用的版本,不同的版本支援的寫法不一樣

建議用第2版的語法,2相較1思維較接近WF應該有的樣子
1的語法還是很像個程式語言而已

nextflow.enable.dsl = 2

匯入process

再來是要import要使用哪些process,注意是用;分隔

include { filtlong; porechop; flye; racon; racon_sec; medaka_consensus; homopolish; publish } from './modules.nf'

宣告參數

如果是檔案的參數可以加上file(),如果輸入的不是存在的檔案就會報錯

long_reads = file(params.long_reads)

workflow

整個專案裡最核心的部分,通常是先寫一個channel

long_reads = channel.fromPath(long_reads)

接下來就跟一般的程式語言的函式很像了
qc_reads和 trim_reads都是process的output,在nextflow裡是channel

qc_reads = filtlong(long_reads)
trim_reads = porechop(qc_reads)

example:

#!/usr/bin/env nextflow
nextflow.enable.dsl = 2
include { filtlong; porechop; flye; racon; racon_sec; medaka_consensus; homopolish; publish } from './modules.nf'


long_reads = file(params.long_reads)
read_source = params.long_read_source
sketches = file(params.sketches)

workflow {
  long_reads = channel.fromPath(long_reads)
  qc_reads = filtlong(long_reads)
  trim_reads = porechop(qc_reads)
  draft_asm = flye(trim_reads, read_source, params.is_meta)
  racon_first = racon(trim_reads, draft_asm)
  racon_second = racon_sec(trim_reads, racon_first)
  consensus_fasta = medaka_consensus(trim_reads, racon_second)
  homopolish_out = homopolish(consensus_fasta, sketches)
  publish(homopolish_out, params.out_dir)

}

執行pipeline

nextflow run plott/main.nf 
--long_reads SRR21601763.fastq --sketches genomes.msh --out_dir plott_out -resume

執行nf之後預設會在現在的位置產生一個名爲work的目錄作為執行的工作目錄
每個process都會產生一個英文和數字組合的目錄如95/06f4d4
裡面的檔案如果是channel過來的就會用軟連結如SRR21601763.fastq -> /home/hunglin/omv/dataset/sra/

output在terminal的log

N E X T F L O W  ~  version 22.10.1
Launching `side_projects/plott/main.nf` [focused_lalande] DSL2 - revision: f2220b5258
executor >  local (3)
[95/06f4d4] process > filtlong (1)         [100%] 1 of 1, cached: 1 ✔
[98/f0914e] process > porechop (1)         [100%] 1 of 1, cached: 1 ✔
[f6/390d36] process > flye (1)             [100%] 1 of 1, cached: 1 ✔
[ed/ebcefb] process > racon (1)            [100%] 1 of 1, cached: 1 ✔
[b7/f3cf24] process > racon_sec (1)        [100%] 1 of 1, cached: 1 ✔
[d8/334735] process > medaka_consensus (1) [100%] 1 of 1 ✔
[44/c757af] process > homopolish (1)       [100%] 1 of 1 ✔
[82/da0cff] process > publish (1)          [100%] 1 of 1 ✔
Completed at: 20-Nov-2022 15:02:21
Duration    : 13m 9s
CPU hours   : 22.6 (88.4% cached)
Succeeded   : 3
Cached      : 5

tree of work directory:

├── 95
│   └── 06f4d46de66d5be5f91f7c9c0a4d54
│       ├── SRR21601763.fastq -> /home/hunglin/omv/dataset/sra/SRR21601763.fastq
│       └── filtlong.fq
├── 98
│   └── f0914e11ce7858ad977ed38d3bdf1c
│       ├── filtlong.fq -> /home/hunglin/omv/work/95/06f4d46de66d5be5f91f7c9c0a4d54/filtlong.fq
│       └── trim.fq
├── f6
│   └── 390d3614faa1f0cb896dae8ef31405
│       ├── trim.fq -> /home/hunglin/omv/work/98/f0914e11ce7858ad977ed38d3bdf1c/trim.fq
│       └── flye_out
├── ed
│   ├── 9b0653e23c6ca289faeb4f37cf8a8d
│   │   ├── consensus.fasta -> /home/hunglin/omv/work/ae/c98ac9d1009eefa7f06da0ce46e02a/medaka_out/consensus.fasta
│   │   ├── genomes.msh -> /home/hunglin/omv/db/homopolish_sketches/genomes.msh
│   │   └── polish_out
│   ├── b95e31b5f9b5066297454fdb756074
│   │   ├── consensus_homopolished.fasta -> /home/hunglin/omv/work/fa/f7fe595d4e329f747cfe2415e29a95/polish_out/consensus_homopolished.fasta
│   │   └── omvplott_out -> /home/hunglin/omvplott_out
│   └── ebcefbc98d9e7c77643ff692b0a0b1
│       ├── assembly.fasta -> /home/hunglin/omv/work/f6/390d3614faa1f0cb896dae8ef31405/flye_out/assembly.fasta
│       ├── trim.fq -> /home/hunglin/omv/work/98/f0914e11ce7858ad977ed38d3bdf1c/trim.fq
│       ├── overlap.paf
│       └── polish.fasta
├── b7
│   └── f3cf24cd2688eeae8e3974c79512ee
│       ├── polish.fasta -> /home/hunglin/omv/work/ed/ebcefbc98d9e7c77643ff692b0a0b1/polish.fasta
│       ├── trim.fq -> /home/hunglin/omv/work/98/f0914e11ce7858ad977ed38d3bdf1c/trim.fq
│       ├── overlap.paf
│       ├── polish_sec.fasta
│       ├── polish_sec.fasta.fai
│       └── polish_sec.fasta.map-ont.mmi

log則是會輸出在.nextflow.log,這裡的log是記錄nextflow產生的stderr
如果要看process的log則是要到work中屬於該process的目錄察看
會寫在.command.log

nextflow指令加上-resume的話可以使用cache,如果在某個process執行失敗,修正問題再執行一次指令時就不會重新跑所有的process

如果所有process產生的檔案都是儲存在work,那要怎麼輸出需要的檔案到外面?
官方是說用publishDir這語法去指定
但這語法只能放在process裡面,但寫在process裡都是寫死的,而且很不自由
不能在workflow自由決定哪些process的channel要輸出

所以我寫一個copy_to_output.py腳本輸出要存到外面的檔案…

https://www.nextflow.io/docs/latest/process.html?highlight=cache#dynamic-output-file-names

先不要碰的雷區

DSL有分第1版跟第2版,直接都學第2版的語法就好
第1版的語法很多都是寫死和不自由類型的

nf-core太複雜了,建議新手不要碰
可以參考modules的寫法,但不要使用他們的模板nf-core create
太多它們社群的制式設定了,全部搞懂前就會想退坑了
學Nextflow最一開始應該只是想整合幾個工具跑流程而已
不會也不需要知道那麼多高端技巧

不知為啥沒有基本的參數解析模組
看了很多nextflow的專案居然都是用println或log.info暴力輸出help message

log.info ''
log.info 'Tychus - Alignment Pipeline'
log.info ''
log.info 'Usage: '
log.info '    nextflow alignment.nf -profile alignment [options]'
log.info ''
log.info 'General Options: '
log.info '    --read_pairs      DIR		Directory of paired FASTQ files'
log.info '    --genome          FILE		Path to the FASTA formatted reference database'
log.info '    --amr_db          FILE		Path to the FASTA formatted resistance database'
log.info '    --vf_db           FILE		Path to the FASTA formatted virulence database'
log.info '    --plasmid_db      FILE		Path to the FASTA formatted plasmid database'
log.info '    --draft           FILE		Path to the FASTA formatted draft databases'
log.info '    --threads         INT		Number of threads to use for each process'
log.info '    --out_dir         DIR		Directory to write output files to'

https://github.com/cdeanj/nextflow-tychus/blob/master/alignment.nf
https://github.com/hoelzer-lab/rnaflow/blob/master/main.nf

目前解法看起來只能自己寫個script處理參數和輸出help message

不知道是我的google search能力差還怎樣,明明就算滿多paper發表用nextflow寫的pipeline
但看到的教學或是文章分享就是不多

台灣有用nextflow的人要加油啊,都是看到中國在那邊寫,雖然大多都是抄官網的範例翻成簡體而已…


Author: Hung-Lin, Chen
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source Hung-Lin, Chen !
  TOC