Source code for microprobe.code

# Copyright 2011-2021 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""":mod:`~.code` generation package

A package for driving the code generation process within microprobe.
The sub-packages of this package are:

- :mod:`~.wrapper`: Code generation wrapper package.

and the modules in this package are the following:

- :mod:`~.address`: Address object module.
- :mod:`~.bbl`: Building block object module.
- :mod:`~.benchmark`: Benchmark object module.
- :mod:`~.cfg`: Control flow graph object module.
- :mod:`~.context`: Context object module.
- :mod:`~.ins`: Instruction object module.
- :mod:`~.var`: Variable objects module.

Visit their respective documentation for further details.

This package defines the benchmark synthesizer (:class:`~.Synthesizer`),
which is the main object driving the code generation process in the
microprobe framework. This object provides a simple interface to define
the set of passes (:class:`~.Pass`) to apply to generate a benchmark
(:class:`~.Benchmark`).
"""

# Futures
from __future__ import absolute_import, annotations

# Built-in modules
import copy
import datetime
import os
from time import time
from typing import TYPE_CHECKING, Dict, List, Type

# Third party modules


# Own modules
import microprobe.code.wrapper
import microprobe.target
import microprobe.utils as cmd
from microprobe import MICROPROBE_RC
from microprobe.code.address import InstructionAddress
from microprobe.code.benchmark import benchmark_factory
from microprobe.exceptions import MicroprobeCodeGenerationError, \
    MicroprobeError, MicroprobeValueError
from microprobe.utils.imp import get_all_subclasses, load_source
from microprobe.utils.logger import DEBUG, get_logger, set_log_level
from microprobe.utils.misc import Progress, findfiles, open_generic_fd

# Type hinting
if TYPE_CHECKING:
    from microprobe.code.address import Address
    from microprobe.code.benchmark import Benchmark
    from microprobe.code.ins import Instruction
    from microprobe.code.wrapper import Wrapper
    from microprobe.passes import Pass
    from microprobe.target import Target

# Constants

#: Package logger (:class:`~.logging.Logger`).
LOG = get_logger(__name__)

_INIT = True
__all__ = ['get_wrapper', 'Synthesizer']


def _wrapper_subclasses() -> List[Type[microprobe.code.wrapper.Wrapper]]:
    return get_all_subclasses(microprobe.code.wrapper.Wrapper)


# Functions
[docs] def get_wrapper(name: str): """Return a wrapper object with name *name*. Look for the registered :class:`~.Wrapper` objects and return and instance of the one with name equal *name*. :param name: Wrapper name :type name: :class:`~.str` :return: A wrapper instance :rtype: :class:`~.Wrapper` """ global _INIT # pylint: disable=global-statement if _INIT: _INIT = False _import_default_wrappers() if MICROPROBE_RC['debugwrapper']: name = "DebugBinaryDouble" for elem in _wrapper_subclasses(): if elem.__name__ == name: return elem raise MicroprobeValueError( "Unknown wrapper '%s'. Available wrappers are: %s. " % (name, [elem.__name__ for elem in _wrapper_subclasses()]))
def _import_default_wrappers(): modules: List[str] = [] LOG.debug('Wrapper paths: %s', MICROPROBE_RC['wrapper_paths']) for path in MICROPROBE_RC['wrapper_paths']: for module in findfiles([path], r"wrappers/.+\.py$", full=True): module = os.path.realpath(module) if module not in modules: modules.append(module) lmodules = -1 while lmodules != len(modules): lmodules = len(modules) for module_file in modules[:]: name = (os.path.basename(module_file).replace(".py", "")) if name == "__init__": continue if name in microprobe.code.wrapper.__dict__: raise MicroprobeError( "Wrapper module name '%s' in '%s' already loaded. " % (name, module_file)) try: module = load_source(name, module_file) except MicroprobeValueError: continue microprobe.code.wrapper.__dict__[name] = module modules.remove(module_file) current_wrappers = \ [elem.__name__ for elem in _wrapper_subclasses() ] if len(current_wrappers) != len(set(current_wrappers)): for elem in set(current_wrappers): current_wrappers.remove(elem) overwrapper = list(set(current_wrappers))[0] raise MicroprobeError( "Module name '%s' in '%s' overrides an existing wrapper " "with name '%s'" % (name, module_file, overwrapper)) # Classes
[docs] class Synthesizer: """Benchmark synthesizer. The Synthesizer objects are in charge of creating :class:`~.Benchmark` objects based on a set of passes that have been previously defined. The typical workflow will be as follow. User instantiates the synthesizer, specifying also the :class:`~.Target` and the :class:`~.Wrapper`, which are required to know the target properties as well as how the code should be translated to the final representation. Then a set of :class:`~.Pass` are registered using the :meth:`add_pass` method. This passes will be applied in the provided order on a empty :class:`~.Benchmark` object when the :meth:`~.synthesize` method is called. Finally, the generated benchmark can be saved to disk by using the :meth:`~.save` method. A snippet of code of this process would be like: .. code:: python synthesizer = Synthesizer(...) # Instantiate object synthesizer.add_pass(...) # Add transformation passes ... synthesizer.add_pass(...) synthesizer.synthesize(...) # Apply the passes and generate a # benchmark synthesizer.save(...) # Save the benchmark The default structure of the benchmarks being synthesized is as follows: #. ``extra_raw['FILE_HEADER']`` contents #. ``wrapper.headers()`` contents #. **<global variable declarations>** #. ``wrapper.start_main()`` contents #. ``extra_raw['CODE_HEADER']`` contents #. **<global variable initializations>** #. ``wrapper.post_var()`` contents #. **<benchmark initialization code>** #. ``wrapper.start_loop()`` contents #. **<benchmark building blocks>** #. **<benchmark finalization code>** #. ``wrapper.end_loop()`` contents #. ``extra_raw['CODE_FOOTER']`` contents #. ``wrapper.end_main()`` contents #. ``wrapper.footer()`` contents #. ``extra_raw['FILE_FOOTER']`` contents where: - ``extra_raw`` contents are provided at initialization (see below) - wrapper object methods provide the decoupling between output format and the benchmark synthesizer - **variables** and **building block** contents are populated by the passes being applied .. note:: - This default code layout can be changed by sub-classing this class. - The :meth:`~.synthesize` method and the :meth:`~.save` method can be called multiple times to generate and save multiple benchmarks in case that some of the passes have some random behavior. Otherwise, it does not make sense ;). """
[docs] def __init__(self, target: Target, wrapper: Wrapper | List[Wrapper], no_scratch: bool = False, extra_raw: Dict[str, str] = {}, value="random", threads: int = 1): """Create a Synthesizer object. :param target: Benchmark target :type target: :class:`~.Target` :param wrapper: Wrapper object defining the output format :type wrapper: :class:`~.Wrapper` :param value: Default immediate value used for non-initialized immediates (Default: random) :type value: :class:`~.int` :param no_scratch: Disable automatic declaration of scratch variables required for code generation support (Default: False) :type no_scratch: :class:`~.bool` :param extra_raw: List of extra raw strings to be embedded in the final output :type extra_raw: :class:`~.list` of elements containing a ``name`` and a ``value`` attributes (Default: []) :return: A Synthesizer instance :rtype: :class:`~.Synthesizer` """ self._target = target # Extra arguments self._no_scratch = no_scratch self._raw = extra_raw self._immediate = value self._threads = threads self._passes: Dict[int, List[Pass]] = {} for idx in range(1, self._threads + 1): self._passes[idx] = [] self._current_thread = 1 if isinstance(wrapper, list): if len(wrapper) != self._threads: raise MicroprobeCodeGenerationError( "Number of wrappers provided (%d) is different from " "number of threads (%d) specified in the Synthesizer" % (len(wrapper), self._threads)) self._wrappers = wrapper else: self._wrappers = [wrapper] for _ in range(1, self._threads): new_wrapper = copy.deepcopy(wrapper) self._wrappers.append(new_wrapper) for wrapper in self._wrappers: wrapper.set_target(target)
@property def target(self): """Target attribute (:class:`~.Target`).""" return self._target @property def wrapper(self): """Wrapper attribute (:class:`~.Wrapper`).""" return self._wrappers[self._current_thread - 1]
[docs] def add_pass(self, synth_pass: Pass, thread_idx: int | None = None): """Add a pass to the benchmark synthesizer. :param synth_pass: New pass to add. :type synth_pass: :class:`~.Pass` """ if thread_idx is None: self._passes[self._current_thread].append(synth_pass) else: if not 1 <= thread_idx <= self._threads + 1: raise MicroprobeCodeGenerationError( "Unknown thread id: %d (min: 1, max: %d)" % (thread_idx, self._threads + 1)) self._passes[thread_idx].append(synth_pass)
[docs] def save(self, name: str, bench: Benchmark | None = None, pad: int | None = None): """Save a benchmark to disk. Save a synthesized benchmark to disk. If bench is not specified a benchmark is automatically synthesized using the :meth:`~.synthesize` method. :param name: Filename to save :type name: :class:`~.str` :param bench: Benchmark to save (Default value = None) :type bench: :class:`~.Benchmark` """ if bench is None: bench = self.synthesize() starttime = time() program_str = self._wrap(bench) endtime = time() LOG.info("Pass wrap: %s", (datetime.timedelta(seconds=endtime - starttime))) outputname = self._wrappers[0].outputname(name) fdx = open_generic_fd(outputname, 'wb') for elem in program_str: if elem == []: continue if isinstance(elem, str): elem = elem.encode() fdx.write(elem) fdx.flush() if pad is not None: file_size = os.path.getsize(outputname) while (file_size % pad) != 0: fdx.write(bytes(pad - (file_size % pad))) fdx.flush() file_size = os.path.getsize(outputname) fdx.close()
[docs] def synthesize(self): """Synthesize a benchmark. Synthesize a benchmark based on the set of passes that have been added using the :meth:`add_pass` method. :return: A new synthesized benchmark :rtype: :class:`~.Benchmark` """ LOG.info("Start synthesizing benchmark") # Create benchmark object bench = benchmark_factory(threads=self._threads) for thread_id in range(1, self._threads + 1): LOG.info("Start synthesizing benchmark thread %d" % thread_id) self.set_current_thread(thread_id) self._target.set_wrapper(self._wrappers[thread_id - 1]) bench.set_current_thread(thread_id) # Set default context -- environment context bench.set_context(self.wrapper.context()) # bench.set_context(Context()) for var in self.wrapper.required_global_vars(): bench.register_var(var, bench.context) if not self._no_scratch: # self._target.scratch_var.set_address(None) bench.register_var(self._target.scratch_var, bench.context) # Basic context reserved_registers = self._target.reserved_registers reserved_registers += self.wrapper.reserved_registers( reserved_registers, self._target) bench.context.add_reserved_registers(reserved_registers) if MICROPROBE_RC['debugpasses']: previous_level = LOG.getEffectiveLevel() set_log_level(DEBUG) passes = self._passes[thread_id] starttime = time() for idx, step in enumerate(passes): LOG.info("Applying pass %03d: %s", idx, step.__class__.__name__) if MICROPROBE_RC['verbose']: cmd.cmdline.print_info("Applying pass %03d: %s" % (idx, step.__class__.__name__)) step(bench, self.target) bench.add_pass_info(step.info()) endtime = time() LOG.debug("Applying pass %03d: %s : Execution time: %s", idx, step.__class__.__name__, datetime.timedelta(seconds=endtime - starttime)) starttime = endtime starttime = time() for idx, step in enumerate(passes): LOG.info("Checking pass %03d: %s", idx, step.__class__.__name__) try: pass_ok = step.check(bench, self.target) except NotImplementedError: LOG.warning("Checking pass %03d: %s. NOT IMPLEMENTED", idx, step.__class__.__name__) pass_ok = False endtime = time() if not pass_ok: LOG.warning("Checking pass %03d: %s. Test result: FAIL", idx, step.__class__.__name__) bench.add_warning( "Pass %03d: %s did not pass the check test" % (idx, step.__class__.__name__)) else: LOG.debug("Checking pass %03d: %s. Test result: OK", idx, step.__class__.__name__) LOG.debug("Checking pass %03d: %s : Execution time: %s", idx, step.__class__.__name__, datetime.timedelta(seconds=endtime - starttime)) starttime = endtime if MICROPROBE_RC['debugpasses']: set_log_level(previous_level) return bench
def _wrap_thread(self, bench: Benchmark, thread_id: int): """Wrap a thread in benchmark. This function wraps a thread using the synthesizer wrapper. The wrapping process is the process of converting the internal representation of the benchmark to the actual string that is written to a file, adding the necessary prologue and epilogue bytes of data. :param bench: Benchmark to wrap. :type bench: :class:`~.Benchmark` :param thread_id: Thread to wrap :type thread_id : :class:`~.int` :return: A string representation of the benchmark :rtype: :class:`~.str` """ bench.set_current_thread(thread_id) self.set_current_thread(thread_id) thread_str: List[str] = [] thread_str.append(self.wrapper.start_main()) if 'CODE_HEADER' in self._raw: thread_str.append("\n" + self._raw['CODE_HEADER'] + "\n") for var in bench.registered_global_vars(): if var.value is None: thread_str.append( self.wrapper.init_global_var(var, self._immediate)) thread_str.append(self.wrapper.post_var()) # TODO: This is hardcoded and assumes a loop always. Needs to be more # generic: pass a building block to a wrapper and it automatically # returns the required string for instr in bench.init: thread_str.append(self.wrapper.wrap_ins(instr)) code_str: List[str] = [] first = True instr = None for bbl in bench.cfg.bbls: for instr in bbl.instrs: if first is True: first = False if bench.init: code_str.append( self.wrapper.start_loop(instr, bench.init[0])) else: code_str.append(self.wrapper.start_loop(instr, instr)) code_str.append(self.wrapper.wrap_ins(instr)) if instr is None: raise MicroprobeCodeGenerationError( "No instructions found in benchmark") thread_str.extend(code_str) for instr in bench.fini: thread_str.append(self.wrapper.wrap_ins(instr)) last_instr = instr thread_str.append(self.wrapper.end_loop(last_instr)) if 'CODE_FOOTER' in self._raw: thread_str.append("\n" + self._raw['CODE_FOOTER'] + "\n") thread_str.append(self.wrapper.end_main()) return thread_str def _wrap(self, bench: Benchmark): """Wrap a benchmark. This function wraps a benchmark using the synthesizer wrapper. The wrapping process is the process of converting the internal representation of the benchmark to the actual string that is written to a file, adding the necessary prologue and epilogue bytes of data. :param bench: Benchmark to wrap. :type bench: :class:`~.Benchmark` :return: A string representation of the benchmark :rtype: :class:`~.str` """ for wrapper in self._wrappers: wrapper.set_benchmark(bench) self.set_current_thread(1) bench_str: List[str] = [] if 'FILE_HEADER' in self._raw: bench_str.append(self._raw['FILE_HEADER'] + "\n") bench_str.append(self.wrapper.headers()) for thread_id in range(1, self._threads + 1): self.set_current_thread(thread_id) bench.set_current_thread(thread_id) for var in sorted(bench.registered_global_vars(), key=lambda x: x.address): bench_str.append(self.wrapper.declare_global_var(var)) for thread_id in range(1, self._threads + 1): bench_str.extend(self._wrap_thread(bench, thread_id)) self.set_current_thread(1) bench_str.append(self.wrapper.footer()) if 'FILE_FOOTER' in self._raw: bench_str.append("\n" + self._raw['FILE_FOOTER'] + "\n") bench_str = [elem for elem in bench_str if elem != ""] return bench_str
[docs] def set_current_thread(self, idx: int): """ """ self._current_thread = idx if not 1 <= idx <= self._threads + 1: raise MicroprobeCodeGenerationError( "Unknown thread id: %d (min: 1, max: %d)" % (idx, self._threads + 1))
[docs] class TraceSynthesizer(Synthesizer): """Trace synthesizer. The Trace Synthesizer objects are in charge of creating :class:`~.Benchmark` objects based on a set of passes that have been previously defined. They operate in a similar fashion as :class:`~.Synthesizer` objects but differ on how the benchmark object is dumped. In this case a dynamic execution trace is dumped (i.e. an execution trace). Required dynamic information should be provided by the registered passes. The default structure of the benchmarks being synthesized is as follows: #. ``wrapper.headers()`` contents #. Dynamic execution trace from: - **<benchmark initialization code>** - **<benchmark building blocks>** - **<benchmark finalization code>** """
[docs] def __init__(self, target: Target, wrapper: Wrapper, show_trace: bool = False, maxins: int = 10000, start_addr: Address | None = None, no_scratch: bool = False, extra_raw: Dict[str, str] = {}, value="random", threads: int = 1): super(TraceSynthesizer, self).__init__(target, wrapper, no_scratch, extra_raw, value, threads) self._show_trace = show_trace self._maxins = maxins self._start_addr = start_addr
def _wrap(self, bench: Benchmark): """Wrap a benchmark. This function wraps a benchmark using the synthesizer wrapper. The wrapping process is the process of converting the internal representation of the benchmark to the actual string that is written to a file, adding the necessary prologue and epilogue bytes of data. :param bench: Benchmark to wrap. :type bench: :class:`~.Benchmark` :return: A string representation of the benchmark :rtype: :class:`~.str` """ self.wrapper.set_benchmark(bench) bench_str: List[str] = [] bench_str.append(self.wrapper.headers()) instructions: List[Instruction] = [] instructions_dict: Dict[InstructionAddress, Instruction] = {} instructions_next_dict: Dict[InstructionAddress, InstructionAddress] = {} for instr in bench.init: instructions.append(instr) for bbl in bench.cfg.bbls: for instr in bbl.instrs: instructions.append(instr) for instr in bench.fini: instructions.append(instr) for instr in instructions: instructions_dict[instr.address] = instr if (instr.branch or instr.syscall or instr.trap): instructions_next_dict[instr.address] = \ instr.decorators['NI']['value'] else: instructions_next_dict[instr.address] = \ instr.address + instr.architecture_type.format.length instr = instructions[0] if self._start_addr is not None: if "EA" in instr.decorators: instr = [ ins for ins in instructions if int(ins.decorators["EA"]['value'][0], 16) == self._start_addr ][0] else: instr = instructions_dict[InstructionAddress( base_address="code", displacement=(self._start_addr - bench.context.code_segment))] count = 0 cmd.cmdline.print_info("Maximum trace size: %s instructions " % self._maxins) progress = Progress(self._maxins, msg="Instructions generated:") while True: count = count + 1 if count > self._maxins: cmd.cmdline.print_info( "Max number of instructions (%d) reached. " "Stoping trace generation." % self._maxins) break try: instr_address = instructions_next_dict[instr.address] if not isinstance(instr_address, InstructionAddress): instr_address = next(instr_address) next_instr = instructions_dict[instr_address] except KeyError: if instr.mnemonic.upper() == "ATTN": cmd.cmdline.print_warning( "Processor ATTN instruction found. Stopping trace " "generation. ") break cmd.cmdline.print_error( "Jump from 0x%X to an unknown instruction in address " "0x%X found. Stoping trace generation." % (instr.address.displacement + bench.context.code_segment, instr_address.displacement + bench.context.code_segment)) exit(-1) progress() wrap_ins = self.wrapper.wrap_ins(instr, next_instr=next_instr, show=self._show_trace) bench_str.append(wrap_ins) instr = next_instr bench_str = [elem for elem in bench_str if elem != ""] return bench_str