# Copyright 2011-2021 IBM Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""":mod:`microprobe.passes.branch` module
"""
# Futures
from __future__ import absolute_import
# Built-in modules
import collections
import itertools
import copy
import re
# Third party modules
# Own modules
from microprobe.code.address import InstructionAddress
from microprobe.code.ins import Instruction, \
instruction_set_def_properties
from microprobe.exceptions import MicroprobeCodeGenerationError, \
MicroprobeUncheckableEnvironmentWarning
from microprobe.passes import Pass
from microprobe.utils.asm import interpret_asm
from microprobe.utils.logger import get_logger
from microprobe.utils.misc import RejectingDict, RNDINT
from microprobe.utils.distrib import probability, regular_probability
# Local modules
# Constants
LOG = get_logger(__name__)
__all__ = ['BranchNextPass',
'BranchBraidNextPass',
'FixIndirectBranchPass',
'InitializeBranchDecorator',
'RandomizeByTypePass',
"LinkBbls"]
# Functions
# Classes
[docs]
class BranchNextPass(Pass):
"""BranchNextPass pass.
"""
[docs]
def __init__(self, branch=None, force=False):
"""
:param branch: (Default value = None)
"""
super(BranchNextPass, self).__init__()
self._description = "Set the branch target of each branch to the " \
"next instruction."
self._branch = branch
self._force = force
if branch is not None:
self._description += "Only model '%s'" % branch
def __call__(self, building_block, target):
"""
:param building_block:
:param target:
"""
if not building_block.context.register_has_value(0):
reg = target.get_register_for_address_arithmetic(
building_block.context
)
building_block.add_init(
target.set_register(
reg, 0, building_block.context
)
)
building_block.context.set_register_value(reg, 0)
building_block.context.add_reserved_registers([reg])
counter = 0
all_counter = 0
rregs = 0
labels = []
label_instruction_map = {}
instr_ant = None
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
if instr_ant is None:
instr_ant = instr
if instr.label is None:
first_label = "first"
instr.set_label("first")
label_instruction_map[first_label] = instr
else:
first_label = instr.label
label_instruction_map[first_label] = instr
continue
if not instr_ant.branch:
instr_ant = instr
continue
if self._branch is not None and \
instr_ant.name != self._branch:
instr_ant = instr
continue
LOG.debug("Setting target for branch: '%s'", instr_ant)
target_set = False
for memoperand in instr_ant.memory_operands():
if not memoperand.descriptor.is_branch_target:
continue
if memoperand.address is not None and not self._force:
continue
if target_set:
continue
LOG.debug("Computing label")
if instr_ant.branch_relative:
if instr.label is None:
label = "rel_branch_%d" % all_counter
instr.set_label(label)
LOG.debug(
"Branch relative, new label: '%s'", label
)
else:
label = instr.label
LOG.debug(
"Branch relative, existing label: '%s'", label
)
elif len(labels) == 0:
if instr.label is None:
label = "abs_branch_%d" % counter
labels.append(label)
instr.set_label(label)
LOG.debug(
"Branch absolute, new label: '%s'", label
)
else:
label = instr.label
LOG.debug(
"Branch absolute, exiting label: '%s'", label
)
label_instruction_map[label] = instr
else:
label = labels[0]
assert label is not None
taddress = InstructionAddress(base_address=label)
if instr_ant.branch_relative:
taddress.set_target_instruction(instr)
else:
taddress.set_target_instruction(
label_instruction_map[label]
)
try:
memoperand.set_address(
taddress, building_block.context
)
target_set = True
instr_ant.add_comment("Branch fixed to: %s" % taddress)
except MicroprobeCodeGenerationError:
reg = \
target.get_register_for_address_arithmetic(
building_block.context)
building_block.add_init(
target.set_register_to_address(
reg, taddress, building_block.context
)
)
building_block.context.set_register_value(
reg, taddress
)
building_block.context.add_reserved_registers([reg])
memoperand.set_address(
taddress, building_block.context
)
rregs += 1
target_set = True
instr_ant.add_comment("Branch fixed to: %s" % taddress)
if not instr_ant.branch_relative:
counter += 1
else:
all_counter += 1
instr_ant = instr
bbl = building_block.cfg.bbls[-1]
if bbl.instrs[-1].branch:
LOG.debug("Last instruction is a BRANCH: %s", bbl.instrs[-1])
for memoperand in bbl.instrs[-1].memory_operands():
if not memoperand.descriptor.is_branch_target:
continue
if memoperand.address is not None and not self._force:
continue
target_set = False
for operand in memoperand.operands:
if operand.value is not None and not self._force:
target_set = True
break
if target_set:
continue
if (len(building_block.fini) >
0 and bbl.instrs[-1].branch_relative):
if building_block.fini[0].label is None:
label = "rel_branch_%d" % all_counter
building_block.fini[0].set_label(label)
else:
label = building_block.fini[0].label
taddress = InstructionAddress(base_address=label)
taddress.set_target_instruction(
label_instruction_map[first_label]
)
else:
assert first_label is not None
if bbl.instrs[-1].label is not None:
taddress = InstructionAddress(
base_address=bbl.instrs[-1].label
) + bbl.instrs[-1].architecture_type.format.length
else:
taddress = InstructionAddress(base_address=first_label)
taddress.set_target_instruction(
label_instruction_map[first_label]
)
try:
memoperand.set_address(
taddress, building_block.context
)
except MicroprobeCodeGenerationError:
reg = \
target.get_register_for_address_arithmetic(
building_block.context)
building_block.add_init(
target.set_register_to_address(
reg, taddress, building_block.context
)
)
building_block.context.set_register_value(
reg, taddress
)
building_block.context.add_reserved_registers([reg])
memoperand.set_address(
taddress, building_block.context
)
[docs]
def check(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
pass_ok = True
for bbl in building_block.cfg.bbls:
instr_ant = None
for instr in bbl.instrs:
if instr_ant is None:
instr_ant = instr
continue
if not instr_ant.branch:
instr_ant = instr
continue
pass_ok = pass_ok and instr.label != ""
for memoperand in instr_ant.memory_operands():
if not memoperand.descriptor.is_branch_target:
continue
taddress = InstructionAddress(base_address=instr.label)
pass_ok = pass_ok and memoperand.address == taddress
instr_ant = instr
return pass_ok
[docs]
class BranchBraidNextPass(Pass):
"""BranchBraidNextPass pass.
"""
[docs]
def __init__(self, branch=None, force=False, bbl=20):
"""
"""
super(BranchBraidNextPass, self).__init__()
self._description = "Set the branch target of each branch to the " \
"next instruction but replicating the code " \
"creating a braid in execution."
self._branch = branch
self._force = force
self._bbl = bbl
if branch is not None:
self._description += "Only model '%s'" % branch
def __call__(self, building_block, target):
"""
:param building_block:
:param target:
"""
if not building_block.context.register_has_value(0):
reg = target.get_register_for_address_arithmetic(
building_block.context
)
building_block.add_init(
target.set_register(
reg, 0, building_block.context
)
)
building_block.context.set_register_value(reg, 0)
building_block.context.add_reserved_registers([reg])
counter = 0
all_counter = 0
rregs = 0
labels = []
label_instruction_map = {}
instr_ant = None
first_branch_hit = False
new_instructions = []
current_bbl = []
last_braid = None
last_bbl_head = None
first_braid = None
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
if instr_ant is None:
instr_ant = instr
if instr.label is None:
first_label = "first"
instr.set_label("first")
label_instruction_map[first_label] = instr
else:
first_label = instr.label
label_instruction_map[first_label] = instr
continue
if not instr_ant.branch:
if first_branch_hit:
current_bbl.append(instr_ant)
instr_ant = instr
continue
if self._branch is not None and \
instr_ant.name != self._branch:
if first_branch_hit:
current_bbl.append(instr_ant)
instr_ant = instr
continue
LOG.debug("Setting target for branch: '%s'", instr_ant)
target_set = False
for memoperand in instr_ant.memory_operands():
if not memoperand.descriptor.is_branch_target:
continue
if memoperand.address is not None and not self._force:
continue
if target_set:
continue
LOG.debug("Computing label")
if instr_ant.branch_relative:
if instr.label is None:
label = "rel_branch_%d" % all_counter
instr.set_label(label)
LOG.debug(
"Branch relative, new label: '%s'", label
)
else:
label = instr.label
LOG.debug(
"Branch relative, existing label: '%s'", label
)
elif rregs < 5:
if instr.label is None:
label = "abs_branch_%d" % counter
labels.append(label)
instr.set_label(label)
LOG.debug(
"Branch absolute, new label: '%s'", label
)
else:
label = instr.label
LOG.debug(
"Branch absolute, exiting label: '%s'", label
)
label_instruction_map[label] = instr
else:
label = labels[counter % 5]
assert label is not None
taddress = InstructionAddress(base_address=label)
if instr_ant.branch_relative:
taddress.set_target_instruction(instr)
else:
taddress.set_target_instruction(
label_instruction_map[label]
)
try:
memoperand.set_address(
taddress, building_block.context
)
target_set = True
instr_ant.add_comment("Branch fixed to: %s" % taddress)
except MicroprobeCodeGenerationError:
reg = \
target.get_register_for_address_arithmetic(
building_block.context)
building_block.add_init(
target.set_register_to_address(
reg, taddress, building_block.context
)
)
building_block.context.set_register_value(
reg, taddress
)
building_block.context.add_reserved_registers([reg])
memoperand.set_address(
taddress, building_block.context
)
rregs += 1
target_set = True
instr_ant.add_comment("Branch fixed to: %s" % taddress)
if not instr_ant.branch_relative:
counter += 1
else:
all_counter += 1
if first_branch_hit:
current_bbl.append(instr_ant)
if not first_branch_hit:
first_braid = instr_ant
new_instructions.append(current_bbl)
last_braid = instr_ant
last_bbl_head = instr
current_bbl = []
first_branch_hit = True
instr_ant = instr
bbl = building_block.cfg.bbls[-1]
if bbl.instrs[-1].branch:
LOG.debug("Last instruction is a BRANCH: %s", bbl.instrs[-1])
for memoperand in bbl.instrs[-1].memory_operands():
if not memoperand.descriptor.is_branch_target:
continue
if memoperand.address is not None and not self._force:
continue
target_set = False
for operand in memoperand.operands:
if operand.value is not None and not self._force:
target_set = True
break
if target_set:
continue
if (len(building_block.fini) >
0 and bbl.instrs[-1].branch_relative):
if building_block.fini[0].label is None:
label = "rel_branch_%d" % all_counter
building_block.fini[0].set_label(label)
else:
label = building_block.fini[0].label
taddress = InstructionAddress(base_address=label)
taddress.set_target_instruction(
label_instruction_map[first_label]
)
else:
assert first_label is not None
taddress = InstructionAddress(base_address=first_label)
taddress.set_target_instruction(
label_instruction_map[first_label]
)
try:
memoperand.set_address(
taddress, building_block.context
)
except MicroprobeCodeGenerationError:
reg = \
target.get_register_for_address_arithmetic(
building_block.context)
building_block.add_init(
target.set_register_to_address(
reg, taddress, building_block.context
)
)
building_block.context.set_register_value(
reg, taddress
)
building_block.context.add_reserved_registers([reg])
memoperand.set_address(
taddress, building_block.context
)
base_instructions = []
for elem in new_instructions:
if len(elem) == 0:
continue
base_instructions.append(elem[:])
new_instructions = [
elem for elem in new_instructions if len(elem) > 0
]
for idx, elem in enumerate(new_instructions):
if len(elem) == 0:
continue
new_instructions[idx] = [instr.copy() for instr in elem]
assert len(base_instructions) == len(new_instructions)
for elem in new_instructions:
for elem2 in elem:
elem2.add_comment("Braid instruction")
def flatten(lst):
return [item for sublist in lst for item in sublist]
basel = []
newl = []
if first_braid is not None:
basel.append(first_braid)
newl.append(first_braid)
basel.extend(flatten(base_instructions))
newl.extend(flatten(new_instructions))
if last_bbl_head is not None:
basel.append(last_bbl_head)
newl.append(last_bbl_head)
# Fix labels and braid the branches
translated_labels = {}
for idx, (base, new) in enumerate(zip(basel, newl)):
if base.label is None:
continue
if not base.label.startswith("rel_branch"):
# This has a label, need to be fixed on new
new.set_label("%s_braid" % base.label)
translated_labels[base.label] = "%s_braid" % base.label
continue
# Braid the branches
jbase = basel[idx-1]
jnew = newl[idx-1]
new.set_label("%s_2" % new.label)
for memoperand in jbase.memory_operands():
if not memoperand.descriptor.is_branch_target:
continue
if not jbase.branch_relative:
continue
taddress = InstructionAddress(base_address=new.label)
taddress.set_target_instruction(new)
memoperand.set_address(
taddress, building_block.context
)
if jbase is not jnew:
if base is not new:
base.set_label("%s_1" % base.label)
for memoperand in jnew.memory_operands():
if not memoperand.descriptor.is_branch_target:
continue
if not jnew.branch_relative:
continue
taddress = InstructionAddress(base_address=base.label)
taddress.set_target_instruction(base)
memoperand.set_address(
taddress, building_block.context
)
for nins in newl:
# Some operands of new instructions might refer to
# translated labels. Update them.
values = [oper.value for oper in nins.operands()]
for idx, val in enumerate(values):
if not isinstance(val, str):
continue
translated = False
for label in translated_labels:
for pattern in ["\\(%s\\)$", "^%s$"]:
bpattern = pattern % label
npattern = pattern % translated_labels[label]
npattern = npattern.replace("\\", "")
npattern = npattern.replace("^", "")
npattern = npattern.replace("$", "")
if re.search(bpattern, val):
val = re.sub(bpattern, npattern, val)
nins.operands()[idx].set_value(val, check=False)
translated = True
break
if translated:
break
chunks = list(enumerate(zip(base_instructions, new_instructions)))
for chunk in [chunks[i:i + self._bbl]
for i in range(0, len(chunks), self._bbl)]:
bins = flatten([chk[1][0] for chk in chunk])
nins = flatten([chk[1][1] for chk in chunk])
assert len(bins) == len(nins)
after_instr = bins[-1]
lidx = chunk[-1][0]
if lidx+1 >= len(chunks):
before_instr = last_bbl_head
else:
before_instr = chunks[lidx+1][1][0][0]
taddress = InstructionAddress(base_address=before_instr.label)
taddress.set_target_instruction(before_instr)
new_branch = target.branch_unconditional_relative(
after_instr.architecture_type.format.length +
after_instr.address, taddress
)
new_branch.add_comment("Jump to synch braid")
building_block.add_instructions(
[new_branch] + nins,
after=after_instr,
before=before_instr
)
# taddress = InstructionAddress(base_address=last_bbl_head.label)
# taddress.set_target_instruction(last_bbl_head)
# new_branch = target.branch_unconditional_relative(
# last_braid.architecture_type.format.length +
# last_braid.address, taddress
# )
# new_branch.add_comment("Branch to latest braid BBL")
# new_instructions = [new_branch] # + new_instructions
# building_block.add_instructions(new_instructions,
# after=last_braid,
# before=last_bbl_head)
[docs]
def check(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
pass_ok = True
for bbl in building_block.cfg.bbls:
instr_ant = None
for instr in bbl.instrs:
if instr_ant is None:
instr_ant = instr
continue
if not instr_ant.branch:
instr_ant = instr
continue
pass_ok = pass_ok and instr.label != ""
for memoperand in instr_ant.memory_operands():
if not memoperand.descriptor.is_branch_target:
continue
taddress = InstructionAddress(base_address=instr.label)
pass_ok = pass_ok and memoperand.address == taddress
instr_ant = instr
return pass_ok
[docs]
class FixIndirectBranchPass(Pass):
"""FixIndirectBranchPass pass.
"""
[docs]
def __init__(self):
"""
"""
super(FixIndirectBranchPass, self).__init__()
self._description = "Fix indirect branches or replace them by "\
"regular branch to next"
def __call__(self, building_block, target):
"""
:param building_block:
:param target:
"""
for bbl in building_block.cfg.bbls:
instr_ant = None
for instr in bbl.instrs:
if instr_ant is None:
instr_ant = instr
if not instr_ant.branch:
instr_ant = instr
continue
target_set, branch_operand = self.check_branch(instr_ant)
if target_set:
instr_ant = instr
continue
self._fix_branch(instr_ant, instr, target)
instr_ant = instr
last_ins = building_block.cfg.bbls[-1].instrs[-1]
if not last_ins.branch:
return
target_set, branch_operand = self.check_branch(instr_ant)
if target_set:
return
else:
# Last instruction is a branch without a target set.
# Change to nop
first_ins = building_block.cfg.bbls[0].instrs[0]
self._fix_branch(last_ins, first_ins, target)
last_ins.set_arch_type(target.nop().architecture_type)
last_ins.set_operands([op.value for op in target.nop().operands()])
[docs]
@staticmethod
def check_branch(local_instr):
"""
:param local_instr:
:type local_instr:
"""
target_set = False
branch_operand = False
for memoperand in local_instr.memory_operands():
if not memoperand.descriptor.is_branch_target:
continue
operands = [operand for operand in memoperand.operands
if operand.type.address_base or
operand.type.address_index]
branch_operand = len(operands) > 0
if memoperand.address is not None:
target_set = True
return target_set, branch_operand
@staticmethod
def _fix_branch(instr_ant, instr, target):
"""
:param local_instr:
:type local_instr:
"""
new_branch = target.branch_unconditional_relative(
instr_ant.address, instr.address
)
instr_ant.set_arch_type(new_branch.architecture_type)
instr_ant.set_operands([op.value for op in new_branch.operands()])
[docs]
class LinkBbls(Pass):
"""LinkBbls pass.
"""
[docs]
def __init__(self):
"""
"""
super(LinkBbls, self).__init__()
self._description = "Add unconditional branch between consecutive" \
" instructions without consecutive addresses if first " \
"instruction is not a branch"
def __call__(self, building_block, target):
"""
:param building_block:
:param target:
"""
for bbl in building_block.cfg.bbls:
instr_ant = None
for instr in bbl.instrs:
if instr_ant is None:
instr_ant = instr
continue
if instr_ant.branch:
instr_ant = instr
continue
if instr_ant.address is None:
instr_ant = instr
continue
if instr.address is None:
instr_ant = instr
continue
if ((instr_ant.architecture_type.format.length +
instr_ant.address) == instr.address):
instr_ant = instr
continue
ninstr = target.branch_unconditional_relative(
instr_ant.architecture_type.format.length +
instr_ant.address, instr)
ninstr.add_comment("LINKBBL ADDED")
building_block.add_instructions([ninstr],
after=instr_ant,
before=instr)
instr_ant = instr
[docs]
class InitializeBranchDecorator(Pass):
"""InitializeBranchDecoratorPass pass.
"""
_error_class = collections.namedtuple("LateErrorClass", ['next'])
[docs]
def __init__(self, default=None, indirect=None):
self._default = default
self._indirect = indirect
self._description = "Initialize branch decorator. Default: %s . " \
"Indirect: %s" % (self._default, self._indirect)
def __call__(self, building_block, dummy_target):
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
if not (instr.branch or instr.trap or instr.syscall):
continue
if "BP" not in instr.decorators and instr.branch_conditional:
instr.add_decorator("BP", self._default)
elif "BP" not in instr.decorators:
instr.add_decorator("BP", "T")
pattern = instr.decorators["BP"]["value"]
target_addr = None
for memoper in instr.memory_operands():
if isinstance(memoper.address, InstructionAddress):
target_addr = memoper.address
break
if target_addr is None:
for oper in instr.operands():
if isinstance(oper.value, InstructionAddress):
target_addr = oper.value
break
if target_addr is None:
if "BT" in instr.decorators:
target_addr = instr.decorators["BT"]['value']
else:
target_addr = self._indirect
else:
if "BT" in instr.decorators:
bt_addr = [
int(elem, 16) for elem
in instr.decorators["BT"]['value']
]
if building_block.context.code_segment is not None:
bt_addr = [
elem - building_block.context.code_segment
for elem in bt_addr
]
if instr.branch_conditional:
# Remove not taken branch addresses
bt_addr = [
elem for elem in bt_addr
if elem !=
instr.address.displacement +
instr.architecture_type.format.length
]
# TODO: This code assumes branch there are
# no conditional branch indirect branches
if len(set(bt_addr)) > 1:
raise MicroprobeCodeGenerationError(
"BT decorator with multiple targets "
"defined but instruction '%s' already"
" defines the target in its codification."
% (instr.assembly())
)
elif len(set(bt_addr)) == 1:
bt_addr = bt_addr[0]
if "EA" in instr.decorators:
# if each in decorator, we are dealing
# with real addresses. No check for
# mismatch.
#
# TODO: Check that 'translated' addresses
# mismatch or enaure we always deal with
# logic addresses
#
LOG.warning(
"Not checking for address missmatches"
)
elif bt_addr != target_addr.displacement:
raise MicroprobeCodeGenerationError(
"BT decorator specified for "
"instruction '%s' "
"but it already provides "
"the target in its "
"codification and they miss-match!"
% (instr.assembly())
)
instr.decorators.pop("BT")
if not instr.branch_conditional and 'N' in pattern:
if (target_addr - instr.address ==
instr.architecture_type.format.length and
"T" not in pattern):
# This is taken branch to next instruction
pass
else:
raise MicroprobeCodeGenerationError(
"Not taken branch pattern provided for an "
"unconditional branch '%s'" % instr.assembly()
)
if pattern is None:
def function_pattern():
raise MicroprobeCodeGenerationError(
"No branch pattern provided for branch '%s'" %
instr.assembly()
)
pattern_error = self._error_class(function_pattern)
instr.add_decorator("NI", pattern_error)
continue
if target_addr is None:
def function_target_addr():
raise MicroprobeCodeGenerationError(
"No target address provided for branch '%s'" %
instr.assembly()
)
target_addr_error = self._error_class(function_target_addr)
instr.add_decorator("NI", target_addr_error)
continue
if not isinstance(target_addr, list):
target_addr = [target_addr]
for idx, address in enumerate(target_addr):
if isinstance(address, str):
address = int(address, 16)
if not isinstance(address, InstructionAddress):
code = 0
if building_block.context.code_segment is not None:
code = building_block.context.code_segment
target_addr[idx] = InstructionAddress(
base_address="code",
displacement=address -
code)
next_address = instr.address + \
instr.architecture_type.format.length
def pattern_gen(lnext_address, lpattern, ltarget_addr):
target_iter = itertools.cycle(list(lpattern))
taken_target_address = itertools.cycle(ltarget_addr)
while True:
if next(target_iter) == 'N':
yield lnext_address
else:
yield next(taken_target_address)
instr.add_decorator(
"NI", itertools.cycle(
pattern_gen(next_address,
pattern, target_addr)))
[docs]
class NormalizeBranchTargetsPass(Pass):
"""NormalizeBranchTargetsPass pass.
"""
[docs]
def __init__(self):
""" """
super(NormalizeBranchTargetsPass, self).__init__()
self._description = "Normalize branch targets to remove the usage" \
"of labels"
self._labels = RejectingDict()
def __call__(self, building_block, dummy_target):
"""
:param building_block:
:param dummy_target:
"""
for instr in (elem for elem in building_block.init
if elem.label is not None):
self._register_label(instr)
for bbl in building_block.cfg.bbls:
for instr in (elem for elem in bbl.instrs
if elem.label is not None):
self._register_label(instr)
for instr in (elem for elem in building_block.fini
if elem.label is not None):
self._register_label(instr)
for instr in building_block.init:
self._normalize_label(instr)
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
self._normalize_label(instr)
for instr in building_block.fini:
self._normalize_label(instr)
return []
def _register_label(self, instruction):
"""
:param instruction:
"""
if instruction.label is None:
return
else:
self._labels[instruction.label.upper()] = instruction
def _normalize_label(self, instruction):
"""
:param instruction:
:param context:
"""
for operand in instruction.operands():
if operand.value is None:
continue
if not isinstance(operand.value, InstructionAddress):
continue
address = operand.value
LOG.debug(
"Normalizing label: '%s' of instruction '%s'", address,
instruction.name
)
if address.target_instruction is not None:
target_address = address.target_instruction.address
elif (
isinstance(address.base_address, str) and
address.base_address.upper() in list(self._labels.keys())
):
target_address = self._labels[
address.base_address.upper()].address + \
address.displacement
else:
LOG.critical(address.base_address)
LOG.critical(address)
LOG.critical(list(self._labels.keys()))
raise NotImplementedError
operand.set_value(target_address)
for moperand in instruction.memory_operands():
if moperand.address is None:
continue
if not isinstance(moperand.address, InstructionAddress):
continue
address = moperand.address
LOG.debug(
"Normalizing label: '%s' of instruction '%s'", address,
instruction.name
)
if address.target_instruction is not None:
target_address = address.target_instruction.address
elif (
isinstance(address.base_address, str) and
address.base_address.upper() in list(self._labels.keys())
):
target_address = self._labels[
address.base_address.upper()].address + \
address.displacement
else:
LOG.critical(address.base_address)
LOG.critical(address)
LOG.critical(list(self._labels.keys()))
raise NotImplementedError
moperand.update_address(target_address)
[docs]
class RandomizeByTypePass(Pass):
"""RandomizeByTypePass pass.
"""
[docs]
def __init__(self, instr1, instr2,
every, code, distance=1,
musage=None, reset=None):
"""
:param instr1:
:param instr2:
:param every:
"""
super(RandomizeByTypePass, self).__init__()
if not isinstance(instr1, list):
instr1 = [instr1]
self._instr1 = instr1
self._instr2 = instr2
if every <= 0:
fevery = None
elif every >= 1:
fevery = regular_probability(int(every))
else:
fevery = probability(every)
self._every = fevery
if distance < 1:
raise MicroprobeCodeGenerationError(
"Dependency distance should be 1 or larger"
)
self._distance = int(distance)
self._code = code
if musage is None:
musage = -1
self._musage = musage
self._reset = reset
self._description = "Randomize '%s' by '%s' every '%d' "\
"by adding '%s' code at distance '%d', with max reg "\
"usage of '%s' and reset mode: '%s'" \
% (
[instr.name for instr in instr1],
instr2.name, every, code, distance,
musage, reset
)
def __call__(self, building_block, target):
"""
:param building_block:
:param target:
"""
if self._every is None:
return
creg = None
usereg = []
cusage = 0
last_branch = None
register = None
zero_register = None
tregister = None
for bbl in building_block.cfg.bbls:
for instr in bbl.instrs:
if not instr.branch:
continue
if instr.architecture_type in self._instr1:
mrandom = self._every()
if not mrandom:
if instr.branch:
last_branch = instr
continue
instr.set_arch_type(self._instr2)
instr.add_comment("Randomized branch")
# take on operand input
soperand = None
for operand in instr.operands():
if not operand.is_input:
continue
if operand.type.constant:
continue
if operand.type.immediate:
continue
if operand.type.float:
continue
if operand.type.address_relative:
continue
if operand.type.address_absolute:
continue
if operand.type.address_immediate:
continue
if operand.type.address_base:
continue
if operand.type.address_index:
continue
soperand = operand
break
if soperand is None:
LOG.debug(
"Unable to randomize branch "
"without input register operands"
)
if instr.branch:
last_branch = instr
continue
values = soperand.type.values()
values = [val for val in values if val not in
building_block.context.reserved_registers
and val not in target.control_registers]
if (creg in values and creg not in usereg and
(cusage < self._musage or self._musage == -1)):
# Current register is in valid values and has not
# been used much, just need to set operand value and
# random code
register = creg
else:
# New register, need to reset according to settings
registers = [
val for val in values
if val not in usereg
]
if len(registers) == 0:
raise MicroprobeCodeGenerationError(
"No free registers available. Change the "
"generation policy settings."
)
register = registers[0]
creg = register
if len(registers) < 2 and tregister is None:
raise MicroprobeCodeGenerationError(
"No free registers available. Change the "
"generation policy settings."
)
if tregister is None:
tregister = registers[1]
building_block.context.add_reserved_registers(
[tregister]
)
if self._reset is not None and self._reset[0] == "add":
vrange = sorted(list(self._reset[1]))
diff = 0
if vrange[0] < 0:
diff = vrange[0]
value = RNDINT(maxmin=(vrange[0], vrange[1]))
value = value - diff
nins = target.add_to_register(register, value)
for ins in nins:
ins.add_comment(
"Instrunction added by "
"braid pass to avoid "
"convergence to zero."
)
building_block.add_fini(nins)
elif (self._reset is not None and
self._reset[0] == "rnd"):
vrange = sorted(list(self._reset[1]))
diff = 0
if vrange[0] < 0:
diff = vrange[0]
value = RNDINT(maxmin=(vrange[0], vrange[1]))
value = value - diff
if len(usereg) == 0:
# First time
regs = [
val for val in values
if val not in usereg and val not in
building_block.context.reserved_registers
]
if len(regs) < 2:
raise MicroprobeCodeGenerationError(
"No free registers available. "
"Change the "
"generation policy settings."
)
reg = regs[1]
nins = target.set_register(
reg, RNDINT(), building_block.context
)
for ins in nins:
ins.add_comment(
"Instrunction added by "
"randomize pass to avoid "
"convergence to zero."
)
building_block.add_init(nins)
nins = target.add_to_register(reg, value)
for ins in nins:
ins.add_comment(
"Instrunction added by "
"randomize pass to avoid "
"convergence to zero."
)
building_block.add_fini(nins)
building_block.context.add_reserved_registers(
[reg]
)
random_reg = reg
nins = target.randomize_register(
register, seed=random_reg
)
for ins in nins:
ins.add_comment(
"Instrunction added by "
"randomize pass to avoid "
"convergence to zero."
)
building_block.add_fini(nins)
else:
raise NotImplementedError
# Set operand
operand.set_value(tregister)
cusage = cusage + 1
if self._code is not None:
code = self._code.replace('@@REG@@', register.name)
code = code.replace('@@COUNT@@', str(cusage))
code = code.replace('@@BRREG@@', tregister.name)
instructions_def = interpret_asm(
code, target, building_block.labels
)
instructions = []
for definition in instructions_def:
instruction = Instruction()
instruction_set_def_properties(
instruction,
definition,
building_block=building_block,
target=target,
allowed_registers=[
register.name, tregister.name
]
)
instructions.append(instruction)
bdistance = bbl.distance(last_branch, instr)
if bdistance + 1 <= self._distance:
after_ins = last_branch
else:
after_ins = bbl.get_instruction_by_distance(
instr, self._distance * (-1)
)
building_block.add_instructions(
instructions,
after=after_ins,
before=instr
)
for operand in instr.operands():
if not operand.is_input:
continue
if operand.type.constant:
continue
if operand.type.immediate:
continue
if operand.type.float:
continue
if operand.type.address_relative:
continue
if operand.type.address_absolute:
continue
if operand.type.address_immediate:
continue
if operand.type.address_base:
continue
if operand.type.address_index:
continue
if operand.value is not None:
continue
if zero_register is None:
# Set the remaining operands to zero
values = operand.type.values()
values = [
val for val in values if val not in
building_block.context.reserved_registers
and val not in target.control_registers
and val not in usereg
and val not in [register, random_reg]
]
if len(values) == 0:
raise MicroprobeCodeGenerationError(
"No free registers available. Change the "
"generation policy settings."
)
zero_register = values[0]
building_block.context.add_reserved_registers(
[zero_register]
)
nins = target.set_register(
zero_register, 0, building_block.context
)
for ins in nins:
ins.add_comment(
"Instrunction added by "
"randomize pass to avoid "
"convergence to zero."
)
building_block.add_init(nins)
operand.set_value(zero_register)
# Use new register once one is exhausted (we can reset
# its value too)
if cusage >= self._musage:
usereg.append(register)
creg = None
cusage = 0
# reserve implicit operands (if they are not already
# reserved) and add them as allowed
for operand_descriptor in instr.implicit_operands:
register = list(operand_descriptor.type.values())[0]
instr.add_allow_register(register)
if operand_descriptor.is_output and register not \
in building_block.context.reserved_registers \
and register not in target.control_registers:
building_block.context.add_reserved_registers(
[register]
)
context_fix_functions = instr.check_context(
building_block.context
)
for context_fix_function in context_fix_functions:
try:
context_fix_function(target, building_block)
except MicroprobeUncheckableEnvironmentWarning as wue:
building_block.add_warning(str(wue))
if instr.branch:
last_branch = instr
if register is not None and register not in usereg:
usereg.append(register)
for register in usereg:
building_block.context.add_reserved_registers([register])
nins = target.set_register(
register, RNDINT(), building_block.context
)
for ins in nins:
ins.add_comment(
"Instrunction added by randomize pass to avoid "
"convergence to zero."
)
building_block.add_init(nins)