В этой статье я расскажу о своей попытке использования библиотеки GenStage, а в частности модуля Flow, для реализации одного из алгоритмов биоинформатики. На протяжении последних двух лет я занимался разработкой комплексной системы хранения и поиска результатов метагеномного анализа (метагеномика) углеводородного сырья. Наверное, для многих это китайская грамота. Фактически такой анализ означает выявление всех типов микроорганизмов, обитающих, к примеру, в залежах нефти. Некоторые из этих микроорганизмов, преимущественно бактерии, способны разъедать стальные трубы и создавать множество других неблагоприятных эффектов.
Первый этап метагеномного анализа – секвенирование генома нескольких тысяч особей, найденных в углеводородном сырье. Так как в результате секвенирования «считывается» не весь геном, а только отдельные его участки, то определить, какой особи принадлежит тот или иной участок – вычислительно сложная задача.
Что касается компьютерной реализации, анализ заключается в передаче исходных данных, так называемых азотистых оснований (A, T, U, C, и G), в цепочку процессов. Одной из известных программ для решения этой задачи является Qiime (читается «чайм»). Она состоит из множества связанных друг с другом приложений, написанных на Python. Сначала я разработал свой фреймворк потоковой обработки данных для Elixir, но, как только появился GenStage, мне стало интересно оценить его возможности в проведении исследований подобного рода.
Дано: CGGACTCGACAGATGTGAAGAACGACAATGTGAAGACTCGACACGACAGAGTGAAGAGAAGAGGAAACATTGTAA
Задание: Найти 5-мер, присутствующий хотя бы 4 раза на участке из 50 нуклеотидов.
Решение: CGACA GAAGA
defmodule Bio do
alias Experimental.Flow
def clump(seq, k_mer_len, subseq_len, times) do
|> seq
|> String.to_charlist
|> Stream.chunk(subseq_len, 1)
|> Flow.from_enumerable
|> Flow.partition
|> Flow.map(fn e -> Stream.chunk(e, k_mer_len, 1) end)
|> Flow.map(
fn e ->
Enum.reduce(e, %{},
fn w, acc ->
Map.update(acc, w, 1, & &1 + 1)
end)
end)
|> Flow.flat_map(
fn e ->
Enum.reject(e, fn({_, n}) -> n < times end)
end)
|> Flow.map(fn({seq, _}) -> seq end)
|> Enum.uniq
end
end
Enum.uniq
, которая отсеивает повторяющиеся элементы (важно не то, сколько раз появилась последовательность, а то, что она встретилась определённое количество раз). Stream.chunk/4
. Эта функция реализована в модулях Enum и Stream, но в Flow её нет. Будучи в замешательстве по поводу того, нужна ли отдельная реализация функции chunk
для модуля Flow, я отправил этот вопрос в список рассылки Elixir. Создатель языка, Жозе Валим, любезно на него ответил, и более того, предоставил улучшенную реализацию функции clump
(см. ниже)!Flow.partition
, так как в данном алгоритме секционирование данных не происходит.def clump(seq, k_mer_len, subseq_len, times) do
seq
|> String.to_charlist
|> Stream.chunk(subseq_len, 1)
|> Flow.from_enumerable
|> Flow.flat_map(&find_sequences(&1, k_mer_len, times))
|> Enum.uniq
end
def find_sequences(subseq, k_mer_len, times) do
subseq
|> Stream.chunk(k_mer_len, 1)
|> Enum.reduce(%{}, fn w, acc ->
Map.update(acc, w, 1, & &1 + 1)
end)
|> Enum.reject(fn({_, n}) -> n < times end)
|> Enum.map(fn({seq, _}) -> seq end)
end
К сожалению, не доступен сервер mySQL