Привет, Хабр!
В условиях многообразия распределенных систем, наличие выверенной информации в целевом хранилище является важным критерием непротиворечивости данных.
На этот счет существует немало подходов и методик, а мы остановимся на реконсиляции, теоретические аспекты которой были затронуты вот в этой статье. Предлагаю рассмотреть практическую реализацию данной системы, масштабируемой и адаптированной под большой объем данных.
Как реализовать этот кейс на старом-добром Python — читаем под катом! Поехали!
docker run --name pg -d -e "POSTGRES_USER=my_user" -e "POSTGRES_PASSWORD=my_password" postgres
docker run -p 80:80 -e "PGADMIN_DEFAULT_EMAIL=user@domain.com" -e "PGADMIN_DEFAULT_PASSWORD=12345" -d dpage/pgadmin4
[POSTGRESQL]
db_url=postgresql://my_user:my_password@172.17.0.2:5432/my_user
@m.timing
def run(self, num_rows):
""" Run the process """
m.info('START!')
self.create_db_schema()
self.create_folder('data')
self.create_csv_file(num_rows)
self.bulk_copy_to_db()
self.random_delete_rows()
self.random_update_rows()
m.info('END!')
@m.timing
@m.wrapper(m.entering, m.exiting)
def random_delete_rows(self):
""" Random deleting some rows from the table """
sql_command = sql.SQL("""
delete from {0}.{1}
where ctid = any(array(
select ctid
from {0}.{1}
tablesample bernoulli (1)
))""").format(sql.Identifier(self.schema_raw),
sql.Identifier(self.raw_table_name))
try:
rows = self.database.execute(sql_command)
m.info('Has been deleted [%s rows] from table %s' % (rows, self.raw_table_name))
except psycopg2.Error as err:
m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror)
@m.timing
@m.wrapper(m.entering, m.exiting)
def random_update_rows(self):
""" Random update some rows from the table """
sql_command = sql.SQL("""
update {0}.{1}
set transaction_amount = round(random()::numeric, 2)
where ctid = any(array(
select ctid
from {0}.{1}
tablesample bernoulli (1) ))""").format(sql.Identifier(self.schema_raw),
sql.Identifier(self.raw_table_name))
try:
rows = self.database.execute(sql_command)
m.info('Has been updated [%s rows] from table %s' % (rows, self.raw_table_name))
except psycopg2.Error as err:
m.error('Oops! Delete random rows has been FAILED. Reason: %s' % err.pgerror)
@m.timing
def generate_test_data_by_chunk(self, chunk_start, chunk_end):
""" Generating and saving to the file """
num_rows_mp = chunk_end - chunk_start
new_rows = []
for _ in range(num_rows_mp):
transaction_uid = uuid.uuid4()
account_uid = choice(self.list_acc)
transaction_date = (self.get_random_date(self.date_in, 0)
.__next__()
.strftime('%Y-%m-%d %H:%M:%S'))
type_deal = choice(self.list_type_deal)
transaction_amount = randint(-1000, 1000)
new_rows.append([transaction_uid,
account_uid,
transaction_date,
type_deal,
transaction_amount])
self.write_in_file(new_rows, chunk_start, chunk_end)
Особенность данной функции — запуск в нескольких распараллеленных асинхронных процессах, каждый из которых генерит свою порцию из 50К записей. Эта «фишка» позволит формировать файл на несколько миллионов строк достаточно быстро
def run_csv_writing(self):
""" Writing the test data into csv file """
pool = mp.Pool(mp.cpu_count())
jobs = []
for chunk_start, chunk_end in self.divide_into_chunks(0, self.num_rows):
jobs.append(pool.apply_async(self.generate_test_data_by_chunk,
(chunk_start, chunk_end)))
# wait for all jobs to finish
for job in jobs:
job.get()
# clean up
pool.close()
pool.join()
./generate_test_data.py 10000
./generate_test_data.py 1000000
На картинке видно, что генерация записей шла асинхронно, непоследовательно. Это означает, что следующий процесс может начаться без учета порядка запуска как только предыдущий завершится. Нет гарантии, что результат окажется в том же порядке, что и входные данные.
@m.timing
def process_wrapper(self, chunk_start, chunk_size):
""" Read a particular chunk """
with open(self.file_name_raw, newline='\n') as file:
file.seek(chunk_start)
lines = file.read(chunk_size).splitlines()
for line in lines:
self.process(line)
def chunkify(self, size=1024*1024*5):
""" Return a new chunk """
with open(self.file_name_raw, 'rb') as file:
chunk_end = file.tell()
while True:
chunk_start = chunk_end
file.seek(size, 1)
file.readline()
chunk_end = file.tell()
if chunk_end > self.file_end:
chunk_end = self.file_end
yield chunk_start, chunk_end - chunk_start
break
else:
yield chunk_start, chunk_end - chunk_start
@m.timing
def run_reading(self):
""" The main method for the reading """
# init objects
pool = mp.Pool(mp.cpu_count())
jobs = []
m.info('Run csv reading...')
# create jobs
for chunk_start, chunk_size in self.chunkify():
jobs.append(pool.apply_async(self.process_wrapper,
(chunk_start, chunk_size)))
# wait for all jobs to finish
for job in jobs:
job.get()
# clean up
pool.close()
pool.join()
m.info('CSV file reading has been completed')
Использование временной таблицы с уникальным именем для каждого процесса реконсиляции позволяет дополнительно распараллеливать процесс сверки в одной базе данных.
def read_data(self):
"""
Read the data from the postgres and shared those records with each
processor to perform their operation using threads
"""
threads_array = self.get_threads(0,
self.max_id_num_row,
self.pid_max)
for pid in range(1, len(threads_array) + 1):
m.info('Process %s' % pid)
# Getting connection from the connection pool
select_conn = self._select_conn_pool.getconn()
select_conn.autocommit = 1
# Creating 10 process to perform the operation
process = Process(target=self.process_data,
args=(self.data_queque,
pid,
threads_array[pid-1][0],
threads_array[pid-1][1],
select_conn))
process.daemon = True
process.start()
process.join()
select_conn.close()
sql_command = sql.SQL("""
select
s1.adapter_name,
count(s1.transaction_uid) as tran_count
from {0}.{1} s1
full join {0}.{1} s2
on s2.transaction_uid = s1.transaction_uid
and s2.adapter_name != s1.adapter_name
and s2.hash = s1.hash
where s2.transaction_uid is null
group by s1.adapter_name;""").format(sql.Identifier(self.schema_target),
sql.Identifier(self.storage_table))
sql_command = sql.SQL("""
with reconcil_data as (
select
s1.transaction_uid
from {0}.{1} s1
join {0}.{1} s2
on s2.transaction_uid = s1.transaction_uid
and s2.adapter_name != s1.adapter_name
where s2.hash = s1.hash
and s1.adapter_name = 'postresql_adapter'
)
insert into {2}.transaction_log
select
t.transaction_uid,
t.account_uid,
t.transaction_date,
t.type_deal,
t.transaction_amount
from {3}.transaction_log t
join reconcil_data r
on t.transaction_uid = r.transaction_uid
where not exists
(
select 1
from {2}.transaction_log tl
where tl.transaction_uid = t.transaction_uid
)
""").format(sql.Identifier(self.schema_target),
sql.Identifier(self.storage_table),
sql.Identifier(self.schema_db_clean),
sql.Identifier(self.schema_raw))
К сожалению, не доступен сервер mySQL