You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

406 lines
14 KiB

import os
os.chdir(r'C:\Users\Aki\source\repos\acoustic_model\acoustic_model')
import sys
from collections import Counter
import pickle
import numpy as np
import pandas as pd
import defaultfiles as default
import convert_phoneset
from phoneset import fame_ipa, fame_asr
sys.path.append(default.toolbox_dir)
from htk import pyhtk
#def read_fileFA(fileFA):
# """
# read the result file of HTK forced alignment.
# this function only works when input is one word.
# """
# with open(fileFA, 'r') as f:
# lines = f.read()
# lines = lines.split('\n')
# phones = []
# for line in lines:
# line_split = line.split()
# if len(line_split) > 1:
# phones.append(line_split[2])
# return ' '.join(phones)
#def fame_pronunciation_variant(ipa):
# ipa = ipa.replace('æ', 'ɛ')
# ipa = ipa.replace('ɐ', 'a')
# ipa = ipa.replace('ɑ', 'a')
# ipa = ipa.replace('ɾ', 'r')
# ipa = ipa.replace('ɹ', 'r') # ???
# ipa = ipa.replace('ʁ', 'r')
# ipa = ipa.replace('ʀ', 'r') # ???
# ipa = ipa.replace('ʊ', 'u')
# ipa = ipa.replace('χ', 'x')
# pronvar_list = [ipa]
# while 'ø:' in ' '.join(pronvar_list) or 'œ' in ' '.join(pronvar_list) or 'ɒ' in ' '.join(pronvar_list):
# pronvar_list_ = []
# for p in pronvar_list:
# if 'ø:' in p:
# pronvar_list_.append(p.replace('ø:', 'ö'))
# pronvar_list_.append(p.replace('ø:', 'ö:'))
# if 'œ' in p:
# pronvar_list_.append(p.replace('œ', 'ɔ̈'))
# pronvar_list_.append(p.replace('œ', 'ɔ̈:'))
# if 'ɒ' in p:
# pronvar_list_.append(p.replace('ɒ', 'ɔ̈'))
# pronvar_list_.append(p.replace('ɒ', 'ɔ̈:'))
# pronvar_list = np.unique(pronvar_list_)
# return pronvar_list
#def make_fame2ipa_variants(fame):
# fame = 'rɛös'
# ipa = [fame]
# ipa.append(fame.replace('ɛ', 'æ'))
# ipa.append(fame.replace('a', 'ɐ'))
# ipa.append(fame.replace('a', 'ɑ'))
# ipa.append(fame.replace('r', 'ɾ'))
# ipa.append(fame.replace('r', 'ɹ'))
# ipa.append(fame.replace('r', 'ʁ'))
# ipa.append(fame.replace('r', 'ʀ'))
# ipa.append(fame.replace('u', 'ʊ'))
# ipa.append(fame.replace('x', 'χ'))
# ipa.append(fame.replace('ö', 'ø:'))
# ipa.append(fame.replace('ö:', 'ø:'))
# ipa.append(fame.replace('ɔ̈', 'œ'))
# ipa.append(fame.replace('ɔ̈:', 'œ'))
# ipa.append(fame.replace('ɔ̈', 'ɒ'))
# ipa.append(fame.replace('ɔ̈:', 'ɒ'))
# return ipa
#def make_htk_dict(word, pronvar_, fileDic, output_type):
# """
# make dict files which can be used for HTK.
# param word: target word.
# param pronvar_: pronunciation variant. nx2 (WORD /t pronunciation) ndarray.
# param fileDic: output dic file.
# param output_type: 0:full, 1:statistics, 2:frequency <2% entries are removed. 3:top 3.
# """
# #assert(output_type < 4 and output_type >= 0, 'output_type should be an integer between 0 and 3.')
# WORD = word.upper()
# if output_type == 0: # full
# pronvar = np.unique(pronvar_)
# with open(fileDic, 'w') as f:
# for pvar in pronvar:
# f.write('{0}\t{1}\n'.format(WORD, pvar))
# else:
# c = Counter(pronvar_)
# total_num = sum(c.values())
# with open(fileDic, 'w') as f:
# if output_type == 3:
# for key, value in c.most_common(3):
# f.write('{0}\t{1}\n'.format(WORD, key))
# else:
# for key, value in c.items():
# percentage = value/total_num*100
# if output_type == 1: # all
# f.write('{0}\t{1:.2f}\t{2}\t{3}\n'.format(value, percentage, WORD, key))
# elif output_type == 2: # less than 2 percent
# if percentage < 2:
# f.write('{0}\t{1}\n'.format(WORD, key))
def make_hcopy_scp_from_filelist_in_fame(fame_dir, dataset, feature_dir, hcopy_scp):
""" Make a script file for HCopy using the filelist in FAME! corpus.
Args:
fame_dir (path): the directory of FAME corpus.
dataset (str): 'devel', 'test' or 'train'.
feature_dir (path): the directory where feature will be stored.
hcopy_scp (path): a script file for HCopy to be made.
"""
filelist_txt = os.path.join(fame_dir, 'fame', 'filelists', dataset + 'list.txt')
with open(filelist_txt) as fin:
filelist = fin.read()
filelist = filelist.split('\n')
with open(hcopy_scp, 'w') as fout:
for filename_ in filelist:
filename = filename_.replace('.TextGrid', '')
if len(filename) > 3: # remove '.', '..' and ''
wav_file = os.path.join(fame_dir, 'fame', 'wav', dataset, filename + '.wav')
mfc_file = os.path.join(feature_dir, filename + '.mfc')
fout.write(wav_file + '\t' + mfc_file + '\n')
return
def load_lexicon(lexicon_file):
""" load lexicon file as data frame.
Args:
lexicon_file (path): lexicon in the format of 'word' /t 'pronunciation'.
Returns:
lex (df): lexicon as Data Frame, which has columns 'word' and 'pronunciation'.
"""
lex = pd.read_csv(lexicon_file, delimiter='\t', header=None, encoding="utf-8")
lex.rename(columns={0: 'word', 1: 'pronunciation'}, inplace=True)
return lex
def get_phoneset_from_lexicon(lexicon_file, phoneset_name='asr'):
""" Make a list of phones which appears in the lexicon.
Args:
lexicon_file (path): lexicon in the format of 'word' /t 'pronunciation'.
phoneset_name (str): the name of phoneset with which lexicon_file is written. 'asr'(default) or 'ipa'.
Returns:
(list_of_phones) (set): the set of phones included in the lexicon_file.
"""
assert phoneset_name in ['asr', 'ipa'], 'phoneset_name should be \'asr\' or \'ipa\''
lex = load_lexicon(lexicon_file)
if phoneset_name == 'asr':
return set(' '.join(lex['pronunciation']).split(' '))
elif phoneset_name == 'ipa':
join_pronunciations = ''.join(lex['pronunciation'])
return set(convert_phone_set.split_word(join_pronunciations, fame_ipa.multi_character_phones))
return
def extract_unknown_phones(ipa, known_phones):
"""extract unknown phones in the pronunciation written in IPA.
Args:
ipa (str): a pronunciation written in IPA.
known_phones (list): list of phones already know.
Returns:
(list_of_phones) (list): unknown phones not included in 'known_phones'.
"""
ipa_split = convert_phone_set.split_word(ipa, fame_ipa.multi_character_phones)
return [i for i in ipa_split if not i in known_phones]
def get_translation_key(lexicon_file_ipa, lexicon_file_asr):
""" get correspondence between lexicon_file_ipa and lexicon_file_asr.
Args:
lexicon_file_ipa (path): lexicon in the format of 'word' /t 'pronunciation (IPA)'.
lexicon_file_asr (path): lexicon in the format of 'word' /t 'pronunciation (asr)'.
the each character of 'pronunciation' should be delimited by ' '.
Returns:
translation_key (dict): translation key from ipa to asr.
(phone_unknown) (list): the list of IPA phones, which does not appear in lexicon_file_asr.
"""
lex_ipa = load_lexicon(lexicon_file_ipa)
lex_asr = load_lexicon(lexicon_file_asr)
phone_unknown = fame_ipa.phoneset[:]
translation_key = dict()
for word in lex_ipa['word']:
if np.sum(lex_ipa['word'] == word) == 1 and np.sum(lex_asr['word'] == word) == 1:
ipa = lex_ipa[lex_ipa['word'] == word].iat[0, 1]
asr = lex_asr[lex_asr['word'] == word].iat[0, 1]
ipa_list = convert_phone_set.split_word(ipa, fame_ipa.multi_character_phones)
asr_list = asr.split(' ')
# if there are phones which is not in phone_unknown
#if len([True for i in asr_list if i in phone_unknown]) > 0:
if(len(ipa_list) == len(asr_list)):
print("{0}: {1} --> {2}".format(word, ipa_list, asr_list))
for ipa_, asr_ in zip(ipa_list, asr_list):
if ipa_ in phone_unknown:
translation_key[ipa_] = asr_
phone_unknown.remove(ipa_)
return translation_key, list(phone_unknown)
def find_phone(lexicon_file, phone, phoneset_name='ipa'):
""" extract rows where the phone is used in the lexicon_file.
Args:
lexicon_file (path): lexicon in the format of 'word' /t 'pronunciation'.
phone (str): the phone to be searched.
phoneset_name (str): the name of phoneset_name with which lexicon_file is written. 'asr' or 'ipa'(default).
Returns:
extracted (df): rows where the phone is used.
ToDo:
* develop when the phonset == 'asr'.
"""
assert phoneset_name in ['asr', 'ipa'], 'phoneset_name should be \'asr\' or \'ipa\''
lex = load_lexicon(lexicon_file)
# to reduce the calculation time, only target rows which include 'phone' at least once.
lex_ = lex[lex['pronunciation'].str.count(phone)>0]
extracted = pd.DataFrame(index=[], columns=['word', 'pronunciation'])
for index, row in lex_.iterrows():
if phoneset_name == 'ipa':
pronunciation = convert_phone_set.split_word(row['pronunciation'], fame_ipa.multi_character_phones)
if phone in pronunciation:
extracted_ = pd.Series([row['word'], pronunciation], index=extracted.columns)
extracted = extracted.append(extracted_, ignore_index=True)
return extracted
def asr2htk_space_delimited(pronunciation):
"""convert phoneset from asr to htk.
Args:
pronunciation (str): space delimited asr phones.
Returns:
(pronunciation) (str): space delimited asr phones in htk format (ascii).
"""
pronunciation_short = [fame_asr.reduction_key.get(i, i) for i in pronunciation.split(' ')
if not i in fame_asr.phones_to_be_removed]
return ' '.join(convert_phoneset.convert_phoneset(
pronunciation_short, fame_asr.translation_key_asr2htk))
def lexicon_asr2htk(lexicon_file_asr, lexicon_file_htk):
""" Convert a lexicon file from asr to htk format (ascii).
Args:
lexicon_file_asr (path): a lexicon file written in asr format e.g. fame/lex.asr.
lexicon_file_htk (path): a lexicon file written in htk format (ascii).
"""
lex_asr = load_lexicon(lexicon_file_asr)
def word2htk_(row):
return word2htk(row['word'])
def asr2htk_space_delimited_(row):
return asr2htk_space_delimited(row['pronunciation'])
lex_htk = pd.DataFrame({
'word': lex_asr.apply(word2htk_, axis=1).str.upper(),
'pronunciation': lex_asr.apply(asr2htk_space_delimited_, axis=1)
})
lex_htk = lex_htk.ix[:, ['word', 'pronunciation']]
lex_htk.to_csv(lexicon_file_htk, header=None, index=None, sep='\t', encoding='utf-8')
return
def combine_lexicon(lexicon_file1, lexicon_file2, lexicon_out):
""" Combine two lexicon files and sort by words.
Args:
lexicon_file1, lexicon_file2 (path): input lexicon files.
Returns:
lexicon_file_out (path): lexicon_file which lexcion_file1 and 2 are combined and sorted.
"""
lex1 = load_lexicon(lexicon_file1)
lex2 = load_lexicon(lexicon_file2)
lex = pd.concat([lex1, lex2])
lex = lex.sort_values(by='word', ascending=True)
lex.to_csv(lexicon_out, index=False, header=False, sep='\t', encoding='utf-8')
def fix_lexicon(lexicon_file):
""" fix lexicon
- add '\' before all single quote at the beginning of words.
- convert special characters to ascii compatible characters.
- add silence.
Args:
lexicon_file (path): lexicon file, which will be overwitten.
"""
lex = load_lexicon(lexicon_file)
lex = lex.dropna() # remove N/A.
# add 'sil'
row = pd.Series(['SILENCE', 'sil'], index=lex.columns)
lex = lex.append(row, ignore_index=True)
lex = lex.sort_values(by='word', ascending=True)
for i in lex[lex['word'].str.startswith('\'')].index.values:
lex.iat[i, 0] = lex.iat[i, 0].replace('\'', '\\\'')
# to_csv does not work with space seperator. therefore all tabs should manually be replaced.
#lex.to_csv(lexicon_file, index=False, header=False, encoding="utf-8", sep=' ', quoting=csv.QUOTE_NONE, escapechar='\\')
lex.to_csv(lexicon_file, index=False, header=False, sep='\t', encoding='utf-8')
return
def word2htk(word):
return ''.join([fame_asr.translation_key_word2htk.get(i, i) for i in word])
def ipa2asr(ipa):
curr_dir = os.path.dirname(os.path.abspath(__file__))
translation_key_ipa2asr = np.load(os.path.join(curr_dir, 'phoneset', 'fame_ipa2asr.npy')).item(0)
#ipa_ = fame_asr.phone_reduction(ipa)
ipa_splitted = convert_phoneset.split_word(ipa, fame_ipa.multi_character_phones)
ipa_splitted = fame_ipa.phone_reduction(ipa_splitted)
asr_splitted = convert_phoneset.convert_phoneset(ipa_splitted, translation_key_ipa2asr)
asr_splitted = fame_asr.phone_reduction(asr_splitted)
return ''.join(asr_splitted)
def ipa2htk(ipa):
curr_dir = os.path.dirname(os.path.abspath(__file__))
translation_key_ipa2asr = np.load(os.path.join(curr_dir, 'phoneset', 'fame_ipa2asr.npy')).item(0)
#translation_key_ipa2asr = np.load(r'c:\Users\Aki\source\repos\acoustic_model\acoustic_model\phoneset\fame_ipa2asr.npy').item(0)
ipa_splitted = convert_phoneset.split_word(ipa, fame_ipa.multi_character_phones)
ipa_splitted = fame_ipa.phone_reduction(ipa_splitted)
asr_splitted = convert_phoneset.convert_phoneset(ipa_splitted, translation_key_ipa2asr)
asr_splitted = fame_asr.phone_reduction(asr_splitted)
htk_splitted = convert_phoneset.convert_phoneset(asr_splitted, fame_asr.translation_key_asr2htk)
return ''.join(htk_splitted)
def performance_on_stimmen(config_dir, stimmen_dir, hmmdefs):
lattice_file = os.path.join(stimmen_dir, 'word_lattice.ltc')
hvite_scp = os.path.join(stimmen_dir, 'hvite.scp')
#fh.make_filelist(os.path.join(stimmen_dir, 'mfc'), hvite_scp, file_type='mfc')
hresult_scp = os.path.join(stimmen_dir, 'hresult.scp')
#fh.make_filelist(os.path.join(stimmen_dir, 'mfc'), hresult_scp, file_type='rec')
lexicon_file = os.path.join(stimmen_dir, 'lexicon_recognition.dic')
# get feature_size from hmmdefs.
with open(hmmdefs) as f:
line = f.readline()
line = f.readline().strip()
feature_size = int(line.split(' ')[2])
chtk = pyhtk.HTK(config_dir, fame_asr.phoneset_htk, lexicon_file, feature_size)
result = chtk.recognition(
lattice_file,
hmmdefs,
hvite_scp
)
per_sentence, per_word = chtk.calc_recognition_performance(hresult_scp)
return per_sentence['accuracy']