You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

289 lines
11 KiB
Python

#!/home/agropunx/anaconda3/envs/geografia/bin/python
import time, subprocess, copy,os, sys, numpy as np, pandas as pd
import config
import spacy
def spoken_kind(doc):
interrogative = '?' in doc.text.strip()[-4:]
if interrogative:
sub_kind = [ 'closed','open'][np.sum([tok.text in config.open_question_words for tok in doc]) >= 1]
out = f"interrogative_{sub_kind}"
else:
out = ['affirmative', 'negative'][np.sum([tok.text in config.negative_words for tok in doc]) >= 1]
return out
def spoken_subject(doc):
for token in doc:
if "subj" in token.dep_:
subtree = list(token.subtree)
start = subtree[0].i
end = subtree[-1].i + 1
return doc[start:end]
def spoken_dobject(doc):
for token in doc:
if "dobj" in token.dep_:
subtree = list(token.subtree)
start = subtree[0].i
end = subtree[-1].i + 1
return doc[start:end]
def spoken_verb(doc):
verb_related=[]
for i,token in enumerate(doc):
if token.dep_ in ['ROOT','cop', 'advmod','aux']:
if token.text not in config.time_terms+config.negative_words:
verb_related.append(i)
return [doc[i] for i in verb_related]
def spoken_time(doc):
for ent in doc.ents:
if ent.label_ in ['DATE', 'TIME']:
return doc[ent.start:ent.end]
for i,tok in enumerate(doc):
if tok.text in config.time_terms:
return doc[i]
def spoken_negative(doc):
for i,tok in enumerate(doc):
if tok.text in config.negative_words:
return i
spacy.tokens.Doc.set_extension("kind", getter=lambda doc: spoken_kind(doc), force=True)
spacy.tokens.Doc.set_extension("subject", getter=lambda doc: spoken_subject(doc), force=True)
spacy.tokens.Doc.set_extension("dobject", getter=lambda doc: spoken_dobject(doc), force=True)
spacy.tokens.Doc.set_extension("verb", getter=lambda doc: spoken_verb(doc), force=True)
spacy.tokens.Doc.set_extension("time", getter=lambda doc: spoken_time(doc), force=True)
spacy.tokens.Doc.set_extension("negative", getter=lambda doc: spoken_negative(doc), force=True)
def bash_command(cmd):
subprocess.Popen(cmd, shell=True, executable='/bin/bash')
class CCQ:
def __init__(self, model_name ="it_core_news_lg", ner_model_name="it_nerIta_trf"):
if ner_model_name:
self.nlp = spacy.load(model_name, exclude=['ner'])
self.ner = spacy.load(ner_model_name)
else:
self.nlp = spacy.load(model_name)
self.ner = None
def analyze(self,sentence,mode='spoken'):
if mode=='spoken':
doc = self.nlp(sentence)
if self.ner:
nerdoc= self.ner(doc)
doc.ents = nerdoc.ents
doc._.time = nerdoc._.time
elif mode=='gloss':
doc=sentence
else:
raise Exception(f'unknown mode {mode}')
return doc
def check_feasibility(self, doc,valid_tokens, mode='spoken', rules= config.rules):
rules = rules[mode]
if mode=='spoken':
if isinstance(doc, str):
doc = self.nlp(doc)
status = {
'max_tokens': len(doc)<rules['max_tokens'],
'valid_tokens' : sum(valid_tokens)<rules['valid_tokens'],
'punct':sum([t.is_punct for t in doc])<=1
}
if doc._.negative:
status['negative_interrogative'] = 'interrogative' not in doc._.kind
if doc._.verb:
status['verb'] =len(doc._.verb)<rules['max_verb_terms']
if doc._.time:
status['time'] = len(doc._.time.text.split())<rules['max_time_terms']
if doc._.subject:
status['subject'] = len(doc._.subject.text.split())<rules['max_subject_terms']
elif mode=='gloss':
status = {
'subject' : sum([t[1]=='subject' for t in doc]) < rules['subject'],
'verb': sum([t[1]=='verb' for t in doc]) < rules['verb'],
}
else:
raise Exception(f'unknown mode {mode}')
status['glob'] = sum(list(status.values())) == len(status)
return status
def check_kind(self, doc,mode='spoken'):
if mode=='spoken':
if isinstance(doc,str):
doc=self.nlp(doc)
out = doc._.kind
elif mode=='gloss':
if sum([True for t in doc if '?' in t[0]])>=1:
if sum([True for t in doc if t[0] in config.open_question_words])>=1:
out = 'interrogative_open'
else:
out = 'interrogative_closed'
else:
if sum([True for t in doc if t[0] in config.negative_words]) >= 1:
out = 'negative'
else:
out = 'affirmative'
else:
raise Exception(f'unknown mode {mode}')
return out
def tok_validator(self, doc, mode='spoken'):
if mode=='spoken':
if isinstance(doc,str):
doc=self.nlp(doc)
valid_tokens = [not((tok.pos_ in ['DET','ADP']) or (tok.text in config.spoken_stopwords) or (tok.is_punct) or (tok.text=='?')) for tok in doc]#'AUX'
elif mode=='gloss':
valid_tokens = [True for _ in doc]
else:
raise Exception(f'unknown mode {mode}')
return valid_tokens
def reorder(self,doc, valid_tokens=[], direction='gloss2spoken', kind='affirmative'):
reordered = []
tail_idx = []
idx_done = np.array([False for _ in range(len(doc))])
if direction=='spoken2gloss':
# SOV -> SVO
assert isinstance(doc,spacy.tokens.doc.Doc), type(doc)
assert doc._.kind in ['affirmative', 'negative', 'interrogative_open', 'interrogative_closed'], doc._.kind
if len(valid_tokens)==0:
valid_tokens = [True for _ in doc]
if doc._.time:
if isinstance(doc._.time,spacy.tokens.span.Span):
tidx = [t for t in range(doc._.time.start, doc._.time.end)]
idx_done[tidx] = True
else:
tidx = [i for i,t in enumerate(doc) if t.idx==doc._.time.idx]
idx_done[tidx[0]] = True
reordered += [t for t in tidx if valid_tokens[t]]
if 'interrogative' in kind:
if 'open' in kind:
open_question_term_idx = [i for i,tok in enumerate(doc) if tok.text in config.open_question_words][0]
if doc._.subject:
if isinstance(doc._.subject, spacy.tokens.span.Span):
tidx = [t for t in range(doc._.subject.start, doc._.subject.end) if t!=open_question_term_idx]
idx_done[tidx] = True
else:
tidx = [i for i, t in enumerate(doc) if t.idx == doc._.subject.idx and i!=open_question_term_idx]
idx_done[tidx[0]] = True
reordered += [t for t in tidx if valid_tokens[t]]
if doc._.verb:
substart = [t.idx for t in doc._.verb]
tidx = [i for i, tok in enumerate(doc) if tok.idx in substart and i!=open_question_term_idx]
reordered += [t for t in tidx if valid_tokens[t]]
idx_done[tidx] = True
else:
if doc._.subject:
if isinstance(doc._.subject, spacy.tokens.span.Span):
tidx = [t for t in range(doc._.subject.start, doc._.subject.end)]
idx_done[tidx] = True
else:
tidx = [i for i, t in enumerate(doc) if t.idx == doc._.subject.idx]
idx_done[tidx[0]] = True
tail_idx += [t for t in tidx if valid_tokens[t]]
if doc._.verb:
substart = [t.idx for t in doc._.verb]
tidx = [i for i, tok in enumerate(doc) if tok.idx in substart]
tail_idx = [t for t in tidx if valid_tokens[t]]
idx_done[tidx] = True
else:
if doc._.subject:
if isinstance(doc._.subject, spacy.tokens.span.Span):
tidx = [t for t in range(doc._.subject.start, doc._.subject.end)]
idx_done[tidx] = True
else:
tidx = [i for i, t in enumerate(doc) if t.idx == doc._.subject.idx]
idx_done[tidx[0]] = True
reordered += [t for t in tidx if valid_tokens[t]]
if doc._.verb:
substart = [t.idx for t in doc._.verb]
tidx = [i for i, tok in enumerate(doc) if tok.idx in substart]
tail_idx += [t for t in tidx if valid_tokens[t]]
idx_done[tidx] = True
if doc._.negative:
tidx = doc._.negative
tail_idx += [tidx]
idx_done[tidx] = True
for i, done in enumerate(idx_done):
if not done:
if valid_tokens[i]:
reordered.append(i)
idx_done[i] = True
reordered += tail_idx
assert sum(idx_done)==len(doc)
reordered = [doc[r].lemma_.lower() for r in reordered]
if doc._.negative:
reordered[-1]='no'
else:
if 'interrogative' in kind:
if 'open' in kind:
wh = doc[open_question_term_idx].text.lower()
if wh!=reordered[-1]:
reordered.append(wh)
reordered.append('?')
elif direction=='gloss2spoken':
# SVO -> SOV
for i,t in enumerate(doc):
if t[1]=='subject':
reordered.append(t[0])
idx_done[i]=True
for i,t in enumerate(doc):
if t[0] in config.open_question_words:
reordered.append(t[0])
idx_done[i]=True
for i,t in enumerate(doc):
if t[1]=='verb':
reordered.append(t[0])
idx_done[i]=True
for i,t in enumerate(doc):
if not idx_done[i]:
reordered.append(t[0])
idx_done[i]=True
else:
raise Exception(f'unknown direction {direction}')
assert len(doc)==sum(idx_done)
return reordered
def translate(self,sentence, direction='spoken2gloss'):
assert direction in ['spoken2gloss','gloss2spoken'], direction
mode = direction.split('2')[0]
doc = self.analyze(sentence, mode=mode)
valid_tokens = self.tok_validator(doc, mode=mode)
kind = self.check_kind(doc, mode)
feasibility = self.check_feasibility(doc,valid_tokens,mode=mode)
if not feasibility['glob']:
translated = ''
info = f"failed {[k for k,v in feasibility.items() if k!='glob' and not v]}"
else:
info=''
translated = self.reorder(doc, valid_tokens=valid_tokens, direction=direction, kind=kind)
success = len(info)==0
return translated, success, info