#!/usr/bin/python # Copyright (c) 2005 by Mark T. Holder, Florida State University. (see end of file) '''routines for making io operations cleaner.''' import os import re import sys import tempfile _loggerInitialized = False # set by _cipresInitLogger(), invoked by the first call to cipresGetLogger() or cipres{Client|Server}Init or cipresDefaultRegistry def initLogger(): import logging.config filename = 'logging_pipres.conf' #fullPath = expandPath(os.path.join('$PIPRES_ROOT', filename)) #if not os.path.exists(fullPath): fullPath = getCipresFile(filename) specifiedPathFailed = False if fullPath and os.path.exists(fullPath): try: logging.config.fileConfig(fullPath) return except: specifiedPathFailed = True import logging logger = logging.getLogger() logger.setLevel(logging.NOTSET) ch = logging.StreamHandler() ch.setLevel(logging.NOTSET) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) logger.addHandler(ch) if specifiedPathFailed: print >> sys.stderr, 'Could not parse %s, using default logging' % fullPath def _cipresInitLogger(): '''Initializes logger and sets the global flag _loggerInitialized to True''' global _loggerInitialized if not _loggerInitialized: initLogger() _loggerInitialized = True def _findCipresUserDir(assertExists = False, create = False): '''Returns full path to the location of the directory that holds cipres files. The order of resolution is: 1. $CIPRES_USER_DIR 2. ~/cipres ''' p = os.environ.get('CIPRES_USER_DIR') if p is not None: fp = expandPath(p) if os.path.exists(fp): return fp if create: assert(False) # todo return fp fp = expandPath(os.path.join('$HOME', 'cipres')) if os.path.exists(fp): return fp if create: assert(False) # todo: create return fp if assertExists: raise AssertionError, '''The CIPRES user configuration directory was not found. this should be either a cipres directory in your home directory, or an arbitrary directory that is specified using the CIPRES_USER_DIR environmental variable.''' return None def _findPathFromCascade(*fromEnvPaths): '''Takes args can be strings or tuples of (top, dir, subdir, ..., ). where "top" can be a string or a callable (which takes no args) Strings are subject to env variable substitution (and ~ expansion) before checking for a path's existence. Returns absolute path of the first existing path or None.''' for i in fromEnvPaths: if isinstance(i, str): p = expandPath(i) if os.path.exists(p): return os.path.abspath(p) else: var = i[0] if isinstance(var, str): p = expandPath(var) else: p = var() if p is not None and os.path.exists(p): p = os.path.join(p, *i[1:]) if os.path.exists(p): return os.path.abspath(p) return None _cipresUserPath = None #these are constant once set (in getCipresConfigFilePath()) _cipresTmpPath = None # by default: this is _cipresUserPath/tmp _cipresRootPath = None # CIPRES_ROOT _cipresDataPath = None # by default: this is CIPRES_ROOT/share/cipres _cipresHelpPath = None # by defaultL: the "help" subdir of cipresDataD _cipresResourcesPath = None _cipresTestPath = None def makeCipresTempDir(suffix='', prefix='tmp'): '''Uses tempfile.mkdtemp to create a temporary subdirectory in the user's CIPRES tmp directory''' return tempfile.mkdtemp(suffix, prefix, dir = getCipresTmpDir()) def _findCipresRootDir(assertExists = False): '''$CIPRES_ROOT or None''' return _findPathFromCascade('$CIPRES_ROOT') def _findCipresDataDir(assertExists = False): '''Returns the first directory found at $CIPRES_DATA_PATH, $CIPRES_ROOT/share/cipres''' return _findPathFromCascade('$CIPRES_DATA_PATH', (getCipresRootDir, 'share', 'cipres')) def _findCipresHelpDir(assertExists = False): '''Returns the first directory found at $CIPRES_HELP_PATH, $CIPRES_DATA_PATH/help.''' return _findPathFromCascade('$CIPRES_HELP_PATH', (getCipresDataDir, 'help')) def _findCipresTestFileDir(assertExists = False): '''Returns the first directory found at $CIPRES_TEST_FILES_PATH, $CIPRES_DATA_PATH/resources''' return _findPathFromCascade('$CIPRES_TEST_FILES_PATH', (getCipresHelpDir, 'sampleFiles')) def _findCipresResourcesDir(assertExists = False): '''Returns the first directory found at $CIPRES_RESOURCES_PATH, $CIPRES_DATA_PATH/resources''' return _findPathFromCascade('$CIPRES_RESOURCES_PATH', (getCipresDataDir, 'resources')) def _findCipresTmpDir(assertExists = False): '''Returns the first directory found at $CIPRES_TMP_PATH or $CIPRES_USER_DIR/tmp''' return _findPathFromCascade('$CIPRES_TMP_PATH', (getCipresUserDir, 'tmp')) def getCipresUserDir(assertExists = False): '''Returns full path to the location of the directory that holds cipres user configuration files and tmp files. return None or raise and AssertionError (depending on "assertExists" arg)., ''' global _cipresUserPath if _cipresUserPath is None: _cipresUserPath = _findCipresUserDir(assertExists=assertExists, create=False) return _cipresUserPath def getCipresTmpDir(assertExists = False): global _cipresTmpPath if _cipresTmpPath is None: _cipresTmpPath = _findCipresTmpDir() return _cipresTmpPath def getCipresRootDir(assertExists = False): global _cipresRootPath if _cipresRootPath is None: _cipresRootPath = _findCipresRootDir() return _cipresRootPath def getCipresDataDir(assertExists = False): global _cipresDataPath if _cipresDataPath is None: _cipresDataPath = _findCipresDataDir() return _cipresDataPath def getCipresHelpDir(assertExists = False): global _cipresTestPath if _cipresTestPath is None: _cipresTestPath = _findCipresHelpDir() return _cipresTestPath def getCipresTestFileDir(assertExists = False): global _cipresHelpPath if _cipresHelpPath is None: _cipresHelpPath = _findCipresTestFileDir() return _cipresHelpPath def getCipresResourcesDir(assertExists = False): global _cipresResourcesPath if _cipresResourcesPath is None: _cipresResourcesPath = _findCipresResourcesDir() return _cipresResourcesPath def _getExistingFileTestNonePaths(top, f): if top is not None: fp = os.path.join(top, f) if os.path.exists(fp): return fp return None def getCipresResourcesFile(fileName): return _getExistingFileTestNonePaths(getCipresResourcesDir(), fileName) def getCipresTestFile(fileName): return _getExistingFileTestNonePaths(getCipresTestFileDir(), fileName) def getCipresUserFile(fileName): return _getExistingFileTestNonePaths(getCipresUserDir(), fileName) def getCipresFile(fileName): '''Looks in CIPRES user directory and then cipres system directory for the file.''' fp = getCipresUserFile(fileName) if fp is None: return getCipresResourcesFile(fileName) return fp def getCipresConfigFilePath(): '''returns a valid file path to the cipres_config.properties file looking in User locations first and then system location (or raises and AssertionError).''' fileName = 'system.properties' p = getCipresFile(fileName) if p is None: raise AssertionError, 'The required file %s was not found in the user ($CIPRES_USER_DIR or ~/cipres) or system ($CIPRES_ROOT/share/cipres/resources) location for this CIPRES installation' % fileName return p def cipresGetLogger(s): '''Wrapper around logging.getLogger that make sure that the pipres logging configuration file is read.''' global _loggerInitialized if not _loggerInitialized: _cipresInitLogger() import logging return logging.getLogger(s) def prependNumberToRename(fn): if not os.path.exists(fn): raise IOError, '%s does not exist' % fn dir = os.path.dirname(fn) basename = os.path.basename(fn) num = -1 nextName = basename nextPath = os.path.join(dir, nextName) while os.path.exists(nextPath): num += 1 nextName = str(num) + basename nextPath = os.path.join(dir, nextName) os.rename(fn, nextPath) return nextPath def expandPath(s): return os.path.expanduser(os.path.expandvars(s)) def tryRemove(*args): '''Silences OSErrors from os.remove()''' try: os.remove(os.path.join(*args)) return True except OSError: return False def tryRmdir(dirName): '''Silences OSErrors from os.rmdir()''' try: os.rmdir(dirName) return True except OSError: return False class FileNotFoundError(IOError, ValueError): '''Thrown when an argument that should be a file path does fails os.path.exists().''' def __init__(self, filePath): self.filePath = filePath def __str__(self): return repr(self.filePath) def openPathAndCallMode(openPathAndCallFilePath, openPathAndCallMode, openPathAndCallFunc , *args, **kwds): '''opens filePath and calls func on the opened file stream then closes the file. Raises InFileNotFoundError and IOError, as well as excpetions from func. \todo Need to deal with file locking (see http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/65203''' openPathAndCallFilePath = expandPath(openPathAndCallFilePath) if openPathAndCallMode.startswith('r') and not os.path.exists(openPathAndCallFilePath): raise FileNotFoundError, openPathAndCallFilePath _f = open(openPathAndCallFilePath, openPathAndCallMode) if not _f: m = 'Could not open ' + openPathAndCallFilePath raise IOError, m try: _ret = openPathAndCallFunc(_f, *args, **kwds) finally: _f.close() return _ret def openInPathAndCall(openPathAndCallFilePath, openPathAndCallFunc, *args, **kwds): return openPathAndCallMode(openPathAndCallFilePath, 'rU', openPathAndCallFunc, *args, **kwds) def openOutPathAndCall(openPathAndCallFilePath, openPathAndCallFunc, *args, **kwds): return openPathAndCallMode(openPathAndCallFilePath, 'w', openPathAndCallFunc, *args, **kwds) def openAppendPathAndCall(openPathAndCallFilePath, openPathAndCallFunc, *args, **kwds): return openPathAndCallMode(openPathAndCallFilePath, 'a', openPathAndCallFunc, *args, **kwds) def toCamelCaseRE(inStr, sep = re.compile(r'\s+')): '''Uses a regular expression to split a string and Capitalize every letter after the removed characters''' temp = sep.split(inStr) if len(temp) < 2: return inStr return str(temp[0] + ''.join([i[0].upper()+i[1:] for i in temp[1:] ])) def logException(logObj, asDebug = False): '''Logs the exception trace to the logObj as an error''' import traceback, cStringIO s = cStringIO.StringIO() traceback.print_exc(None, s) if asDebug: logObj.debug(s.getvalue()) else: logObj.error(s.getvalue()) def writeWrapped(out, strToWrite, length, firstOffset = 0, leftMargin='', noCountLeftMargin=''): '''writes to stream out, wrapping at length. NO trailing newline is added. firstOffset is the number of characters already written to the first line. leftMargin is a string to add to every newline (counts toward length). noCountLeftMargin is the same as leftMargin except that it does not count toward the length. Tabs are NOT expanded (and count as one character)''' if len < 1: raise ValueError, 'length must be a positive integer' currInd = 0 ls = len(strToWrite) marginLen = len(leftMargin) leftMargin = noCountLeftMargin + leftMargin if marginLen >= length: raise ValueError, 'margin must be shorter than line length' nextInd = currInd + length - firstOffset offset = firstOffset while nextInd + offset < ls: out.write('%s\n%s' % (strToWrite[currInd:nextInd], leftMargin)) currInd, nextInd = nextInd, nextInd + length - marginLen offset = 0 out.write(strToWrite[currInd:]) return out # This file is part of the PIPRes library # # The PIPRes library is free software; you can redistribute it # and/or modify it under the terms of the GNU Lesser General # Public License as published by the Free Software Foundation; # either version 2.1 of the License, or (at your option) any later # version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free # Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, # MA 02111-1307, USA