Coverage for sm/core/util : 19%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
"""Fake util module"""
"""Execute a subprocess, then return its return code, stdout and stderr"""
proc = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) (stdout, stderr) = proc.communicate(inputtext) # Workaround for a pylint bug, can be removed after upgrade to # python 3.x or maybe a newer version of pylint in the future stdout = str(stdout) stderr = str(stderr) ret = proc.returncode return (ret, stdout, stderr)
"Follow symlinks to the actual file" absPath = path directory = '' while os.path.islink(absPath): directory = os.path.dirname(absPath) absPath = os.readlink(absPath) absPath = os.path.join(directory, absPath) return absPath
for _ in range(timeout): if len(glob.glob(path)): return True time.sleep(1) return False
for i in range(0,timeout): if not os.path.exists(path): return True time.sleep(1) return False
"""scsi_id_sanitise""" text = re.sub("^\s+", "", str_) # pylint: disable=W1401 return re.sub("\s+", "_", text) # pylint: disable=W1401
"""trivial""" return isinstance(value, basestring)
"""ported from SM util""" cmdlist_for_exec = [] cmdlist_for_log = [] for item in cmdlist: if is_string(item): cmdlist_for_exec.append(item) if scramble: if item.find(scramble) != -1: cmdlist_for_log.append("<filtered out>") else: cmdlist_for_log.append(item) else: cmdlist_for_log.append(item) else: cmdlist_for_exec.append(item[0]) cmdlist_for_log.append(item[1])
(ret, stdout, stderr) = doexec(cmdlist_for_exec) if ret != expect_rc: if stderr == '': stderr = stdout raise f_exceptions.XenError("Command", stderr.strip()) return stdout
"""Ditto""" return pread(cmdlist)
"""Get the SCSI id of a block device
Input: path -- (str) path to block device; can be symlink
Return: scsi_id -- (str) the device's SCSI id
Raise: f_exceptions.XenError """
if not path.startswith('/dev/'): path = '/dev/' + path.lstrip('/')
stdout = pread2([SCSI_ID_BIN, '-g', '--device', path])
return scsi_id_sanitise(stdout[:-1])
regex = re.compile("^[0-9a-f]{8}-(([0-9a-f]{4})-){3}[0-9a-f]{12}") return regex.search(s, 0)
def handler(signum, frame): raise TimeoutException() signal.signal(signal.SIGALRM, handler) signal.alarm(timeoutseconds) try: function(*arguments) finally: # Cancel the alarm signal so that it isn't fired later on signal.alarm(0)
if not os.path.exists(ISCSI_REFDIR): os.mkdir(ISCSI_REFDIR) filename = os.path.join(ISCSI_REFDIR, targetIQN) try: f = open(filename, 'a+') except: raise f_exceptions.XenError('LVMRefCount', message='file %s' % filename)
found = False refcount = 0 for line in filter(match_uuid, f.readlines()): refcount += 1 if line.find(uuid) != -1: found = True if not found: f.write("%s\n" % uuid) refcount += 1 f.close() return refcount
filename = os.path.join(ISCSI_REFDIR, targetIQN) if not os.path.exists(filename): return 0 try: f = open(filename, 'a+') except: raise f_exceptions.XenError('LVMRefCount', message='file %s' % filename) output = [] refcount = 0 for line in filter(match_uuid, f.readlines()): if line.find(uuid) == -1: output.append(line[:-1]) refcount += 1 if not refcount: os.unlink(filename) return refcount
# Re-open file and truncate f.close() f = open(filename, 'w') for i in range(0,refcount): f.write("%s\n" % output[i]) f.close() return refcount
syslog.openlog(ident, 0, facility) syslog.syslog(priority, "[%d] %s" % (os.getpid(), message)) syslog.closelog()
if LOGGING: for message_line in str(message).split('\n'): _logToSyslog(ident, _SM_SYSLOG_FACILITY, priority, message_line)
retries = 0 while True: try: return f() except Exception, e: SMlog("Got exception: %s. Retry number: %s" % (str(e),retries))
retries += 1 if retries >= maxretry: break
time.sleep(period)
return f()
""" Modified from _testHost in sm.util """ SMlog("testHost: Testing host/port: %s,%d" % (hostname,port)) try: sockinfo = socket.getaddrinfo(hostname, int(port))[0] except: SMlog('Exception occured getting IP for %s' % hostname) return False
timeout = 10
sock = socket.socket(sockinfo[0], socket.SOCK_STREAM) # Only allow the connect to block for up to timeout seconds sock.settimeout(timeout) try: sock.connect(sockinfo[4]) # Fix for MS storage server bug sock.send('\n') sock.close() return True except socket.error, reason: SMlog("testHost: Connect failed after %d seconds (%s) - %s" \ % (timeout, hostname, reason)) return False |