В прошлой статье я рассказал о том, каким образом поисковая система может узнать о том, что существует та или иная веб-страница, и сохранить ее себе в хранилище. Но узнать о том, что веб-страница существует, — это только самое начало. Гораздо более важно за доли секунды успеть найти те страницы, которые содержат ключевые слова, введенные пользователем. О том, как это работает, я и расскажу в сегодняшней статье, проиллюстрировав свой рассказ «учебной» реализацией, которая тем не менее спроектирована таким образом, чтобы иметь возможность масштабироваться до размеров индексирования всего Интернета и учитывать современное состояние технологий анализа больших объемов данных.
Заодно у меня получилось рассмотреть основные функции и методы Apache Spark, так что данную статью можно рассматривать еще и как небольшой туториал по спарку.
Более формальная постановка задачи, которую я сегодня разберу: имеется хранилище, содержащее набор веб-страниц, скачанных из Интернета краулером. Необходимо спроектировать механизм, который позволит за доли секунды указывать ссылки на все веб-страницы из этого хранилища, включающие все ключевые слова, содержащиеся в пользовательском запросе. Этот механизм должен быть:
Несколько важных ограничений разбираемой сегодня задачи:
Давайте теперь разберем, как решить задачу в рамках поставленных ограничений.
Рассмотрим следующую структуру данных: словарь, ключами которого будут слова из нашего языка, значениями — множества веб-страниц, на которых это слово встречается:
Такая структура данных называется инвертированным индексом, и она является ключевой для работы поисковой системы. Настолько ключевой, что, например, «Яндекс» даже называется в честь нее (yandex — это не что иное, как yet another index).
В реальности этот словарь будет иметь размер намного больший, чем в приведенном примере: количество элементов в нем будет равно количеству разнообразных слов на веб-страницах, а максимальный размер множества для одного элемента — все веб-страницы в индексируемой части Интернета.
Допустим, мы смогли построить такую структуру данных. В этом случае поиск веб-страниц, содержащих слова из запроса, будет происходить следующим образом:
Например, если мы ищем все веб-страницы по запросу «алгоритм визуализация», а обратный индекс соответствует приведенному в таблице, то результирующее множество будет содержать всего лишь одну веб-страницу — habr.ru/post/325422/, так как только она содержится в пересечении множеств для слов «алгоритм» и «визуализация».
Для того чтобы построить такую структуру данных, можно воспользоваться подходом MapReduce. Об этом подходе у меня есть отдельная статья, но основная идея заключается в следующем:
Ниже в разделе с имплементацией я покажу, как реализовать описанный алгоритм, используя популярный open source инструмент для работы с большими данными — apache spark.
Под аббревиатурой NLP в области анализа данных понимают компьютерную обработку естественного языка (Natural Language Processing, не путать с псевдонаучным нейролингвистическим программированием). При работе с поисковой системой не избежать хотя бы отдаленного столкновения с обработкой языка, поэтому нам понадобятся некоторые понятия и инструменты из этой области. В качестве библиотеки работы с естественным языком я буду использовать популярную библиотеку для python NLTK.
Первое понятие из области NLP, которое нам понадобится, — это токенизация. При описании работы с реверсивным индексом я пользовался понятием слово. Однако слово — это не очень хороший термин для разработчика поисковой системы, так как на веб-страницах встречается много разнообразных наборов символов, не являющихся словом в прямом смысле этого слова (например, masha545 или 31337). Поэтому мы будем вместо этого пользоваться токенами. В библиотеке NLTK есть специальный модуль для выделения токенов: nltk.tokenize. Там есть разнообразные способы разбить текст на токены. Мы воспользуемся самым простым способом выделить токены — токенизацией по регекспу:
#code
from nltk.tokenize import RegexpTokenizer
tokenizer = RegexpTokenizer(r'[а-яёa-z0-9]+')
text = "Съешь ещё этих мягких французских булок, да выпей же чаю."
tokenizer.tokenize(text.lower())
#result
['съешь', 'ещё', 'этих', 'мягких', 'французских', 'булок', 'да', 'выпей', 'же', 'чаю']
Многие языки, и русский в особенности, богаты на формы слов. Понятно, что, когда мы ищем слово «компьютер», мы ожидаем, что найдутся страницы, содержащие слово «компьютера», «компьютеров» и так далее. Для этого все токены нужно привести в так называемую «нормальную форму». Это можно сделать при помощи разных инструментов. Например, на github есть библиотека pymystem, являющаяся оберткой над библиотекой, разработанной яндексом. Я для простоты воспользуюсь методом стемминга — отброса незначащего окончания — и использую для этого стеммер русского языка, входящий в библиотеку nltk:
#code
from nltk.stem.snowball import RussianStemmer
stemmer = RussianStemmer()
tokens = ['съешь', 'ещё', 'этих', 'мягких', 'французских', 'булок', 'да', 'выпей', 'же', 'чаю']
stemmed_tokens = [stemmer.stem(token) for token in tokens]
print(stemmed_tokens)
#result
['съеш', 'ещ', 'эт', 'мягк', 'французск', 'булок', 'да', 'вып', 'же', 'ча']
В обратном индексе для каждого слова мы храним множество URL-страниц, в которых это слово опубликовано. Проблема заключается в том, что некоторые слова (например, предлог «в») встречаются практически на каждой веб-странице. При этом информативность наличия или отсутствия таких слов на странице очень маленькая. Поэтому, для того чтобы не хранить огромные массивы в индексе и не делать лишнюю работу, такие слова мы будем просто игнорировать.
Чтобы определить множество слов, которые будем игнорировать, можно для каждого слова подсчитать долю веб-страниц, на которых это слово встречается, и поставить некоторую границу по этой частоте. Расчет частотности слов — это классическая задача в парадигме map-reduce, подробно об этом можно почитать в моих статьях.
Я для своей реализации вычислял не просто частоту слов, а некоторое ее монотонное преобразование — Inverse Document Frequency (IDF), которое нам в дальнейшем пригодится еще и для ранжирования документов. Методом «пристального взгляда» я определил, что подходящая константа для отсечения слов будет примерно равна 1.12, между словами «за» и «код» (слово «код» очень часто встречается на хабре).
Архитектура моего учебного поискового движка
Документов, которые я индексирую, довольно много — несколько миллионов. Для их обработки необходим инструментарий для работы с большими данными. Я выбрал apache spark, который является одним из самых популярных фреймворков на сегодняшний день. Так как я использую amazon web services для своей имплементации, я воспользовался дистрибутивом спарка, входящим в состав elastic map reduce. Apache Spark имеет несколько вариантов представления датасетов. Один из основных — так называемый Resilient Distributed Dataset (RDD) — по сути представляет собой распределенный массив данных, которые можно обрабатывать параллельно. Я буду использовать его для своей реализации (хотя есть и другие API для работы со спарком, которые в некоторых случаях могут быть быстрее, см например Dataframe API)
Так как данные в нашем случае у нас хранятся на объектном хранилище амазона (S3), то сначала спарку необходимо сообщить необходимую информацию для работы с этим хранилищем:
#sc-это spark context, он содержит параметры подключения к спарку
def init_aws_spark(sc, config):
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", config.AWS_ACCESS_KEY)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", config.AWS_SECRET_KEY)
Дальше можно создать RDD из данных, хранящихся на S3 (которые туда сохранил краулер), и заодно сразу распарсить документы из json-формата:
rdd = sc.textFile("s3a://minicrawl/habrahabr.ru/*",)
jsons_rdd = rdd.map(lambda doc: json.loads(doc))
Тут мы применили одну из базовых функций спарка — map, которая применяет функцию ко всем элементам массива, делая это параллельно на всех узлах кластера.
Далее мы несколько раз применим эту функцию для предварительной обработки текста:
Тут ничего особо интересного, использую библиотеку lxml для парсинга html-а и удаления разметки:
import copy
import lxml.etree as etree
def stringify_children(node):
if str(node.tag).lower() in {'script', 'style'}:
return []
from lxml.etree import tostring
parts = [node.text]
for element in node.getchildren():
parts += stringify_children(element)
return ''.join(filter(None, parts)).lower()
def get_tree(html):
parser = etree.HTMLParser()
tree = etree.parse(StringIO(html), parser)
return tree.getroot()
def remove_tags(html):
tree = get_tree(html)
return stringify_children(tree)
def get_text(doc):
res = copy.deepcopy(doc)
res['html'] = res['text']
res['text'] = remove_tags(res['html'])
return res
clean_text_rdd = jsons_rdd.map(get_text).cache()
from nltk.tokenize import RegexpTokenizer
from nltk.stem.snowball import RussianStemmer
def tokenize(doc):
tokenizer = RegexpTokenizer(r'[а-яёa-z0-9]+')
res = copy.deepcopy(doc)
tokens = tokenizer.tokenize(res['text'])
res['tokens'] = list(filter(lambda x: len(x) < 15, tokens))
return res
def stem(doc):
stemmer = RussianStemmer()
res = copy.deepcopy(doc)
res['stemmed'] = [stemmer.stem(token) for token in res['tokens']]
return res
stemmed_docs = clean_text_rdd.map(tokenize).map(stem).cache()
Функция cache(), вызванная после функции map(), подсказывает, что этот датасет необходимо закешировать. Если этого не сделать, при многократном использовании спарк будет его рассчитывать заново.
Как я писал, фильтровать будем слова, для которых мера IDF меньше чем 1.12. Для этого нам сначала надо посчитать частоты всех слов. Это прямо классическая задача на анализ больших данных:
def get_words(doc):
return [(word, 1) for word in set(doc['stemmed'])]
word_counts = stemmed_docs.flatMap(get_words) .reduceByKey(lambda x, y: x+y)
Здесь используются две интересные функции спарка:
Далее рассчитаем IDF для всех токенов:
doc_count = stemmed_docs.count()
def get_idf(doc_count, doc_with_word_count):
return math.log(doc_count/doc_with_word_count)
idf = word_counts.mapValues(lambda word_count: get_idf(doc_count, word_count))
Получим список высокочастотных стоп-слов:
idf_border = 1.12
stop_words_list =idf.filter(lambda x: x[1] < idf_border).keys().collect()
stop_words = set(sop_words)
Тут используются функции спарка:
У меня получилось 50 стоп-слов, среди которых присутствуют как очевидные: "в", "о", "не", так и менее очевидные, но логичные для хабра: "хабрахабр", "мобильн", "песочниц", "поддержк", "регистрац".
Задача построить датасет типа слово -> множество URL очень похожа на задачу посчитать количество документов, в которых встречается слово, с одним различием: мы будем не прибавлять единицу каждый раз, а добавлять новый URL в множество.
def token_urls(doc):
res = []
for token in set(doc['stemmed']):
if token not in stop_words:
res.append((token, doc['url']))
return res
index_rdd = stemmed_docs.flatMap(token_urls) .aggregateByKey(set(), lambda x, y: x.union({y}),
lambda x, y: x.union(y))
Тут в дополнение к уже использованной ранее функции flatMap используется еще и функция aggregateByKey, которая очень похожа на reduceByKey, но принимает три параметра:
В общем, все просто:
import pickle
storage = LazyAerospike(config.AEROSPIKE_ADDRESS)
results = index_rdd.map(lambda x: storage.put(x[0], pickle.dumps(x[1]))).collect()
Тут я использую pickle — стандартный питоновский способ сериализации почти любых объектов. Также использую небольшую обертку над стандартным клиентом aerospike, которая позволяет инициализировать соединение с базой данных в момент первой записи или чтения. Это нужно, так как spark не может распараллелить подключение к базе данных по всем узлам кластера, приходится каждый раз подключаться заново.
class LazyAerospike(object):
import aerospike
def __init__(self, addr, namespace='test', table='index'):
self.addr = addr
self.connection = None
self.namespace = namespace
self.table = table
def check_connection(self):
if self.connection is None:
config = {
'hosts': [ (self.addr, 3000) ]
}
self.connection = self.aerospike.Client(config).connect()
def _get_full_key(self, key):
return (self.namespace, self.table, key)
def put(self, key, value):
self.check_connection()
key_full = self._get_full_key(key)
self.connection.put(key_full, {'value': value})
return True
def get(self, key):
self.check_connection()
key_full = self._get_full_key(key)
value = self.connection.get(key_full)
return value[2]['value']
Осталось написать функцию, которая будет исполняться во время пользовательского запроса. С ней все просто: разбиваем запрос на токены, извлекаем множества URL-ов для каждого токена и пересекаем их:
def index_get_urls(keyword):
raw_urls = storage.get(keyword)
return pickle.loads(raw_urls)
def search(query):
stemmer = RussianStemmer()
tokenizer = RegexpTokenizer(r'[а-яёa-z0-9]+')
keywords_all = [stemmer.stem(token) for token in tokenizer.tokenize(query)]
keywords = list(filter(lambda token: token not in stop_words, keywords_all))
if len(keywords) == 0:
return []
result_set= index_get_urls(keywords[0])
for keyword in keywords[1:]:
result_set= result_set.intersection(index_get_urls(keyword))
return result_set
Запускаем и убеждаемся, что все работает как надо (тут я запускал на небольшом семпле):
Настоящая поисковая система, конечно, устроена гораздо сложнее. Например, я храню множества очень неоптимально, достаточно было бы вместо самих url-ов хранить только их id-шники. Тем не менее архитектура получилась распределенная и потенциально может действительно работать на большой части Интернета и под большими нагрузками.
В продакшене вам, скорее всего, не нужно будет реализовывать поисковую систему руками — лучше воспользоваться готовыми решениями, например таким, как ElasticSearch.
Однако понимание основ того, как работает обратный индекс, может помочь его правильно настроить и использовать, ну и для решения похожих задач может оказаться очень полезным.
В этой статье я не затронул самую интересную, на мой взгляд, часть поиска — ранжирование. О нем речь пойдет в следующих статьях.
К сожалению, не доступен сервер mySQL