#!/usr/bin/env python3
#
from .source import Source
from .connector import *
import logging
import psycopg2
logger = logging.getLogger(__name__)
import psycopg2.extras
from typing import Iterator, Dict, Any, Optional
import io
#source from https://hakibenita.com/fast-load-data-python-postgresql
[docs]def clean_csv_value(value: Optional[Any]) -> str:
if value is None:
return r'\N'
return str(value).replace('\n', '\\n')
[docs]class StringIteratorIO(io.TextIOBase):
def __init__(self, iter: Iterator[str]):
self._iter = iter
self._buff = ''
[docs] def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> str:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
[docs] def read(self, n: Optional[int] = None) -> str:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return ''.join(line)
[docs]class OmopSource(Source,PostGresConnector):
"""
Connection to a POstgres Ommop source
"""
def __init__(self, DB_host, DB_name, DB_port, DB_user, DB_password):
"""Initialize a connection to a PostresConnector DB using super() constructor
:param DB_host:
:param DB_name:
:param DB_port:
:param DB_user:
:param DB_password:
:returns:
:rtype:
"""
super().__init__( DB_host, DB_name, DB_port, DB_user, DB_password)
logger.info("Initialize Omop connection")
def __insert_execute_values_iterator_person(self, dict_person_local: Iterator[Dict[str, Any]]) -> None:
""" Insert values to an Omop Person table """
with self.conn.cursor() as cursor:
psycopg2.extras.execute_values(cursor, """
INSERT INTO person VALUES %s;
""", ((
dict_person_local["person_id"][position],
dict_person_local["gender_concept_id"][position],
dict_person_local["year_of_birth"][position],
dict_person_local["month_of_birth"][position],
dict_person_local["day_of_birth"][position],
dict_person_local["birth_datetime"][position],
dict_person_local["death_datetime"][position],
dict_person_local["race_concept_id"][position],
dict_person_local["ethnicity_concept_id"][position],
dict_person_local["location_id"][position],
dict_person_local["provider_id"][position],
dict_person_local["care_site_id"][position],
dict_person_local["person_source_value"][position],
dict_person_local["gender_source_value"][position],
dict_person_local["gender_source_concept_id"][position],
dict_person_local["race_source_value"][position],
dict_person_local["race_source_concept_id"][position],
dict_person_local["ethnicity_source_value"][position],
dict_person_local["ethnicity_source_concept_id"][position],
) for position in range (0,len(dict_person_local["person_id"]) ) ))
def __insert_execute_values_iterator_note(self, dict_note: Iterator[Dict[str, Any]]) -> None:
with self.conn.cursor() as cursor:
psycopg2.extras.execute_values(cursor, """
INSERT INTO note VALUES %s;
""", ((
dict_note["note_id"][position],
dict_note["person_id"][position],
dict_note["note_event_id"][position],
dict_note["note_event_field_concept_id"][position],
dict_note["note_date"][position],
dict_note["note_datetime"][position],
dict_note["note_type_concept_id"][position],
dict_note["note_class_concept_id"][position],
dict_note["note_title"][position],
dict_note["note_text"][position],
dict_note["encoding_concept_id"][position],
dict_note["language_concept_id"][position],
dict_note["provider_id"][position],
dict_note["visit_occurrence_id"][position],
dict_note["visit_detail_id"][position],
dict_note["note_source_value"][position],
) for position in range (0,len(dict_note["note_id"])) ))
def __insert_execute_values_iterator_note_nlp(self, dict_nlp: Iterator[Dict[str, Any]]) -> None:
with self.conn.cursor() as cursor:
psycopg2.extras.execute_values(cursor, """
INSERT INTO note VALUES %s;
""", ((
dict_nlp["note_nlp_id"][position],
dict_nlp["note_id"][position],
dict_nlp["section_concept_id"][position],
dict_nlp["snippet"][position].replace("|","##"),
dict_nlp["offset"][position].replace("|","##"),
dict_nlp["lexical_variant"][position].replace("|","##"),
dict_nlp["note_nlp_concept_id"][position],
dict_nlp["nlp_system"][position],
dict_nlp["nlp_date"][position],
dict_nlp["nlp_datetime"][position],
dict_nlp["term_exists"][position].replace("|","##"),
dict_nlp["term_temporal"][position].replace("|","##"),
dict_nlp["term_modifiers"][position].replace("|","##"),
dict_nlp["note_nlp_source_concept_id"][position],
dict_nlp["term_negation"][position].replace("|","##"),
dict_nlp["term_experiencer"][position].replace("|","##"),
dict_nlp["term_hypothesis"][position].replace("|","##"),
dict_nlp["term_type"][position].replace("|","##"),
dict_nlp["nlp_workflow"][position],
dict_nlp["offset_start"][position],
dict_nlp["offset_end"][position],
) for position in range (0,len(dict_nlp["note_id"])) ))
def __copy_string_iterator_note_nlp(self, dict_nlp_local: Iterator[Dict[str, Any]], size: int = 8192) -> None:
with self.conn.cursor() as cursor:
note_nlp_string_iterator = StringIteratorIO((
'|'.join(map(clean_csv_value, (
dict_nlp_local["note_nlp_id"][position],
dict_nlp_local["note_id"][position],
dict_nlp_local["section_concept_id"][position],
dict_nlp_local["snippet"][position].replace("|","##").replace("\ ","\\" ),
dict_nlp_local["offset"][position].replace("|","##").replace("\ ","\\"),
dict_nlp_local["lexical_variant"][position].replace("|","##").replace("\ ","\\"),
dict_nlp_local["note_nlp_concept_id"][position],
dict_nlp_local["nlp_system"][position],
dict_nlp_local["nlp_date"][position],
dict_nlp_local["nlp_datetime"][position],
dict_nlp_local["term_exists"][position],
dict_nlp_local["term_temporal"][position],
'"""'+dict_nlp_local["term_modifiers"][position]+'"""',
dict_nlp_local["note_nlp_source_concept_id"][position],
dict_nlp_local["term_negation"][position],
dict_nlp_local["term_experiencer"][position],
dict_nlp_local["term_hypothesis"][position],
dict_nlp_local["term_type"][position],
dict_nlp_local["nlp_workflow"][position],
dict_nlp_local["offset_start"][position],
dict_nlp_local["offset_end"][position],
))) + '\n'
for position in range (0,len(dict_nlp_local["note_id"]) )
))
cursor.copy_from(note_nlp_string_iterator, 'note_nlp', sep='|', size=size)
def __getInfo(self,getcolumn, fromTable):
self.cur.execute("select "+getcolumn+" from "+fromTable)
rows = self.cur.fetchall()
theColumn=[]
for this_id in rows:
# print(this_id)
theColumn.append(this_id[0])
return(theColumn)
# @staticmethod
[docs] def getLastNotenlpid(self):
self.cur.execute("select note_nlp_id from note_nlp order by note_nlp_id desc limit 1")
rows = self.cur.fetchall()
lastIDs=[]
for this_ids in rows:
lastIDs.append(this_ids[0])
return(lastIDs[-1])
[docs] def saveToSource(self, table_person, table_note, table_note_nlp):
patient_ids = self.__getInfo("person_id","person")
print(patient_ids)
if table_person["person_id"][0] not in patient_ids:
self.__insert_execute_values_iterator_person(table_person)
else:
print("patient already exist")
note_ids = self.__getInfo("note_id","note")
if table_note["note_id"][0] not in note_ids:
self.__insert_execute_values_iterator_note(table_note)
self.__copy_string_iterator_note_nlp(table_note_nlp) #faster but data note cleaned so difficult to load it easily
# self.__insert_execute_values_iterator_note_nlp(table_note_nlp)
else:
print("note already exist, skip it")
return(0)