Source code for microprobe.code.benchmark

# Copyright 2011-2021 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""":mod:`microprobe.code.benchmark` module

"""

# Futures
from __future__ import absolute_import, print_function, annotations
from typing import TYPE_CHECKING, Dict, List

# Own modules
from microprobe.code.cfg import Cfg
from microprobe import MICROPROBE_RC
from microprobe.code.address import Address
from microprobe.exceptions import MicroprobeCodeGenerationError
from microprobe.utils.logger import get_logger
from microprobe.utils.misc import OrderedDict
from microprobe.utils.typeguard_decorator import typeguard_testsuite

# Type hinting
if TYPE_CHECKING:
    from microprobe.code.context import Context
    from microprobe.code.ins import Instruction
    from microprobe.code.var import Variable

# Constants
LOG = get_logger(__name__)
__all__ = [
    "BuildingBlock", "Benchmark", "MultiThreadedBenchmark", "benchmark_factory"
]


# Functions
[docs] @typeguard_testsuite def benchmark_factory(threads: int = 1): if threads == 1: return Benchmark() else: return MultiThreadedBenchmark(num_threads=threads)
# Classes
[docs] @typeguard_testsuite class BuildingBlock(object): """Class to represent a benchmark building block. Class to represent a benchmark building block. The different building blocks of a benchmark such as function, basic blocks, loops, etc. should inherit from this class. """
[docs] def __init__(self): """ """ self._warnings: Dict[str, int] = {} self._info: List[str] = [] self._pass_info: List[str] = [] self._requirements: List[str] = []
[docs] def add_warning(self, message: str): """Add a warning message to the building block. :param message: Warning message :type message: :class:`~.str` """ if message not in self._warnings: self._warnings[message] = 1 else: self._warnings[message] += 1
@property def warnings(self): """List of warnings of the building block List of warnings of the building block (:class:`~.list` of :class:`~.str`) """ return self._warnings
[docs] def add_pass_info(self, message: str): """Add an pass information message to the building block. :param message: Information pass message :type message: :class:`~.str` """ self._pass_info.append(message)
[docs] def add_info(self, message: str): """Add an information message to the building block. :param message: Information message :type message: :class:`~.str` """ self._info.append(message)
@property def info(self): """List of information messages of the building block (:class:`~.list` of :class:`~.str`) """ return self._info @property def pass_info(self): """List of information pass messages of the building block (:class:`~.list` of :class:`~.str`) """ return self._pass_info
[docs] def add_requirement(self, message: str): """Add an requirement message to the building block. :param message: Requirement message :type message: :class:`~.str` """ self._requirements.append(message)
@property def requirements(self): """List of requirement messages of the building block (:class:`~.list` of :class:`~.str`) """ return self._requirements
[docs] @typeguard_testsuite class Benchmark(BuildingBlock): """Class to represent a benchmark (highest level building block)."""
[docs] def __init__(self): """ """ super(Benchmark, self).__init__() self._cfg = Cfg() self._global_vars: Dict[str, Variable] = OrderedDict() self._init: List[Instruction] = [] self._fini: List[Instruction] = [] self._vardisplacement = 0 self._context = None self._num_threads = 1
@property def init(self): """Initialization instructions Initialization instructions (:class:`~.list` of :class:`~.Instruction`) """ return self._init
[docs] def add_init(self, inits: List[Instruction], prepend: bool = False): """Appends the specified list of instructions to initialization list :param inits: List of instructions to be added :type inits: :class:`~.list` of :class:`~.Instruction` """ LOG.debug("Add Init") for init in inits: LOG.debug("NEW INIT INSTRUCTION: %s (prepend: %s)", init.assembly(), prepend) if not prepend: self._init = self._init + inits else: self._init = inits + self._init
[docs] def rm_init(self, inits: List[Instruction]): """Removes from the initialization list the specified instructions. :param inits: List of instructions to be removed :type inits: :class:`~.list` of :class:`~.Instruction` """ self._init = self._init[0:-len(inits)]
[docs] def add_fini(self, finis: List[Instruction]): """Appends the specified list of instructions to the finalization list :param finis: List of instructions to be added :type finis: :class:`~.list` of :class:`~.Instruction` """ self._fini = self._fini + finis
@property def fini(self): """Finalization instructions (:class:`~.list` of :class:`~.Instruction`)""" return self._fini @property def cfg(self): """Returns the benchmark control flow graph.""" return self._cfg
[docs] def set_cfg(self, cfg: Cfg): """Sets the benchmarks control flow graph. :param cfg: Control flow graph :type cfg: :class:`~.Cfg` """ self._cfg = cfg
[docs] def registered_global_vars(self): """Returns the list of registered global variables.""" return list(self._global_vars.values())
[docs] def set_var_displacement(self, displacement: int): # Displacement only can increase assert displacement > self._vardisplacement self._vardisplacement = displacement
[docs] def register_var(self, var: Variable, context: Context): """Registers the given variable as a global variable. :param var: Variable to register :type var: :class:`~.Variable` """ LOG.debug("Registering global var: '%s'", var.name) if var.name in self._global_vars: var2 = self._global_vars[var.name] if (var.value == var2.value and var.address == var2.address and var.address is not None and MICROPROBE_RC['safe_bin']): LOG.warning("Variable: '%s' registered multiple times!", var.name) return LOG.critical("Registered variables: %s", list(self._global_vars.keys())) raise MicroprobeCodeGenerationError( "Variable already registered: %s" % (var.name)) self._global_vars[var.name] = var if context.symbolic and var.address is None: LOG.debug("Context symbolic. No need to track base addresses") var_address = Address(base_address=var.name, displacement=0) var.set_address(var_address) elif var.address is None: LOG.debug("Context not symbolic and variable address not set") # if (self._vardisplacement == 0 and # context.data_segment is not None): # self._vardisplacement = context.data_segment address_ok = False while not address_ok: var_address = Address(base_address="data", displacement=self._vardisplacement) LOG.debug("Address before alignment: %s", var_address) align = var.align if align is None: align = 1 LOG.debug("Variable alignment: %s", align) LOG.debug("Current address: %s", var_address) if (var_address.displacement + context.data_segment) % align != 0: # alignment needed var_address += align - ( (var_address.displacement + context.data_segment) % align) LOG.debug("Address after alignment: %s", var_address) LOG.debug("Current var displacement: %x", self._vardisplacement) var.set_address(var_address) over = self._check_variable_overlap(var) if over is not None: self._vardisplacement = max( self._vardisplacement, over.address.displacement + over.size) continue address_ok = True LOG.debug("Variable registered at address: '%s'", var_address) self._vardisplacement = var_address.displacement + var.size else: LOG.debug("Using pre-defined address: '%s'", var.address) over = self._check_variable_overlap(var) if over is not None: raise MicroprobeCodeGenerationError( "Variable '%s' overlaps with variable '%s'" % (var.name, over.name))
def _check_variable_overlap(self, var: Variable): return None vara = var.address vara2 = vara + var.size for rvar in self._global_vars.values(): if var.name == rvar.name: continue rvara = rvar.address rvara2 = rvara + rvar.size if rvara < vara2 and rvara2 >= vara2: return rvar if rvara <= vara and rvara2 > vara: return rvar if rvara >= vara and rvara2 <= vara2: return rvar if rvara <= vara and rvara2 >= vara2: return rvar return None
[docs] def set_context(self, context: Context): """Set the execution context of the building block. :param context: Execution context :type context: :class:`~.Context` """ self._context = context
@property def context(self): """Return benchmark's context""" return self._context
[docs] def add_instructions(self, instrs: List[Instruction], after: Instruction | None = None, before: Instruction | None = None): """Adds the given instruction to the building block. Adds the given instructions right after the specified instruction and before the specified one. If the condition can not be fulfilled an specification exception is raised. :param instrs: Instruction to add :type instrs: :class:`~.list` of :class:`~.Instruction` :param after: Instruction after which to add the new instruction (Default value = None) :type after: :class:`~.Instruction` :param before: Instruction before which to add the new instruction (Default value = None) :type before: :class:`~.Instruction` """ if after is None and before is None: bbl = self.cfg.last_bbl() bbl.insert_instr(instrs) elif after is not None and before is None: idx_after = self.cfg.index(after) bbl = self.cfg.get_bbl(idx_after) bbl.insert_instr(instrs, after=after) elif after is None and before is not None: idx_before = self.cfg.index(before) bbl = self.cfg.get_bbl(idx_before) bbl.insert_instr(instrs, before=before) elif after is not None and before is not None: idx_before = self.cfg.index(before) idx_after = self.cfg.index(after) if idx_before < idx_after: bbl = self.cfg.get_bbl(idx_before) bbl.insert_instr(instrs) elif idx_before == idx_after: bbl = self.cfg.get_bbl(idx_before) bbl.insert_instr(instrs, after=after, before=before) else: raise MicroprobeCodeGenerationError( "Attempt to inserst instruction in a position that it is" " not possible")
@property def code_size(self): """Return benchmark's size""" size = 0 for bbl in self.cfg.bbls: for instr in bbl.instrs: size += instr.architecture_type.format.length return size @property def labels(self): """Return the a list of the current defined labels and symbols""" labels = [key.upper() for key in self._global_vars.keys()] for bbl in self.cfg.bbls: for instr in bbl.instrs: if instr.label is not None: labels.append(instr.label.upper()) labels += [ instr.label.upper() for instr in self._fini if instr.label is not None ] labels += [ instr.label.upper() for instr in self._init if instr.label is not None ] return labels
[docs] def set_current_thread(self, idx: int): """ """ self._current_thread = idx if not 1 <= idx <= self._num_threads + 1: raise MicroprobeCodeGenerationError( "Unknown thread id: %d (min: 1, max: %d)" % (idx, self._num_threads + 1))
[docs] @typeguard_testsuite class MultiThreadedBenchmark(Benchmark): """ """
[docs] def __init__(self, num_threads: int = 1): """ """ self._num_threads = num_threads self._threads: Dict[int, Benchmark] = {} for idx in range(1, num_threads + 1): self._threads[idx] = Benchmark() self._current_thread = 1
def __getattr__(self, attribute_name: str): return self._threads[self._current_thread].__getattribute__( attribute_name)