NT FASTA近億等級數量索引


情境說明,自己玩的專案中需要建立NT資料庫
並且需要經過資料清洗,像是將資料庫分成human和non human的fasta
並將無taxon分類的sequences排除
可怕的是目前的NT的序列數已達到9400萬條

策略是先遊歷一遍整個檔案
並記錄每個sequence的taxid及file offset (f.tell())儲存到dictionary

透過記錄的offset,使用f.seek()快速到達指定的sequence位置,並依據taxid判斷是屬於human或non human的sequence分別寫入到兩個fasta

因此本篇測試有哪些key-value類型的資料結構可以快速索引fasta中seqid於檔案中位置
且符合時間成本、硬體資源等效益

測試1: dictionary

首先使用內建的dictionary資料結構
以seqid爲Key, value記錄其在fasta中的start offset, end offset, sequence length, taxid

acc2taxid_trie = Acc2Taxid(acc2taxid_marisa)

taxids = set()
c = 0
removed = []
positions = {}
last_pos = 0
with open_gz(fasta) as f:
    while True:
        line = f.readline()
        if not line:
            break
        offset = f.tell()
        line_len = offset - last_pos
        if line[0] == ">":
            seqid = line.strip(" >\n").split(".")[0]
            taxid = acc2taxid_trie.search_taxid(seqid)
            taxid = str(taxid) if taxid is not None else taxid
            if taxid:
                taxids.add(taxid)
            else:
                removed.append(seqid)
                seqid = None
            positions[seqid] = {
                "header": {"fr": last_pos, "len": line_len},
                "seq": {"fr": last_pos + line_len + 1, "len": 0, "seqlen": 0},
            }
            c += 1
            if c % 1000000 == 0:
                print(f"Processed {c} sequences")
        else:
            if seqid is None:
                continue
            seqid_end = (
                positions[seqid]["header"]["fr"] + positions[seqid]["header"]["len"]
            )
            positions[seqid]["seq"]["len"] = offset - seqid_end
            positions[seqid]["seq"]["seqlen"] += line_len - 1

        last_pos = offset

可惜時間花費約1005m (~16hr),並且佔據約86GB
雖然數據量這麼龐大,但dict搜尋速度也是順殺
不符合效益,且記憶體佔用這麼誇張大概沒多少家用主機可以承受…

測試2: marisa_trie

marisa_trie並非python內建套件
該資料結構是C++寫成

https://pypi.org/project/marisa-trie/

記憶體佔用減少很多,只佔約2G
但時間花費更久了,達到可怕的1850m (~30hr)
雖然在這個案例它不適合,但在其他需要大量且快速索引的情境下有他的優勢

  1. 佔用的記憶體很少,同樣數量,dictionary需要80G,marisa_trie只需要2G
  2. 可快速輸出成檔案,並反覆讀取,且不需要全部寫入記憶體才開始索引,跟HDF5和MySQL一樣

marisa_trie的範例

trie = marisa_trie.RecordTrie(fmt, zip(keys, values))

從上面的程式碼可看出它需要一次建立完成,不能像dictionary分次逐一寫入
因此不可能事先將fasta全部寫入記憶體再存到marisa_trie,所以需要以generater的方式批次寫入
由於marisa_trie寫入的資料形態需要事先定義,而且只接受tupple
所以就只存放最重要的每個sequence的start offset, end offset, sequence length, taxid
format爲fmt="<QQLL"

format的字串代表資料形態,

acc2taxid_trie = Acc2Taxid(acc2taxid_marisa)

def iter_fasta():
    c = 0
    taxids = set()
    pos = {"seqid": None}
    with open_gz(fasta) as f:
        while True:
            line_start = f.tell()
            line = f.readline()
            if not line:
                break
            offset = f.tell()
            line_len = offset - line_start
            if line[0] == ">":
                seqid = line.strip(" >\n").split(".")[0]

                if pos["seqid"] is not None:
                    yield pos["seqid"], (
                        pos["start"],
                        pos["end"],
                        pos["seqlen"],
                        taxid,
                    )
                pos = {
                    "seqid": seqid,
                    "start": line_start,
                    "end": offset,
                    "seqlen": 0,
                }

                taxid = acc2taxid_trie.search_taxid(seqid)
                taxid = taxid if taxid is not None else 0
                if taxid:
                    taxids.add(taxid)

                else:
                    pos["seqid"] = None
                c += 1
                if c % 50000 == 0:
                    print(f"Processed {c} sequences")
                    

            else:
                if pos["seqid"] is None:
                    continue
                pos["seqlen"] += line_len - 1
                pos["end"] = offset

    with open(out_dir/"taxids.txt", "w") as taxid_f:
        taxid_f.write("\n".join(map(str, taxids)))


pos_trie = marisa_trie.RecordTrie("<QQLL", iter_fasta())

結語

可惜經過以上測試,或許直接用500GB (file size) 9400萬條序列的FASTA來處理是不恰當的
明顯是受到I/O性能限制,題外話,執行的位置是M2 SSD,實際讀寫約3000MB/s

但這次測試隱約感覺到寫進dictionary或marisa_trie的key-value累加越多
所花費時間就越長,比如說第1M條10分鐘,但到第100M卻花200分鐘
時間不是以等差級數上去的
但我也沒有再進一步測試了,還要考量到每條序列的長度不同,每條序列的迴圈數是不一樣的等因素

最終使用的方案爲將blastdb格式的NT下載後使用blastdbcmd -taxids來篩選human和non human的序列並用awk濾除sequence length太短的序列
雖然整套流程下來的時間預計也需要8hr以上了


Author: Hung-Lin, Chen
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source Hung-Lin, Chen !
 Previous
Nextflow平行與合併處理 Nextflow平行與合併處理
前言這篇文章是爲了自己要學習幾個Nextflow的重要功能而寫的並寫一個程式實踐這些功能 Nextflow的基本概念會跳過不講可以參考我之前寫的文章快速了解或是看官方文件 Nextflow教學 - 以最小可行專案為例
2023-08-06
Next 
記憶體插滿4條無法開機 記憶體插滿4條無法開機
這篇是成功案例,但每次更換電子零件都是場賭注如果更換完按下開機鍵有不如預期的狀況真的很想死 先講結論,調整記憶體電壓就成功了,但要發現是電壓問題…
2023-06-12
  TOC