Browse Source

Update importer

master
Joshua Rubingh 1 year ago
parent
commit
6a433cc786
  1. 348
      importer.py

348
importer.py

@ -33,7 +33,7 @@ class dHealthImport():
'CONTACT_DTTM','CONTACT_DT','PROCESSED_DTTM','KENNISDATUM','TSRECORD','DATUM_TOESTAND_VANAF','DATUM_TOESTAND_TM','CONTACT_DATUM_TIJD']))
POSTAL_CODE_FIELDS = list(set(['postalCode','ZipCode','ZCL_POSTCODE']))
DROP_FIELDS = list(set(['EmailAddress','rug_id']))
DROP_FIELDS = list(set(['EmailAddress','rug_id','email_address']))
# Index fields will only set on those fields that are available when optimizing. So the rug_id field will not be added the first optimize run
INDEX_FIELDS = list(set(['AccountId','LegacyAccountId','DacadooId','ORIGINEEL_RELATIE_ID','userId','rug_id','ZCL_REL_NR']))
@ -56,10 +56,10 @@ class dHealthImport():
self.__drop_fields = copy.copy(dHealthImport.DROP_FIELDS)
self.__source_files = []
self.__source_folders = {
'dacadoo' : None,
'menzis' : None,
'netivity' : None,
'vektis' : None,
'dacadoo' : [],
'menzis' : [],
'netivity' : [],
'vektis' : [],
}
self.__error_log = Path('errors.log')
@ -160,18 +160,20 @@ class dHealthImport():
def __load_files(self):
self.__source_files = []
for _, source in self.__source_folders.items():
if source is not None:
self.__source_files += sorted([child for child in source.iterdir()])
for location in source:
self.__source_files += sorted([child for child in location.iterdir()])
def __file_belongs_to_source(self,file):
for name, source in self.__source_folders.items():
if file.parent == source:
if file.parent in source:
return name
def __process_csv_file(self,file,table):
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
conn.execute(f'drop table if exists {table}')
sql = f'drop table if exists {table}'
self.__logmessage(sql)
conn.execute(sql)
panda_data = pd.read_csv(file, sep=dHealthImport.CSV_SEPERATOR, error_bad_lines=False, warn_bad_lines=True)
# Some fields are not detected properly. So the dict dtype holds field names which are not properly detected and specify the needed types
@ -240,7 +242,7 @@ class dHealthImport():
json_data = []
records_done = 0
filesize_done = 0
with file.open() as f:
with file.open(encoding='utf-8') as f:
for line in f:
filesize_done += len(line)
json_line = json.loads(line)
@ -351,9 +353,9 @@ class dHealthImport():
self.__optimize_mysql_tables()
def process_file(self,file = None):
self.__logmessage(f'Processing file: {file} with filesize: {self.__byte_size(file.stat().st_size)}')
start = time()
table = self.__file_belongs_to_source(file) + '_'
self.__logmessage(f'Processing file: {file} with filesize: {self.__byte_size(file.stat().st_size)} with table prefix {table}')
start = time()
if '.json' == file.suffix:
# Stupid Windows does not understand casing.... so force lowercase... :(
@ -361,7 +363,8 @@ class dHealthImport():
self.__process_json_file(file,table)
elif '.csv' == file.suffix:
# Stupid Windows does not understand casing.... so force lowercase... :(
table = '{}{}'.format(table,re.sub(r'(_\d+_\d+)?\.csv', '', file.name)).lower()
# Mensiz is no capable of producing correct export names....
table = '{}{}'.format(table,re.sub(r'([_-]+\d+[_-]+\d+(?:.*)?)?.csv', '', file.name)).lower()
self.__process_csv_file(file,table)
elif '.gz' == file.suffix:
self.__process_gz_file(file)
@ -371,8 +374,6 @@ class dHealthImport():
def filter_on_consent(self):
def fix_vektis_insurance_number(self):
print('fix_vektis_insurance_number')
print(self)
# Apperently it is difficult to combine to CSV files at the source. So we have to fix it here again....
# And they are not able to upload easy to understand file names... bunch of rubbish..
@ -388,7 +389,7 @@ class dHealthImport():
if not source_table_name.lower().startswith('vektis_'):
continue
# Do some cleanup... Menzis rommel
# Do some cleanup... Menzis rommel. They stored numbers in tekst, and apperently forced to scientific format. This we cannot process
sql = f'DELETE FROM {source_table_name} WHERE VERZEKERDENUMMER LIKE "%%e+%%"'
result = conn.execute(sql)
self.__logmessage(f'Deleted {result.rowcount} rows from table {source_table_name}')
@ -430,7 +431,7 @@ class dHealthImport():
print('Fix vektis exception')
print(ex)
if source_table_name is None:
if source_table_name is not None:
try:
# Drop the table, as we do not need it anymore
sql = f'DROP TABLE {source_table_name}'
@ -507,13 +508,13 @@ class dHealthImport():
if table_name in ['netivity_account','netivity_legacyaccountoud']:
continue
source_table = 'netivity_legacyaccountoud' if table_name.startswith('legacy') else 'netivity_account'
source_table = 'netivity_legacyaccountoud' if 'legacy' in table_name else 'netivity_account'
sql = f'DELETE FROM {table_name} WHERE {id_field_name} NOT IN (SELECT {id_field_name} FROM {source_table})'
self.__logmessage(sql)
conn.execute(sql)
# And now how about the Menzis data??? Unclear...
# And now how about the Menzis data??? Unclear... Looks unrelated marketing data
def create_new_rug_ids(self):
self.__drop_fields.append('LegacyAccountId')
@ -526,10 +527,12 @@ class dHealthImport():
`menzis_id` BIGINT(20) NULL DEFAULT '0',
`netivity_legacy_id` BIGINT(20) NULL DEFAULT '0',
`dacadoo_id` VARCHAR(100) NULL DEFAULT '' COLLATE 'utf8_general_ci',
`rug_id` VARCHAR(50) DEFAULT (uuid()),
`email_address` VARCHAR(250) NULL DEFAULT '' COLLATE 'utf8_general_ci',
`rug_id` VARCHAR(50) NOT NULL DEFAULT (uuid()),
INDEX (`menzis_id`),
INDEX (`netivity_legacy_id`),
INDEX (`dacadoo_id`),
INDEX (`email_address`),
UNIQUE INDEX `rug_id` (`rug_id`) USING BTREE)'''
with self.db.connect() as conn, conn.begin():
@ -539,39 +542,43 @@ class dHealthImport():
# Collect the account IDs from Netivity data which holds 3 fields: AccountId, LegacyAccountId and DacadooId. This allow us to join all the data sets
self.__logmessage('Inserting Netivity account data to new rug ids ...')
sql = f'INSERT INTO {new_user_table}(menzis_id,netivity_legacy_id,dacadoo_id) SELECT AccountId, LegacyAccountId, DacadooId FROM netivity_account'
sql = f'INSERT INTO {new_user_table}(menzis_id,netivity_legacy_id,dacadoo_id,email_address) SELECT AccountId, LegacyAccountId, DacadooId, EmailAddress FROM netivity_account'
self.__logmessage(sql)
conn.execute(sql)
self.__logmessage('Inserting Netivity account data to new rug ids is done!')
count = conn.execute(sql)
self.__logmessage(f'Inserted {count.rowcount} Netivity accounts.')
self.__logmessage('Get the legacy account ids based on Dacadoo Ids ...')
sql = f'UPDATE {new_user_table} SET netivity_legacy_id = (SELECT AccountId FROM netivity_legacyaccountoud WHERE netivity_legacyaccountoud.DacadooId = {new_user_table}.dacadoo_id LIMIT 1) WHERE netivity_legacy_id = \'\''
self.__logmessage(sql)
conn.execute(sql)
count = conn.execute(sql)
self.__logmessage(f'Updated {count.rowcount} Netivity legacy accounts based on the Dacadoo IDs.')
# Collect old IDS from legacy tables which holds 2 fields: AccountId, DacadooId. But now we only want NEW records in table rug_userid_conversion. So only request records which are not al ready seen.
self.__logmessage('Inserting Netivity LEGACY account data to new rug ids ...')
sql = f'INSERT INTO {new_user_table}(netivity_legacy_id,dacadoo_id) SELECT AccountId, DacadooId FROM netivity_legacyaccountoud WHERE AccountId NOT IN (SELECT netivity_legacy_id FROM {new_user_table}) AND DacadooId NOT IN (SELECT dacadoo_id FROM {new_user_table})'
self.__logmessage(sql)
conn.execute(sql)
self.__logmessage('Inserting Netivity LEGACY account data to new rug ids is done!')
count = conn.execute(sql)
self.__logmessage(f'Added {count.rowcount} legacy Netivity accounts.')
# Get all Dacadoo IDs which are not seen yet....
self.__logmessage('Loading the remaining Dacadoo Ids which do not have a link with the Menzis or Netivity data')
sql = f'INSERT INTO {new_user_table}(dacadoo_id) SELECT id FROM dacadoo_user WHERE id NOT IN (SELECT dacadoo_id FROM {new_user_table})'
self.__logmessage(sql)
conn.execute(sql)
self.__logmessage('Loaded the remaining Dacadoo Ids which do not have a link with the Menzis or Netivity data')
count = conn.execute(sql)
self.__logmessage(f'Loaded the remaining Dacadoo Ids ({count.rowcount}) which do not have a link with the Menzis or Netivity data')
# Load all the accounts from temp table to memory, so we only have to query once the source table
accounts = []
sql = f'SELECT menzis_id, netivity_legacy_id, dacadoo_id, rug_id FORM {new_user_table} ORDER BY Index'
result = conn.execute(sql)
for row in result:
accounts.append((row[0],row[1],row[2],row[3]))
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
# Load all the accounts from temp table to memory, so we only have to query once the source table
accounts = []
sql = f'SELECT menzis_id, netivity_legacy_id, dacadoo_id, email_address, rug_id FROM {new_user_table} ORDER BY `Index`'
result = conn.execute(sql)
for row in result:
accounts.append((row[0],row[1],row[2],row[3],row[4]))
total_accounts = len(accounts)
self.__logmessage(f'Loaded in total {total_accounts} accounts to re-number')
total_accounts = len(accounts)
self.__logmessage(f'Loaded in total {total_accounts} accounts to re-number')
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
@ -586,6 +593,7 @@ class dHealthImport():
for row in result:
table_counter += 1
table_name = row[0]
try:
sql = f'ALTER TABLE {table_name} ADD COLUMN `rug_id` VARCHAR(50) NULL DEFAULT NULL'
self.__logmessage(sql)
@ -598,11 +606,15 @@ class dHealthImport():
counter = 0
for account in accounts:
counter += 1
source_id = 'netivity_legacy_id' if 'legacy' in table_name else 'menzis_id'
source_value = account[1] if 'legacy' in table_name else account[0]
if source_value is None:
# This happens due to the fact, that Netivity users are sharing their Dacadoo account. We cannot handle that.
print(account)
continue
sql = f'UPDATE {table_name} SET rug_id = \'{account[3]}\' WHERE {source_id} = {source_value}'
self.__logmessage(sql)
sql = f'UPDATE {table_name} SET rug_id = \'{account[4]}\' WHERE accountid = {source_value}'
#self.__logmessage(sql)
updates = conn.execute(sql)
self.__logmessage(f'[{counter} off {total_accounts}({(counter/total_accounts)*100:.2f}%)]: Updated {updates.rowcount} records for account id {source_value} in table {table_name}')
@ -640,99 +652,99 @@ class dHealthImport():
# self.__logmessage(sql)
# conn.execute(sql)
tables_to_process = []
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
# Get all the Dacadoo tables which holds ids in field name userId
table_name = 'dacadoo_user'
try:
sql = f'ALTER TABLE {table_name} ADD COLUMN `rug_id` VARCHAR(50) NULL DEFAULT NULL'
self.__logmessage(sql)
conn.execute(sql)
except Exception:
pass
sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.id LIMIT 1)'
self.__logmessage(sql)
conn.execute(sql)
id_field_name = 'userId'
self.__drop_fields.append(id_field_name)
sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
result = conn.execute(sql)
for row in result:
tables_to_process.append(row[0])
# Here we loop over the tables that needs to be changed. We open a new DB connection for every table update.
# This will hopefully reduce the undo log and commit earlier the changes
for table_name in tables_to_process:
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
try:
sql = f'ALTER TABLE {table_name} DROP COLUMN rug_id'
self.__logmessage(sql)
conn.execute(sql)
except Exception:
pass
try:
sql = f'ALTER TABLE {table_name} DROP INDEX rug_id'
#self.__logmessage(sql)
conn.execute(sql)
except Exception:
pass
try:
sql = f'ALTER TABLE {table_name} ADD COLUMN rug_id VARCHAR(50) NULL DEFAULT NULL'
self.__logmessage(sql)
conn.execute(sql)
except Exception:
pass
sql = f'FLUSH TABLES'
#self.__logmessage(sql)
conn.execute(sql)
batch_size = 100000
amount_of_records = round(self.__record_count(table_name)/batch_size)+1
for i in range(amount_of_records):
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL LIMIT {batch_size}'
self.__logmessage(f'({i+1}/{amount_of_records}) {sql}')
try:
result = conn.execute(sql)
except Exception as ex:
result = conn.execute(sql)
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
sql = f'DROP TABLE {new_user_table}'
self.__logmessage(sql)
conn.execute(sql)
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
id_field_name = 'rug_id'
sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
result = conn.execute(sql)
for row in result:
table_name = row[0]
sql = f'DELETE FROM {table_name} WHERE {id_field_name} IS NULL'
self.__logmessage(sql)
conn.execute(sql)
# Special case. These are the original Dacadoo ids. Only in the user table they should be deleted.
with self.db.connect() as conn, conn.begin():
self.__init_mysql_connection(conn)
drop_field = 'id'
table_name = 'dacadoo_user'
self.__logmessage(f'Dropping field {drop_field} from table {table_name}')
sql = f'ALTER TABLE {table_name} DROP {drop_field}'
result = conn.execute(sql)
self.__logmessage(f'Dropped field {drop_field} from table {table_name}')
## tables_to_process = []
## with self.db.connect() as conn, conn.begin():
## self.__init_mysql_connection(conn)
## # Get all the Dacadoo tables which holds ids in field name userId
## table_name = 'dacadoo_user'
## try:
## sql = f'ALTER TABLE {table_name} ADD COLUMN `rug_id` VARCHAR(50) NULL DEFAULT NULL'
## self.__logmessage(sql)
## conn.execute(sql)
## except Exception:
## pass
## sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.id LIMIT 1)'
## self.__logmessage(sql)
## conn.execute(sql)
##
## id_field_name = 'userId'
## self.__drop_fields.append(id_field_name)
## sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
## result = conn.execute(sql)
## for row in result:
## tables_to_process.append(row[0])
##
## # Here we loop over the tables that needs to be changed. We open a new DB connection for every table update.
## # This will hopefully reduce the undo log and commit earlier the changes
## for table_name in tables_to_process:
## with self.db.connect() as conn, conn.begin():
## self.__init_mysql_connection(conn)
## try:
## sql = f'ALTER TABLE {table_name} DROP COLUMN rug_id'
## self.__logmessage(sql)
## conn.execute(sql)
## except Exception:
## pass
##
## try:
## sql = f'ALTER TABLE {table_name} DROP INDEX rug_id'
## #self.__logmessage(sql)
## conn.execute(sql)
## except Exception:
## pass
##
## try:
## sql = f'ALTER TABLE {table_name} ADD COLUMN rug_id VARCHAR(50) NULL DEFAULT NULL'
## self.__logmessage(sql)
## conn.execute(sql)
## except Exception:
## pass
##
## sql = f'FLUSH TABLES'
## #self.__logmessage(sql)
## conn.execute(sql)
##
## batch_size = 100000
## amount_of_records = round(self.__record_count(table_name)/batch_size)+1
##
## for i in range(amount_of_records):
## with self.db.connect() as conn, conn.begin():
## self.__init_mysql_connection(conn)
##
## sql = f'UPDATE {table_name} SET rug_id = (SELECT rug_id FROM {new_user_table} WHERE {new_user_table}.dacadoo_id = {table_name}.{id_field_name} LIMIT 1) WHERE rug_id IS NULL LIMIT {batch_size}'
## self.__logmessage(f'({i+1}/{amount_of_records}) {sql}')
## try:
## result = conn.execute(sql)
## except Exception as ex:
## result = conn.execute(sql)
##
## with self.db.connect() as conn, conn.begin():
## self.__init_mysql_connection(conn)
## sql = f'DROP TABLE {new_user_table}'
## self.__logmessage(sql)
## conn.execute(sql)
##
## with self.db.connect() as conn, conn.begin():
## self.__init_mysql_connection(conn)
## id_field_name = 'rug_id'
## sql = f'SELECT table_name FROM information_schema.COLUMNS WHERE COLUMN_NAME = \'{id_field_name}\' AND TABLE_SCHEMA = \'{self.__temp_db_name}\' ORDER BY table_name'
## result = conn.execute(sql)
## for row in result:
## table_name = row[0]
## sql = f'DELETE FROM {table_name} WHERE {id_field_name} IS NULL'
## self.__logmessage(sql)
## conn.execute(sql)
##
## # Special case. These are the original Dacadoo ids. Only in the user table they should be deleted.
## with self.db.connect() as conn, conn.begin():
## self.__init_mysql_connection(conn)
## drop_field = 'id'
## table_name = 'dacadoo_user'
## self.__logmessage(f'Dropping field {drop_field} from table {table_name}')
## sql = f'ALTER TABLE {table_name} DROP {drop_field}'
## result = conn.execute(sql)
## self.__logmessage(f'Dropped field {drop_field} from table {table_name}')
##
def drop_fields(self, drop = True):
@ -776,7 +788,7 @@ class dHealthImport():
for row in result:
table_name = row[0]
self.__logmessage(f'Updateing postal code field {postal_code_field} for table {table_name}')
sql = f'UPDATE {table_name} SET {postal_code_field} = SUBSTRING({postal_code_field},0,4)'
sql = f'UPDATE {table_name} SET {postal_code_field} = SUBSTRING({postal_code_field},1,4)'
result = conn.execute(sql)
self.__logmessage(f'Updated postal code field {postal_code_field} for table {table_name}')
@ -816,36 +828,46 @@ class dHealthImport():
print('')
def addDacadooData(self,location):
location = Path(location)
if location.exists() and location.is_dir():
self.__source_folders['dacadoo'] = location
self.__load_files()
else:
raise RuntimeError(f'Location {location} is not a valid Dacadoo source location')
for location in location.split(','):
location = Path(location)
if location.exists() and location.is_dir():
self.__source_folders['dacadoo'].append(location)
else:
print(f'Location {location} is not a valid Dacadoo source location')
self.__load_files()
def addMenzisData(self,location):
location = Path(location)
if location.exists() and location.is_dir():
self.__source_folders['menzis'] = location
self.__load_files()
else:
raise RuntimeError(f'Location {location} is not a valid Menzis source location')
for location in location.split(','):
location = Path(location)
if location.exists() and location.is_dir():
self.__source_folders['menzis'].append(location)
else:
print(f'Location {location} is not a valid Menzis source location')
self.__load_files()
def addNetivityData(self,location):
location = Path(location)
if location.exists() and location.is_dir():
self.__source_folders['netivity'] = location
self.__load_files()
else:
raise RuntimeError(f'Location {location} is not a valid Netivity source location')
for location in location.split(','):
location = Path(location)
if location.exists() and location.is_dir():
self.__source_folders['netivity'].append(location)
else:
print(f'Location {location} is not a valid Netivity source location')
self.__load_files()
def addVektisData(self,location):
location = Path(location)
if location.exists() and location.is_dir():
self.__source_folders['vektis'] = location
self.__load_files()
else:
raise RuntimeError(f'Location {location} is not a valid Vektis source location')
for location in location.split(','):
location = Path(location)
if location.exists() and location.is_dir():
self.__source_folders['vektis'].append(location)
else:
print(f'Location {location} is not a valid Vektis source location')
self.__load_files()
if __name__ == "__main__":
@ -889,11 +911,11 @@ if __name__ == "__main__":
except KeyError as ex:
print('Not exporting CSV data')
#importer.run(True)
#importer.filter_on_consent()
# importer.create_new_rug_ids()
#importer.clean_birth_days()
#importer.clean_postal_codes()
#importer.drop_fields()
importer.run(reinit_db=True)
importer.filter_on_consent()
#importer.create_new_rug_ids()
importer.clean_birth_days()
importer.clean_postal_codes()
importer.drop_fields()
importer.create_csv_exports()
importer.create_csv_exports(False,False)
Loading…
Cancel
Save