You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
289 lines
11 KiB
Python
289 lines
11 KiB
Python
#!/home/agropunx/anaconda3/envs/geografia/bin/python
|
|
|
|
import time, subprocess, copy,os, sys, numpy as np, pandas as pd
|
|
import config
|
|
import spacy
|
|
|
|
def spoken_kind(doc):
|
|
interrogative = '?' in doc.text.strip()[-4:]
|
|
if interrogative:
|
|
sub_kind = [ 'closed','open'][np.sum([tok.text in config.open_question_words for tok in doc]) >= 1]
|
|
out = f"interrogative_{sub_kind}"
|
|
else:
|
|
out = ['affirmative', 'negative'][np.sum([tok.text in config.negative_words for tok in doc]) >= 1]
|
|
return out
|
|
|
|
def spoken_subject(doc):
|
|
for token in doc:
|
|
if "subj" in token.dep_:
|
|
subtree = list(token.subtree)
|
|
start = subtree[0].i
|
|
end = subtree[-1].i + 1
|
|
return doc[start:end]
|
|
|
|
def spoken_dobject(doc):
|
|
for token in doc:
|
|
if "dobj" in token.dep_:
|
|
subtree = list(token.subtree)
|
|
start = subtree[0].i
|
|
end = subtree[-1].i + 1
|
|
return doc[start:end]
|
|
|
|
def spoken_verb(doc):
|
|
verb_related=[]
|
|
for i,token in enumerate(doc):
|
|
if token.dep_ in ['ROOT','cop', 'advmod','aux']:
|
|
if token.text not in config.time_terms+config.negative_words:
|
|
verb_related.append(i)
|
|
return [doc[i] for i in verb_related]
|
|
|
|
def spoken_time(doc):
|
|
for ent in doc.ents:
|
|
if ent.label_ in ['DATE', 'TIME']:
|
|
return doc[ent.start:ent.end]
|
|
for i,tok in enumerate(doc):
|
|
if tok.text in config.time_terms:
|
|
return doc[i]
|
|
|
|
def spoken_negative(doc):
|
|
for i,tok in enumerate(doc):
|
|
if tok.text in config.negative_words:
|
|
return i
|
|
|
|
|
|
spacy.tokens.Doc.set_extension("kind", getter=lambda doc: spoken_kind(doc), force=True)
|
|
spacy.tokens.Doc.set_extension("subject", getter=lambda doc: spoken_subject(doc), force=True)
|
|
spacy.tokens.Doc.set_extension("dobject", getter=lambda doc: spoken_dobject(doc), force=True)
|
|
spacy.tokens.Doc.set_extension("verb", getter=lambda doc: spoken_verb(doc), force=True)
|
|
spacy.tokens.Doc.set_extension("time", getter=lambda doc: spoken_time(doc), force=True)
|
|
spacy.tokens.Doc.set_extension("negative", getter=lambda doc: spoken_negative(doc), force=True)
|
|
|
|
def bash_command(cmd):
|
|
subprocess.Popen(cmd, shell=True, executable='/bin/bash')
|
|
|
|
class CCQ:
|
|
|
|
def __init__(self, model_name ="it_core_news_lg", ner_model_name="it_nerIta_trf"):
|
|
if ner_model_name:
|
|
self.nlp = spacy.load(model_name, exclude=['ner'])
|
|
self.ner = spacy.load(ner_model_name)
|
|
else:
|
|
self.nlp = spacy.load(model_name)
|
|
self.ner = None
|
|
|
|
def analyze(self,sentence,mode='spoken'):
|
|
if mode=='spoken':
|
|
|
|
doc = self.nlp(sentence)
|
|
if self.ner:
|
|
nerdoc= self.ner(doc)
|
|
doc.ents = nerdoc.ents
|
|
doc._.time = nerdoc._.time
|
|
elif mode=='gloss':
|
|
doc=sentence
|
|
else:
|
|
raise Exception(f'unknown mode {mode}')
|
|
return doc
|
|
|
|
def check_feasibility(self, doc,valid_tokens, mode='spoken', rules= config.rules):
|
|
rules = rules[mode]
|
|
if mode=='spoken':
|
|
if isinstance(doc, str):
|
|
doc = self.nlp(doc)
|
|
status = {
|
|
'max_tokens': len(doc)<rules['max_tokens'],
|
|
'valid_tokens' : sum(valid_tokens)<rules['valid_tokens'],
|
|
'punct':sum([t.is_punct for t in doc])<=1
|
|
}
|
|
if doc._.negative:
|
|
status['negative_interrogative'] = 'interrogative' not in doc._.kind
|
|
if doc._.verb:
|
|
status['verb'] =len(doc._.verb)<rules['max_verb_terms']
|
|
if doc._.time:
|
|
status['time'] = len(doc._.time.text.split())<rules['max_time_terms']
|
|
if doc._.subject:
|
|
status['subject'] = len(doc._.subject.text.split())<rules['max_subject_terms']
|
|
elif mode=='gloss':
|
|
status = {
|
|
'subject' : sum([t[1]=='subject' for t in doc]) < rules['subject'],
|
|
'verb': sum([t[1]=='verb' for t in doc]) < rules['verb'],
|
|
}
|
|
else:
|
|
raise Exception(f'unknown mode {mode}')
|
|
|
|
status['glob'] = sum(list(status.values())) == len(status)
|
|
|
|
return status
|
|
|
|
def check_kind(self, doc,mode='spoken'):
|
|
if mode=='spoken':
|
|
if isinstance(doc,str):
|
|
doc=self.nlp(doc)
|
|
out = doc._.kind
|
|
elif mode=='gloss':
|
|
if sum([True for t in doc if '?' in t[0]])>=1:
|
|
if sum([True for t in doc if t[0] in config.open_question_words])>=1:
|
|
out = 'interrogative_open'
|
|
else:
|
|
out = 'interrogative_closed'
|
|
else:
|
|
if sum([True for t in doc if t[0] in config.negative_words]) >= 1:
|
|
out = 'negative'
|
|
else:
|
|
out = 'affirmative'
|
|
else:
|
|
raise Exception(f'unknown mode {mode}')
|
|
return out
|
|
|
|
def tok_validator(self, doc, mode='spoken'):
|
|
if mode=='spoken':
|
|
if isinstance(doc,str):
|
|
doc=self.nlp(doc)
|
|
valid_tokens = [not((tok.pos_ in ['DET','ADP']) or (tok.text in config.spoken_stopwords) or (tok.is_punct) or (tok.text=='?')) for tok in doc]#'AUX'
|
|
elif mode=='gloss':
|
|
valid_tokens = [True for _ in doc]
|
|
else:
|
|
raise Exception(f'unknown mode {mode}')
|
|
return valid_tokens
|
|
|
|
def reorder(self,doc, valid_tokens=[], direction='gloss2spoken', kind='affirmative'):
|
|
reordered = []
|
|
tail_idx = []
|
|
idx_done = np.array([False for _ in range(len(doc))])
|
|
|
|
if direction=='spoken2gloss':
|
|
# SOV -> SVO
|
|
assert isinstance(doc,spacy.tokens.doc.Doc), type(doc)
|
|
assert doc._.kind in ['affirmative', 'negative', 'interrogative_open', 'interrogative_closed'], doc._.kind
|
|
|
|
if len(valid_tokens)==0:
|
|
valid_tokens = [True for _ in doc]
|
|
|
|
if doc._.time:
|
|
if isinstance(doc._.time,spacy.tokens.span.Span):
|
|
tidx = [t for t in range(doc._.time.start, doc._.time.end)]
|
|
idx_done[tidx] = True
|
|
else:
|
|
tidx = [i for i,t in enumerate(doc) if t.idx==doc._.time.idx]
|
|
idx_done[tidx[0]] = True
|
|
reordered += [t for t in tidx if valid_tokens[t]]
|
|
|
|
if 'interrogative' in kind:
|
|
if 'open' in kind:
|
|
open_question_term_idx = [i for i,tok in enumerate(doc) if tok.text in config.open_question_words][0]
|
|
if doc._.subject:
|
|
if isinstance(doc._.subject, spacy.tokens.span.Span):
|
|
tidx = [t for t in range(doc._.subject.start, doc._.subject.end) if t!=open_question_term_idx]
|
|
idx_done[tidx] = True
|
|
else:
|
|
tidx = [i for i, t in enumerate(doc) if t.idx == doc._.subject.idx and i!=open_question_term_idx]
|
|
idx_done[tidx[0]] = True
|
|
reordered += [t for t in tidx if valid_tokens[t]]
|
|
if doc._.verb:
|
|
substart = [t.idx for t in doc._.verb]
|
|
tidx = [i for i, tok in enumerate(doc) if tok.idx in substart and i!=open_question_term_idx]
|
|
reordered += [t for t in tidx if valid_tokens[t]]
|
|
idx_done[tidx] = True
|
|
else:
|
|
if doc._.subject:
|
|
if isinstance(doc._.subject, spacy.tokens.span.Span):
|
|
tidx = [t for t in range(doc._.subject.start, doc._.subject.end)]
|
|
idx_done[tidx] = True
|
|
else:
|
|
tidx = [i for i, t in enumerate(doc) if t.idx == doc._.subject.idx]
|
|
idx_done[tidx[0]] = True
|
|
tail_idx += [t for t in tidx if valid_tokens[t]]
|
|
|
|
if doc._.verb:
|
|
substart = [t.idx for t in doc._.verb]
|
|
tidx = [i for i, tok in enumerate(doc) if tok.idx in substart]
|
|
tail_idx = [t for t in tidx if valid_tokens[t]]
|
|
idx_done[tidx] = True
|
|
else:
|
|
if doc._.subject:
|
|
if isinstance(doc._.subject, spacy.tokens.span.Span):
|
|
tidx = [t for t in range(doc._.subject.start, doc._.subject.end)]
|
|
idx_done[tidx] = True
|
|
else:
|
|
tidx = [i for i, t in enumerate(doc) if t.idx == doc._.subject.idx]
|
|
idx_done[tidx[0]] = True
|
|
|
|
reordered += [t for t in tidx if valid_tokens[t]]
|
|
|
|
if doc._.verb:
|
|
substart = [t.idx for t in doc._.verb]
|
|
tidx = [i for i, tok in enumerate(doc) if tok.idx in substart]
|
|
tail_idx += [t for t in tidx if valid_tokens[t]]
|
|
idx_done[tidx] = True
|
|
|
|
if doc._.negative:
|
|
tidx = doc._.negative
|
|
tail_idx += [tidx]
|
|
idx_done[tidx] = True
|
|
|
|
for i, done in enumerate(idx_done):
|
|
if not done:
|
|
if valid_tokens[i]:
|
|
reordered.append(i)
|
|
idx_done[i] = True
|
|
reordered += tail_idx
|
|
assert sum(idx_done)==len(doc)
|
|
reordered = [doc[r].lemma_.lower() for r in reordered]
|
|
if doc._.negative:
|
|
reordered[-1]='no'
|
|
else:
|
|
if 'interrogative' in kind:
|
|
if 'open' in kind:
|
|
wh = doc[open_question_term_idx].text.lower()
|
|
if wh!=reordered[-1]:
|
|
reordered.append(wh)
|
|
reordered.append('?')
|
|
|
|
elif direction=='gloss2spoken':
|
|
# SVO -> SOV
|
|
for i,t in enumerate(doc):
|
|
if t[1]=='subject':
|
|
reordered.append(t[0])
|
|
idx_done[i]=True
|
|
|
|
for i,t in enumerate(doc):
|
|
if t[0] in config.open_question_words:
|
|
reordered.append(t[0])
|
|
idx_done[i]=True
|
|
|
|
for i,t in enumerate(doc):
|
|
if t[1]=='verb':
|
|
reordered.append(t[0])
|
|
idx_done[i]=True
|
|
|
|
for i,t in enumerate(doc):
|
|
if not idx_done[i]:
|
|
reordered.append(t[0])
|
|
idx_done[i]=True
|
|
|
|
|
|
else:
|
|
raise Exception(f'unknown direction {direction}')
|
|
assert len(doc)==sum(idx_done)
|
|
return reordered
|
|
|
|
def translate(self,sentence, direction='spoken2gloss'):
|
|
assert direction in ['spoken2gloss','gloss2spoken'], direction
|
|
mode = direction.split('2')[0]
|
|
|
|
doc = self.analyze(sentence, mode=mode)
|
|
valid_tokens = self.tok_validator(doc, mode=mode)
|
|
kind = self.check_kind(doc, mode)
|
|
feasibility = self.check_feasibility(doc,valid_tokens,mode=mode)
|
|
|
|
if not feasibility['glob']:
|
|
translated = ''
|
|
info = f"failed {[k for k,v in feasibility.items() if k!='glob' and not v]}"
|
|
else:
|
|
info=''
|
|
translated = self.reorder(doc, valid_tokens=valid_tokens, direction=direction, kind=kind)
|
|
|
|
success = len(info)==0
|
|
|
|
return translated, success, info
|