Browse Source

Update repository with new code

master
Joshua Rubingh 1 year ago
parent
commit
0c33f6f62f
  1. 899
      importer.py
  2. 3
      requirements.txt
  3. 14
      settings.cfg

899
importer.py

@ -0,0 +1,899 @@
import multiprocessing
from pathlib import Path
from datetime import datetime, timedelta
from time import time
import json
import pandas as pd
import re
import gzip
import configparser
import copy
import math
from sqlalchemy import create_engine
from sqlalchemy.types import String, Text, DateTime, Date, BigInteger
from sqlalchemy.exc import IntegrityError, ProgrammingError
class dHealthImport():
CSV_SEPERATOR = ';'
CSV_CHUNK_SIZE = 10 ** 5
JSON_BATCH_SIZE = 10000
EXPORT_BATCH_SIZE = 1000000
SQL_INSERT_BATCH = 10
# Put the fields in a set, so we will not end up with duplicate field names
BIRTHDAY_FIELDS = list(set(['Geboortedatum','DateOfBirth','dateOfBirth','ZCL_DATUM_GEBOORTE']))
DATE_FIELDS = list(set(BIRTHDAY_FIELDS + ['DatumAangemeld','DatumAfgemeld','DateAnonymized','DateRegistered','DateUnregistered','_anonymized_timestamp',
'LastAppUseDate','Timestamp','DateTime','LoginTime','Date','StartDate','LoginDate','StartDatum','DatumAangemaakt','DatumTot','DatumVan',
'Datum','DatumAangemaakt','DatumGeplaatst','ActiefTot','ActiefVan','DatumToegevoegd','MutationDate','Date','EndTime','StartTime','LastLoginDate','EindDatum',
'CONTACT_DTTM','CONTACT_DT','PROCESSED_DTTM','KENNISDATUM','TSRECORD','DATUM_TOESTAND_VANAF','DATUM_TOESTAND_TM','CONTACT_DATUM_TIJD']))
POSTAL_CODE_FIELDS = list(set(['postalCode','ZipCode','ZCL_POSTCODE']))
DROP_FIELDS = list(set(['EmailAddress','rug_id']))
# Index fields will only set on those fields that are available when optimizing. So the rug_id field will not be added the first optimize run
INDEX_FIELDS = list(set(['AccountId','LegacyAccountId','DacadooId','ORIGINEEL_RELATIE_ID','userId','rug_id','ZCL_REL_NR']))
# Panda sql export needs to know what the name of the date fields are
PANDA_TYPE_FIELDS = {}
for date_field in DATE_FIELDS:
PANDA_TYPE_FIELDS[date_field] = DateTime()
PANDA_TYPE_FIELDS['AccountId'] = BigInteger()
# Omdat Menzis geen getallen kunnen exporten, zijn de 'getallen' in het veld VERZEKERDENUMMER TEKST. Hoe dom!
# Daardoor kunnen we ze niet als INT importeren, want dan krijg je een error bij het getal '6,019e+09'
# Hierdoor moeten we de data CASTEN tijdens query, wat weer tijd kost.
# PANDA_TYPE_FIELDS['VERZEKERDENUMMER'] = BigInteger()
PANDA_TYPE_FIELDS['DacadooId'] = String(50)
def __init__(self, mysql_user, mysql_pass, mysql_host, mysql_db, reinit_db = True):
self.db_name = mysql_db
self.export_location = None
self.__drop_fields = copy.copy(dHealthImport.DROP_FIELDS)
self.__source_files = []
self.__source_folders = {
'dacadoo' : None,
'menzis' : None,
'netivity' : None,
'vektis' : None,
}
self.__error_log = Path('errors.log')
if self.__error_log.exists():
self.__error_log.unlink()
# NOTICE: Multiprocessing does not work on Windows :(. So it is not used
self.__number_of_cpu = multiprocessing.cpu_count()
# TODO: Make the import and cleaning run in a newly created temporary database. Afterward, drop the old existing, and rename the temporary to the final datbase name.
self.__temp_db_name = f'{self.db_name}_temp'
self.db = create_engine(f'mysql://{mysql_user}:{mysql_pass}@{mysql_host}/{self.__temp_db_name}?charset=utf8mb4')
def __logmessage(self,message):
print(f'[{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}] {message}',flush=True)
def __byte_size(self,size):
sizes = ['b','Kb','Mb','Gb','Tb']
counter = 0
while size > 1024:
size /= 1024
counter += 1
return f'{size:.3f} {sizes[counter]}'
def __init_mysql_connection(self,conn):
mysql_import_settings = [
'set global max_allowed_packet=1048576000',
'set global connect_timeout=36000',
'set global wait_timeout=36000',
'set global interactive_timeout=36000',
'set global mysqlx_connect_timeout=36000',
'set global mysqlx_read_timeout=36000',
'set global net_read_timeout=36000',
'set global innodb_buffer_pool_size=568435456',
'set global join_buffer_size=64*1024*1024',
'flush tables'
]
for mysql_setting in mysql_import_settings:
try:
conn.execute(mysql_setting)
except Exception as ex:
pass
#print(ex)
def __record_count(self,table):
with self.db.connect() as conn, conn.begin():
sql = f'SELECT COUNT(*) as AmountRecords FROM {table}'
#self.__logmessage(sql)
result = conn.execute(sql)
for row in result:
return row[0]
return None
def __get_all_tables(self):
data = []
with self.db.connect() as conn, conn.begin():
sql = 'SHOW TABLES'
#self.__logmessage(sql)
result = conn.execute(sql)
for row in result:
data.append(row[0])
return data
def __optimize_mysql_tables(self):
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
# Adding indexes to the tables for faster processing
for index_field in dHealthImport.INDEX_FIELDS:
sql = f'SELECT a.TABLE_NAME, (SELECT COUNT(*) FROM information_schema.statistics AS b WHERE b.TABLE_NAME = a.TABLE_NAME AND b.index_name = a.COLUMN_NAME AND b.TABLE_SCHEMA = a.TABLE_SCHEMA) AS Amount FROM information_schema.COLUMNS AS a WHERE a.COLUMN_NAME = \'{index_field}\' and a.TABLE_SCHEMA = \'{self.__temp_db_name}\''
result = conn.execute(sql)
for row in result:
# Index is alreay added. So skip
if row[1] > 0:
continue
table_name = row[0]
try:
sql = f'ALTER TABLE {table_name} ADD INDEX ({index_field})'
self.__logmessage(sql)
conn.execute(sql)
except Exception as ex:
pass
# Optimize tables so all data is indexed and ready
result = conn.execute('show tables')
for row in result:
table_name = row[0]
sql = f'OPTIMIZE TABLE {table_name}'
self.__logmessage(sql)
conn.execute(sql)
def __load_files(self):
self.__source_files = []
for _, source in self.__source_folders.items():
if source is not None:
self.__source_files += sorted([child for child in source.iterdir()])
def __file_belongs_to_source(self,file):
for name, source in self.__source_folders.items():
if file.parent == source:
return name
def __process_csv_file(self,file,table):
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
conn.execute(f'drop table if exists {table}')
panda_data = pd.read_csv(file, sep=dHealthImport.CSV_SEPERATOR, error_bad_lines=False, warn_bad_lines=True)
# Some fields are not detected properly. So the dict dtype holds field names which are not properly detected and specify the needed types
try:
panda_data.to_sql(table, conn, if_exists='append', index=False, chunksize=dHealthImport.SQL_INSERT_BATCH, method='multi', dtype=dHealthImport.PANDA_TYPE_FIELDS)
except Exception as ex:
# Something went wrong. The data is not committed to db, so we do it now one by one, to filter out the error records.
try:
panda_data.to_sql(table, conn, if_exists='append', index=False, chunksize=1, method='multi', dtype=dHealthImport.PANDA_TYPE_FIELDS)
except Exception as ex:
with self.__error_log.open('a') as logfile:
logfile.write(f'{table}, {file}: {ex}\n')
def __process_gz_file(self,file,table = None):
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
if table is not None:
conn.execute('drop table if exists {}'.format(table))
with gzip.open(file, 'rt', encoding='utf-8') as f:
sql_statement = ''
for line in f:
# Were we skip the drop table lines
sql_line = line.strip()
if sql_line.startswith('--') or sql_line.startswith('CREATE DATABASE') or sql_line.startswith('USE') or sql_line.startswith('DROP TABLE'):
# Cleanup step 1
if 'table' in sql_line:
print('')
self.__logmessage(f'[{file.name}] {sql_line}')
continue
if sql_line.startswith('CREATE TABLE'):
# Cleanup step 2
sql_line = sql_line.replace('CREATE TABLE','CREATE TABLE /*!32312 IF NOT EXISTS*/')
line = sql_line
if '' != sql_line:
sql_statement = '{} {}'.format(sql_statement,sql_line)
if sql_line.endswith(';'):
try:
conn.execute(sql_statement)
conn.execute('commit')
except IntegrityError as ex:
# Duplicate key errors
with self.__error_log.open('a') as logfile:
logfile.write('{}, {}: {}\n'.format(file, ex, sql_statement))
except Exception as ex:
print('\nError: {}'.format(ex))
print(file)
print(sql_statement)
#with self.__error_log.open('a') as logfile:
# logfile.write('{}, {}: {}\n'.format(file, ex, sql_statement))
sql_statement = ''
print('.', end='',flush=True)
print('')
self.__logmessage('Processing file {} is done!'.format(file))
def __process_json_file(self,file,table):
with self.db.connect() as conn, conn.begin():
conn.execute('drop table if exists {}'.format(table))
json_data = []
records_done = 0
filesize_done = 0
with file.open() as f:
for line in f:
filesize_done += len(line)
json_line = json.loads(line)
# Stupid date fixes.....
for date_field in dHealthImport.DATE_FIELDS:
try:
json_line[date_field] = json_line[date_field].strip().strip('Z').strip()
if '' == json_line[date_field]:
json_line[date_field] = None
elif ' ' in json_line[date_field]:
temp = json_line[date_field].split(' ')
temp[0] = temp[0].split('-')
# Swap year and day field.... mixing up dates here....
if int(temp[0][0]) > 1000:
json_line[date_field] = '{}-{}-{}T{}'.format(temp[0][0],temp[0][1],temp[0][2],temp[1])
else:
json_line[date_field] = '{}-{}-{}T{}'.format(temp[0][2],temp[0][1],temp[0][0],temp[1])
except Exception as ex:
#print(ex)
pass
json_data.append(json_line)
if len(json_data) == dHealthImport.JSON_BATCH_SIZE:
records_done += dHealthImport.JSON_BATCH_SIZE
panda_data = pd.DataFrame(json_data)
del(json_data)
json_data = []
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
# Some fields are not detected properly. So the dict dtype holds field names which are not properly detected and specify the needed types
try:
panda_data.to_sql(table, conn, if_exists='append', index=False, chunksize=dHealthImport.SQL_INSERT_BATCH, method='multi', dtype=dHealthImport.PANDA_TYPE_FIELDS)
except Exception as ex:
# Something went wrong. The data is not committed to db, so we do it now one by one, to filter out the error records.
try:
panda_data.to_sql(table, conn, if_exists='append', index=False, chunksize=1, method='multi', dtype=dHealthImport.PANDA_TYPE_FIELDS)
except Exception as ex:
with self.__error_log.open('a') as logfile:
logfile.write('{}, {}: {}\n'.format(table,file,ex))
self.__logmessage('Processing at {:.3f}% {}/{}, {} records from JSON data file \'{}\'.'.format((filesize_done / file.stat().st_size) * 100,self.__byte_size(filesize_done),self.__byte_size(file.stat().st_size),records_done,file))
# Remaining records
records_done += len(json_data)
panda_data = pd.DataFrame(json_data)
del(json_data)
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
# Some fields are not detected properly. So the dict dtype holds field names which are not properly detected and specify the needed types
try:
panda_data.to_sql(table, conn, if_exists='append', index=False, chunksize=dHealthImport.SQL_INSERT_BATCH, method='multi', dtype=dHealthImport.PANDA_TYPE_FIELDS)
except Exception as ex:
# Something went wrong. The data is not commited to db, so we do it now one by one, to filter out the error records.
try:
panda_data.to_sql(table, conn, if_exists='append', index=False, chunksize=1, method='multi', dtype=dHealthImport.PANDA_TYPE_FIELDS)
except Exception as ex:
with self.__error_log.open('a') as logfile:
logfile.write('{}, {}: {}\n'.format(table,file,ex))
self.__logmessage('Processing at {:.3f}% {}/{}, {} records from JSON data file \'{}\'.'.format((filesize_done / file.stat().st_size) * 100,self.__byte_size(filesize_done),self.__byte_size(file.stat().st_size),records_done,file))
def set_export_location(self, path):
path = Path(path)
if not path.exists():
self.__logmessage(f'Creating CSV export folder {path}')
path.mkdir()
self.export_location = path
def run(self, reinit_db = True):
if reinit_db:
temp_con = create_engine(str(self.db.url).replace(self.__temp_db_name,''))
try:
self.__logmessage(f'Dropping existing temporary database: {self.__temp_db_name}')
temp_con.execute(f'DROP DATABASE IF EXISTS {self.__temp_db_name}')
self.__logmessage(f'Create new temporary database: {self.__temp_db_name}')
temp_con.execute(f'CREATE DATABASE {self.__temp_db_name}')
except Exception as ex:
print(ex)
# TODO: Make it multiprocessing.... that is way faster then one file at the time
for file in self.__source_files:
self.process_file(file)
# Start renaming the dacadoo tables. As the original names are coming from the SQL import
# Were we prefix all the tables that do not have a known prefix
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
result = conn.execute('show tables')
for row in result:
table_name = row[0]
rename_table = f'dacadoo_{table_name}'.lower()
if table_name.startswith('menzis_') or table_name.startswith('netivity_') or table_name.startswith('vektis_') or table_name.startswith('dacadoo_'):
continue
self.__logmessage(f'rename table {table_name} TO {rename_table}')
try:
conn.execute(f'rename table {table_name} TO {rename_table}')
except Exception as ex:
print(f'Error executing: {ex}')
self.__optimize_mysql_tables()
def process_file(self,file = None):
self.__logmessage(f'Processing file: {file} with filesize: {self.__byte_size(file.stat().st_size)}')
start = time()
table = self.__file_belongs_to_source(file) + '_'
if '.json' == file.suffix:
# Stupid Windows does not understand casing.... so force lowercase... :(
table = '{}{}'.format(table,re.sub(r'_\d+-\d+-\d+T.*\.json', '', file.name)).lower()
self.__process_json_file(file,table)
elif '.csv' == file.suffix:
# Stupid Windows does not understand casing.... so force lowercase... :(
table = '{}{}'.format(table,re.sub(r'(_\d+_\d+)?\.csv', '', file.name)).lower()
self.__process_csv_file(file,table)
elif '.gz' == file.suffix:
self.__process_gz_file(file)
self.__logmessage(f'Processing file {file} done in {timedelta(seconds=(time()-start))}')
def filter_on_consent(self):
def fix_vektis_insurance_number(self):
print('fix_vektis_insurance_number')
print(self)
# Apperently it is difficult to combine to CSV files at the source. So we have to fix it here again....
# And they are not able to upload easy to understand file names... bunch of rubbish..
# Find the 'source' table with the insurance and accountid numbers
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'VERZEKERDENUMMER\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name'
self.__logmessage(sql)
source_table_name = None
result = conn.execute(sql)
for row in result:
source_table_name = row[0]
if not source_table_name.lower().startswith('vektis_'):
continue
# Do some cleanup... Menzis rommel
sql = f'DELETE FROM {source_table_name} WHERE VERZEKERDENUMMER LIKE "%%e+%%"'
result = conn.execute(sql)
self.__logmessage(f'Deleted {result.rowcount} rows from table {source_table_name}')
self.__logmessage(f'Found source insurrance table at: {source_table_name}')
if source_table_name is None:
return
# Find all the tables that holds the field ZCL_REL_NR
convert_field = 'ZCL_REL_NR'
sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{convert_field}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name'
result = conn.execute(sql)
for row in result:
table_name = row[0]
try:
sql = f'SELECT AccountID, VERZEKERDENUMMER FROM {source_table_name} ORDER BY AccountID'
account_ids = conn.execute(sql)
total_records = account_ids.rowcount
self.__logmessage(f'Updating {total_records} accounts in the table {table_name}')
counter = 0
for account in account_ids:
counter += 1
sql = f'UPDATE {table_name} SET {convert_field} = {account[0]} WHERE {convert_field} = {account[1]}'
updates = conn.execute(sql)
self.__logmessage(f'[{counter} off {total_records}({(counter/total_records)*100:.2f}%)]: Updated {updates.rowcount} records for account id {account[0]} in table {table_name}')
self.__logmessage(f'Renaming field {convert_field} to AccountId')
sql = f'ALTER TABLE {table_name} CHANGE COLUMN {convert_field} AccountId Bigint(20)'
self.__logmessage(sql)
conn.execute(sql)
sql = f'ALTER TABLE {table_name} ADD INDEX (AccountId)'
self.__logmessage(sql)
conn.execute(sql)
except Exception as ex:
print('Fix vektis exception')
print(ex)
if source_table_name is None:
try:
# Drop the table, as we do not need it anymore
sql = f'DROP TABLE {source_table_name}'
self.__logmessage(sql)
conn.execute(sql)
except Exception as ex:
print(ex)
# Here we clean up the records that should not be here. They have not given a consent
# First we start with the Dacadoo data.
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
try:
# Delete all dacadoo accounts that should not be here.
sql = 'DELETE FROM dacadoo_user WHERE id NOT IN (SELECT id FROM dacadoo_acceptedUser)'
self.__logmessage(sql)
conn.execute(sql)
# Drop the table, as we do not need it anymore
sql = 'DROP TABLE dacadoo_acceptedUser'
self.__logmessage(sql)
conn.execute(sql)
except Exception as ex:
print(ex)
# Now we clean all the other tables that contain the Dacadoo userId field (reference field)
# And delete all records where the userId is not in the account table.
id_field_name = 'userId'
self.__drop_fields.append(id_field_name)
sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
result = conn.execute(sql)
for row in result:
table_name = row[0]
sql = f'DELETE FROM {table_name} WHERE {id_field_name} NOT IN (SELECT id FROM dacadoo_user)'
self.__logmessage(sql)
conn.execute(sql)
# Convert Vektis insurance number to Netvitiy AccoundID, so we can clean if later on
fix_vektis_insurance_number(self)
# Now we clean the Netivity data
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
# Delete all Netivity accounts that should not be here.
# But this is based on either legacy or normal account id
try:
sql = 'DELETE FROM netivity_account WHERE AccountId NOT IN (SELECT ToestemmingRUG FROM netivity_rugtoestemmingids) AND LegacyAccountId NOT IN (SELECT ToestemmingRUG FROM netivity_rugtoestemmingids)'
self.__logmessage(sql)
conn.execute(sql)
sql = 'DELETE FROM netivity_legacyaccountoud WHERE AccountId NOT IN (SELECT ToestemmingRUG FROM netivity_rugtoestemmingids)'
self.__logmessage(sql)
conn.execute(sql)
# Drop the table, as we do not need it anymore
sql = 'DROP TABLE netivity_rugtoestemmingids'
self.__logmessage(sql)
conn.execute(sql)
except Exception as ex:
print(ex)
# Now we clean all the other tables that contain the Netivity userId field (reference field)
# And delete all records where the userId is not in the account table.
id_field_name = 'AccountId'
sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name'
result = conn.execute(sql)
for row in result:
table_name = row[0]
if table_name in ['netivity_account','netivity_legacyaccountoud']:
continue
source_table = 'netivity_legacyaccountoud' if table_name.startswith('legacy') else 'netivity_account'
sql = f'DELETE FROM {table_name} WHERE {id_field_name} NOT IN (SELECT {id_field_name} FROM {source_table})'
self.__logmessage(sql)
conn.execute(sql)
# And now how about the Menzis data??? Unclear...
def create_new_rug_ids(self):
self.__drop_fields.append('LegacyAccountId')
self.__drop_fields.append('DacadooId')
new_user_table = 'rug_userid_conversion'
table_sql = '''CREATE TABLE `''' + new_user_table + '''` (
`Index` BIGINT(20) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`menzis_id` BIGINT(20) NULL DEFAULT '0',
`netivity_legacy_id` BIGINT(20) NULL DEFAULT '0',
`dacadoo_id` VARCHAR(100) NULL DEFAULT '' COLLATE 'utf8_general_ci',
`rug_id` VARCHAR(50) DEFAULT (uuid()),
INDEX (`menzis_id`),
INDEX (`netivity_legacy_id`),
INDEX (`dacadoo_id`),
UNIQUE INDEX `rug_id` (`rug_id`) USING BTREE)'''
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
conn.execute(f'DROP TABLE IF EXISTS {new_user_table}')
conn.execute(table_sql)
# Collect the account IDs from Netivity data which holds 3 fields: AccountId, LegacyAccountId and DacadooId. This allow us to join all the data sets
self.__logmessage('Inserting Netivity account data to new rug ids ...')
sql = f'INSERT INTO {new_user_table}(menzis_id,netivity_legacy_id,dacadoo_id) SELECT AccountId, LegacyAccountId, DacadooId FROM netivity_account'
self.__logmessage(sql)
conn.execute(sql)
self.__logmessage('Inserting Netivity account data to new rug ids is done!')
self.__logmessage('Get the legacy account ids based on Dacadoo Ids ...')
sql = f'UPDATE {new_user_table} SET netivity_legacy_id = (SELECT AccountId FROM netivity_legacyaccountoud WHERE netivity_legacyaccountoud.DacadooId = {new_user_table}.dacadoo_id LIMIT 1) WHERE netivity_legacy_id = \'\''
self.__logmessage(sql)
conn.execute(sql)
# Collect old IDS from legacy tables which holds 2 fields: AccountId, DacadooId. But now we only want NEW records in table rug_userid_conversion. So only request records which are not al ready seen.
self.__logmessage('Inserting Netivity LEGACY account data to new rug ids ...')
sql = f'INSERT INTO {new_user_table}(netivity_legacy_id,dacadoo_id) SELECT AccountId, DacadooId FROM netivity_legacyaccountoud WHERE AccountId NOT IN (SELECT netivity_legacy_id FROM {new_user_table}) AND DacadooId NOT IN (SELECT dacadoo_id FROM {new_user_table})'
self.__logmessage(sql)
conn.execute(sql)
self.__logmessage('Inserting Netivity LEGACY account data to new rug ids is done!')
# Get all Dacadoo IDs which are not seen yet....
self.__logmessage('Loading the remaining Dacadoo Ids which do not have a link with the Menzis or Netivity data')
sql = f'INSERT INTO {new_user_table}(dacadoo_id) SELECT id FROM dacadoo_user WHERE id NOT IN (SELECT dacadoo_id FROM {new_user_table})'
self.__logmessage(sql)
conn.execute(sql)
self.__logmessage('Loaded the remaining Dacadoo Ids which do not have a link with the Menzis or Netivity data')
# Load all the accounts from temp table to memory, so we only have to query once the source table
accounts = []
sql = f'SELECT menzis_id, netivity_legacy_id, dacadoo_id, rug_id FORM {new_user_table} ORDER BY Index'
result = conn.execute(sql)
for row in result:
accounts.append((row[0],row[1],row[2],row[3]))
total_accounts = len(accounts)
self.__logmessage(f'Loaded in total {total_accounts} accounts to re-number')
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
# Now we are looking for all tables that holds the old IDS. Based on table name (legacy or not) we choose the field name to match
id_field_name = 'AccountId'
self.__drop_fields.append(id_field_name)
sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name'
result = conn.execute(sql)
table_counter = 0
table_total = result.rowcount
self.__logmessage(f'We found {table_total} tables that needs to be re-numbered for the field {id_field_name}')
for row in result:
table_counter += 1
table_name = row[0]
try:
sql = f'ALTER TABLE {table_name} ADD COLUMN `rug_id` VARCHAR(50) NULL DEFAULT NULL'
self.__logmessage(sql)
conn.execute(sql)
except Exception:
pass
# Loop over all accounts to update
self.__logmessage(f'Re-numbering table {table_name} ({table_counter} off {table_total} - {(table_counter/table_total)*100:.2f}%)')
counter = 0
for account in accounts:
counter += 1
source_id = 'netivity_legacy_id' if 'legacy' in table_name else 'menzis_id'
source_value = account[1] if 'legacy' in table_name else account[0]
sql = f'UPDATE {table_name} SET rug_id = \'{account[3]}\' WHERE {source_id} = {source_value}'
self.__logmessage(sql)
updates = conn.execute(sql)
self.__logmessage(f'[{counter} off {total_accounts}({(counter/total_accounts)*100:.2f}%)]: Updated {updates.rowcount} records for account id {source_value} in table {table_name}')
# self.__logmessage(f'Updated ')
# sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.{source_id} = {table_name}.{id_field_name} LIMIT 1)'
# self.__logmessage(sql)
# conn.execute(sql)
# with self.db.connect() as conn, conn.begin():
# self.__init_mysql_connection(conn)
# # Get all the Menzis tables which holds ids in field name ORIGINEEL_RELATIE_ID
# id_field_name = 'ORIGINEEL_RELATIE_ID'
# self.__drop_fields.append(id_field_name)
# sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' order by table_name'
# result = conn.execute(sql)
# for row in result:
# table_name = row[0]
# # sql = f'INSERT INTO {new_user_table}(netivity_legacy_id) SELECT DISTINCT {id_field_name} FROM {table_name} WHERE {id_field_name} NOT IN (SELECT menzis_id FROM {new_user_table}) AND {id_field_name} NOT IN (SELECT netivity_legacy_id FROM {new_user_table})'
# # self.__logmessage(sql)
# # conn.execute(sql)
# try:
# sql = f'ALTER TABLE {table_name} ADD COLUMN `rug_id` VARCHAR(50) NULL DEFAULT NULL'
# self.__logmessage(sql)
# conn.execute(sql)
# except Exception:
# pass
# sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.menzis_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL'
# self.__logmessage(sql)
# conn.execute(sql)
# sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.netivity_legacy_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL'
# self.__logmessage(sql)
# conn.execute(sql)
tables_to_process = []
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
# Get all the Dacadoo tables which holds ids in field name userId
table_name = 'dacadoo_user'
try:
sql = f'ALTER TABLE {table_name} ADD COLUMN `rug_id` VARCHAR(50) NULL DEFAULT NULL'
self.__logmessage(sql)
conn.execute(sql)
except Exception:
pass
sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.id LIMIT 1)'
self.__logmessage(sql)
conn.execute(sql)
id_field_name = 'userId'
self.__drop_fields.append(id_field_name)
sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
result = conn.execute(sql)
for row in result:
tables_to_process.append(row[0])
# Here we loop over the tables that needs to be changed. We open a new DB connection for every table update.
# This will hopefully reduce the undo log and commit earlier the changes
for table_name in tables_to_process:
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
try:
sql = f'ALTER TABLE {table_name} DROP COLUMN rug_id'
self.__logmessage(sql)
conn.execute(sql)
except Exception:
pass
try:
sql = f'ALTER TABLE {table_name} DROP INDEX rug_id'
#self.__logmessage(sql)
conn.execute(sql)
except Exception:
pass
try:
sql = f'ALTER TABLE {table_name} ADD COLUMN rug_id VARCHAR(50) NULL DEFAULT NULL'
self.__logmessage(sql)
conn.execute(sql)
except Exception:
pass
sql = f'FLUSH TABLES'
#self.__logmessage(sql)
conn.execute(sql)
batch_size = 100000
amount_of_records = round(self.__record_count(table_name)/batch_size)+1
for i in range(amount_of_records):
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL LIMIT {batch_size}'
self.__logmessage(f'({i+1}/{amount_of_records}) {sql}')
try:
result = conn.execute(sql)
except Exception as ex:
result = conn.execute(sql)
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
sql = f'DROP TABLE {new_user_table}'
self.__logmessage(sql)
conn.execute(sql)
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
id_field_name = 'rug_id'
sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
result = conn.execute(sql)
for row in result:
table_name = row[0]
sql = f'DELETE FROM {table_name} WHERE {id_field_name} IS NULL'
self.__logmessage(sql)
conn.execute(sql)
# Special case. These are the original Dacadoo ids. Only in the user table they should be deleted.
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
drop_field = 'id'
table_name = 'dacadoo_user'
self.__logmessage(f'Dropping field {drop_field} from table {table_name}')
sql = f'ALTER TABLE {table_name} DROP {drop_field}'
result = conn.execute(sql)
self.__logmessage(f'Dropped field {drop_field} from table {table_name}')
def drop_fields(self, drop = True):
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
# Drop all the fields that contain data that is sensitive
# For now, we keep all the different userid fields. As we do not re-create new rug-ids
# for drop_field in self.__drop_fields:
for drop_field in dHealthImport.DROP_FIELDS:
sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{drop_field}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
result = conn.execute(sql)
for row in result:
table_name = row[0]
self.__logmessage(f'Dropping field {drop_field} from table {table_name}')
sql = f'ALTER TABLE {table_name} DROP {drop_field}'
if drop:
result = conn.execute(sql)
self.__logmessage(f'Dropped field {drop_field} from table {table_name}')
def clean_birth_days(self):
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
for birthday_field in dHealthImport.BIRTHDAY_FIELDS:
sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{birthday_field}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
result = conn.execute(sql)
for row in result:
table_name = row[0]
self.__logmessage(f'Updateing birthday field {birthday_field} for table {table_name}')
sql = f'UPDATE {table_name} SET {birthday_field} = DATE_FORMAT({birthday_field},\'%%Y-01-01\')'
result = conn.execute(sql)
self.__logmessage(f'Updated birthday field {birthday_field} for table {table_name}')
def clean_postal_codes(self):
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
for postal_code_field in dHealthImport.POSTAL_CODE_FIELDS:
sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{postal_code_field}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
result = conn.execute(sql)
for row in result:
table_name = row[0]
self.__logmessage(f'Updateing postal code field {postal_code_field} for table {table_name}')
sql = f'UPDATE {table_name} SET {postal_code_field} = SUBSTRING({postal_code_field},0,4)'
result = conn.execute(sql)
self.__logmessage(f'Updated postal code field {postal_code_field} for table {table_name}')
def create_csv_exports(self, summary = True, clean = True):
# Create the export folder for all the CSV files
if self.export_location is None:
return
if clean:
self.__logmessage(f'Clean up export location: {self.export_location}')
for child in self.export_location.iterdir():
child.unlink()
self.__logmessage(f'Delete file {child.name}')
summary = '' if not summary else '_summary'
batch_size = dHealthImport.EXPORT_BATCH_SIZE
for table_name in self.__get_all_tables():
export_file = f'{self.export_location}/{table_name}{summary}.csv'
self.__logmessage(f'Exporting to {export_file}')
if summary:
batches = 1
batch_size = 1000
else:
batches = math.ceil(self.__record_count(table_name) / batch_size)
for x in range(batches):
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
sql = f'SELECT * FROM {table_name} LIMIT {x*batch_size}, {batch_size}'
sql_data = pd.read_sql(sql, conn)
# Add headers when x == 0. This is the first batch, which should create and add headers
sql_data.to_csv(export_file, index=False, header=(x==0), mode='a', encoding='utf-8', sep=dHealthImport.CSV_SEPERATOR)
print('.', end='',flush=True)
print('')
def addDacadooData(self,location):
location = Path(location)
if location.exists() and location.is_dir():
self.__source_folders['dacadoo'] = location
self.__load_files()
else:
raise RuntimeError(f'Location {location} is not a valid Dacadoo source location')
def addMenzisData(self,location):
location = Path(location)
if location.exists() and location.is_dir():
self.__source_folders['menzis'] = location
self.__load_files()
else:
raise RuntimeError(f'Location {location} is not a valid Menzis source location')
def addNetivityData(self,location):
location = Path(location)
if location.exists() and location.is_dir():
self.__source_folders['netivity'] = location
self.__load_files()
else:
raise RuntimeError(f'Location {location} is not a valid Netivity source location')
def addVektisData(self,location):
location = Path(location)
if location.exists() and location.is_dir():
self.__source_folders['vektis'] = location
self.__load_files()
else:
raise RuntimeError(f'Location {location} is not a valid Vektis source location')
if __name__ == "__main__":
config = configparser.ConfigParser()
try:
config.read_file(open('settings.cfg'))
except FileNotFoundError as ex:
print('Please create a settings.cfg file based on the settings.sample.cfg file.')
exit()
importer = dHealthImport(config['database']['user'],config['database']['pass'],config['database']['host'],config['database']['db'])
try:
importer.addDacadooData(config['datasources']['dacadoo'])
print('Loaded Dacadoo data')
except KeyError as ex:
print('Not loading Dacadoo data')
try:
importer.addMenzisData(config['datasources']['menzis'])
print('Loaded Menzis data')
except KeyError as ex:
print('Not loading Menzis data')
try:
importer.addNetivityData(config['datasources']['netivity'])
print('Loaded Netivity data')
except KeyError as ex:
print('Not loading Netivity data')
try:
importer.addVektisData(config['datasources']['vektis'])
print('Loaded Vektis data')
except KeyError as ex:
print('Not loading Vektis data')
try:
importer.set_export_location(config['export']['location'])
print(f'Export location is set to {importer.export_location}')
except KeyError as ex:
print('Not exporting CSV data')
#importer.run(True)
#importer.filter_on_consent()
# importer.create_new_rug_ids()
#importer.clean_birth_days()
#importer.clean_postal_codes()
#importer.drop_fields()
importer.create_csv_exports()
importer.create_csv_exports(False,False)

3
requirements.txt

@ -0,0 +1,3 @@
sqlalchemy
MySQLdb
pandas

14
settings.cfg

@ -0,0 +1,14 @@
[database]
host=129.125.108.73
user=datamanager
pass=datamanager
db=healthpro
[datasources]
dacadoo=G:\HealthPro\RAW\Received 2020-06-30\Dacadoo
menzis=G:\HealthPro\RAW\Received 2019-11-06\Menzis
netivity=G:\HealthPro\RAW\Received 2020-09-17\Netivity
vektis=G:\HealthPro\RAW\Received 2020-11-18\Vektis
[export]
location=G:\HealthPro\CLEAN
Loading…
Cancel
Save