Source code for microprobe.passes.branch

# Copyright 2011-2021 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""":mod:`microprobe.passes.branch` module

"""
# Futures
from __future__ import absolute_import

# Built-in modules
import collections
import itertools
import copy
import re

# Third party modules

# Own modules
from microprobe.code.address import InstructionAddress
from microprobe.code.ins import Instruction, \
    instruction_set_def_properties
from microprobe.exceptions import MicroprobeCodeGenerationError, \
    MicroprobeUncheckableEnvironmentWarning
from microprobe.passes import Pass
from microprobe.utils.asm import interpret_asm
from microprobe.utils.logger import get_logger
from microprobe.utils.misc import RejectingDict, RNDINT
from microprobe.utils.distrib import probability, regular_probability


# Local modules


# Constants
LOG = get_logger(__name__)
__all__ = ['BranchNextPass',
           'BranchBraidNextPass',
           'FixIndirectBranchPass',
           'InitializeBranchDecorator',
           'RandomizeByTypePass',
           "LinkBbls"]

# Functions


# Classes
[docs] class BranchNextPass(Pass): """BranchNextPass pass. """
[docs] def __init__(self, branch=None, force=False): """ :param branch: (Default value = None) """ super(BranchNextPass, self).__init__() self._description = "Set the branch target of each branch to the " \ "next instruction." self._branch = branch self._force = force if branch is not None: self._description += "Only model '%s'" % branch
def __call__(self, building_block, target): """ :param building_block: :param target: """ if not building_block.context.register_has_value(0): reg = target.get_register_for_address_arithmetic( building_block.context ) building_block.add_init( target.set_register( reg, 0, building_block.context ) ) building_block.context.set_register_value(reg, 0) building_block.context.add_reserved_registers([reg]) counter = 0 all_counter = 0 rregs = 0 labels = [] label_instruction_map = {} instr_ant = None for bbl in building_block.cfg.bbls: for instr in bbl.instrs: if instr_ant is None: instr_ant = instr if instr.label is None: first_label = "first" instr.set_label("first") label_instruction_map[first_label] = instr else: first_label = instr.label label_instruction_map[first_label] = instr continue if not instr_ant.branch: instr_ant = instr continue if self._branch is not None and \ instr_ant.name != self._branch: instr_ant = instr continue LOG.debug("Setting target for branch: '%s'", instr_ant) target_set = False for memoperand in instr_ant.memory_operands(): if not memoperand.descriptor.is_branch_target: continue if memoperand.address is not None and not self._force: continue if target_set: continue LOG.debug("Computing label") if instr_ant.branch_relative: if instr.label is None: label = "rel_branch_%d" % all_counter instr.set_label(label) LOG.debug( "Branch relative, new label: '%s'", label ) else: label = instr.label LOG.debug( "Branch relative, existing label: '%s'", label ) elif len(labels) == 0: if instr.label is None: label = "abs_branch_%d" % counter labels.append(label) instr.set_label(label) LOG.debug( "Branch absolute, new label: '%s'", label ) else: label = instr.label LOG.debug( "Branch absolute, exiting label: '%s'", label ) label_instruction_map[label] = instr else: label = labels[0] assert label is not None taddress = InstructionAddress(base_address=label) if instr_ant.branch_relative: taddress.set_target_instruction(instr) else: taddress.set_target_instruction( label_instruction_map[label] ) try: memoperand.set_address( taddress, building_block.context ) target_set = True instr_ant.add_comment("Branch fixed to: %s" % taddress) except MicroprobeCodeGenerationError: reg = \ target.get_register_for_address_arithmetic( building_block.context) building_block.add_init( target.set_register_to_address( reg, taddress, building_block.context ) ) building_block.context.set_register_value( reg, taddress ) building_block.context.add_reserved_registers([reg]) memoperand.set_address( taddress, building_block.context ) rregs += 1 target_set = True instr_ant.add_comment("Branch fixed to: %s" % taddress) if not instr_ant.branch_relative: counter += 1 else: all_counter += 1 instr_ant = instr bbl = building_block.cfg.bbls[-1] if bbl.instrs[-1].branch: LOG.debug("Last instruction is a BRANCH: %s", bbl.instrs[-1]) for memoperand in bbl.instrs[-1].memory_operands(): if not memoperand.descriptor.is_branch_target: continue if memoperand.address is not None and not self._force: continue target_set = False for operand in memoperand.operands: if operand.value is not None and not self._force: target_set = True break if target_set: continue if (len(building_block.fini) > 0 and bbl.instrs[-1].branch_relative): if building_block.fini[0].label is None: label = "rel_branch_%d" % all_counter building_block.fini[0].set_label(label) else: label = building_block.fini[0].label taddress = InstructionAddress(base_address=label) taddress.set_target_instruction( label_instruction_map[first_label] ) else: assert first_label is not None if bbl.instrs[-1].label is not None: taddress = InstructionAddress( base_address=bbl.instrs[-1].label ) + bbl.instrs[-1].architecture_type.format.length else: taddress = InstructionAddress(base_address=first_label) taddress.set_target_instruction( label_instruction_map[first_label] ) try: memoperand.set_address( taddress, building_block.context ) except MicroprobeCodeGenerationError: reg = \ target.get_register_for_address_arithmetic( building_block.context) building_block.add_init( target.set_register_to_address( reg, taddress, building_block.context ) ) building_block.context.set_register_value( reg, taddress ) building_block.context.add_reserved_registers([reg]) memoperand.set_address( taddress, building_block.context )
[docs] def check(self, building_block, dummy_target): """ :param building_block: :param dummy_target: """ pass_ok = True for bbl in building_block.cfg.bbls: instr_ant = None for instr in bbl.instrs: if instr_ant is None: instr_ant = instr continue if not instr_ant.branch: instr_ant = instr continue pass_ok = pass_ok and instr.label != "" for memoperand in instr_ant.memory_operands(): if not memoperand.descriptor.is_branch_target: continue taddress = InstructionAddress(base_address=instr.label) pass_ok = pass_ok and memoperand.address == taddress instr_ant = instr return pass_ok
[docs] class BranchBraidNextPass(Pass): """BranchBraidNextPass pass. """
[docs] def __init__(self, branch=None, force=False, bbl=20): """ """ super(BranchBraidNextPass, self).__init__() self._description = "Set the branch target of each branch to the " \ "next instruction but replicating the code " \ "creating a braid in execution." self._branch = branch self._force = force self._bbl = bbl if branch is not None: self._description += "Only model '%s'" % branch
def __call__(self, building_block, target): """ :param building_block: :param target: """ if not building_block.context.register_has_value(0): reg = target.get_register_for_address_arithmetic( building_block.context ) building_block.add_init( target.set_register( reg, 0, building_block.context ) ) building_block.context.set_register_value(reg, 0) building_block.context.add_reserved_registers([reg]) counter = 0 all_counter = 0 rregs = 0 labels = [] label_instruction_map = {} instr_ant = None first_branch_hit = False new_instructions = [] current_bbl = [] last_braid = None last_bbl_head = None first_braid = None for bbl in building_block.cfg.bbls: for instr in bbl.instrs: if instr_ant is None: instr_ant = instr if instr.label is None: first_label = "first" instr.set_label("first") label_instruction_map[first_label] = instr else: first_label = instr.label label_instruction_map[first_label] = instr continue if not instr_ant.branch: if first_branch_hit: current_bbl.append(instr_ant) instr_ant = instr continue if self._branch is not None and \ instr_ant.name != self._branch: if first_branch_hit: current_bbl.append(instr_ant) instr_ant = instr continue LOG.debug("Setting target for branch: '%s'", instr_ant) target_set = False for memoperand in instr_ant.memory_operands(): if not memoperand.descriptor.is_branch_target: continue if memoperand.address is not None and not self._force: continue if target_set: continue LOG.debug("Computing label") if instr_ant.branch_relative: if instr.label is None: label = "rel_branch_%d" % all_counter instr.set_label(label) LOG.debug( "Branch relative, new label: '%s'", label ) else: label = instr.label LOG.debug( "Branch relative, existing label: '%s'", label ) elif rregs < 5: if instr.label is None: label = "abs_branch_%d" % counter labels.append(label) instr.set_label(label) LOG.debug( "Branch absolute, new label: '%s'", label ) else: label = instr.label LOG.debug( "Branch absolute, exiting label: '%s'", label ) label_instruction_map[label] = instr else: label = labels[counter % 5] assert label is not None taddress = InstructionAddress(base_address=label) if instr_ant.branch_relative: taddress.set_target_instruction(instr) else: taddress.set_target_instruction( label_instruction_map[label] ) try: memoperand.set_address( taddress, building_block.context ) target_set = True instr_ant.add_comment("Branch fixed to: %s" % taddress) except MicroprobeCodeGenerationError: reg = \ target.get_register_for_address_arithmetic( building_block.context) building_block.add_init( target.set_register_to_address( reg, taddress, building_block.context ) ) building_block.context.set_register_value( reg, taddress ) building_block.context.add_reserved_registers([reg]) memoperand.set_address( taddress, building_block.context ) rregs += 1 target_set = True instr_ant.add_comment("Branch fixed to: %s" % taddress) if not instr_ant.branch_relative: counter += 1 else: all_counter += 1 if first_branch_hit: current_bbl.append(instr_ant) if not first_branch_hit: first_braid = instr_ant new_instructions.append(current_bbl) last_braid = instr_ant last_bbl_head = instr current_bbl = [] first_branch_hit = True instr_ant = instr bbl = building_block.cfg.bbls[-1] if bbl.instrs[-1].branch: LOG.debug("Last instruction is a BRANCH: %s", bbl.instrs[-1]) for memoperand in bbl.instrs[-1].memory_operands(): if not memoperand.descriptor.is_branch_target: continue if memoperand.address is not None and not self._force: continue target_set = False for operand in memoperand.operands: if operand.value is not None and not self._force: target_set = True break if target_set: continue if (len(building_block.fini) > 0 and bbl.instrs[-1].branch_relative): if building_block.fini[0].label is None: label = "rel_branch_%d" % all_counter building_block.fini[0].set_label(label) else: label = building_block.fini[0].label taddress = InstructionAddress(base_address=label) taddress.set_target_instruction( label_instruction_map[first_label] ) else: assert first_label is not None taddress = InstructionAddress(base_address=first_label) taddress.set_target_instruction( label_instruction_map[first_label] ) try: memoperand.set_address( taddress, building_block.context ) except MicroprobeCodeGenerationError: reg = \ target.get_register_for_address_arithmetic( building_block.context) building_block.add_init( target.set_register_to_address( reg, taddress, building_block.context ) ) building_block.context.set_register_value( reg, taddress ) building_block.context.add_reserved_registers([reg]) memoperand.set_address( taddress, building_block.context ) base_instructions = [] for elem in new_instructions: if len(elem) == 0: continue base_instructions.append(elem[:]) new_instructions = [ elem for elem in new_instructions if len(elem) > 0 ] for idx, elem in enumerate(new_instructions): if len(elem) == 0: continue new_instructions[idx] = [instr.copy() for instr in elem] assert len(base_instructions) == len(new_instructions) for elem in new_instructions: for elem2 in elem: elem2.add_comment("Braid instruction") def flatten(lst): return [item for sublist in lst for item in sublist] basel = [] newl = [] if first_braid is not None: basel.append(first_braid) newl.append(first_braid) basel.extend(flatten(base_instructions)) newl.extend(flatten(new_instructions)) if last_bbl_head is not None: basel.append(last_bbl_head) newl.append(last_bbl_head) # Fix labels and braid the branches translated_labels = {} for idx, (base, new) in enumerate(zip(basel, newl)): if base.label is None: continue if not base.label.startswith("rel_branch"): # This has a label, need to be fixed on new new.set_label("%s_braid" % base.label) translated_labels[base.label] = "%s_braid" % base.label continue # Braid the branches jbase = basel[idx-1] jnew = newl[idx-1] new.set_label("%s_2" % new.label) for memoperand in jbase.memory_operands(): if not memoperand.descriptor.is_branch_target: continue if not jbase.branch_relative: continue taddress = InstructionAddress(base_address=new.label) taddress.set_target_instruction(new) memoperand.set_address( taddress, building_block.context ) if jbase is not jnew: if base is not new: base.set_label("%s_1" % base.label) for memoperand in jnew.memory_operands(): if not memoperand.descriptor.is_branch_target: continue if not jnew.branch_relative: continue taddress = InstructionAddress(base_address=base.label) taddress.set_target_instruction(base) memoperand.set_address( taddress, building_block.context ) for nins in newl: # Some operands of new instructions might refer to # translated labels. Update them. values = [oper.value for oper in nins.operands()] for idx, val in enumerate(values): if not isinstance(val, str): continue translated = False for label in translated_labels: for pattern in ["\\(%s\\)$", "^%s$"]: bpattern = pattern % label npattern = pattern % translated_labels[label] npattern = npattern.replace("\\", "") npattern = npattern.replace("^", "") npattern = npattern.replace("$", "") if re.search(bpattern, val): val = re.sub(bpattern, npattern, val) nins.operands()[idx].set_value(val, check=False) translated = True break if translated: break chunks = list(enumerate(zip(base_instructions, new_instructions))) for chunk in [chunks[i:i + self._bbl] for i in range(0, len(chunks), self._bbl)]: bins = flatten([chk[1][0] for chk in chunk]) nins = flatten([chk[1][1] for chk in chunk]) assert len(bins) == len(nins) after_instr = bins[-1] lidx = chunk[-1][0] if lidx+1 >= len(chunks): before_instr = last_bbl_head else: before_instr = chunks[lidx+1][1][0][0] taddress = InstructionAddress(base_address=before_instr.label) taddress.set_target_instruction(before_instr) new_branch = target.branch_unconditional_relative( after_instr.architecture_type.format.length + after_instr.address, taddress ) new_branch.add_comment("Jump to synch braid") building_block.add_instructions( [new_branch] + nins, after=after_instr, before=before_instr ) # taddress = InstructionAddress(base_address=last_bbl_head.label) # taddress.set_target_instruction(last_bbl_head) # new_branch = target.branch_unconditional_relative( # last_braid.architecture_type.format.length + # last_braid.address, taddress # ) # new_branch.add_comment("Branch to latest braid BBL") # new_instructions = [new_branch] # + new_instructions # building_block.add_instructions(new_instructions, # after=last_braid, # before=last_bbl_head)
[docs] def check(self, building_block, dummy_target): """ :param building_block: :param dummy_target: """ pass_ok = True for bbl in building_block.cfg.bbls: instr_ant = None for instr in bbl.instrs: if instr_ant is None: instr_ant = instr continue if not instr_ant.branch: instr_ant = instr continue pass_ok = pass_ok and instr.label != "" for memoperand in instr_ant.memory_operands(): if not memoperand.descriptor.is_branch_target: continue taddress = InstructionAddress(base_address=instr.label) pass_ok = pass_ok and memoperand.address == taddress instr_ant = instr return pass_ok
[docs] class FixIndirectBranchPass(Pass): """FixIndirectBranchPass pass. """
[docs] def __init__(self): """ """ super(FixIndirectBranchPass, self).__init__() self._description = "Fix indirect branches or replace them by "\ "regular branch to next"
def __call__(self, building_block, target): """ :param building_block: :param target: """ for bbl in building_block.cfg.bbls: instr_ant = None for instr in bbl.instrs: if instr_ant is None: instr_ant = instr if not instr_ant.branch: instr_ant = instr continue target_set, branch_operand = self.check_branch(instr_ant) if target_set: instr_ant = instr continue self._fix_branch(instr_ant, instr, target) instr_ant = instr last_ins = building_block.cfg.bbls[-1].instrs[-1] if not last_ins.branch: return target_set, branch_operand = self.check_branch(instr_ant) if target_set: return else: # Last instruction is a branch without a target set. # Change to nop first_ins = building_block.cfg.bbls[0].instrs[0] self._fix_branch(last_ins, first_ins, target) last_ins.set_arch_type(target.nop().architecture_type) last_ins.set_operands([op.value for op in target.nop().operands()])
[docs] @staticmethod def check_branch(local_instr): """ :param local_instr: :type local_instr: """ target_set = False branch_operand = False for memoperand in local_instr.memory_operands(): if not memoperand.descriptor.is_branch_target: continue operands = [operand for operand in memoperand.operands if operand.type.address_base or operand.type.address_index] branch_operand = len(operands) > 0 if memoperand.address is not None: target_set = True return target_set, branch_operand
@staticmethod def _fix_branch(instr_ant, instr, target): """ :param local_instr: :type local_instr: """ new_branch = target.branch_unconditional_relative( instr_ant.address, instr.address ) instr_ant.set_arch_type(new_branch.architecture_type) instr_ant.set_operands([op.value for op in new_branch.operands()])
[docs] class LinkBbls(Pass): """LinkBbls pass. """
[docs] def __init__(self): """ """ super(LinkBbls, self).__init__() self._description = "Add unconditional branch between consecutive" \ " instructions without consecutive addresses if first " \ "instruction is not a branch"
def __call__(self, building_block, target): """ :param building_block: :param target: """ for bbl in building_block.cfg.bbls: instr_ant = None for instr in bbl.instrs: if instr_ant is None: instr_ant = instr continue if instr_ant.branch: instr_ant = instr continue if instr_ant.address is None: instr_ant = instr continue if instr.address is None: instr_ant = instr continue if ((instr_ant.architecture_type.format.length + instr_ant.address) == instr.address): instr_ant = instr continue ninstr = target.branch_unconditional_relative( instr_ant.architecture_type.format.length + instr_ant.address, instr) ninstr.add_comment("LINKBBL ADDED") building_block.add_instructions([ninstr], after=instr_ant, before=instr) instr_ant = instr
[docs] class InitializeBranchDecorator(Pass): """InitializeBranchDecoratorPass pass. """ _error_class = collections.namedtuple("LateErrorClass", ['next'])
[docs] def __init__(self, default=None, indirect=None): self._default = default self._indirect = indirect self._description = "Initialize branch decorator. Default: %s . " \ "Indirect: %s" % (self._default, self._indirect)
def __call__(self, building_block, dummy_target): for bbl in building_block.cfg.bbls: for instr in bbl.instrs: if not (instr.branch or instr.trap or instr.syscall): continue if "BP" not in instr.decorators and instr.branch_conditional: instr.add_decorator("BP", self._default) elif "BP" not in instr.decorators: instr.add_decorator("BP", "T") pattern = instr.decorators["BP"]["value"] target_addr = None for memoper in instr.memory_operands(): if isinstance(memoper.address, InstructionAddress): target_addr = memoper.address break if target_addr is None: for oper in instr.operands(): if isinstance(oper.value, InstructionAddress): target_addr = oper.value break if target_addr is None: if "BT" in instr.decorators: target_addr = instr.decorators["BT"]['value'] else: target_addr = self._indirect else: if "BT" in instr.decorators: bt_addr = [ int(elem, 16) for elem in instr.decorators["BT"]['value'] ] if building_block.context.code_segment is not None: bt_addr = [ elem - building_block.context.code_segment for elem in bt_addr ] if instr.branch_conditional: # Remove not taken branch addresses bt_addr = [ elem for elem in bt_addr if elem != instr.address.displacement + instr.architecture_type.format.length ] # TODO: This code assumes branch there are # no conditional branch indirect branches if len(set(bt_addr)) > 1: raise MicroprobeCodeGenerationError( "BT decorator with multiple targets " "defined but instruction '%s' already" " defines the target in its codification." % (instr.assembly()) ) elif len(set(bt_addr)) == 1: bt_addr = bt_addr[0] if "EA" in instr.decorators: # if each in decorator, we are dealing # with real addresses. No check for # mismatch. # # TODO: Check that 'translated' addresses # mismatch or enaure we always deal with # logic addresses # LOG.warning( "Not checking for address missmatches" ) elif bt_addr != target_addr.displacement: raise MicroprobeCodeGenerationError( "BT decorator specified for " "instruction '%s' " "but it already provides " "the target in its " "codification and they miss-match!" % (instr.assembly()) ) instr.decorators.pop("BT") if not instr.branch_conditional and 'N' in pattern: if (target_addr - instr.address == instr.architecture_type.format.length and "T" not in pattern): # This is taken branch to next instruction pass else: raise MicroprobeCodeGenerationError( "Not taken branch pattern provided for an " "unconditional branch '%s'" % instr.assembly() ) if pattern is None: def function_pattern(): raise MicroprobeCodeGenerationError( "No branch pattern provided for branch '%s'" % instr.assembly() ) pattern_error = self._error_class(function_pattern) instr.add_decorator("NI", pattern_error) continue if target_addr is None: def function_target_addr(): raise MicroprobeCodeGenerationError( "No target address provided for branch '%s'" % instr.assembly() ) target_addr_error = self._error_class(function_target_addr) instr.add_decorator("NI", target_addr_error) continue if not isinstance(target_addr, list): target_addr = [target_addr] for idx, address in enumerate(target_addr): if isinstance(address, str): address = int(address, 16) if not isinstance(address, InstructionAddress): code = 0 if building_block.context.code_segment is not None: code = building_block.context.code_segment target_addr[idx] = InstructionAddress( base_address="code", displacement=address - code) next_address = instr.address + \ instr.architecture_type.format.length def pattern_gen(lnext_address, lpattern, ltarget_addr): target_iter = itertools.cycle(list(lpattern)) taken_target_address = itertools.cycle(ltarget_addr) while True: if next(target_iter) == 'N': yield lnext_address else: yield next(taken_target_address) instr.add_decorator( "NI", itertools.cycle( pattern_gen(next_address, pattern, target_addr)))
[docs] class NormalizeBranchTargetsPass(Pass): """NormalizeBranchTargetsPass pass. """
[docs] def __init__(self): """ """ super(NormalizeBranchTargetsPass, self).__init__() self._description = "Normalize branch targets to remove the usage" \ "of labels" self._labels = RejectingDict()
def __call__(self, building_block, dummy_target): """ :param building_block: :param dummy_target: """ for instr in (elem for elem in building_block.init if elem.label is not None): self._register_label(instr) for bbl in building_block.cfg.bbls: for instr in (elem for elem in bbl.instrs if elem.label is not None): self._register_label(instr) for instr in (elem for elem in building_block.fini if elem.label is not None): self._register_label(instr) for instr in building_block.init: self._normalize_label(instr) for bbl in building_block.cfg.bbls: for instr in bbl.instrs: self._normalize_label(instr) for instr in building_block.fini: self._normalize_label(instr) return [] def _register_label(self, instruction): """ :param instruction: """ if instruction.label is None: return else: self._labels[instruction.label.upper()] = instruction def _normalize_label(self, instruction): """ :param instruction: :param context: """ for operand in instruction.operands(): if operand.value is None: continue if not isinstance(operand.value, InstructionAddress): continue address = operand.value LOG.debug( "Normalizing label: '%s' of instruction '%s'", address, instruction.name ) if address.target_instruction is not None: target_address = address.target_instruction.address elif ( isinstance(address.base_address, str) and address.base_address.upper() in list(self._labels.keys()) ): target_address = self._labels[ address.base_address.upper()].address + \ address.displacement else: LOG.critical(address.base_address) LOG.critical(address) LOG.critical(list(self._labels.keys())) raise NotImplementedError operand.set_value(target_address) for moperand in instruction.memory_operands(): if moperand.address is None: continue if not isinstance(moperand.address, InstructionAddress): continue address = moperand.address LOG.debug( "Normalizing label: '%s' of instruction '%s'", address, instruction.name ) if address.target_instruction is not None: target_address = address.target_instruction.address elif ( isinstance(address.base_address, str) and address.base_address.upper() in list(self._labels.keys()) ): target_address = self._labels[ address.base_address.upper()].address + \ address.displacement else: LOG.critical(address.base_address) LOG.critical(address) LOG.critical(list(self._labels.keys())) raise NotImplementedError moperand.update_address(target_address)
[docs] class RandomizeByTypePass(Pass): """RandomizeByTypePass pass. """
[docs] def __init__(self, instr1, instr2, every, code, distance=1, musage=None, reset=None): """ :param instr1: :param instr2: :param every: """ super(RandomizeByTypePass, self).__init__() if not isinstance(instr1, list): instr1 = [instr1] self._instr1 = instr1 self._instr2 = instr2 if every <= 0: fevery = None elif every >= 1: fevery = regular_probability(int(every)) else: fevery = probability(every) self._every = fevery if distance < 1: raise MicroprobeCodeGenerationError( "Dependency distance should be 1 or larger" ) self._distance = int(distance) self._code = code if musage is None: musage = -1 self._musage = musage self._reset = reset self._description = "Randomize '%s' by '%s' every '%d' "\ "by adding '%s' code at distance '%d', with max reg "\ "usage of '%s' and reset mode: '%s'" \ % ( [instr.name for instr in instr1], instr2.name, every, code, distance, musage, reset )
def __call__(self, building_block, target): """ :param building_block: :param target: """ if self._every is None: return creg = None usereg = [] cusage = 0 last_branch = None register = None zero_register = None tregister = None for bbl in building_block.cfg.bbls: for instr in bbl.instrs: if not instr.branch: continue if instr.architecture_type in self._instr1: mrandom = self._every() if not mrandom: if instr.branch: last_branch = instr continue instr.set_arch_type(self._instr2) instr.add_comment("Randomized branch") # take on operand input soperand = None for operand in instr.operands(): if not operand.is_input: continue if operand.type.constant: continue if operand.type.immediate: continue if operand.type.float: continue if operand.type.address_relative: continue if operand.type.address_absolute: continue if operand.type.address_immediate: continue if operand.type.address_base: continue if operand.type.address_index: continue soperand = operand break if soperand is None: LOG.debug( "Unable to randomize branch " "without input register operands" ) if instr.branch: last_branch = instr continue values = soperand.type.values() values = [val for val in values if val not in building_block.context.reserved_registers and val not in target.control_registers] if (creg in values and creg not in usereg and (cusage < self._musage or self._musage == -1)): # Current register is in valid values and has not # been used much, just need to set operand value and # random code register = creg else: # New register, need to reset according to settings registers = [ val for val in values if val not in usereg ] if len(registers) == 0: raise MicroprobeCodeGenerationError( "No free registers available. Change the " "generation policy settings." ) register = registers[0] creg = register if len(registers) < 2 and tregister is None: raise MicroprobeCodeGenerationError( "No free registers available. Change the " "generation policy settings." ) if tregister is None: tregister = registers[1] building_block.context.add_reserved_registers( [tregister] ) if self._reset is not None and self._reset[0] == "add": vrange = sorted(list(self._reset[1])) diff = 0 if vrange[0] < 0: diff = vrange[0] value = RNDINT(maxmin=(vrange[0], vrange[1])) value = value - diff nins = target.add_to_register(register, value) for ins in nins: ins.add_comment( "Instrunction added by " "braid pass to avoid " "convergence to zero." ) building_block.add_fini(nins) elif (self._reset is not None and self._reset[0] == "rnd"): vrange = sorted(list(self._reset[1])) diff = 0 if vrange[0] < 0: diff = vrange[0] value = RNDINT(maxmin=(vrange[0], vrange[1])) value = value - diff if len(usereg) == 0: # First time regs = [ val for val in values if val not in usereg and val not in building_block.context.reserved_registers ] if len(regs) < 2: raise MicroprobeCodeGenerationError( "No free registers available. " "Change the " "generation policy settings." ) reg = regs[1] nins = target.set_register( reg, RNDINT(), building_block.context ) for ins in nins: ins.add_comment( "Instrunction added by " "randomize pass to avoid " "convergence to zero." ) building_block.add_init(nins) nins = target.add_to_register(reg, value) for ins in nins: ins.add_comment( "Instrunction added by " "randomize pass to avoid " "convergence to zero." ) building_block.add_fini(nins) building_block.context.add_reserved_registers( [reg] ) random_reg = reg nins = target.randomize_register( register, seed=random_reg ) for ins in nins: ins.add_comment( "Instrunction added by " "randomize pass to avoid " "convergence to zero." ) building_block.add_fini(nins) else: raise NotImplementedError # Set operand operand.set_value(tregister) cusage = cusage + 1 if self._code is not None: code = self._code.replace('@@REG@@', register.name) code = code.replace('@@COUNT@@', str(cusage)) code = code.replace('@@BRREG@@', tregister.name) instructions_def = interpret_asm( code, target, building_block.labels ) instructions = [] for definition in instructions_def: instruction = Instruction() instruction_set_def_properties( instruction, definition, building_block=building_block, target=target, allowed_registers=[ register.name, tregister.name ] ) instructions.append(instruction) bdistance = bbl.distance(last_branch, instr) if bdistance + 1 <= self._distance: after_ins = last_branch else: after_ins = bbl.get_instruction_by_distance( instr, self._distance * (-1) ) building_block.add_instructions( instructions, after=after_ins, before=instr ) for operand in instr.operands(): if not operand.is_input: continue if operand.type.constant: continue if operand.type.immediate: continue if operand.type.float: continue if operand.type.address_relative: continue if operand.type.address_absolute: continue if operand.type.address_immediate: continue if operand.type.address_base: continue if operand.type.address_index: continue if operand.value is not None: continue if zero_register is None: # Set the remaining operands to zero values = operand.type.values() values = [ val for val in values if val not in building_block.context.reserved_registers and val not in target.control_registers and val not in usereg and val not in [register, random_reg] ] if len(values) == 0: raise MicroprobeCodeGenerationError( "No free registers available. Change the " "generation policy settings." ) zero_register = values[0] building_block.context.add_reserved_registers( [zero_register] ) nins = target.set_register( zero_register, 0, building_block.context ) for ins in nins: ins.add_comment( "Instrunction added by " "randomize pass to avoid " "convergence to zero." ) building_block.add_init(nins) operand.set_value(zero_register) # Use new register once one is exhausted (we can reset # its value too) if cusage >= self._musage: usereg.append(register) creg = None cusage = 0 # reserve implicit operands (if they are not already # reserved) and add them as allowed for operand_descriptor in instr.implicit_operands: register = list(operand_descriptor.type.values())[0] instr.add_allow_register(register) if operand_descriptor.is_output and register not \ in building_block.context.reserved_registers \ and register not in target.control_registers: building_block.context.add_reserved_registers( [register] ) context_fix_functions = instr.check_context( building_block.context ) for context_fix_function in context_fix_functions: try: context_fix_function(target, building_block) except MicroprobeUncheckableEnvironmentWarning as wue: building_block.add_warning(str(wue)) if instr.branch: last_branch = instr if register is not None and register not in usereg: usereg.append(register) for register in usereg: building_block.context.add_reserved_registers([register]) nins = target.set_register( register, RNDINT(), building_block.context ) for ins in nins: ins.add_comment( "Instrunction added by randomize pass to avoid " "convergence to zero." ) building_block.add_init(nins)