#!/usr/bin/python # Copyright (c) 2005 by Mark T. Holder, Florida State University. (see end of file) '''Classes and functions for reading an instance document of a cipres_registry into python classes. Uses xml_to_obj to make the sax code easier.''' from xml_to_obj import * import os, sys, time, re, getpass from PIPRes.util.cipres import getCipresORB, getCipresProperties, getCipresRegistry from PIPRes.util.io import logException, expandPath, cipresGetLogger, makeCipresTempDir from PIPRes.exceptions import CipresException from PIPRes.wrap.idl import CipresIDL_api1, CorbaError, CipresInterfaceEnum, cipresInterfaceStrToEnum, entryInfo, briefEntryInfo, UnknownIDLInterfaceError, getClassFromRepoID from PIPRes.util._cipres_properties import getCipresRegistryIOR from cStringIO import StringIO _LOG = cipresGetLogger('pipres.util.cipres_registry_from_xml') from omniORB import CORBA USING_GROUP_IDS = True def callRemoteRegistry(callableObj, *args): try: return callableObj(*args) except CORBA.TRANSIENT: raise CorbaError, 'Call to CIPRES registry failed' TextOnlyElement.automaticallyConvertToString = True SAXConstructible.purgeParsingVars = True def CipresRegistry(): '''Returns a CipresRegistryFromXML object created by reading the registryFile specified in the CipresProperties argument The registry retains a reference to the cipProperties because other fields affect the behavior of the registry. CipresRegistryFromXML has an odd constructor because of the SAXConstructible requirement, this function acts like a constructor''' # if cipProperties.useColocatedRegistry: # raise AssertionError, 'python (pipres) colocated registry is not supported.' # assert(bool(cipProperties.registryFile)) # registry = CipresRegistryFromXML('registry-collection', {}) # registry.isColocated = True # if not registry.parseFile(cipProperties.registryFile): # raise ValueError, '%s not parsed' % cipProperties.registryFile # else: registry = CipresRegistryInterface() registry.group_id = 0 # adding group ID for this process (need to figure out how the registry tells us our group ID registry.isColocated = False cipProperties = getCipresProperties() # hasRegIOR, registry.cipRegistryIOR = cipProperties.get('cipres.registry.ior') registry.cipRegistryIOR = getCipresRegistryIOR() registry._cipRegistryObjRef = None # if not hasRegIOR: # raise ValueError, 'cipres.registry.ior property must be used if cipres.applicaton.use_colocated_registry is not 1' registry.properties = cipProperties registry.orb = getCipresORB() # orb has to be initialized in code that has access to command line arguments cipres*Init*() functions in PIPRes.util.cipres.py should be used return registry def registryEntryInfoFromGUICmd(s): '''Parses the info describing a CipresIDL_api1.RegistryEntryInfo from the value of a command as the CIPRES GUIGEN sends it. Returns an CipresIDL_api1.RegistryEntryInfo corresponding to the object.''' sp = s.split(',') if len(s) < 4: raise ValueError, 'Expecting "registryInstanceID,repositoryID,applicationName,description' return CipresIDL_api1.RegistryEntryInfo(sp[0], sp[1], sp[2], ','.join(sp[3:])) class PythonRegistryElementFromXML(SAXConstructible): pass class PythonRegistryInfo: requiredAttributes = ['module', 'factory', 'needsRegistry', 'args'] def __init__(self, **kwargs): self.module = kwargs['module'] self.factory = kwargs['factory'] nr = kwargs.get('needsRegistry') if nr is None: self.needsRegistry = False elif isinstance(nr, str): self.needsRegistry = nr.upper() == 'TRUE' else: self.needsRegistry = bool(nr) self.args = kwargs.get('args', []) def __eq__(self, other): if isinstance(other,dict): d = other else: d = other.__dict__ for k in PythonRegistryInfo.requiredAttributes: if self.__dict__[k] != d[k]: return False return True def toCipresIDLRegistryEntryInfo(regE, registry): if isinstance(regE, CipresIDL_api1.RegistryEntryInfo): return regE return regE.toCipresIDLRegistryEntry(registry) _stringifiedIDLClassPattern = re.compile(r'PIPRes\.corba\.api\d+\.(CipresIDL_api\d+)\.(.+)') def interfaceNameToRepositoryID(s): m = _stringifiedIDLClassPattern.match(s) if m: return 'IDL:%s/%s:1.0' % m.groups() return 'IDL:CipresIDL_api1/%s:1.0' % s def composeRegistryQuery(**kwargs): ri = kwargs.get('repositoryID') if ri is None: i = kwargs.get('interface') if i is None: ri = '' else: if isinstance(i, int): i = CipresInterfaceEnum.getName(i) if not isinstance(i, str): i = str(i) ri = interfaceNameToRepositoryID(i) rei = CipresIDL_api1.RegistryEntryInfo( kwargs.get('registryInstanceID', ''), ri, kwargs.get('applicationName', ''), kwargs.get('description', '')) return rei def getFirstCipresService(interface, **kwargs): kwargs['interface'] = interface regEntry = composeRegistryQuery(**kwargs) registry = getCipresRegistry() return registry.getServiceOrThrow(interface, registryEntry=regEntry) class PipresRegistryEntry(object): debugAttributes = ['debugLaunchCommand', 'debugLaunchArgs', 'debugLaunchEnv'] extensionsToXML = debugAttributes + ['debugMode'] entryInfo = entryInfo briefEntryInfo = briefEntryInfo def __init__(self, entryDict): # entry specific debug flags are not supported now self.debugMode = False self.debugLaunchCommand = '' self.debugLaunchArgs = [] self.debugLaunchEnv = {} for d in PipresRegistryEntry.extensionsToXML: if d in entryDict: self.__dict__[d] = entryDict[d] for key in _entryAttributes: self.__dict__[key] = entryDict[key] if self.command: expanded = expandPath(self.command) if expanded != self.command: self.genericCommand = self.command self.command = expanded def toCipresIDLRegistryEntryInfo(self, registry): return CipresIDL_api1.RegistryEntryInfo(registry.registryInstanceID, self.repositoryID, self.applicationName, self.description) def writeXML(self, out, indentation = ''): sep = '\n' + indentation cle = self.getCurrLaunchEnv() invocation = self.getCurrentLaunchInvocation() #if self.debugMode and self.debugLaunchCommand and (len(invocation[-1].split()) > 1): # invocation[-1] = "'%s'" % invocation[-1] pyAtt = self.python if pyAtt is None: pyElString = [] else: pyElString = ['\t' % (pyAtt.module, pyAtt.factory, str(pyAtt.needsRegistry), ' '.join(pyAtt.args)) ] out.write(sep.join([ indentation + '' % self.entryType, ' %s' % self.repositoryID, ' %s' % self.applicationName ] + [' %s=%s' % (k, v) for k, v in cle.iteritems() ] + pyElString + [' %s' % i for i in invocation] + [' %s' % self.ior, ' %s' % self.description, '\n' ])) def __str__(self): return self.applicationName + '\n\t'.join([ 'interface: ' + CipresInterfaceEnum.getName(self.interface), 'path:\t' + self.command, 'args:\t' + ' '.join(self.args), 'env:\t' + '\n\t\t'.join(['%s = %s' % i for i in self.launchEnv.iteritems()]) ]) def getCurrLaunchEnv(self): e = copy.deepcopy(os.environ) e.update(self.launchEnv) if self.debugMode and self.debugLaunchEnv: e.update(self.debugLaunchEnv) return e def getCurrentLaunchInvocation(self): if self.debugMode and self.debugLaunchCommand: debugArgs = self.__dict__.get('debugLaunchArgs') if debugArgs is None: debugArgs = [] if len(self.args): return [self.debugLaunchCommand] + debugArgs + [self.command +' ' + ' '.join(self.args)] return [self.debugLaunchCommand] + debugArgs + [self.command] return [self.command] + self.args def launchService(self, registry): '''Launches a service and returns an IOR string for it.''' currInvocation = registry.getCommandPrefixList() + self.getCurrentLaunchInvocation() + registry.getCommandSuffixList() currCmd = currInvocation[0] if not os.path.exists(currCmd): raise ValueError, 'The path to the executable (%s) does not exist' % currCmd tempdir = makeCipresTempDir(prefix='pytmp') tempPath = os.path.join(tempdir, 'iorfile.txt') subPair = '-CipresIOR ' + tempPath iorExpansion = ['-CipresIOR', tempPath] args = [currInvocation[0]] for i in currInvocation[1:]: quoted = len(i.split()) > 1 sp = i.split('%%IORFILE') if quoted: args.append(i.replace('%%IORFILE', subPair)) else: args.append(sp[0]) for w in sp[1:]: args.extend(iorExpansion) args.append(w) _LOG.debug('Launching service:%s\nargs verbatim = :%s' % (str(self), ' '.join(args))) pid = os.spawnve(os.P_NOWAIT, currCmd, args, self.getCurrLaunchEnv()) _LOG.debug('Launched process %d' % pid) for i in range(registry.properties.registryRetryCount + 1): if not os.path.exists(tempPath): _LOG.debug('%s not found, sleeping...\n' %tempPath) time.sleep(registry.properties.registryRetryInterval) else: f = open(tempPath, 'rU') ior = ''.join(f.readlines()).strip() f.close() os.remove(tempPath) os.rmdir(tempdir) return ior raise RuntimeError, 'Launch of service (%s) timed out' % self.applicationName def getRepositoryID(self): return interfaceNameToRepositoryID(CipresInterfaceEnum.getName(self.interface)) repositoryID = property(getRepositoryID) def parseEntryCommandline(cmdLine): return cmdLine[0], cmdLine[1:] class RegistryEntryFromXML(SAXConstructible): '''Encapsulates an entry in the cipres registry which is a description of an implementation including the commands needed to launch it.''' def endSelfElement(self, name): '''Maps repositoryID => interface (just the interface name), commandenv => launchEnv (a dict), commandline => command, args''' self.entryType = int(self.entryType) try: self.interface = cipresInterfaceStrToEnum(str(self.repositoryID)) except UnknownIDLInterfaceError, uiie: if uiie.interfaceName: _LOG.warn('Adding unknown interface %s at runtime' % uiie.interfaceName) ind = len(CipresInterfaceEnum.names) CipresInterfaceEnum.__dict__[uiie.interfaceName] = ind CipresInterfaceEnum.names.append(uiie.interfaceName) self.interface = ind del self.repositoryID self.launchEnv = {} for e in self.commandenv: envArgSplit = e.split('=') if len(envArgSplit) < 2: raise ValueError, 'Expecting "=" in environment string %s' % e k = envArgSplit[0].strip() v = '='.join(envArgSplit[1:]).strip() self.launchEnv[k] = v if self.commandline: self.command, self.args = parseEntryCommandline(self.commandline) else: self.command = None self.args = None del self.commandenv del self.commandline class CipresRegistryInterface(object): def getRegistryInstanceID(self): try: return getattr(self, _registryInstanceID) except: if self.isColocated: self._registryInstanceID = os.environ.get('HOST', 'local python registry') return self._registryInstanceID return self.cipRegistryObjRef._get_registryInstanceID() def setRegistryInstanceID(self, s): self._registryInstanceID = s registryInstanceID = property(getRegistryInstanceID, setRegistryInstanceID) def disambiguate(self, entries, disambigMode, narrowToClass): assert(len(entries) > 1) disambiguatedList = disambigMode is None and entries or disambigMode.chooseEntry(entries, narrowToClass) if len(disambiguatedList) > 1: _LOG.debug('arbitrarily choosing the first entry:\n%s\nfor a %s implementation.' %(disambiguatedList[0].applicationName, narrowToClass.__name__)) return disambiguatedList[0] def makeQuery(self, disambigMode, narrowToClass = None): initialQuery = CipresIDL_api1.RegistryEntryInfo('', '' , '', '') if narrowToClass is not None: initialQuery.repositoryID = narrowToClass._NP_RepositoryId return disambigMode is None and initialQuery or disambigMode.restrictQuery(initialQuery, narrowToClass) def getUiXmlFilename(self, entry): if self.isColocated: assert(entry.repositoryID and entry.applicationName and entry.description and entry.applicationName) interfaceCode = CipresInterfaceEnum.getCode(entry.repositoryID) matching = None for i in self.byInterface.get(interfaceCode): if entry.applicationName == i.applicationName and entry.description == i.description and entry.registryInstanceID == i.registryInstanceID: if matching is None: matching = i else: raise CipresIDL_api1.NoRegistryMatchException, 'Multiple matches for %s' % str(i) if matching is None: raise CipresIDL_api1.NoRegistryMatchException, 'Registry Entry %s was not found' % str(entry) return matching.uiXmlFilename else: return self.cipRegistryObjRef.getUiXmlFilename(entry) def getUIXML(self, entry): fn = self.getUiXmlFilename(entry) if not fn: return None if not os.path.isabs(fn): fn = os.path.join(self.properties.applicationRoot, fn) fn = expandPath(fn) if not os.path.exists(fn): raise CipresException(CipresException.MAJOR_CONFIGURATION, message = '%s (the UI XML file for %s) does not exist.' %(fn, str(entry))) return open(fn, 'rU') def find(self, query): if self.isColocated: if query.repositoryID != '': interfaceCode = CipresInterfaceEnum.getCode(query.repositoryID) entry = self.byInterface.get(interfaceCode) if entry is None: raise CipresIDL_api1.NoRegistryMatchException, 'CipresRegistryFromXML does not know how to create an implementation of %s' % CipresInterfaceEnum.getName(interfaceCode) return entry raise CipresIDL_api1.NoRegistryMatchException, 'find without repositoryID is not implemented in the python cipres registry' else: return self.cipRegistryObjRef.find(query) def findFirst(self, query): disambigMode = self.__dict__.get('disambigMode') possib = self.find(query) if len(possib) == 0: raise CipresIDL_api1.NoRegistryMatchException, 'CipresRegistryFromXML does not know how to create an implementation of %s' % narrowToClass.__name__ return possib[0] def getRegistryEntry(self, disambigMode, narrowToClass): query = self.makeQuery(disambigMode, narrowToClass) possib = self.find(query) if len(possib) == 0: raise CipresIDL_api1.NoRegistryMatchException, 'CipresRegistryFromXML does not know how to create an implementation of %s' % narrowToClass.__name__ if len(possib) == 1: if disambigMode is not None: disambigMode.validateEntry(possib[0], narrowToClass) return possib[0] return self.disambiguate(possib, disambigMode, narrowToClass) def getServiceIOR(self, narrowToClass, disambigMode, registryEntry = None): regEntry = registryEntry is not None and registryEntry or self.getRegistryEntry(disambigMode, narrowToClass) return regEntry.launchService(self) def getServiceObjectRef(self, narrowToClass, disambigMode, pyRegInterface = None, registryEntry = None): regEntry = registryEntry is not None and registryEntry or self.getRegistryEntry(disambigMode, narrowToClass) #try to use the python natively if possible pythonElement = regEntry.__dict__.get('python') if pythonElement is not None: try: _LOG.debug('importing implementation of %s' % narrowToClass.__name__) modName = pythonElement.module factFunc = pythonElement.factory needsRegistry = pythonElement.needsRegistry.upper() == 'TRUE' args = pythonElement.args != '' and pythonElement.args.split() or None mod = __import__(modName, {}, {}, [factFunc]) funcObj = getattr(mod, factFunc) allFactoryArgs = [] if needsRegistry: allFactoryArgs.append(pyRegInterface) if args is not None: allFactoryArgs.append(args) return funcObj(*allFactoryArgs) except: _LOG.debug('servant creation via import failed:') logException(_LOG, True) #these exceptions are not fatal, log them as debug output. if self.isColocated: # launch the ior = regEntry.launchService(self) obj = self.orb.string_to_object(ior) else: try: if USING_GROUP_IDS: obj = self.cipRegistryObjRef.getObject(regEntry, self.group_id) else: obj = self.cipRegistryObjRef.getObject(regEntry) except CORBA.TRANSIENT: raise CorbaError, 'Call to CIPRES registry failed' return obj._narrow(narrowToClass) def getExternalRegistryRef(self): if self._cipRegistryObjRef is None: _LOG.debug('Registry IOR is %s' % self.cipRegistryIOR) obj = self.orb.string_to_object(self.cipRegistryIOR) try: self._cipRegistryObjRef = obj._narrow(CipresIDL_api1.Registry) except CORBA.TRANSIENT: raise CorbaError, 'Call to CIPRES registry failed' return self._cipRegistryObjRef cipRegistryObjRef = property(getExternalRegistryRef) def __str__(self): return '\n'.join(['%s\n\t%s'% (CipresInterfaceEnum.getName(i), '\n\t'.join([str(e) for e in k])) for i, k in self.byInterface.iteritems()]) def getCommandPrefixList(self): return self.__dict__.get('commandPrefix', []) def getCommandSuffixList(self): return self.__dict__.get('commandSuffix', []) def _classifyRawEntries(self): self.byID = {} self.byInterface = {} self.entries = [isinstance(i, PipresRegistryEntry) and i or PipresRegistryEntry(i.__dict__) for i in self.entries] for e in self.entries: e.registryInstanceID = self.registryInstanceID interface = e.interface if self.byInterface.has_key(interface): self.byInterface[interface].append(e) else: self.byInterface[interface] = [e] return True def writeXML(self, out): out.write('\n') for e in self.entries: e.writeXML(out, ' ') out.write(''' %s '''% self.registryInstanceID) def addEntries(self, newEntries): self.entries.extend(list(newEntries)) class CipresRegistryExtension: def getObjectFromIOR(self, narrowToClass, ior): o = self.orb.string_to_object(ior) return o._narrow(narrowToClass) def getService(self, narrowToClass=None, disambigMode=None, registryEntry=None): '''Only accessible if RichCipresRegistry.__init__() was called on this object!''' # get a treeDrawer reference # previously had "Object" as the second arg, but this seemed to inhibit finding of the NameService if narrowToClass is None: if registryEntry is None: raise TypeError("Either narrowToClass or registryEntry must be specified") repoID = registryEntry.repositoryID if not repoID or repoID == "*": raise TypeError("Either narrowToClass or a registryEntry (with a specific repositoryID) must be specified") narrowToClass = getClassFromRepoID(repoID) name = narrowToClass.__name__ if self.nsLookup: nameComponent = [CosNaming.NameComponent(name, "")] try: obj = self.namingContext.resolve(nameComponent) except CosNaming.NamingContext.NotFound, ex: return None objRef= obj._narrow(narrowToClass) else: disambigMode = (disambigMode is not None) and disambigMode or self.duplicateServiceChooser objRef = self.getServiceObjectRef(narrowToClass, disambigMode, self, registryEntry=registryEntry) if not objRef: raise CorbaError, ("Object reference is not an %s (%s based on __class__.__name__)" % (name, narrowToClass.__class__.__name__)) if name in self.objRefTranslator: return self.objRefTranslator[name](objRef) return objRef def getServiceOrExit(self, narrowToClass=None, disambigMode = None, registryEntry = None): '''Only accessible if RichCipresRegistry.__init__() was called on this object!''' try: return self.getServiceOrThrow(narrowToClass, disambigMode, registryEntry=registryEntry) except CorbaError, e: sys.exit(e.msg) def getServiceOrThrow(self, narrowToClass=None, disambigMode = None, registryEntry = None): '''Only accessible if RichCipresRegistry.__init__() was called on this object!''' objRef = self.getService(narrowToClass, registryEntry=registryEntry) if objRef is None: raise CorbaError('could not get a %s object from the CipresRegistryFromXML' % (narrowToClass.__name__)) return objRef def bindOrRebind(self, serviceRef, serviceName = ''): '''Only accessible if RichCipresRegistry.__init__() was called on this object!''' if not serviceName: serviceName = self.implName if self.nsRegister: if not serviceName: raise ValueError, 'no implementation in bindOrRebind()' nameToBind = [CosNaming.NameComponent(serviceName, "")] # Note that is should be sufficient to just call rebind() without # calling bind() first. Some Naming service implementations # incorrectly raise NotFound if rebind() is called for an unknown # name, so we use the two-stage approach above try: self.namingContext.bind(nameToBind, serviceRef) except: self.namingContext.rebind(nameToBind, serviceRef) if self.iorFileName: f = open(self.iorFileName, 'w') s = self.orb.object_to_string(serviceRef) p = os.getpid() f.write('%s\n%d\n' % (s, p)) f.close() _LOG.info('%s written to %s' % (s, self.iorFileName)) elif not self.nsRegister: raise ValueError, 'nsRegister or iorFileName must be specified to bind' def unbind(self, serviceName): '''Only accessible if RichCipresRegistry.__init__() was called on this object!''' if self.nsRegister: nameToBind = [CosNaming.NameComponent(serviceName, "")] self.namingContext.unbind(nameToBind) elif self.iorFileName: os.remove(self.iorFileName) def listing(self): '''Only accessible if RichCipresRegistry.__init__() was called on this object!''' bindingList, bindingIterator = self.namingContext.list(10000) return [str(i.binding_name[0].id) for i in bindingList] class CipresRegistryFromXML(SAXConstructible, CipresRegistryInterface): def parseFileObj(self, fileObj): if not SAXConstructible.parseFileObj(self, fileObj): return False return self._classifyRawEntries() #statics CipresRegistryFromXML._CTH = ChildrenToHandle( singleEl = { 'registry-instance-iD' : TextOnlyElement, 'version' : TextOnlyElement, }, multiEl = { 'command-prefix' : TextOnlyElement, 'command-suffix' : TextOnlyElement, 'entries' : RegistryEntryFromXML }) _entryAttributes = ['entryType', 'args', 'command', 'applicationName', 'description', 'ior', 'python', 'interface', 'uiXmlFilename', 'launchEnv'] RegistryEntryFromXML._CTH = ChildrenToHandle( attr = [ 'entry-type', ], singleEl = { 'application-name': TextOnlyElement, 'description': TextOnlyElement, # 'host-name': TextOnlyElement, #deprecated 'ior': TextOnlyElement, 'python' : PythonRegistryElementFromXML, 'repository-iD': TextOnlyElement, 'ui-xml-filename': TextOnlyElement, }, multiEl = { 'commandline': TextOnlyElement, 'commandenv' : TextOnlyElement, }) PythonRegistryElementFromXML._CTH = ChildrenToHandle( attr = [ 'module', 'factory', 'needsRegistry', 'args' ]) if __name__ == '__main__': from PIPRes.util.io import initLogger from PIPRes.util.cipres import getCipresRegistry registry = getCipresRegistry() print 'registryInstanceID = ', registry.registryInstanceID # This file is part of the PIPRes library # The PIPRes library is free software; you can redistribute it # and/or modify it under the terms of the GNU Lesser General # Public License as published by the Free Software Foundation; # either version 2.1 of the License, or (at your option) any later # version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free # Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, # MA 02111-1307, USA