import sys import time import os from threading import Event, Thread from omniORB.COS import CosEventChannelAdmin, CosEventChannelAdmin__POA, CosEventComm, CosEventComm__POA import CORBA import CosEventChannelAdmin import CosEventChannelAdmin__POA import CosEventComm import CosEventComm__POA from PIPRes.util.io import cipresGetLogger, logException from PIPRes.util.cipres import cipresServe, orbIsInitialized, getCipresORB, activatePOAManager from PIPRes.wrap.idl import getTreeScore from PIPRes.cipres_types import toIDLTree from PIPRes.corba.api1 import CipresIDL_api1 from PIPRes.nexus.primitives import NexusToken _LOG = cipresGetLogger('pipres.event_consumer') class EventConsumer(CosEventChannelAdmin__POA.ProxyPushConsumer): def __init__(self): self.supplier = None self.disconnected = False self.thisRef = self._this() orb = getCipresORB() activatePOAManager(orb) def toIDL(self): return self.thisRef def remove(self): if self.thisRef is None: return if not self.disconnected: self.disconnect_push_consumer() self.thisRef = None def connect_push_supplier(self, supplier): _LOG.debug("supplier connected received") if self.supplier is None: self.supplier = supplier else: raise CosEventChannelAdmin.AlreadyConnected() def push(self, event): if self.disconnected: raise CosEventChannelAdmin.Disconnected() _LOG.debug("event received") def disconnect_push_consumer(self): _LOG.debug("disconnect_push_consumer") if self.disconnected: raise CORBA.OBJECT_NOT_EXIST() s = self.supplier self.supplier = None if s is not None: try: s.disconnect_push_supplier(); except: pass self.disconnected = True _TreeTypeCode = CORBA.TypeCode(CORBA.id(CipresIDL_api1.Tree)) class ToNexusEventConsumer(EventConsumer): "Event Consumer that writes events as NEXUS blocks (strings as comments)." def __init__(self, buffered=True): EventConsumer.__init__(self) self.out_stream = sys.stdout self.open_block = "" self.started = False self.buffered = buffered def setStream(self, out_stream): self.out_stream = out_stream def push(self, event): #_LOG.debug("ToNexusEventConsumer event received") if self.out_stream is None: return try: if not self.started: self.out_stream.write("#NEXUS\n") self.started = True v = event.value(CORBA.TC_string) if v is not None: return self.str_event(v) v = event.value(_TreeTypeCode) if v is not None: return self.tree_event(v) _LOG.debug("Ignoring pushed event that is of an unrecognized type.") except: logException(_LOG) def str_event(self, event): if self.out_stream is None: return self.out_stream.write("[!%s]\n" % event) if not self.buffered: self.out_stream.flush() def tree_event(self, event): if self.out_stream is None: return if self.open_block != "TREES": if self.open_block: pref = "END;\nBEGIN TREES;\n" else: pref = "BEGIN TREES;\n" self.open_block = "TREES" else: pref = "" self.out_stream.write("%s\tTree %s [score=%s] = %s\n" % (pref, NexusToken.escapeString(event.m_name), str(getTreeScore(event)), event.m_newick)) if not self.buffered: self.out_stream.flush() def disconnect_push_consumer(self): if (self.out_stream is not None) and self.open_block: self.out_stream.write("END;\n") self.open_block = "" EventConsumer.disconnect_push_consumer(self) class EventSupplier(CosEventComm__POA.PushSupplier): def __init__(self, proxyConsumer): self.proxyConsumer = None self.thisRef = self._this() orb = getCipresORB() activatePOAManager(orb) self.setConsumer(proxyConsumer) def setConsumer(self, proxyConsumer): try: if self.proxyConsumer is not None: self.disconnect() if proxyConsumer is not None: proxyConsumer.connect_push_supplier(self.thisRef); self.proxyConsumer = proxyConsumer except: logException(_LOG) self.remove() def remove(self): if self.proxyConsumer is not None: self.disconnect_push_supplier() def __del__(self): self.remove() def disconnect_push_supplier(self): if self.proxyConsumer is None: return try: self.proxyConsumer.disconnect_push_consumer(); except: pass self.proxyConsumer = None disconnect = disconnect_push_supplier def sendEvent(self, event): if self.proxyConsumer is None: return try: return self.proxyConsumer.push(event) except: logException(_LOG) def send_str(self, str_event): event = CORBA.Any(CORBA.TC_string, str(str_event)) return self.sendEvent(event) def send_tree(self, tree): idlTree = toIDLTree(tree) event = CORBA.Any(_TreeTypeCode, idlTree) return self.sendEvent(event) class LineReadingThread(Thread): """A thread that will read the input stream - designed to work with a file thas is being written. Note that if the file does not end with a newline and the keep_going() method does not return False, then the thread will not terminate self.keep_going() is called with each line. (sub classes should override). LineReadingThread.__init__ must be called by subclasses. """ def __init__(self, lineCallback=None, stream=None, filename="", stopEvent=None, READ_SLEEP_INTERVAL=0.1): """`lineCallback` is the callable that takes a string that is each line, and returns False to stop reading. This is a way of using the class without sub-classing and overriding keep_going `stream` is in input file-like object `filename` can be sent instead of `stream`, it should be the path to the file to read. `stopEvent` is an Event, that will kill the thread if it is triggered. `READ_SLEEP_INTERVAL` is the interval to sleep while waiting for a new tree to appear. other arguments are passed to the Thread.__init__() """ self.stopEvent = stopEvent self.stream = stream self.filename = filename self.lineCallback = lineCallback self.sleepInterval = READ_SLEEP_INTERVAL self.unfinished = None Thread.__init__(self, group=None, target=None, name=None, args=(), kwargs={}) def run(self): if self.stream is None: if not self.filename: _LOG.debug('"stream" and "filename" both None when LineReadingThread.run called') return if not self._wait_for_file(): return self._read_file() def keep_going(self, line): if self.lineCallback is None: return True return self.lineCallback(line) def _wait_for_file(self): if not self.filename: return False while True: if os.path.exists(self.filename): self.stream = open(self.filename, "rU") return True if (self.stopEvent is not None) and self.stopEvent.isSet(): return False time.sleep(self.sleepInterval) _LOG.debug("Waiting for %s" % self.filename) def _read_file(self): while True: if (self.stopEvent is not None) and self.stopEvent.isSet(): # when we terminate because of an event setting, # we pass any unfinished line that we have to if not self.unfinished is None: self.keep_going(self.unfinished) return prev_pos = self.stream.tell() line = self.stream.readline() if not line.endswith("\n"): self.unfinished = line self.stream.seek(prev_pos) time.sleep(self.sleepInterval) else: self.unfinished = None if not self.keep_going(line): return class LinesToEventsThread(LineReadingThread): def __init__(self, event_supplier, stream=None, filename="", stopEvent=None, READ_SLEEP_INTERVAL=0.5, event_supplier_lock=None): self.lock = event_supplier_lock self.event_supplier = event_supplier LineReadingThread.__init__(self, lineCallback=None, stream=stream, filename=filename, stopEvent=stopEvent, READ_SLEEP_INTERVAL=READ_SLEEP_INTERVAL) def keep_going(self, line): if self.lock is not None: self.lock.acquire() try: self.event_supplier.send_str(line) finally: if self.lock is not None: self.lock.release()