NT FASTA近億等級數量索引

情境說明,自己玩的專案中需要建立NT資料庫
並且需要經過資料清洗,像是將資料庫分成human和non human的fasta 並將無taxon分類的sequences排除
可怕的是目前的NT的序列數已達到9400萬條

策略是先遊歷一遍整個檔案 並記錄每個sequence的taxid及file offset (f.tell())儲存到dictionary

透過記錄的offset,使用f.seek()快速到達指定的sequence位置,並依據taxid判斷是屬於human或non human的sequence分別寫入到兩個fasta

因此本篇測試有哪些key-value類型的資料結構可以快速索引fasta中seqid於檔案中位置
且符合時間成本、硬體資源等效益

測試1: dictionary

首先使用內建的dictionary資料結構 以seqid爲Key, value記錄其在fasta中的start offset, end offset, sequence length, taxid

acc2taxid_trie = Acc2Taxid(acc2taxid_marisa)

taxids = set()
c = 0
removed = []
positions = {}
last_pos = 0
with open_gz(fasta) as f:
    while True:
        line = f.readline()
        if not line:
            break
        offset = f.tell()
        line_len = offset - last_pos
        if line[0] == ">":
            seqid = line.strip(" >\n").split(".")[0]
            taxid = acc2taxid_trie.search_taxid(seqid)
            taxid = str(taxid) if taxid is not None else taxid
            if taxid:
                taxids.add(taxid)
            else:
                removed.append(seqid)
                seqid = None
            positions[seqid] = {
                "header": {"fr": last_pos, "len": line_len},
                "seq": {"fr": last_pos + line_len + 1, "len": 0, "seqlen": 0},
            }
            c += 1
            if c % 1000000 == 0:
                print(f"Processed {c} sequences")
        else:
            if seqid is None:
                continue
            seqid_end = (
                positions[seqid]["header"]["fr"] + positions[seqid]["header"]["len"]
            )
            positions[seqid]["seq"]["len"] = offset - seqid_end
            positions[seqid]["seq"]["seqlen"] += line_len - 1

        last_pos = offset

可惜時間花費約1005m (~16hr),並且佔據約86GB
雖然數據量這麼龐大,但dict搜尋速度也是順殺
不符合效益,且記憶體佔用這麼誇張大概沒多少家用主機可以承受…

測試2: marisa_trie

marisa_trie並非python內建套件
該資料結構是C++寫成

https://pypi.org/project/marisa-trie/

記憶體佔用減少很多,只佔約2G
但時間花費更久了,達到可怕的1850m (~30hr)
雖然在這個案例它不適合,但在其他需要大量且快速索引的情境下有他的優勢

  1. 佔用的記憶體很少,同樣數量,dictionary需要80G,marisa_trie只需要2G
  2. 可快速輸出成檔案,並反覆讀取,且不需要全部寫入記憶體才開始索引,跟HDF5和MySQL一樣

marisa_trie的範例

trie = marisa_trie.RecordTrie(fmt, zip(keys, values))

從上面的程式碼可看出它需要一次建立完成,不能像dictionary分次逐一寫入 因此不可能事先將fasta全部寫入記憶體再存到marisa_trie,所以需要以generater的方式批次寫入
由於marisa_trie寫入的資料形態需要事先定義,而且只接受tupple
所以就只存放最重要的每個sequence的start offset, end offset, sequence length, taxid
format爲fmt="<QQLL"

format的字串代表資料形態,

  • Q是unsigned long long,範圍爲0 到 18,446,744,073,709,551,615 L是
  • L: unsigned long,範圍爲0 到 4,294,967,295

資料類型範圍
https://docs.python.org/3/library/struct.html#format-strings

acc2taxid_trie = Acc2Taxid(acc2taxid_marisa)

def iter_fasta():
    c = 0
    taxids = set()
    pos = {"seqid": None}
    with open_gz(fasta) as f:
        while True:
            line_start = f.tell()
            line = f.readline()
            if not line:
                break
            offset = f.tell()
            line_len = offset - line_start
            if line[0] == ">":
                seqid = line.strip(" >\n").split(".")[0]

                if pos["seqid"] is not None:
                    yield pos["seqid"], (
                        pos["start"],
                        pos["end"],
                        pos["seqlen"],
                        taxid,
                    )
                pos = {
                    "seqid": seqid,
                    "start": line_start,
                    "end": offset,
                    "seqlen": 0,
                }

                taxid = acc2taxid_trie.search_taxid(seqid)
                taxid = taxid if taxid is not None else 0
                if taxid:
                    taxids.add(taxid)

                else:
                    pos["seqid"] = None
                c += 1
                if c % 50000 == 0:
                    print(f"Processed {c} sequences")
                    

            else:
                if pos["seqid"] is None:
                    continue
                pos["seqlen"] += line_len - 1
                pos["end"] = offset

    with open(out_dir/"taxids.txt", "w") as taxid_f:
        taxid_f.write("\n".join(map(str, taxids)))


pos_trie = marisa_trie.RecordTrie("<QQLL", iter_fasta())

結語

可惜經過以上測試,或許直接用500GB (file size) 9400萬條序列的FASTA來處理是不恰當的
明顯是受到I/O性能限制,題外話,執行的位置是M2 SSD,實際讀寫約3000MB/s

但這次測試隱約感覺到寫進dictionary或marisa_trie的key-value累加越多
所花費時間就越長,比如說第1M條10分鐘,但到第100M卻花200分鐘
時間不是以等差級數上去的
但我也沒有再進一步測試了,還要考量到每條序列的長度不同,每條序列的迴圈數是不一樣的等因素

最終使用的方案爲將blastdb格式的NT下載後使用blastdbcmd -taxids來篩選human和non human的序列並用awk濾除sequence length太短的序列
雖然整套流程下來的時間預計也需要8hr以上了