#!/usr/bin/python # Copyright (c) 2005 by Mark T. Holder, Florida State University. (see end of file) '''routines for making io operations cleaner.''' import os import re import sys _loggerInitialized = False # set by _cipresInitLogger(), invoked by the first call to cipresGetLogger() or cipres{Client|Server}Init or cipresDefaultRegistry def initLogger(): import logging.config filename = 'logging_pipres.conf' fullPath = getCipresFile(filename) specifiedPathFailed = False if fullPath and os.path.exists(fullPath): try: logging.config.fileConfig(fullPath) return except: specifiedPathFailed = True import logging logger = logging.getLogger() logger.setLevel(logging.NOTSET) ch = logging.StreamHandler() ch.setLevel(logging.NOTSET) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) logger.addHandler(ch) if specifiedPathFailed: print >> sys.stderr, 'Could not parse %s, using default logging' % fullPath def _cipresInitLogger(): '''Initializes logger and sets the global flag _loggerInitialized to True''' global _loggerInitialized if not _loggerInitialized: initLogger() _loggerInitialized = True def _findCipresUserDir(assertExists = False, create = False): '''Returns full path to the location of the directory that holds cipres files. The order of resolution is: 1. $CIPRES_USER_DIR 2. ~/cipres ''' p = os.environ.get('CIPRES_USER_DIR') if p is not None: fp = expandPath(p) if os.path.exists(fp): return fp if create: # todo: create return fp fp = expandPath(os.path.join('~', 'cipres')) if os.path.exists(fp): return fp if create: # todo: create return fp raise AssertionError, '''The CIPRES configuration directory was not found. this should be either a cipres directory in your home directory, or an arbitrary directory that is specified using the CIPRES_USER_DIR environmental variable.''' return None #deprecated #def _findPathToFileInCipresDir(fileName, assertExists = False): # '''Returns full path to the a file in the directory that holds a cipres configuration file. #(see _findCipresConfigDir). #If the file or directory are not found the method returns None or raise and AssertionError (depending on "assertExists" arg). #''' # sp = _findCipresConfigDir(assertExists, fileName) # if sp is None: # return None # sp = os.path.join(sp, fileName) # if os.path.exists(sp): # return sp # if assertExists: # raise AssertionError, 'The required file %s was not found in the user (CIPRES_USER_DIR or ~/.cipres) or system (CIPRES_SYS_DIR) location for this CIPRES installation' % fileName # return None _pathToCipresDir = None #these are constant once set (in getCipresConfigFilePath()) _pathToCipresProperties = None #these are constant once set (in getCipresConfigFilePath()) def getCipresFile(fileName): ''' Looks for a cipres configuration file by first looking in the Cipres user dir, then in CIPRES_ROOT/resources. ''' global _pathToCipresDir, _pathToCipresProperties if _pathToCipresDir is None: _pathToCipresDir = _findCipresUserDir(True, True) fp = os.path.join(_pathToCipresDir, fileName) if os.path.exists(fp): _pathToCipresProperties = fp; return _pathToCipresProperties p = os.environ.get('CIPRES_ROOT') if p is not None: fp = expandPath(os.path.join(p, 'resources')) fp = os.path.join(fp, fileName); if os.path.exists(fp): _pathToCipresProperties = fp; return _pathToCipresProperties raise AssertionError, 'The required file %s was not found in the user ($CIPRES_USER_DIR or ~/cipres) or system ($CIPRES_ROOT/resources) location for this CIPRES installation' % fileName def getCipresConfigFilePath(): '''returns a valid file path to the cipres_config.properties file looking in User locations first and then system location (or raises and AssertionError).''' fileName = 'cipres_config.properties' return getCipresFile(fileName) def getCipresUserDir(assertExists = False): '''Returns full path to the location of the directory that holds cipres user configuration files and tmp files. return None or raise and AssertionError (depending on "assertExists" arg)., ''' global _pathToCipresDir if _pathToCipresDir is None: getCipresConfigFilePath() if assertExists: assert(_pathToCipresDir is not None) return _pathToCipresDir def cipresGetLogger(s): '''Wrapper around logging.getLogger that make sure that the pipres logging configuration file is read.''' global _loggerInitialized if not _loggerInitialized: _cipresInitLogger() import logging return logging.getLogger(s) def prependNumberToRename(fn): if not os.path.exists(fn): raise IOError, '%s does not exist' % fn dir = os.path.dirname(fn) basename = os.path.basename(fn) num = -1 nextName = basename nextPath = os.path.join(dir, nextName) while os.path.exists(nextPath): num += 1 nextName = str(num) + basename nextPath = os.path.join(dir, nextName) os.rename(fn, nextPath) return nextPath def expandPath(s): return os.path.expanduser(os.path.expandvars(s)) def tryRemove(*args): '''Silences OSErrors from os.remove()''' try: os.remove(os.path.join(*args)) return True except OSError: return False def tryRmdir(dirName): '''Silences OSErrors from os.rmdir()''' try: os.rmdir(dirName) return True except OSError: return False class FileNotFoundError(IOError, ValueError): '''Thrown when an argument that should be a file path does fails os.path.exists().''' def __init__(self, filePath): self.filePath = filePath def __str__(self): return repr(self.filePath) def openPathAndCallMode(openPathAndCallFilePath, openPathAndCallMode, openPathAndCallFunc , *args, **kwds): '''opens filePath and calls func on the opened file stream then closes the file. Raises InFileNotFoundError and IOError, as well as excpetions from func. \todo Need to deal with file locking (see http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/65203''' openPathAndCallFilePath = expandPath(openPathAndCallFilePath) if openPathAndCallMode.startswith('r') and not os.path.exists(openPathAndCallFilePath): raise FileNotFoundError, openPathAndCallFilePath _f = open(openPathAndCallFilePath, openPathAndCallMode) if not _f: m = 'Could not open ' + openPathAndCallFilePath raise IOError, m try: _ret = openPathAndCallFunc(_f, *args, **kwds) finally: _f.close() return _ret def openInPathAndCall(openPathAndCallFilePath, openPathAndCallFunc, *args, **kwds): return openPathAndCallMode(openPathAndCallFilePath, 'rU', openPathAndCallFunc, *args, **kwds) def openOutPathAndCall(openPathAndCallFilePath, openPathAndCallFunc, *args, **kwds): return openPathAndCallMode(openPathAndCallFilePath, 'w', openPathAndCallFunc, *args, **kwds) def openAppendPathAndCall(openPathAndCallFilePath, openPathAndCallFunc, *args, **kwds): return openPathAndCallMode(openPathAndCallFilePath, 'a', openPathAndCallFunc, *args, **kwds) def toCamelCaseRE(inStr, sep = re.compile(r'\s+')): '''Uses a regular expression to split a string and Capitalize every letter after the removed characters''' temp = sep.split(inStr) if len(temp) < 2: return inStr return str(temp[0] + ''.join([i[0].upper()+i[1:] for i in temp[1:] ])) def logException(logObj, asDebug = False): '''Logs the exception trace to the logObj as an error''' import traceback, cStringIO s = cStringIO.StringIO() traceback.print_exc(None, s) if asDebug: logObj.debug(s.getvalue()) else: logObj.error(s.getvalue()) # This file is part of the PIPRes library # # The PIPRes library is free software; you can redistribute it # and/or modify it under the terms of the GNU Lesser General # Public License as published by the Free Software Foundation; # either version 2.1 of the License, or (at your option) any later # version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free # Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, # MA 02111-1307, USA