import sys import time import os from threading import Event, Thread from omniORB.COS import CosEventChannelAdmin, CosEventChannelAdmin__POA, CosEventComm, CosEventComm__POA import CORBA import CosEventChannelAdmin import CosEventChannelAdmin__POA import CosEventComm import CosEventComm__POA from PIPRes.util.io import cipresGetLogger, logException from PIPRes.util.cipres import cipresServe, orbIsInitialized, getCipresORB, activatePOAManager from PIPRes.wrap.idl import getTreeScore from PIPRes.cipres_types import toIDLTree from PIPRes.corba.api1 import CipresIDL_api1 from PIPRes.nexus.primitives import NexusToken _LOG = cipresGetLogger('pipres.event_consumer') class EventConsumer(CosEventChannelAdmin__POA.ProxyPushConsumer): def __init__(self): self.supplier = None self.disconnected = False self.thisRef = self._this() orb = getCipresORB() activatePOAManager(orb) def toIDL(self): return self.thisRef def remove(self): if self.thisRef is None: return if not self.disconnected: self.disconnect_push_consumer() self.thisRef = None def connect_push_supplier(self, supplier): _LOG.debug("supplier connected received") if self.supplier is None: self.supplier = supplier else: raise CosEventChannelAdmin.AlreadyConnected() def push(self, event): if self.disconnected: raise CosEventChannelAdmin.Disconnected() _LOG.debug("event received") def disconnect_push_consumer(self): _LOG.debug("disconnect_push_consumer") if self.disconnected: raise CORBA.OBJECT_NOT_EXIST() s = self.supplier self.supplier = None if s is not None: try: s.disconnect_push_supplier(); except: pass self.disconnected = True _TreeTypeCode = CORBA.TypeCode(CORBA.id(CipresIDL_api1.Tree)) class ToNexusEventConsumer(EventConsumer): "Event Consumer that writes events as NEXUS blocks (strings as comments)." def __init__(self, buffered=True): EventConsumer.__init__(self) self.out_stream = sys.stdout self.open_block = "" self.started = False self.buffered = buffered def setStream(self, out_stream): self.out_stream = out_stream def push(self, event): #_LOG.debug("ToNexusEventConsumer event received") if self.out_stream is None: return try: if not self.started: self.out_stream.write("#NEXUS\n") self.started = True v = event.value(CORBA.TC_string) if v is not None: return self.str_event(v) v = event.value(_TreeTypeCode) if v is not None: return self.tree_event(v) _LOG.debug("Ignoring pushed event that is of an unrecognized type.") except: logException(_LOG) def str_event(self, event): if self.out_stream is None: return self.out_stream.write("[!%s]\n" % event) if not self.buffered: self.out_stream.flush() def tree_event(self, event): if self.out_stream is None: return if self.open_block != "TREES": if self.open_block: pref = "END;\nBEGIN TREES;\n" else: pref = "BEGIN TREES;\n" self.open_block = "TREES" else: pref = "" self.out_stream.write("%s\tTree %s [score=%s] = %s\n" % (pref, NexusToken.escapeString(event.m_name), str(getTreeScore(event)), event.m_newick)) if not self.buffered: self.out_stream.flush() def disconnect_push_consumer(self): if (self.out_stream is not None) and self.open_block: self.out_stream.write("END;\n") self.open_block = "" EventConsumer.disconnect_push_consumer(self) class EventSupplier(CosEventComm__POA.PushSupplier): def __init__(self, proxyConsumer): self.proxyConsumer = None self.thisRef = self._this() orb = getCipresORB() activatePOAManager(orb) self.setConsumer(proxyConsumer) def setConsumer(self, proxyConsumer): try: if self.proxyConsumer is not None: self.disconnect() if proxyConsumer is not None: proxyConsumer.connect_push_supplier(self.thisRef); self.proxyConsumer = proxyConsumer except: logException(_LOG) self.remove() def remove(self): if self.proxyConsumer is not None: self.disconnect_push_supplier() def __del__(self): self.remove() def disconnect_push_supplier(self): if self.proxyConsumer is None: return try: self.proxyConsumer.disconnect_push_consumer(); except: pass self.proxyConsumer = None disconnect = disconnect_push_supplier def sendEvent(self, event): if self.proxyConsumer is None: return try: return self.proxyConsumer.push(event) except: logException(_LOG) def send_str(self, str_event): event = CORBA.Any(CORBA.TC_string, str(str_event)) return self.sendEvent(event) def send_tree(self, tree): idlTree = toIDLTree(tree) event = CORBA.Any(_TreeTypeCode, idlTree) return self.sendEvent(event) class FileWatchingThread(Thread): def __init__(self, stop_event=None, READ_SLEEP_INTERVAL=0.1, *args, **kwargs): """Subclass of Thread that provides methods that block until a file exists. `stop_event` is an Event, that will kill the thread if it is triggered. `READ_SLEEP_INTERVAL` is the interval to sleep while waiting for a new tree to appear. other arguments are passed to the Thread.__init__() """ self.stop_event = stop_event self.sleep_interval = READ_SLEEP_INTERVAL Thread.__init__(self, group=None, target=None, name=None, args=tuple(*args), kwargs=dict(**kwargs)) def wait_for_file_to_appear(self, filename): """Blocks until the file `filename` appears or stop_event is triggered. Returns True if `filename` exists. Checks for the stop_event *before* checking for the file existence. (paup_wrap and raxml_wrap threads depend on this behavior). """ while True: if (self.stop_event is not None) and self.stop_event.isSet(): return False if os.path.exists(filename): return True #_LOG.debug("Waiting for %s" %filename) time.sleep(self.sleep_interval) def open_file_when_exists(self, filename): """Blocks until the file `filename` appears and then returns a file object opened in rU mode. Returns None if the stop event is triggered. """ if self.wait_for_file_to_appear(filename): return open(filename, "rU") return None class LineReadingThread(FileWatchingThread): """A thread that will read the input stream - designed to work with a file thas is being written. Note that if the file does not end with a newline and the keep_going() method does not return False, then the thread will not terminate self.keep_going() is called with each line. (sub classes should override). LineReadingThread.__init__ must be called by subclasses. """ def __init__(self, lineCallback=None, stream=None, filename="", stop_event=None, READ_SLEEP_INTERVAL=0.1): """`lineCallback` is the callable that takes a string that is each line, and returns False to stop reading. This is a way of using the class without sub-classing and overriding keep_going `stream` is in input file-like object `filename` can be sent instead of `stream`, it should be the path to the file to read. `stop_event` is an Event, that will kill the thread if it is triggered. `READ_SLEEP_INTERVAL` is the interval to sleep while waiting for a new tree to appear. other arguments are passed to the Thread.__init__() """ self.stream = stream self.filename = filename self.lineCallback = lineCallback self.unfinished = None FileWatchingThread.__init__(self, stop_event=stop_event, READ_SLEEP_INTERVAL=READ_SLEEP_INTERVAL) def run(self): if self.stream is None: if not self.filename: _LOG.debug('"stream" and "filename" both None when LineReadingThread.run called') return self.stream = self.open_file_when_exists(self.filename) if self.stream is None: return self._read_stream() def keep_going(self, line): if self.lineCallback is None: return True return self.lineCallback(line) def _read_stream(self): while True: if (self.stop_event is not None) and self.stop_event.isSet(): # when we terminate because of an event setting, # we pass any unfinished line that we have to if not self.unfinished is None: self.keep_going(self.unfinished) return prev_pos = self.stream.tell() line = self.stream.readline() if not line.endswith("\n"): self.unfinished = line self.stream.seek(prev_pos) time.sleep(self.sleep_interval) else: self.unfinished = None if not self.keep_going(line): return class LinesToEventsThread(LineReadingThread): def __init__(self, event_supplier, stream=None, filename="", stop_event=None, READ_SLEEP_INTERVAL=0.5, event_supplier_lock=None): self.lock = event_supplier_lock self.event_supplier = event_supplier LineReadingThread.__init__(self, lineCallback=None, stream=stream, filename=filename, stop_event=stop_event, READ_SLEEP_INTERVAL=READ_SLEEP_INTERVAL) def keep_going(self, line): if self.lock is not None: self.lock.acquire() try: self.event_supplier.send_str(line) finally: if self.lock is not None: self.lock.release()