# Copyright 2011-2021 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""":mod:`microprobe.passes.instruction` module
"""
# Futures
from __future__ import absolute_import, division, print_function, annotations
# Built-in modules
import copy
import itertools
import multiprocessing as mp
import pickle
from typing import TYPE_CHECKING, List
import random
# Own modules
import microprobe.code.ins
import microprobe.passes
import microprobe.utils.distrib
from microprobe import MICROPROBE_RC
from microprobe.code.address import Address, InstructionAddress
from microprobe.code.ins import instruction_set_def_properties
from microprobe.code.var import Variable
from microprobe.exceptions import MicroprobeCodeGenerationError, \
MicroprobeUncheckableEnvironmentWarning
from microprobe.target.isa.instruction import InstructionType
from microprobe.target.isa.register import Register
from microprobe.utils.asm import interpret_asm
from microprobe.utils.cmdline import print_info
from microprobe.utils.distrib import generate_weighted_profile, \
weighted_choice
from microprobe.utils.logger import get_logger
from microprobe.utils.misc import RejectingDict, closest_divisor, \
Progress, iter_flatten, getnextf
# Type hinting
if TYPE_CHECKING:
from microprobe.code.ins import Instruction
# Constants
LOG = get_logger(__name__)
__all__ = [
'SetInstructionTypePass',
'ReplaceInstructionByTypePass',
'SetInstructionTypeByProfilePass',
'SetRandomInstructionTypePass',
'SetInstructionTypeBySequencePass',
'ReproduceSequencePass',
'SetInstructionTypeByPropertyPass',
'InsertInstructionSequencePass',
'DIDTSimplePass',
'SetInstructionTypeByAlternatingSequencesPass',
'SetInstructionOperandsByOpcodePass',
'SetInstructionTypeByElementPass',
'AddAssemblyByIndexPass',
'ReplaceLoadInstructionsPass',
'AddOnePass',
'DisableAsmByOpcodePass',
]
# Functions
# Classes
[docs]
class AddAssemblyByIndexPass(microprobe.passes.Pass):
"""AddAssemblyByIndexPass pass.
"""
[docs]
def __init__(self, assembly, allow_registers=None, index=-1):
"""
:param assembly:
:type assembly:
"""
super(AddAssemblyByIndexPass, self).__init__()
self._asm = assembly
self._aregs = []
if allow_registers is not None:
self._aregs = allow_registers
self._index = index
self._description = "Append the '%s' instructions at index of the"\
"building block." % ";".join(assembly)
def __call__(self, building_block, target):
"""
:param building_block:
:param target:
"""
instructions_def = interpret_asm(
self._asm, target, building_block.labels
)
instructions = []
for definition in instructions_def:
instruction = microprobe.code.ins.Instruction()
instruction_set_def_properties(
instruction,
definition,
building_block=building_block,
target=target,
allowed_registers=self._aregs
)
instructions.append(instruction)
if self._index < 0:
building_block.add_instructions(
instructions,
after=building_block.cfg.bbls[-1].instrs[-1]
)
elif self._index == 0:
building_block.add_instructions(
instructions,
before=building_block.cfg.bbls[0].instrs[0]
)
else:
cindex = 0
ains = None
instr = None
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
cindex = cindex + 1
if instr is None:
raise MicroprobeCodeGenerationError(
"Empty basic block found"
)
if cindex == self._index:
ains = instr
building_block.add_instructions(instructions, after=ains)
return
building_block.add_instructions(
instructions,
after=building_block.cfg.bbls[-1].instrs[-1]
)
[docs]
class SetInstructionTypePass(microprobe.passes.Pass):
"""SetInstructionTypePass pass.
"""
[docs]
def __init__(self, instr, allow_registers=None):
"""
:param instr:
"""
super(SetInstructionTypePass, self).__init__()
self._instr = instr
self._aregs = []
if allow_registers is not None:
self._aregs = allow_registers
def __call__(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
instr.set_arch_type(self._instr)
for reg in self._aregs:
instr.add_allow_register(reg)
return []
[docs]
class ReplaceInstructionByTypePass(microprobe.passes.Pass):
"""ReplaceInstructionByTypePass pass.
"""
[docs]
def __init__(self, instr1, instr2, every):
"""
:param instr1:
:param instr2:
:param every:
"""
super(ReplaceInstructionByTypePass, self).__init__()
if not isinstance(instr1, list):
instr1 = [instr1]
self._instr1 = instr1
self._instr2 = instr2
self._every = every
self._description = "Replace '%s' by '%s' every '%d'" % (
[instr.name for instr in instr1],
instr2.name, every
)
def __call__(self, building_block, target):
"""
:param building_block:
:param target:
"""
replaced = False
if self._every == 0:
return
count = 0
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
if instr.architecture_type in self._instr1:
count = count + 1
if (count % self._every) != 0:
continue
replaced = True
instr.set_arch_type(self._instr2)
# reserve implicit operands (if they are not already
# reserved) and add them as allowed
for operand_descriptor in instr.implicit_operands:
register = list(operand_descriptor.type.values())[0]
instr.add_allow_register(register)
if operand_descriptor.is_output and register not \
in building_block.context.reserved_registers \
and register not in target.control_registers:
building_block.context.add_reserved_registers(
[register]
)
context_fix_functions = instr.check_context(
building_block.context
)
for context_fix_function in context_fix_functions:
try:
context_fix_function(target, building_block)
except MicroprobeUncheckableEnvironmentWarning as wue:
building_block.add_warning(str(wue))
# if not replaced:
# raise MicroprobeCodeGenerationError(
# "Unable to replace %s by %s every %d times" %
# ([instr.name for instr in self._instr1],
# self._instr2.name, self._every))
[docs]
class SetInstructionTypeByElementPass(microprobe.passes.Pass):
"""SetInstructionTypeByElementPass pass.
"""
[docs]
def __init__(
self,
target,
components,
profile,
block=None,
maxlatency=None,
minlatency=None,
avelatency=None,
any_comp=False
):
if block is None:
block = []
super(SetInstructionTypeByElementPass, self).__init__()
profile = _get_profile(
target,
components,
profile,
block,
maxlatency=maxlatency,
minlatency=minlatency,
avelatency=avelatency,
any_comp=any_comp
)
self._func = microprobe.utils.distrib.weighted_choice(profile)
self._description = "Profile by component"
def __call__(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
instr.set_arch_type(self._func())
return []
[docs]
class SetInstructionTypeByProfilePass(microprobe.passes.Pass):
"""SetInstructionTypeByProfilePass pass.
"""
[docs]
def __init__(self, profile):
"""
:param profile:
"""
super(SetInstructionTypeByProfilePass, self).__init__()
self._func = microprobe.utils.distrib.weighted_choice(profile)
def __call__(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
instr.set_arch_type(self._func())
return []
[docs]
class SetRandomInstructionTypePass(microprobe.passes.Pass):
"""SetRandomInstructionTypePass pass.
"""
[docs]
def __init__(self, instructions: List[Instruction], rand: random.Random):
"""
:param instructions:
"""
super(SetRandomInstructionTypePass, self).__init__()
self._instrs = instructions
if rand is None:
rand = random.Random()
rand.seed(13)
self._func = rand.choice
def __call__(self, building_block, target):
"""
:param building_block:
:param target:
"""
for idx, instr in enumerate(self._instrs):
if not isinstance(instr, str):
continue
self._instrs[idx] = target.isa.instructions[instr]
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
instr.set_arch_type(self._func(list(self._instrs)))
# reserve implicit operands (if they are not already
# reserved) and add them as allowed
for operand_descriptor in instr.implicit_operands:
register = list(operand_descriptor.type.values())[0]
instr.add_allow_register(register)
if operand_descriptor.is_output and register not \
in building_block.context.reserved_registers \
and register not in target.control_registers:
building_block.context.add_reserved_registers(
[register]
)
context_fix_functions = instr.check_context(
building_block.context
)
for context_fix_function in context_fix_functions:
try:
context_fix_function(target, building_block)
except MicroprobeUncheckableEnvironmentWarning as wcheck:
building_block.add_warning(str(wcheck))
[docs]
def check(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
instrs = {}
instrs_per = {}
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
if instr.architecture_type not in list(instrs.keys()):
instrs[instr.architecture_type] = 1
else:
instrs[instr.architecture_type] += 1
total = sum(instrs.values())
for key, value in instrs.items():
instrs_per[key] = (value) / total
total_per = sum(instrs_per.values())
assert abs(total_per - 1.0) < 0.0001, total_per
average_value = total_per / len(list(instrs_per.values()))
for key in sorted(instrs_per):
value = instrs_per[key]
if abs(value - average_value) > 0.05:
LOG.debug("Deviation: %.5f > 0.05", abs(value - average_value))
LOG.debug("Instruction: %s", key)
LOG.debug("Count: %d", instrs[key])
return False
return True
[docs]
class SetInstructionTypeBySequencePass(microprobe.passes.Pass):
"""SetInstructionTypeBySequencePass pass.
"""
[docs]
def __init__(self, instrs, prepend=None,
allow_registers=None):
"""
:param instrs:
"""
super(SetInstructionTypeBySequencePass, self).__init__()
for instr in instrs:
if not isinstance(instr, InstructionType):
raise MicroprobeCodeGenerationError(
"Invalid sequence definition. '%s' is not an instruction"
" type " % instr
)
self._sequence = instrs
self._func = getnextf(itertools.cycle(instrs))
self._description = "Repeat the instruction sequence '%s'" % \
([instruction.name for instruction in self._sequence])
if prepend is None:
prepend = []
self._prepend = prepend
self._aregs = []
if allow_registers is not None:
self._aregs = allow_registers
def __call__(self, building_block, target):
"""
:param building_block:
:param target:
"""
count = 0
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
if count < len(self._prepend):
instruction = self._prepend[count]
else:
instruction = self._func()
count += 1
LOG.debug("Set instruction type: %s", instruction)
instr.set_arch_type(instruction)
for reg in self._aregs:
instr.add_allow_register(reg)
# reserve implicit operands (if they are not already
# reserved) and add them as allowed
for operand_descriptor in instr.implicit_operands:
register = list(operand_descriptor.type.values())[0]
instr.add_allow_register(register)
if (
operand_descriptor.is_output and register not in
building_block.context.reserved_registers and
register not in target.control_registers
):
building_block.context.add_reserved_registers(
[register]
)
context_fix_functions = instr.check_context(
building_block.context
)
for context_fix_function in context_fix_functions:
try:
context_fix_function(target, building_block)
except MicroprobeUncheckableEnvironmentWarning as wcheck:
building_block.add_warning(str(wcheck))
[docs]
def check(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
pass_ok = True
func = getnextf(itertools.cycle(self._sequence))
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
pass_ok = pass_ok and (instr.architecture_type == func())
return pass_ok
[docs]
class ReproduceSequencePass(microprobe.passes.Pass):
"""ReproduceSequencePass pass.
"""
[docs]
def __init__(self, seq):
"""
:param seq:
"""
super(ReproduceSequencePass, self).__init__()
self._sequence = seq[:]
self._description = "Reproduce instruction sequence"
def __call__(self, building_block, target):
"""
:param building_block:
:param dummy_target:
"""
if len(self._sequence) > MICROPROBE_RC['parallel_threshold']:
if MICROPROBE_RC['verbose']:
print_info("Enabling parallel processing")
return self._parallel(building_block, target)
sequence = iter(self._sequence)
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
definition = next(sequence)
instruction_set_def_properties(
instr,
definition,
building_block=building_block,
target=target
)
return []
def _parallel(self, building_block, target):
def _iter_zip(list1, list2, extra=None):
idx = 0
try:
while True:
item1 = next(list1)
item2 = next(list2)
if extra is None:
yield (item1, item2, idx)
else:
yield tuple([item1, item2] + extra + [idx])
idx = idx + 1
except StopIteration:
return
props = iter(self._sequence)
for bbl in building_block.cfg.bbls:
instrs = iter(bbl.instrs)
instrs_props = list(_iter_zip(instrs, props,
extra=[building_block, target]))
csize = max(len(bbl.instrs) // MICROPROBE_RC['cpus'], 1)
instrs_propsl = (instrs_props[idx:idx + csize]
for idx in range(
0, len(self._sequence), csize
)
)
if MICROPROBE_RC['verbose']:
print_info("Start mapping to %s threads ... " %
MICROPROBE_RC['cpus'])
extra_args = {}
extra_args['show_progress'] = True
processes = []
queues = []
for chunk in instrs_propsl:
queue = mp.Queue()
extra_args['queue'] = queue
proc = mp.Process(target=_set_props,
args=(chunk,),
kwargs=extra_args)
processes.append(proc)
queues.append(queue)
proc.start()
extra_args['show_progress'] = False
new_instrs = []
for queue in queues:
new_instrs += queue.get()
progress = Progress(
len(bbl.instrs),
msg="Setting instruction properties: "
)
instrs = iter(bbl.instrs)
for instr, new_instr in zip(instrs, new_instrs):
bbl.reset_instruction(instr, new_instr)
progress()
for queue in queues:
queue.close()
queue.join_thread()
for process in processes:
process.join()
process.terminate()
return []
[docs]
def check(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
pass_ok = True
func = getnextf(itertools.cycle(self._sequence))
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
pass_ok = pass_ok and (instr.architecture_type == func())
return pass_ok
[docs]
class SetInstructionTypeByPropertyPass(SetInstructionTypeBySequencePass):
"""SetInstructionTypeByPropertyPass pass.
"""
[docs]
def __init__(
self,
instructions,
attribute,
targetvalue,
maxvalue=None,
minvalue=None
):
"""
:param instructions:
:param attribute:
:param targetvalue:
:param maxvalue: (Default value = None)
:param minvalue: (Default value = None)
"""
super(SetInstructionTypeByPropertyPass, self).__init__(instructions)
self._targetvalue = targetvalue
self._attribute = attribute
self._profile = generate_weighted_profile(
instructions,
attribute,
targetvalue,
maxvalue=maxvalue,
minvalue=minvalue
)
self._func = weighted_choice(self._profile)
self._description = "Generate instruction profile with average" \
" attribute '%s' value of %d." % (attribute,
targetvalue)
if maxvalue is not None:
self._description += "Filter out values above %d." % maxvalue
if minvalue is not None:
self._description += "Filter out values below %d" % maxvalue
[docs]
def check(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
values = []
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
values.append(getattr(instr, self._attribute))
finalvalue = sum(values) // len(values)
diff = abs((finalvalue // self._targetvalue) - 1)
if diff > 0.05:
LOG.debug(
"finalvalue %f != expected %f (diff %d%%)", finalvalue,
self._targetvalue, diff * 100
)
return False
return True
[docs]
class InsertInstructionSequencePass(microprobe.passes.Pass):
"""InsertInstructionSequencePass pass.
"""
[docs]
def __init__(self, instrs, every=None, pad=0, operands=None):
"""
:param instrs:
:param every: (Default value = None)
:param pad: (Default value = 0)
:param operands: (Default value = None)
"""
super(InsertInstructionSequencePass, self).__init__()
self._instrs = instrs
self._every = every
self._pad = pad
self._operands = operands
self._description = "Insert instruction: %s , every '%s', with " \
"pad '%s' and operands: '%s'" % (instrs, every,
pad, operands)
def __call__(self, building_block, target):
"""
:param building_block:
:param target:
"""
for bbl in building_block.cfg.bbls:
indexes = list(range(self._pad, bbl.size, self._every))
instructions = bbl.instrs[:]
for index in indexes:
newins = []
for instr in self._instrs:
nins = target.new_instruction(instr.name)
if self._operands is not None:
nins.set_operands(self._operands)
newins.append(nins)
bbl.insert_instr(newins, after=instructions[index])
[docs]
class DIDTSimplePass(microprobe.passes.Pass):
"""DIDTSimplePass pass.
"""
[docs]
def __init__(
self, size, instrs1, ipc1, dep1, instrs2, ipc2, dep2, freqhz,
system_freq_mhz
):
"""
:param size:
:param instrs1:
:param ipc1:
:param dep1:
:param instrs2:
:param ipc2:
:param dep2:
:param freqhz:
:param system_freq_mhz:
"""
super(DIDTSimplePass, self).__init__()
LOG.debug("IPC: %s, %s ", ipc1, ipc2)
LOG.debug("FREQ: %s, %s", freqhz, system_freq_mhz)
cycletime = 1.0 / (system_freq_mhz * 1000000)
period = 1.0 / (freqhz)
cycles = (period / 2) / cycletime
LOG.debug("TIMES: %s %s %s", cycletime, period, cycles)
nins1 = cycles * ipc1
nins2 = cycles * ipc2
self._nins1 = int(nins1)
self._nins2 = int(nins2)
if self._nins1 < 1:
self._nins1 = 1
LOG.warning("Warning 1 a single instruction more than the period")
if self._nins2 < 1:
self._nins2 = 1
LOG.warning("Warning 2 a single instruction more than the period")
LOG.debug("INS %s %s", nins1, nins2)
# seq1len = len(instrs1)
# seq2len = len(instrs2)
# repeat1 = (size/2)/seq1len
# repeat2 = (size/2)/seq2len
# Compute iterations in each sequence
self._loop1 = int(nins1 / (size / 2))
self._loop2 = int(nins2 / (size / 2))
LOG.debug(
" %f%% deviation in loop1 size ", (
(
abs(
self._loop1 - (nins1 / (size / 2))
) / (nins1 / (size / 2))
) * 100
)
)
LOG.debug(
" %f%% deviation in loop2 size ", (
(
abs(
self._loop2 - (nins2 / (size / 2))
) / (nins2 / (size / 2))
) * 100
)
)
LOG.debug("LOOP %s %s", self._loop1, self._loop2)
self._size = size / 2
self._func1 = getnextf(itertools.cycle(instrs1))
self._func2 = getnextf(itertools.cycle(instrs2))
self._dep1 = dep1
self._dep2 = dep2
def __call__(self, building_block, target):
"""
:param building_block:
:param target:
"""
rregs = building_block.context.reserved_registers
LOG.debug(self._loop1, self._loop2)
if not (self._loop1 < 2 and self._loop2 < 2):
controlreg = target.get_address_reg(rregs)
init_loop1 = target.set_register(controlreg, self._loop1)
instr = microprobe.code.ins.Instruction()
instr.set_arch_type(target.isa["MTSPR"])
instr.set_operands([target.registers["CTR"], controlreg])
init_loop1.append(instr)
init_loop2 = target.set_register(controlreg, self._loop2)
instr = microprobe.code.ins.Instruction()
instr.set_arch_type(target.isa["MTSPR"])
instr.set_operands([target.registers["CTR"], controlreg])
init_loop2.append(instr)
branch_loop1 = microprobe.code.ins.Instruction()
branch_loop1.set_arch_type(target.isa["BC"])
branch_loop1.set_operands([16, 0, "loop1"])
branch_loop2 = microprobe.code.ins.Instruction()
branch_loop2.set_arch_type(target.isa["BC"])
branch_loop2.set_operands([16, 0, "loop2+24"])
for instr in init_loop1 + init_loop2 + [branch_loop1] + \
[branch_loop2]:
instr.add_allow_register(controlreg)
instr.add_allow_register(target.registers["CTR"])
building_block.add_init(init_loop1)
building_block.add_fini(init_loop1)
bbl1 = building_block.cfg.add_bbl(size=self._size)
bbl2 = building_block.cfg.add_bbl(size=self._size)
first = True
for instr in bbl1.instrs:
instr.set_arch_type(self._func1())
instr.set_dependency_distance(self._dep1)
if first:
first = False
instr.set_label("loop1")
bbl1.insert_instr([branch_loop1], after=instr)
first = True
for instr in bbl2.instrs:
instr.set_arch_type(self._func2())
instr.set_dependency_distance(self._dep2)
if first:
bbl2.insert_instr(init_loop2, before=instr)
first = False
init_loop2[0].set_label("loop2")
bbl2.insert_instr([branch_loop2], after=instr)
rregs.append(controlreg)
else:
size = self._nins1 + self._nins2
while size < (self._size * 2):
bbl = building_block.cfg.add_bbl(size=self._nins1)
for instr in bbl.instrs:
instr.set_arch_type(self._func1())
instr.set_dependency_distance(self._dep1)
bbl = building_block.cfg.add_bbl(size=self._nins2)
for instr in bbl.instrs:
instr.set_arch_type(self._func2())
instr.set_dependency_distance(self._dep2)
size = size + self._nins1 + self._nins2
if size == self._nins1 + self._nins2:
raise MicroprobeCodeGenerationError("Increase benchmark size")
return rregs
[docs]
class SetInstructionTypeByAlternatingSequencesPass(microprobe.passes.Pass):
"""DIDTPass pass.
"""
[docs]
def __init__(self, sequences, max_size=None):
"""
:param sequences:
:type sequences:
:param max_size:
:type max_size:
"""
super(SetInstructionTypeByAlternatingSequencesPass, self).__init__()
self._sequences = []
self._max_size = max_size
total_size = sum([elem[1] for elem in sequences])
max_segment_size = max_size // (2 * len(sequences))
if max_size is None:
max_size = total_size * (len(sequences) + 1)
for idx, sequence in enumerate(sequences):
extra = 0
iterations = 1
modulo = len(sequence[0])
length = min(sequence[1], max_segment_size)
length = (length // modulo) * modulo
if length == 0:
length = modulo
if (length * iterations) + extra != sequence[1]:
iterations = sequence[1] // length
extra = sequence[1] % length
assert (length * iterations) + extra == sequence[1]
self._sequences.append(
(
getnextf(itertools.cycle(
sequence[
0
]
)), length, iterations, extra
)
)
LOG.debug(
"Sequence: %d - %s, Length: %d, "
"Iterations: %d, Extra: %d ", idx,
",".join([elem.name for elem in sequence[0]]), length,
iterations, extra
)
def __call__(self, building_block, target):
"""
:param building_block:
:param target:
"""
def fill_block(block, function):
for instr in block.instrs:
instr.set_arch_type(function())
context_fix_functions = instr.check_context(
building_block.context
)
for context_fix_function in context_fix_functions:
try:
context_fix_function(target, building_block)
except MicroprobeUncheckableEnvironmentWarning as \
warning_check:
building_block.add_warning(str(warning_check))
controlreg = target.get_register_for_address_arithmetic(
building_block.context
)
for idx, sequence in enumerate(self._sequences):
next_instruction, block_size, block_iterations, extra = sequence
if block_iterations > 1:
init_loop = target.set_register(
controlreg, block_iterations, building_block.context
)
if controlreg not in building_block.context.reserved_registers:
building_block.context.add_reserved_registers([controlreg])
init_bbl = building_block.cfg.add_bbl(instructions=init_loop)
for instr in init_bbl.instrs:
instr.add_allow_register(controlreg)
block_bbl = building_block.cfg.add_bbl(size=block_size)
fill_block(block_bbl, next_instruction)
if block_iterations > 1:
label = "sequence_%d" % idx
block_bbl.instrs[0].set_label(label)
add_cmp_branch = target.add_to_register(controlreg, -1)
for instr in add_cmp_branch:
instr.add_allow_register(controlreg)
add_cmp_branch += target.compare_and_branch(
controlreg,
0,
">",
InstructionAddress(base_address=label),
building_block.context
)
block_bbl.insert_instr(
add_cmp_branch, after=block_bbl.instrs[-1]
)
if extra > 0:
extra_bbl = building_block.cfg.add_bbl(size=(extra))
fill_block(extra_bbl, next_instruction)
return []
[docs]
class SetInstructionOperandsByOpcodePass(microprobe.passes.Pass):
"""SetInstructionOperandsByOpcodePass pass.
"""
[docs]
def __init__(self, opcodes, operand_pos, value, force=False, ifval=None):
"""
:param opcodes:
:param operand_pos:
:param value:
"""
super(SetInstructionOperandsByOpcodePass, self).__init__()
self._description = "Set operand %d of instructions with opcode " \
"'%s' to value: '%s'" % (operand_pos, opcodes,
value)
if isinstance(opcodes, str):
self._opcodes = [opcodes]
else:
self._opcodes = opcodes
self._pos = operand_pos
self._base_value = value
self._force = force
self._ifval = ifval
if not isinstance(value, list):
def idem():
"""Return a constant."""
return value
self._value = idem
else:
self._value = getnextf(itertools.cycle(value))
def __call__(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
if instr.name in self._opcodes:
if (self._ifval is not None and
instr.operands()[self._pos].value != self._ifval):
continue
if instr.operands()[
self._pos].value is None or self._force:
instr.operands()[self._pos].set_value(self._value())
if (instr.operands()[self._pos].is_output and
isinstance(
instr.operands()[self._pos].value,
Register
)):
instr.add_allow_register(
instr.operands()[self._pos].value
)
[docs]
def check(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
if isinstance(self._base_value, list):
self._value = getnextf(itertools.cycle(self._base_value))
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
if instr.opcode in self._opcodes:
if instr.operands()[self._pos].value != self._value():
return False
return True
[docs]
class DisableAsmByOpcodePass(microprobe.passes.Pass):
"""DisableAsmByOpcodePass pass.
"""
[docs]
def __init__(self, opcodes, operand_pos, ifval=None):
"""
:param opcodes:
:param operand_pos:
:param value:
"""
super(DisableAsmByOpcodePass, self).__init__()
self._description = "Disable ASM %d of instructions with opcode " \
"'%s' if value: '%s'" % (operand_pos, opcodes,
ifval)
if isinstance(opcodes, str):
self._opcodes = [opcodes]
else:
self._opcodes = opcodes
self._pos = operand_pos
self._ifval = ifval
def __call__(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
if instr.name in self._opcodes:
if (self._ifval is not None and
instr.operands()[self._pos].value != self._ifval):
continue
instr.disable_asm = True
[docs]
def check(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
if instr.opcode in self._opcodes:
if self._ifval is None and not instr.disable_asm:
return False
elif (instr.operands()[self._pos].value == self._ifval and
not instr.disable_asm):
return False
return True
[docs]
class ReplaceLoadInstructionsPass(microprobe.passes.Pass):
"""ReplaceLoadInstructionsPass pass.
"""
[docs]
def __init__(self, instr, every):
"""
:param instr:
:type instr:
:param every:
:type every:
"""
super(ReplaceLoadInstructionsPass, self).__init__()
self._description = "Replace every load for a '%s' instruction every "\
"'%d' number of loads" % (instr.name, every)
self._instr = instr
self._every = every
def __call__(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
if self._every == 0:
return []
count = 0
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
is_load = False
for moperand in instr.memory_operands():
if moperand.is_load:
is_load = True
break
if is_load:
count = count + 1
if (count % self._every) == 0:
instr.set_arch_type(self._instr)
# TODO: This get_profile function is copied from the first prototype of the
# code, back in 2011. It need an importand re-engineering pass.
def _get_profile(
target,
components,
profile,
block,
maxlatency=None,
minlatency=None,
avelatency=None,
any_comp=False
):
if avelatency is not None:
if maxlatency is not None and avelatency > maxlatency:
raise MicroprobeCodeGenerationError(
"I can not fulfill the conditions"
)
if minlatency is not None and avelatency < minlatency:
raise MicroprobeCodeGenerationError(
"I can not fulfill the conditions"
)
isa_map = target.property_isa_map("execution_units")
if any_comp:
setm = set()
for comp in components:
setm = setm | isa_map[comp.name]
else:
setm = None
for comp in components:
if setm is None:
setm = isa_map[comp.name]
else:
setm = setm & isa_map[comp.name]
for comp in block:
setm = setm - isa_map[comp.name]
values = []
for key, value in profile.items():
if key not in setm:
continue
if maxlatency is not None:
if key.latency > maxlatency and key.latency > 0:
continue
if minlatency is not None:
if key.latency < minlatency and key.latency > 0:
continue
values.append((key, value))
if len(values) == 0:
for value in setm:
if value.privileged or value.hypervisor or value.trap:
continue
if maxlatency is not None:
if value.latency > maxlatency and value.latency > 0:
continue
if minlatency is not None:
if value.latency < minlatency and value.latency > 0:
continue
values.append((value, 1))
if len(values) < 1:
raise MicroprobeCodeGenerationError(
"No instruction found in the ISA stressing %s" % (
[comp.name for comp in components]
)
)
if avelatency is not None:
def calc_avelatency(values):
"""Computer average latency by weight."""
return sum(
[
v.latency * weight for v, weight in values
]
) / sum(
[
weight for v, weight in values
]
)
cavelatency = calc_avelatency(values)
if cavelatency < avelatency:
abovelist = [
idx for idx, v in enumerate(values)
if v[0].latency > avelatency
]
if len(abovelist) > 0:
while cavelatency < avelatency:
index = random.choice(abovelist)
abovelist.remove(index)
values[index] = (values[index][0], values[index][1] + 1)
cavelatency = calc_avelatency(values)
if len(abovelist) == 0:
abovelist = [
idx
for idx, v in enumerate(values)
if v[0].latency > avelatency
]
else:
maxlat = max([v.latency for v, dummy_weight in values])
values = [(v, w) for v, w in values if v.latency == maxlat]
elif cavelatency > avelatency:
belowlist = [
idx for idx, v in enumerate(values)
if v[0].latency < avelatency
]
if len(belowlist) > 0:
while cavelatency > avelatency:
index = random.choice(belowlist)
belowlist.remove(index)
values[index] = (values[index][0], values[index][1] + 1)
cavelatency = calc_avelatency(values)
if len(belowlist) == 0:
belowlist = [
idx
for idx, v in enumerate(values)
if v[0].latency < avelatency
]
assert len(belowlist) > 0, "Something wrong"
else:
minlat = min([v.latency for v, dummy_weight in values])
values = [(v, w) for v, w in values if v.latency == minlat]
return dict(values)
def _set_props(largs, queue=None, show_progress=False):
rlist = []
if show_progress:
progress = Progress(
len(largs),
msg="Setting instruction properties: "
)
for args in largs:
instr, definition, building_block, target, displ = args
instruction_set_def_properties(
instr,
definition,
building_block=building_block,
target=target,
label_displ=displ
)
if show_progress:
progress()
rlist.append(instr)
if queue is not None:
queue.put(rlist)
return rlist
[docs]
class AddOnePass(microprobe.passes.Pass):
"""AddOne pass.
"""
[docs]
def __init__(self, varname):
"""
:param instr:
"""
super(AddOnePass, self).__init__()
self._varname = varname
def __call__(self, building_block, target):
"""
:param building_block:
:param dummy_target:
"""
loopvar = None
for var in building_block.registered_global_vars():
if var.name == self._varname:
loopvar = var
break
if loopvar is None:
raise MicroprobeCodeGenerationError(
"AddOne pass var '%s' not declared" % self._varname
)
context = building_block.context
areg = target.get_register_for_address_arithmetic(
context
)
context.add_reserved_registers([areg])
# Reset register to zero
instrs = target.set_register(areg, 0, context)
for instr in instrs:
instr.add_comment("Loop counter init")
building_block.add_init(instrs)
instrs = target.add_to_register(areg, 1)
try:
instrs += target.store_integer(
areg, loopvar.address, 64, context
)
except MicroprobeCodeGenerationError:
areg = target.scratch_registers[0]
instrs += target.set_register_to_address(
areg,
loopvar.address,
context,
)
context.set_register_value(
areg, loopvar.address
)
instrs += target.store_integer(
areg, loopvar.address, 64, context
)
for instr in instrs:
instr.add_comment("Loop update")
building_block.add_fini(instrs)
return []