# Copyright 2011-2021 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""":mod:`microprobe.utils.misc` module
"""
# Futures
from __future__ import absolute_import, division, print_function
# Built-in modules
import bz2
import gzip
import itertools
import os
import random
import re
import sys
import timeit
# Third party modules
# Own modules
from microprobe.exceptions import MicroprobeDuplicatedValueError
from microprobe.utils.logger import get_logger
from collections import OrderedDict
# Constants
LOG = get_logger(__name__)
__all__ = [
"natural_sort", "dict2OrderedDict", "RejectingDict",
"RejectingOrderedDict", "Pickable", "primes", "closest_divisor",
"smart_copy_dict", "findfiles", "RNDINT", "RNDFP", "twocs_to_int",
"int_to_twocs", "iter_flatten", "which", "Progress",
"range_to_sequence", "longest_common_substr", "open_generic_fd",
"getnextf", "move_file", "compress_file"
]
_RND_SEED = 13 # My favorite number ;)
_RNDINT = random.Random()
_RNDINT.seed(_RND_SEED)
_RNDFP = random.Random()
_RNDFP.seed(_RND_SEED)
# Functions
[docs]
def getnextf(itr):
def next_function():
return next(itr)
return next_function
[docs]
def natural_sort(input_list):
"""
:param input_list:
"""
def convert(text):
"""
:param text:
:type text:
"""
if text.isdigit():
return int(text)
else:
return text.lower()
def alphanum_key(key):
"""
:param key:
:type key:
"""
return [convert(c) for c in re.split('([0-9]+)', key)]
return sorted(input_list, key=alphanum_key)
[docs]
def dict2OrderedDict(my_dict): # pylint: disable-msg=C0103
"""
:param my_dict:
"""
odict = OrderedDict()
for key in natural_sort(list(my_dict.keys())):
odict[key] = my_dict[key]
return odict
[docs]
def twocs_to_int(val, bits):
"""compute the int value of a two compliment int"""
assert len(bin(val)) - 2 <= bits, (val, bits) # check if it fits
if (val & (1 << (bits - 1))) != 0: # if sign bit is set -> 8bit: 128-255
val = val - (1 << bits) # compute negative value
return val # return positive value as is
[docs]
def int_to_twocs(val, bits):
"""compute the two compliment of a int"""
# check if it fits
if val < 0:
assert len(bin(val)) - 3 <= bits
else:
assert len(bin(val)) - 2 <= bits, "%s - %s" % (bin(val), bits)
return int(bin(val & int('0b' + '1' * bits, 2)), 2)
[docs]
def shift_with_sign(val, bits, shift):
"""shift value extending sign in twocs format"""
# check if it fits
if val < 0:
assert len(bin(val)) - 3 <= bits
else:
assert len(bin(val)) - 2 <= bits
ormask = int('1' * shift + '0' * (bits - shift), 2)
mask = int('1' * bits, 2)
return ((val >> shift) | ormask) & mask
[docs]
def primes(number):
"""
:param number:
"""
primfac = []
divisor = 2
while divisor * divisor <= number:
while (number % divisor) == 0:
# supposing you want multiple factors repeated
primfac.append(divisor)
number /= divisor
divisor += 1
if number > 1:
primfac.append(number)
return primfac
[docs]
def closest_divisor(target, closer):
"""
:param target:
:param closer:
"""
mprimes = primes(int(target))
value = 1
for elem in reversed(mprimes):
if value * elem > int(closer * 1.5):
continue
value = value * elem
if value < (closer * 0.1):
value = closer
return value
[docs]
def smart_copy_dict(olddict):
"""
:param olddict:
"""
new_dict = {}
if isinstance(olddict, RejectingDict):
new_dict = RejectingDict()
for key, value in olddict.items():
if isinstance(key, list):
key = key[:]
elif isinstance(key, (dict, RejectingDict)):
key = smart_copy_dict(key)
if isinstance(value, list):
value = value[:]
elif isinstance(value, (dict, RejectingDict)):
value = smart_copy_dict(value)
new_dict[key] = value
return new_dict
[docs]
def findfiles(paths, regexp, full=False, maxcount=10000):
"""
:param paths:
:type paths:
:param regexp:
:type regexp:
"""
LOG.debug("Start find files")
LOG.debug("Paths: %s", paths)
LOG.debug("Regexp: '%s'", regexp)
results = []
re_obj = re.compile(regexp)
path_seen = []
count = 0
for path in paths:
if path in path_seen:
continue
all_files = os.walk(path)
for base_path, dummy_dirnames, filenames in all_files:
for filename in filenames:
count += 1
if count > maxcount:
LOG.warning(
"Maximum number of files checked."
" Stopping the search."
)
break
fullname = os.path.join(base_path, filename)
if full:
filename = fullname
if fullname in results:
continue
if re_obj.search(filename):
results.append(fullname)
LOG.debug("File match: %s", results[-1])
if count > maxcount:
break
if count > maxcount:
break
path_seen.append(path)
LOG.debug("End find files")
return results
[docs]
def RNDINT(maxmin=None): # pylint: disable-msg=invalid-name
"""Returns a random integer between 0 and 2^32. """
if maxmin is None:
return _RNDINT.randint(0, (2**64))
else:
return _RNDINT.randint(*maxmin)
[docs]
def RNDFP(x=0, y=1): # pylint: disable-msg=invalid-name
"""Returns a random floating point between x and y """
return _RNDFP.uniform(x, y)
[docs]
def iter_flatten(iterable):
"""
:param iterable:
:type iterable:
"""
iterator = iter(iterable)
for element in iterator:
if isinstance(element, (list, tuple)):
for another_element in iter_flatten(element):
yield another_element
else:
yield element
[docs]
def range_to_sequence(start, *args):
"""
"""
if len(args) > 2:
raise NotImplementedError(
"This function does not support more than 3 arguments")
step = 1
if isinstance(start, str):
start = int(start, 0)
if len(args) > 0:
end = args[0]
if isinstance(end, str):
end = int(end, 0)
end = end + 1
else:
return [start]
if len(args) > 1:
step = args[1]
if isinstance(step, str):
step = int(step, 0)
retval = list(
itertools.islice(
itertools.count(start, step),
(end - start + step - 1 + 2 * (step < 0)) // step
)
)
if end - 1 not in retval:
retval.append(end-1)
return retval
[docs]
def range_to_sequence_float(start, *args):
"""
"""
if len(args) > 2:
raise NotImplementedError(
"This function does not support more than 3 arguments")
step = 1
if isinstance(start, str):
start = float(start, 0)
if len(args) > 0:
end = args[0]
if isinstance(end, str):
end = float(end, 0)
end = end + 0.0000000000001
else:
return [start]
if len(args) > 1:
step = args[1]
if isinstance(step, str):
step = float(step, 0)
retval = [start]
cval = start
while (cval + step) < end:
retval.append(cval + step)
cval += step
return retval
[docs]
def which(program):
"""
:param program:
:type program:
"""
def is_exe(fpath):
"""
:param fpath:
:type fpath:
"""
return os.path.exists(fpath) and os.access(fpath, os.X_OK)
def ext_candidates(fpath):
"""
:param fpath:
:type fpath:
"""
yield fpath
for ext in os.environ.get("PATHEXT", "").split(os.pathsep):
yield fpath + ext
fpath, dummy_fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
exe_file = os.path.join(path, program)
for candidate in ext_candidates(exe_file):
if is_exe(candidate):
return candidate
return None
[docs]
def longest_common_substr(str1, str2):
"""
:param str1:
:type str1:
:param str2:
:type str2:
"""
if not str1 or not str2:
return ""
str1_char, str1_rest, str2_char, str2_rest = \
str1[0], str1[1:], str2[0], str2[1:]
if str1_char == str2_char:
return str1_char + longest_common_substr(str1_rest, str2_rest)
else:
return max(
longest_common_substr(str1, str2_rest),
longest_common_substr(str1_rest, str2),
key=len)
# Classes
[docs]
class RejectingDict(dict):
"""A dictionary that raise an exception if the key is already set.
"""
def __setitem__(self, key, value):
"""
:param key:
:param value:
"""
if key in self:
raise MicroprobeDuplicatedValueError(
"Key '%s' is already present" % str(key)
)
else:
return super(RejectingDict, self).__setitem__(key, value)
[docs]
class RejectingOrderedDict(OrderedDict):
"""An ordered dictionary that raises an exception if key is already set.
"""
def __setitem__(self, key, value): # pylint: disable=arguments-differ
"""
:param key:
:param value:
"""
if key in self:
raise MicroprobeDuplicatedValueError(
"Key '%s' is already present" % str(key)
)
else:
return super(RejectingOrderedDict, self).__setitem__(key, value)
[docs]
class Pickable(object): # pylint: disable-msg=R0903
"""A helper class to implement the pickling interface.
Objects that inherit from this class are automatically serialized/
deserialized to disk whenever is needed. Check what is 'pickle' in
python for more details.
"""
def __getstate__(self):
""" """
return self.__dict__
def __setstate__(self, state):
"""
:param state:
"""
self.__dict__.update(state)
[docs]
class Progress(object): # pylint: disable-msg=too-few-public-methods
"""A counting progress indicator."""
[docs]
def __init__(self, total, msg="", out=sys.stderr):
"""
:arg total: Objective progress count.
:type total: int
:arg msg: Message to prefix to the progress indicator.
:type msg: str
:arg out: Output file.
:type out: file
"""
self._total = total
self._msg = msg
self._count = 0
self._fd = out
self._fmt = " %%s %%%%%dd / %%d (%%%%2.1f/100) " \
"ETA: %%%%s \r" % (len(str(self._total))
)
self._fmt = self._fmt % (msg, self._total)
self._start = timeit.default_timer()
self._eta = "???"
self._skip_delete = False
self._module = 0
def __call__(self, increment=1):
"""Increment the progress indicator by *increment*."""
self._count += increment
if (self._total / 100) == 0:
return
module = self._count % (self._total / 100)
required_time = 0
if module < self._module:
time_per_increment = (
timeit.default_timer() - self._start
) / self._count
required_time = time_per_increment * (self._total - self._count)
minut, sec = divmod(required_time, 60)
hour, minut = divmod(minut, 60)
self._eta = "%dh:%02dm:%02ds" % (hour, minut, sec)
self._module = module
if required_time > 1:
self._fd.write(
self._fmt % (
self._count, (
(float(self._count) / self._total)
) * 100, self._eta
)
)
def __del__(self):
"""Delete the progress indicator from screen."""
if self._total < 100:
return
length = len(self._fmt % (0, 0, 0))
self._fd.write((" " * length) + "\r")
total_time = timeit.default_timer() - self._start
minut, sec = divmod(total_time, 60)
hour, minut = divmod(minut, 60)
if minut > 1:
time_str = "%s Elapsed time: %dh:%02dm:%02ds\n" % \
(self._msg, hour, minut, sec)
self._fd.write(time_str)
[docs]
def open_generic_fd(filename, mode):
if filename.endswith(".gz"):
if 'b' not in mode:
mode += 'b'
fd = gzip.open(filename, mode, compresslevel=9)
elif filename.endswith(".bz2"):
if 'b' not in mode:
mode += 'b'
fd = bz2.BZ2File(filename, mode, compresslevel=9)
else:
fd = open(filename, mode)
return fd
[docs]
def compress_file(source):
move_file(source, source+".gz")
[docs]
def move_file(source, target):
sfd = open_generic_fd(source, 'rb')
tfd = open_generic_fd(target, 'wb')
# tfd.write("".join(sfd.readlines()).encode())
tfd.writelines(sfd)
sfd.close()
tfd.close()
os.remove(source)