Multiprocessing и реконсиляция данных из различных источников +9


Привет, Хабр!

В условиях многообразия распределенных систем, наличие выверенной информации в целевом хранилище является важным критерием непротиворечивости данных.

На этот счет существует немало подходов и методик, а мы остановимся на реконсиляции, теоретические аспекты которой были затронуты вот в этой статье. Предлагаю рассмотреть практическую реализацию данной системы, масштабируемой и адаптированной под большой объем данных.

Как реализовать этот кейс на старом-добром Python — читаем под катом! Поехали!


(Источник картинки)

Введение


Давайте представим, что финансовая организация имеет несколько распределенных систем и перед нами стоит задача сверить транзакции в этих системах и загрузить сверенные данные в целевое хранилище.

В качестве источника данных возьмем большой текстовый файл и таблицу в базе данных PostgreSQL. Предположим, что данные, находящиеся в этих источниках, имеют одни и те же транзакции, но при этом могут иметь различия, и поэтому их необходимо сверить и записать выверенные данные в конечное хранилище для анализа.

Дополнительно необходимо предусмотреть параллельный запуск нескольких реконсиляций на одной базе данных и адаптировать систему под большой объем, применив multiprocessing.

Модуль multiprocessing отлично подходит для распараллеливания операций в Python и позволяет в некотором смысле обходить определенные недостатки GIL. Возможностями данной библиотеки воспользуемся далее.

Архитектура разрабатываемой системы



Используемые компоненты:

  • Генератор случайных данных – Python-скрипт, формирующий CSV файл и на его основе заполняющий таблицу в базе данных;
  • Источники данных – CSV-файл и таблица в БД PostgreSQL;
  • Адаптеры – в данном случае используем два адаптера, которые будут извлекать данные из своих источников (CSV или БД) и заносить информацию в промежуточную БД;
  • Базы данных – в количестве трех штук: сырые данные, промежуточная БД, хранящая информацию снятую адаптерами, и «чистая» база данных, содержащая в себе сверенные транзакции из обоих источников.

Начальная подготовка


В качестве инструмента хранения данных будем использовать БД PostgreSQL в Docker-контейнере и взаимодействовать с нашей базой данных через pgAdmin, запущенном в контейнере:

docker run --name pg -d -e "POSTGRES_USER=my_user" -e "POSTGRES_PASSWORD=my_password" postgres

Запуск pgAdmin:

docker run -p 80:80 -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" -e "PGADMIN_DEFAULT_PASSWORD=12345" -d dpage/pgadmin4

После того, как все запустилось, не забудем указать в конфигурационном файле (conf/db.ini) строку подключения к БД (для учебного примера так можно!):

[POSTGRESQL]
db_url=postgresql://my_user:my_password@172.17.0.2:5432/my_user

В принципе, использование контейнера не является обязательным и Вы можете использовать свой сервер БД.

Генерация входных данных


За генерацию тестовых данных отвечает Python-скрипт generate_test_data, который принимает на вход желаемое количество записей для генерации. Последовательность операции легко проследить по основной функции класса GenerateTestData:

    @m.timing
    def run(self, num_rows):
        """ Run the process """
        m.info('START!')
        self.create_db_schema()
        self.create_folder('data')
        self.create_csv_file(num_rows)
        self.bulk_copy_to_db()
        self.random_delete_rows()
        self.random_update_rows()
        m.info('END!')

Итак, функция выполняет следующие шаги:

  • Создание схем в БД (создаем все основные схемы и таблицы);
  • Создание папки для хранения тестового файла;
  • Генерация тестового файла с заданным количеством строк;
  • Bulk-insert данных в целевую таблицу transaction_db_raw.transaction_log;
  • Случайное удаление нескольких строк в этой таблице;
  • Случайное обновление нескольких строк в этой таблице.

Удаление и внесение изменений необходимо для того, чтобы сравниваемые объекты имели хоть какое-то расхождение. Важно уметь искать эти расхождения!

@m.timing
@m.wrapper(m.entering, m.exiting)
def random_delete_rows(self):
    """ Random deleting some rows from the table """
    sql_command = sql.SQL("""
                    delete from {0}.{1}
                    where ctid = any(array(
                      select ctid
                      from {0}.{1}
                      tablesample bernoulli (1)
                      ))""").format(sql.Identifier(self.schema_raw),
                                    sql.Identifier(self.raw_table_name))
    try:
        rows = self.database.execute(sql_command)
        m.info('Has been deleted [%s rows] from table %s' % (rows, self.raw_table_name))
    except psycopg2.Error as err:
        m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror)

@m.timing
@m.wrapper(m.entering, m.exiting)
def random_update_rows(self):
    """ Random update some rows from the table """
    sql_command = sql.SQL("""
            update {0}.{1}
            set transaction_amount = round(random()::numeric, 2)
            where ctid = any(array(
              select ctid
              from {0}.{1}
              tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw),
                                                      sql.Identifier(self.raw_table_name))
    try:
        rows = self.database.execute(sql_command)
        m.info('Has been updated [%s rows] from table %s' % (rows, self.raw_table_name))
    except psycopg2.Error as err:
        m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror)

Генерация тестового набора данных и последующая запись в текстовой файл формате CSV происходит следующим образом:

  • Создается случайный UID транзакции;
  • Создается случайный UID номер счета (по умолчанию, берем десять уникальных счетов, но это значение можно изменить с помощью конфигурационного файла, поменяв параметр «random_accounts»);
  • Дата транзакции – рандомная дата от заданной в конфиге даты (initial_date);
  • Тип операции (сделка / комиссия);
  • Сумма транзакции;
  • Основную работу в генерации данных выполняет метод generate_test_data_by_chunk класса TestDataCreator:

@m.timing
def generate_test_data_by_chunk(self, chunk_start, chunk_end):
    """ Generating and saving to the file """
    num_rows_mp = chunk_end - chunk_start
    new_rows = []

    for _ in range(num_rows_mp):
        transaction_uid = uuid.uuid4()
        account_uid = choice(self.list_acc)
        transaction_date = (self.get_random_date(self.date_in, 0)
                            .__next__()
                            .strftime('%Y-%m-%d %H:%M:%S'))
        type_deal = choice(self.list_type_deal)
        transaction_amount = randint(-1000, 1000)

        new_rows.append([transaction_uid,
                         account_uid,
                         transaction_date,
                         type_deal,
                         transaction_amount])

    self.write_in_file(new_rows, chunk_start, chunk_end)

Особенность данной функции — запуск в нескольких распараллеленных асинхронных процессах, каждый из которых генерит свою порцию из 50К записей. Эта «фишка» позволит формировать файл на несколько миллионов строк достаточно быстро

def run_csv_writing(self):
    """ Writing the test data into csv file """
    pool = mp.Pool(mp.cpu_count())
    jobs = []

    for chunk_start, chunk_end in self.divide_into_chunks(0, self.num_rows):
        jobs.append(pool.apply_async(self.generate_test_data_by_chunk,
                                     (chunk_start, chunk_end)))
    # wait for all jobs to finish
    for job in jobs:
        job.get()

    # clean up
    pool.close()
    pool.join()

После того, как завершится заполнение текстового файла, отрабатывается команда bulk_insert и все данные из этого файла попадают в таблицу transaction_db_raw.transaction_log.

Далее, в двух источниках будут содержаться совершенно одинаковые данные и реконсиляция не найдет ничего интересного, поэтому удаляем и изменяем несколько случайных строк в базе данных.

Запускаем скрипт и генерим тестовый CSV-файл с транзакциями на 10К строк:

./generate_test_data.py 10000


На скриншоте видно, что получен файл на 10К строк, в БД загружено 10К, но затем из базы данных было удалено 112 строк и изменено еще 108. Итог: файл и таблица в БД отличаются между собой на 220 записей.

«Ну и где тут мультипроцессинг?», — спросите вы.
А его работу можно увидеть, когда будете генерить файл побольше, не на 10К записей, а, к примеру, на 1M. Попробуем?

./generate_test_data.py 1000000


После загрузки данных, удаления и изменения случайных записей, видим отличия текстового файла от таблицы: 19 939 строки (из них 10 022 удалено случайным образом, а 9 917 изменено).

На картинке видно, что генерация записей шла асинхронно, непоследовательно. Это означает, что следующий процесс может начаться без учета порядка запуска как только предыдущий завершится. Нет гарантии, что результат окажется в том же порядке, что и входные данные.

А это точно быстрее?
Один миллион строк не на самой быстрой виртуальной машине был «придуман» за 15.5 секунд — и это достойный вариант. Запустив эту же генерацию последовательно, без использования мультипроцессинга, я получил результат: генерация файла шла медленнее более чем в три раза (свыше 52 секунд вместо 15,5):



Адаптер для CSV


Этот адаптер хэширует строку, оставляя без изменения только первый столбец – идентификатор транзакции и сохраняет полученные данные в файл data/ transaction_hashed.csv. Финальным шагом его работы является загрузка этого файла при помощи команды COPY во временную таблицу схемы reconciliation_db.

Оптимальное чтение файла выполняется несколькими параллельными процессами. Читаем построчно, кусками по 5 мегабайт каждый. Цифра «5 мегабайт» была получена эмпирическим методом. Именно при таком размере одного фрагмента текста, удалось получить наименьшее время чтения больших файлов на своей виртуальной машине. Можно поэкспериментировать на своем окружении с данным параметром и посмотреть, как будет меняться время работы:

@m.timing
def process_wrapper(self, chunk_start, chunk_size):
    """ Read a particular chunk """
    with open(self.file_name_raw, newline='\n') as file:
        file.seek(chunk_start)
        lines = file.read(chunk_size).splitlines()
        for line in lines:
            self.process(line)

def chunkify(self, size=1024*1024*5):
    """ Return a new chunk """
    with open(self.file_name_raw, 'rb') as file:
        chunk_end = file.tell()
        while True:
            chunk_start = chunk_end
            file.seek(size, 1)
            file.readline()
            chunk_end = file.tell()

            if chunk_end > self.file_end:
                chunk_end = self.file_end
                yield chunk_start, chunk_end - chunk_start
                break
            else:
                yield chunk_start, chunk_end - chunk_start

@m.timing
def run_reading(self):
    """ The main method for the reading """
    # init objects
    pool = mp.Pool(mp.cpu_count())
    jobs = []

    m.info('Run csv reading...')
    # create jobs
    for chunk_start, chunk_size in self.chunkify():
        jobs.append(pool.apply_async(self.process_wrapper,
                                     (chunk_start, chunk_size)))

    # wait for all jobs to finish
    for job in jobs:
        job.get()

    # clean up
    pool.close()
    pool.join()

    m.info('CSV file reading has been completed')

Пример чтения созданного ранее файла на 1М записей:


На скриншоте показано создание временной таблицы с уникальным именем для текущего запуска реконсиляции. Далее идет асинхронное чтение файла по частям и взятие хэша каждой строки. Вставка данных от адаптера в целевую таблицу завершает работу с данным адаптером.
Использование временной таблицы с уникальным именем для каждого процесса реконсиляции позволяет дополнительно распараллеливать процесс сверки в одной базе данных.

Адаптер для PostgreSQL


Адаптер для обработки данных, хранящихся в таблице работает примерно по той же логике, что и адаптер для файла:

  • чтение по частям таблицы (если она большая, свыше 100К записей) и взятие хэша по всем столбцам, кроме идентификатора транзакции;
  • затем идет вставка обработанных данных в таблицу reconciliation_db. storage_$(int(time.time()).

Интересной особенностью данного адаптера является то, что он использует пул коннектов к базе данных, которые будут искать по индексу необходимые данные в таблице и обрабатывать их.

Исходя из размеров таблицы, происходит вычисление количества процессов, необходимых для обработки и внутри каждого процесса идет деление на 10 задач.

def read_data(self):
    """
    Read the data from the postgres and shared those records with each
    processor to perform their operation using threads
    """
    threads_array = self.get_threads(0,
                                     self.max_id_num_row,
                                     self.pid_max)

    for pid in range(1, len(threads_array) + 1):
        m.info('Process %s' % pid)

        # Getting connection from the connection pool
        select_conn = self._select_conn_pool.getconn()
        select_conn.autocommit = 1

        # Creating 10 process to perform the operation
        process = Process(target=self.process_data,
                          args=(self.data_queque,
                                pid,
                                threads_array[pid-1][0],
                                threads_array[pid-1][1],
                                select_conn))

        process.daemon = True
        process.start()
        process.join()
        select_conn.close()


Поиск расхождений


Переходим к сверке данных, полученных от двух адаптеров.

Сверка (или получение отчета о расхождениях) происходит на стороне сервера баз данных, используя всю мощь языка SQL.

SQL-запрос совсем нехитрый – это всего лишь джойн таблицы с данными от адаптеров самой на себя по идентификатору транзакции:

sql_command = sql.SQL("""
    select
        s1.adapter_name,
        count(s1.transaction_uid) as tran_count
    from {0}.{1} s1
    full join {0}.{1} s2
        on s2.transaction_uid = s1.transaction_uid
        and s2.adapter_name != s1.adapter_name
        and s2.hash = s1.hash
    where s2.transaction_uid is null
    group by s1.adapter_name;""").format(sql.Identifier(self.schema_target),
                                         sql.Identifier(self.storage_table))

На выходе получаем отчет:


Проверим, все ли правильно на картинке выше. Мы помним, что из таблицы в БД было удалено 9917 и изменено 10 022 строк. Итого 19939 строк, что и видно в отчете.

Итоговая таблица


Осталось только вставить в таблицу-хранилище «чистые» транзакции, которые совпадают по всем параметрам (по хэшу) в разных адаптерах. Этот процесс выполним следующим SQL-запросом:

sql_command = sql.SQL("""
    with reconcil_data as (
        select
            s1.transaction_uid
        from {0}.{1} s1
        join {0}.{1} s2
            on s2.transaction_uid = s1.transaction_uid
            and s2.adapter_name != s1.adapter_name
        where s2.hash = s1.hash
            and s1.adapter_name = 'postresql_adapter'
    )
    insert into {2}.transaction_log
    select
        t.transaction_uid,
        t.account_uid,
        t.transaction_date,
        t.type_deal,
        t.transaction_amount
    from {3}.transaction_log t
    join reconcil_data r
        on t.transaction_uid = r.transaction_uid
    where not exists
        (
            select 1
            from {2}.transaction_log tl
            where tl.transaction_uid = t.transaction_uid
        )
    """).format(sql.Identifier(self.schema_target),
                sql.Identifier(self.storage_table),
                sql.Identifier(self.schema_db_clean),
                sql.Identifier(self.schema_raw))

Временную таблицу, которую мы использовали в качестве промежуточного хранения данных от адаптеров, можно удалить.


Заключение


В ходе проделанной работы была разработана система реконсиляции данных из разных источников: текстовый файл и таблица в базе данных. Использовали минимум дополнительных инструментов.

Возможно, искушенный читатель может заметить, что использование фреймворков типа Apache Spark вкупе с приведением исходных данных к паркетному формату, может значительно ускорить данный процесс, особенно для огромных объемов. Но основная цель данной работы – написание системы на «голом» Python и изучение мультипроцессинговой обработки данных. С чем мы, на мой взгляд, справились.

Исходный код всего проекта лежит в моем репозитории на GitHub, предлагаю с ним ознакомиться.

С удовольствием отвечу на все вопросы и ознакомлюсь с вашими замечаниями.

Желаю успехов!




К сожалению, не доступен сервер mySQL