Source code for microprobe.utils.distrib

# Copyright 2011-2021 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""":mod:`microprobe.utils.distrib` module

"""

# Futures
from __future__ import absolute_import, division

# Built-in modules
import bisect
import itertools
from random import Random

# Third party modules

# Own modules
from microprobe.exceptions import MicroprobeValueError
from microprobe.utils.logger import get_logger
from microprobe.utils.misc import getnextf


# Constants
LOG = get_logger(__name__)
__all__ = [
    "Choice", "weighted_choice", "discrete_average", "shuffle",
    "locality",
    "sort_by_distance", "sort_by_usage", "generate_plain_profile",
    "generate_weighted_profile", "compute_weighted_profile_average",
    "regular_seq", "average", "pstdev"
]

# Functions


# Classes
[docs] class Choice(object): # pylint: disable-msg=too-few-public-methods """Choice """
[docs] def __init__(self, items): """ :param items: """ self._items = items[:] self._oitems = items[:] self._random = Random() self._random.seed(10)
def __call__(self, rnd=None, bis=bisect.bisect): """ :param rnd: (Default value = None) :param bis: (Default value = bisect.bisect) """ added_weights = [] last_sum = 0 if rnd is None: rnd = self._random.random for dummy, weight in self._items: last_sum += weight added_weights.append(last_sum) index = bis(added_weights, rnd() * last_sum) item = self._items[index][0] return item
[docs] def weighted_choice(items): """Returns a function that makes a weighted random choice from items. :param items: """ if isinstance(items, dict): items = list(items.items()) else: items = list(items) return Choice(items)
[docs] def discrete_average(average_val): """ :param average_val: """ if average_val == int(average_val): return lambda: int(average_val) low = int(average_val) high = low + 1 array = [low, high] weights = [1, 1] caverage = average(array, weights=weights) while abs(caverage - average_val) > 0.01: if caverage > average_val: weights[0] = weights[0] + 1 else: weights[1] = weights[1] + 1 caverage = average(array, weights=weights) return weighted_choice((list(zip(array, weights))))
[docs] def average(array, weights=None): """ :param array: :type array: :param weights: :type weights: """ if weights is None: weights = [1.0] * len(array) if len(weights) != len(array): raise MicroprobeValueError( "Length of weights not compatible with length of the array" ) ave_val = (sum( [ array[idx] * weights[idx] for idx in range(0, len(array)) ] ) * 1.0) / sum(weights) return ave_val
[docs] def pstdev(data): """Calculates the population standard deviation""" ldata = len(data) if ldata < 2: raise MicroprobeValueError( "variance requires at least two data points" ) data_ave = average(data) squared_sum = sum((x - data_ave)**2 for x in data) pvar = squared_sum / (ldata) return pvar**0.5
[docs] def regular_seq(items): """ :param items: :type items: """ total = 0 for item, weight in items.items(): total = total + weight sequence = [] for idx in range(0, total): for item, weight in items.items(): if weight == 0: continue every = (total // weight) - 1 if every == 0: sequence.append(item) elif idx % every == 0: sequence.append(item) return getnextf(itertools.cycle(sequence))
[docs] def shuffle(slist, threshold): """ :param slist: :param threshold: """ rlist = [] slist = sorted(slist) random = Random() random.seed(10) if threshold == -1: random.shuffle(slist) return slist if threshold == 0: return slist if len(slist) == 0: return [] init_value = slist[0] temp_list = [init_value] for val in slist[1:]: if (val - init_value) >= threshold: random.shuffle(temp_list) rlist = rlist + temp_list init_value = val temp_list = [val] else: temp_list.append(val) random.shuffle(temp_list) rlist = rlist + temp_list return rlist
[docs] def sort_by_distance( regs, distdict, useddict, distance, dummy_instr, dummy_idx ): """ :param regs: :param distdict: :param useddict: :param distance: :param dummy_instr: :param dummy_idx: """ req = 0 while req < 1: # LOG.debug(": %s", req) for key, value in distdict.items(): LOG.debug("%s : %s ", key, value) LOG.debug("Valid: %s, %s", key in regs, value == distance) if key in regs and value == distance: return key req = req + 1 LOG.warning( "Unable to get a used register in the requested distance." " Failback to usage sorting." ) return sort_by_usage(regs, useddict, distdict)
[docs] def sort_by_usage(regs, lastdict, dummy_defdict): """ :param regs: :param lastdict: :param dummy_defdict: """ assert len(regs) > 0 for reg in lastdict: LOG.debug("Dict key: %s", reg) if reg in regs: LOG.debug("Last used register: %s", reg) return reg # None of the regs has ben used yet # Return the first one return regs[0]
# LOG.critical("Regs: %s", regs) # LOG.critical("Dictionary keys: %s", lastdict.keys()) # raise MicroprobeException("Sort error")
[docs] def generate_plain_profile(elements): """ :param elements: """ return [(elem, 1) for elem in elements]
[docs] def generate_weighted_profile( elements, attribute, targetvalue, maxvalue=None, minvalue=None ): """ :param elements: :param attribute: :param targetvalue: :param maxvalue: (Default value = None) :param minvalue: (Default value = None) """ profile = generate_plain_profile(elements) if maxvalue is not None: profile = [ (entry, weight) for entry, weight in profile if getattr(entry, attribute) <= maxvalue ] if minvalue is not None: profile = [ (entry, weight) for entry, weight in profile if getattr(entry, attribute) >= minvalue ] aver = compute_weighted_profile_average(profile, attribute) count = 0 step = 1 random = Random() random.seed(10) if aver < targetvalue: above = [ idx for idx, entry in enumerate(profile) if getattr(entry[0], attribute) > targetvalue ] if len(above) > 0: while aver < targetvalue: index = random.choice(above) above.remove(index) profile[index] = (profile[index][0], profile[index][1] + step) aver = compute_weighted_profile_average(profile, attribute) if len(above) == 0: above = [ idx for idx, entry in enumerate(profile) if getattr(entry[0], attribute) > targetvalue ] count = count + 1 if count % 1000 == 0: step = step * 10 else: maximum = max( [ getattr(entry, attribute) for entry, weight in profile ] ) profile = [ (entry, weight) for entry, weight in profile if getattr(entry, attribute) == maximum ] elif aver > targetvalue: below = [ idx for idx, entry in enumerate(profile) if getattr(entry[0], attribute) < targetvalue ] if len(below) > 0: while aver > targetvalue: index = random.choice(below) below.remove(index) profile[index] = (profile[index][0], profile[index][1] + step) aver = compute_weighted_profile_average(profile, attribute) if len(below) == 0: below = [ idx for idx, entry in enumerate(profile) if getattr(entry[0], attribute) < targetvalue ] count = count + 1 if count % 1000 == 0: step = step * 10 else: minimum = min( [ getattr(entry, attribute) for entry, weight in profile ] ) profile = [ (entry, weight) for entry, weight in profile if getattr(entry, attribute) == minimum ] return profile
[docs] def compute_weighted_profile_average(profile, attribute): """ :param profile: :param attribute: """ return sum([getattr(entry, attribute) * weight for entry, weight in profile]) / \ sum([weight for entry, weight in profile])
[docs] def locality(values, locdef): """ """ length = locdef[0] repeat = locdef[1] if repeat <= 1: return values values = [ values[i:i + length] * repeat for i in range(0, len(values), length) ] values = [item for sublist in values for item in sublist] return values
[docs] def probability(value): """ """ assert value >= 0 and value <= 1, "Invalid probability" crandom = Random() crandom.seed(10) def func(): return crandom.uniform(0, 1) <= value return func
[docs] def regular_probability(value): """ Returns callable that returns True every value calls """ def func(): count = 0 while True: count = count + 1 ret = (count % value) == 0 yield ret generator = func() def func2(): return next(generator) return func2