Skip to content

control_reader

trestle.core.control_reader ¤

Handle reading of writing controls from markdown.

logger ¤

Classes¤

ControlReader ¤

Class to read controls from markdown.

Source code in trestle/core/control_reader.py
class ControlReader():
    """Class to read controls from markdown."""

    @staticmethod
    def _parse_control_title_line(line: str) -> Tuple[int, str, str]:
        """Process the title line and extract the control id, group title (in brackets) and control title."""
        if line.count('-') == 0:
            raise TrestleError(f'Markdown control title format error, missing - after control id: {line}')
        split_line = line.split()
        if len(split_line) < 3 or split_line[2] != '-':
            raise TrestleError(f'Cannot parse control markdown title for control_id group and title: {line}')
        # first token after the #
        control_id = split_line[1]
        group_title_start = line.find('\[')
        group_title_end = line.find('\]')
        if group_title_start < 0 or group_title_end < 0 or group_title_start > group_title_end:
            raise TrestleError(f'unable to read group title for control {control_id}')
        group_title = line[group_title_start + 2:group_title_end].strip()
        control_title = line[group_title_end + 2:].strip()
        return control_id, group_title, control_title

    @staticmethod
    def _indent(line: str) -> int:
        """Measure indent of non-empty line."""
        if not line:
            raise TrestleError('Empty line queried for indent.')
        if line[0] not in [' ', '-']:
            return -1
        for ii in range(len(line)):
            if line[ii] == '-':
                return ii
            # if line is indented it must start with -
            if line[ii] != ' ':
                break
        raise TrestleError(f'List elements must start with -: {line}')

    @staticmethod
    def _get_next_line(ii: int, lines: List[str]) -> Tuple[int, str]:
        while ii < len(lines):
            line = lines[ii]
            if line:
                return ii, line
            ii += 1
        return -1, ''

    @staticmethod
    def _get_next_indent(ii: int, lines: List[str]) -> Tuple[int, int, str]:
        """Seek to next content line.  ii remains at line read."""
        while 0 <= ii < len(lines):
            line = lines[ii]
            if line:
                if line[0] == '#':
                    return ii, -1, line
                indent = ControlReader._indent(line)
                if indent >= 0:
                    # extract text after -
                    start = indent + 1
                    while start < len(line) and line[start] == ' ':
                        start += 1
                    if start >= len(line):
                        raise TrestleError(f'Invalid line {line}')
                    return ii, indent, line[start:]
                return ii, indent, line
            ii += 1
        return ii, -1, ''

    @staticmethod
    def _read_part_id_prose(line: str) -> Tuple[str, str]:
        """Extract the part id letter or number and prose from line."""
        start = line.find('\\[')
        end = line.find('\\]')
        prose = line.strip() if start < 0 else line[end + 2:].strip()
        id_ = '' if start < 0 or end < 0 else line[start + 2:end]
        return id_, prose

    @staticmethod
    def _bump_label(label: str) -> str:
        """
        Find next label given a string of 1 or more pure letters or digits.

        The input must be either a string of digits or a string of ascii letters - or empty string.
        """
        if not label:
            return 'a'
        if label[0] in string.digits:
            return str(int(label) + 1)
        if len(label) == 1 and label[0].lower() < 'z':
            return chr(ord(label[0]) + 1)
        # if this happens to be a string of letters, force it lowercase and bump
        label = label.lower()
        factor = 1
        value = 0
        # delta is needed because a counts as 0 when first value on right, but 1 for all others
        delta = 0
        for letter in label[::-1]:
            value += (ord(letter) - ord('a') + delta) * factor
            factor *= 26
            delta = 1

        value += 1

        new_label = ''
        delta = 0
        while value > 0:
            new_label += chr(ord('a') + value % 26 - delta)
            value = value // 26
            delta = 1
        return new_label[::-1]

    @staticmethod
    def _create_next_label(prev_label: str, indent: int) -> str:
        """
        Create new label at indent level based on previous label if available.

        If previous label is available, make this the next one in the sequence.
        Otherwise start with a or 1 on alternate levels of indentation.
        If alphabetic label reaches z, next one is aa.
        Numeric ranges from 1 to 9, then 10 etc.
        """
        if not prev_label:
            # assume indent goes in steps of 2
            return ['a', '1'][(indent // 2) % 2]
        label_prefix = ''
        label_suffix = prev_label
        is_char = prev_label[-1] in string.ascii_letters
        # if it isn't ending in letter or digit just append 'a' to end
        if not is_char and prev_label[-1] not in string.digits:
            return prev_label + 'a'
        # break in middle of string if mixed types
        if len(prev_label) > 1:
            ii = len(prev_label) - 1
            while ii >= 0:
                if prev_label[ii] not in string.ascii_letters + string.digits:
                    break
                if (prev_label[ii] in string.ascii_letters) != is_char:
                    break
                ii -= 1
            if ii >= 0:
                label_prefix = prev_label[:(ii + 1)]
                label_suffix = prev_label[(ii + 1):]

        return label_prefix + ControlReader._bump_label(label_suffix)

    @staticmethod
    def _read_parts(indent: int, ii: int, lines: List[str], parent_id: str,
                    parts: List[common.Part]) -> Tuple[int, List[common.Part]]:
        """If indentation level goes up or down, create new list or close current one."""
        while True:
            ii, new_indent, line = ControlReader._get_next_indent(ii, lines)
            if new_indent < 0:
                # we are done reading control statement
                return ii, parts
            if new_indent == indent:
                # create new item part and add to current list of parts
                id_text, prose = ControlReader._read_part_id_prose(line)
                # id_text is the part id and needs to be as a label property value
                # if none is there then create one from previous part, or use default
                if not id_text:
                    prev_label = ControlInterface.get_label(parts[-1]) if parts else ''
                    id_text = ControlReader._create_next_label(prev_label, indent)
                id_ = ControlInterface.strip_to_make_ncname(parent_id.rstrip('.') + '.' + id_text.strip('.'))
                name = 'objective' if id_.find('_obj') > 0 else 'item'
                prop = common.Property(name='label', value=id_text)
                part = common.Part(name=name, id=id_, prose=prose, props=[prop])
                parts.append(part)
                ii += 1
            elif new_indent > indent:
                # add new list of parts to last part and continue
                if len(parts) == 0:
                    raise TrestleError(f'Improper indentation structure: {line}')
                ii, new_parts = ControlReader._read_parts(new_indent, ii, lines, parts[-1].id, [])
                if new_parts:
                    parts[-1].parts = new_parts
            else:
                # return list of sub-parts
                return ii, parts

    @staticmethod
    def _read_control_statement(ii: int, lines: List[str], control_id: str) -> Tuple[int, common.Part]:
        """Search for the Control statement and read until next ## Control."""
        while 0 <= ii < len(lines) and not lines[ii].startswith(const.CONTROL_HEADER):
            ii += 1
        if ii >= len(lines):
            raise TrestleError(f'Control statement not found for control {control_id}')
        ii += 1

        ii, line = ControlReader._get_next_line(ii, lines)
        if ii < 0:
            # This means no statement and control withdrawn (this happens in NIST catalog)
            return ii, None
        if line and line[0] == ' ' and line.lstrip()[0] != '-':
            # prose that appears indented but has no - : treat it as the normal statement prose
            line = line.lstrip()
            indent = -1
            ii += 1
        else:
            ii, indent, line = ControlReader._get_next_indent(ii, lines)

        statement_id = ControlInterface.create_statement_id(control_id)
        statement_part = common.Part(name=const.STATEMENT, id=statement_id)
        # first line is either statement prose or start of statement parts
        if indent < 0:
            statement_part.prose = line
            ii += 1
        # we have absorbed possible statement prose.
        # now just read parts recursively
        # if there was no statement prose, this will re-read the line just read
        # as the start of the statement's parts
        ii, parts = ControlReader._read_parts(0, ii, lines, statement_part.id, [])
        statement_part.parts = none_if_empty(parts)
        return ii, statement_part

    @staticmethod
    def _read_control_objective(ii: int, lines: List[str], control_id: str) -> Tuple[int, Optional[common.Part]]:
        ii_orig = ii
        while 0 <= ii < len(lines) and not lines[ii].startswith(const.CONTROL_OBJECTIVE_HEADER):
            ii += 1

        if ii >= len(lines):
            return ii_orig, None
        ii += 1

        ii, line = ControlReader._get_next_line(ii, lines)
        if ii < 0:
            raise TrestleError(f'Unable to parse objective from control markdown {control_id}')
        if line and line[0] == ' ' and line.lstrip()[0] != '-':
            # prose that appears indented but has no - : treat it as the normal objective prose
            line = line.lstrip()
            indent = -1
            ii += 1
        else:
            ii, indent, line = ControlReader._get_next_indent(ii, lines)

        objective_part = common.Part(name='objective', id=f'{control_id}_obj')
        # first line is either objective prose or start of objective parts
        if indent < 0:
            objective_part.prose = line
            ii += 1
        # we have absorbed possible objective prose.
        # now just read parts recursively
        # if there was no objective prose, this will re-read the line just read
        # as the start of the objective's parts
        ii, parts = ControlReader._read_parts(0, ii, lines, objective_part.id, [])
        objective_part.parts = parts if parts else None
        return ii, objective_part

    @staticmethod
    def _read_sections(ii: int, lines: List[str], control_id: str,
                       control_parts: List[common.Part]) -> Tuple[int, Optional[List[common.Part]]]:
        """Read all sections following the section separated by ## Control."""
        new_parts = []
        prefix = const.CONTROL_HEADER + ' '
        while 0 <= ii < len(lines):
            line = lines[ii]
            if line.startswith('## What is the solution') or line.startswith(f'# {const.EDITABLE_CONTENT}'):
                ii += 1
                continue
            if not line:
                ii += 1
                continue
            if line and not line.startswith(prefix):
                # the control has no sections to read, so exit the loop
                break
            label = line[len(prefix):].strip()
            prose = ''
            ii += 1
            while 0 <= ii < len(lines) and not lines[ii].startswith(prefix) and not lines[ii].startswith(
                    f'# {const.EDITABLE_CONTENT}'):
                prose = '\n'.join([prose, lines[ii]])
                ii += 1
            if prose:
                if label.lower() == 'guidance':
                    id_ = ControlInterface.strip_to_make_ncname(control_id + '_gdn')
                else:
                    id_ = ControlInterface.strip_to_make_ncname(control_id + '_' + label)
                label = ControlInterface.strip_to_make_ncname(label)
                new_parts.append(common.Part(id=id_, name=label, prose=prose.strip('\n')))
        if new_parts:
            control_parts = [] if not control_parts else control_parts
            control_parts.extend(new_parts)
        control_parts = none_if_empty(control_parts)
        return ii, control_parts

    @staticmethod
    def _clean_prose(prose: List[str]) -> List[str]:
        # remove empty and horizontal rule lines at start and end of list of prose lines
        forward_index = 0
        for line in prose:
            if line.strip() and not line.startswith('____'):
                break
            forward_index += 1
        new_prose = prose[forward_index:]
        reverse_index = 0
        for line in reversed(new_prose):
            if line.strip() and not line.startswith('____'):
                break
            reverse_index += 1
        clean_prose = new_prose[:len(new_prose) - reverse_index]
        clean_prose = clean_prose if clean_prose else ['']
        # if there is no useful prose this will return [''] and allow generation of a statement with empty prose
        return clean_prose

    @staticmethod
    def _comp_name_in_dict(comp_name: str, comp_dict: CompDict) -> str:
        """If the name is already in the dict in a similar form, stick to that form."""
        simple_name = ControlReader.simplify_name(comp_name)
        for name in comp_dict.keys():
            if simple_name == ControlReader.simplify_name(name):
                return name
        return comp_name

    @staticmethod
    def _add_node_to_dict(
        comp_name: str,
        label: str,
        comp_dict: CompDict,
        node: MarkdownNode,
        control_id: str,
        comp_list: List[str],
        context: ControlContext
    ) -> None:
        """Extract the label, prose, possible component name - along with implementation status."""
        component_mode = context.purpose == ContextPurpose.COMPONENT
        # for ssp, ### marks component name but for component it is ##
        # if it is a header, make sure it has correct format
        if node.key and node.key[0] == '#' and ControlInterface.bad_header(node.key):
            raise TrestleError(f'Improper header format for control {control_id}: {node.key}')
        if not component_mode:
            # look for component name heading if present
            prefix = '### '
            if node.key.startswith(prefix):
                if len(node.key.split()) <= 1:
                    raise TrestleError(
                        f'Header line in control {control_id} markdown starts with {prefix} but has no content.'
                    )
                comp_name = node.key.split(' ', 1)[1].strip()
                simp_comp_name = ControlReader.simplify_name(comp_name)
                if simp_comp_name == ControlReader.simplify_name(const.SSP_MAIN_COMP_NAME) and not component_mode:
                    raise TrestleError(
                        f'Response in control {control_id} has {const.SSP_MAIN_COMP_NAME} as a component heading.  '
                        'Instead, place all response prose for the default component at the top of the section, '
                        'with no ### component_name heading.  It will be entered as prose for the default system '
                        'component.'
                    )
                if simp_comp_name in comp_list:
                    raise TrestleError(
                        f'Control {control_id} has a section with two component headings for {comp_name}.  '
                        'Please combine the sections so there is only one heading for each component in a '
                        'statement.'
                    )
                comp_list.append(simp_comp_name)
                comp_name = ControlReader._comp_name_in_dict(comp_name, comp_dict)
            elif node.key.startswith('## What is the solution'):
                comp_name = const.SSP_MAIN_COMP_NAME
                simp_comp_name = ControlReader.simplify_name(comp_name)
                comp_list.append(simp_comp_name)
                comp_name = ControlReader._comp_name_in_dict(comp_name, comp_dict)

        # prose may be empty in md and we want to capture that so put it in the comp_dict
        prose = '\n'.join(ControlReader._clean_prose(node.content.text))
        # add the prose to the comp_dict, creating new entry as needed
        if comp_name in comp_dict:
            if label in comp_dict[comp_name]:
                comp_dict[comp_name][label].prose = prose
            else:
                # create new entry with prose
                comp_dict[comp_name][label] = ComponentImpInfo(prose=prose, rules=[])
        else:
            comp_dict[comp_name] = {label: ComponentImpInfo(prose=prose, rules=[])}

        # build list of subnodes that get handled specially so they aren't processed here
        subnode_kill: List[int] = []
        status_str = None
        remarks_str = None
        rules_list: List[str] = []
        for ii, subnode in enumerate(node.subnodes):
            if subnode.key.find(const.IMPLEMENTATION_STATUS_REMARKS_HEADER) >= 0:
                remarks_str = subnode.key.split(maxsplit=4)[-1]
                subnode_kill.append(ii)
            elif subnode.key.find(const.IMPLEMENTATION_STATUS_HEADER) >= 0:
                status_str = subnode.key.split(maxsplit=3)[-1]
                subnode_kill.append(ii)
            elif subnode.key.find('Rules:') >= 0:
                rules_list = [text[2:] for text in subnode.content.text if text.startswith('- ')]
                subnode_kill.append(ii)
        if status_str:
            new_status = common.ImplementationStatus(state=status_str, remarks=remarks_str)
            if comp_name not in comp_dict:
                comp_dict[comp_name] = {}
            if label not in comp_dict[comp_name]:
                comp_dict[comp_name][label] = ComponentImpInfo(prose='', rules=[])
            comp_dict[comp_name][label].status = new_status
        if rules_list:
            comp_dict[comp_name][label].rules = rules_list
        delete_list_from_list(node.subnodes, subnode_kill)
        for subnode in as_list(node.subnodes):
            ControlReader._add_node_to_dict(comp_name, label, comp_dict, subnode, control_id, comp_list, context)

    @staticmethod
    def _get_statement_label(control: Optional[cat.Control], statement_id: str) -> str:
        if control:
            for part in as_list(control.parts):
                if part.name == const.STATEMENT:
                    for sub_part in as_list(part.parts):
                        if sub_part.name == 'item' and sub_part.id == statement_id:
                            return ControlInterface.get_label(sub_part)
        return ''

    @staticmethod
    def _add_component_to_dict(context: ControlContext, control: Optional[cat.Control],
                               comp_dict: CompDict) -> Tuple[Dict[str, Dict[str, str]], List[str]]:
        """Add imp_reqs for this control and this component to the component dictionary."""
        params_dict: Dict[str, Dict[str, str]] = {}
        all_rules: Set[str] = set()
        sub_comp_dict: Dict[str, ComponentImpInfo] = {}
        if control:
            component = ControlInterface.get_component_by_name(context.comp_def, context.comp_name)
            for control_imp in as_list(component.control_implementations):
                for imp_req in ControlInterface.get_control_imp_reqs(control_imp, control.id):
                    # if description is same as control id regard it as not having prose
                    # add top level control guidance with no statement id
                    prose = ControlReader._handle_empty_prose(imp_req.description, control.id)
                    params_dict.update(ControlInterface.get_params_dict_from_item(imp_req))
                    rules_list = ControlInterface.get_rule_list_for_item(imp_req)
                    all_rules.update(rules_list)
                    status = ControlInterface.get_status_from_props(imp_req)
                    sub_comp_dict[''] = ComponentImpInfo(prose=prose, status=status, rules=rules_list)
                    for statement in as_list(imp_req.statements):
                        rules_list = ControlInterface.get_rule_list_for_item(statement)
                        all_rules.update(rules_list)
                        status = ControlInterface.get_status_from_props(statement)
                        label = ControlReader._get_statement_label(control, statement.statement_id)
                        prose = ControlReader._handle_empty_prose(statement.description, statement.statement_id)
                        sub_comp_dict[label] = ComponentImpInfo(prose=prose, status=status, rules=rules_list)
            if sub_comp_dict:
                comp_dict[context.comp_name] = sub_comp_dict
        return params_dict, sorted(all_rules)

    @staticmethod
    def _insert_header_content(
        imp_req: generic.GenericImplementedRequirement, header: Dict[str, Any], control_id: str
    ) -> None:
        """Insert yaml header content into the imp_req and its by_comps."""
        dict_ = header.get(const.TRESTLE_PROPS_TAG, {})
        # if an attribute is in the dict but it is None, need to make sure we get empty list anyway
        control_orig = as_list(dict_.get(const.CONTROL_ORIGINATION, []))
        imp_status = as_list(dict_.get(const.IMPLEMENTATION_STATUS, []))
        roles = as_list(dict_.get(const.RESPONSIBLE_ROLES, []))
        props = []
        responsible_roles = []
        for co in control_orig:
            if isinstance(co, str):
                props.append(common.Property(ns=const.NAMESPACE_NIST, name=const.CONTROL_ORIGINATION, value=co))
            elif isinstance(co, dict):
                if const.STATUS_INHERITED in co:
                    uuid = co[const.STATUS_INHERITED]
                    props.append(common.Property(name=const.LEV_AUTH_UUID, value=uuid))
                    props.append(
                        common.Property(
                            ns=const.NAMESPACE_NIST, name=const.CONTROL_ORIGINATION, value=const.STATUS_INHERITED
                        )
                    )
                else:
                    raise TrestleError(f'The yaml header for control {control_id} has unexpected content: {co}')
            else:
                raise TrestleError(f'The yaml header for control {control_id} has unexpected content: {co}')
        # FIXME this needs reworking
        for status in imp_status:
            if isinstance(status, str):
                props.append(
                    common.Property(ns=const.NAMESPACE_FEDRAMP, name=const.IMPLEMENTATION_STATUS, value=status)
                )
            elif isinstance(status, dict):
                if const.STATUS_PLANNED in status:
                    if const.STATUS_COMPLETION_DATE not in status:
                        raise TrestleError(
                            f'Planned status in the control {control_id} yaml header must '
                            f'specify completion date: {status}'
                        )
                    props.append(
                        common.Property(
                            ns=const.NAMESPACE_FEDRAMP, name=const.STATUS_PLANNED, value=status[const.STATUS_PLANNED]
                        )
                    )
                    datestr = status[const.STATUS_COMPLETION_DATE]
                    datestr = datestr.strftime('%Y-%m-%d') if isinstance(datestr, datetime) else str(datestr)
                    props.append(
                        common.Property(
                            ns=const.NAMESPACE_FEDRAMP, name=const.STATUS_PLANNED_COMPLETION_DATE, value=datestr
                        )
                    )
                else:
                    if len(status) != 1:
                        raise TrestleError(f'Unexpected content in control {control_id} yaml header: {status}')
                    value = list(status.keys())[0]
                    remark = list(status.values())[0]
                    props.append(
                        common.Property(
                            ns=const.NAMESPACE_FEDRAMP,
                            name=const.IMPLEMENTATION_STATUS,
                            value=value,
                            remarks=common.Remarks(__root__=remark)
                        )
                    )
            else:
                raise TrestleError(f'Unexpected content in control {control_id} yaml header: {status}')
        for role in roles:
            if isinstance(role, str):
                # role_id must conform to NCNAME regex
                role = role.strip().replace(' ', '_')
                if role:
                    responsible_roles.append(common.ResponsibleRole(role_id=role))
            else:
                logger.warning(f'Role in header for control {control_id} not recognized: {role}')
        if props:
            imp_req.props = as_list(imp_req.props)
            imp_req.props.extend(props)
        if responsible_roles:
            imp_req.responsible_roles = as_list(imp_req.responsible_roles)
            imp_req.responsible_roles.extend(responsible_roles)
            imp_req.responsible_roles = none_if_empty(imp_req.responsible_roles)
            # enforce single list of resp. roles for control and each by_comp
            for by_comp in as_list(imp_req.by_components):
                by_comp.responsible_roles = imp_req.responsible_roles

    @staticmethod
    def simplify_name(name: str) -> str:
        """Simplify the name to ignore variations in case, space, hyphen, underscore, slash."""
        return name.lower().replace(' ', '').replace('-', '').replace('_', '').replace('/', '')

    @staticmethod
    def _get_label_from_implementation_header(imp_header: str):
        # assumed to be of form: Implementation for part a.
        split_header = imp_header.split(' ', 4)
        if len(split_header) != 5:
            raise TrestleError(f'Implementation header cannot be parsed for statement part: {imp_header}')
        return split_header[4].strip()

    @staticmethod
    def read_all_implementation_prose_and_header(
        control: Optional[cat.Control], control_file: pathlib.Path, context: ControlContext
    ) -> Tuple[CompDict, Dict[str, List[str]]]:
        """
        Find all labels and associated implementation prose in the markdown for this control.

        Args:
            control: optional control used for finding statement labels
            control_file: path to the control markdown file
            context: context of the control usage

        Returns:
            Dictionary by comp_name of Dictionaries of part labels and corresponding prose read from the markdown file.
            Also returns the yaml header as dict in second part of tuple.
            This does not generate components - it only tracks component names and associated responses.

        Notes:
            If a component is provided, any implementation prose for a control will be added.
            In addition, the implemented requirement will be queried for a
            property corresponding to implementation status and included if available.
        """
        # this level only adds for known component but add_node_to_dict can add for other components
        comp_name = context.comp_name if context.comp_name else const.SSP_MAIN_COMP_NAME
        control_id = control_file.stem

        comp_dict: CompDict = {}
        yaml_header = {}
        # use context.rules_dict and params_dict to map rules
        if context.purpose == ContextPurpose.COMPONENT:
            # find rule info needed by this control
            params_dict, rules_list = ControlReader._add_component_to_dict(context, control, comp_dict)
            all_params = []
            if params_dict:
                if not set(params_dict.keys()).issuperset(rules_list):
                    raise TrestleError(f'Control {control_id} has a parameter assigned to a rule that is not defined.')
                if context.rules_dict:
                    all_params.extend(
                        [
                            {
                                context.rules_dict[id_]['name']: context.rules_params_dict[id_]
                                for id_ in context.rules_params_dict.keys()
                            }
                        ]
                    )
            if context.rules_dict:
                rule_ids = [id_ for id_ in context.rules_dict.keys() if context.rules_dict[id_]['name'] in rules_list]
                control_rules = [context.rules_dict[id_] for id_ in rule_ids]
                if control_rules:
                    yaml_header[const.COMP_DEF_RULES_TAG] = control_rules
                all_params.extend(
                    [context.rules_params_dict[id_] for id_ in rule_ids if id_ in context.rules_params_dict]
                )
            if all_params:
                yaml_header[const.RULES_PARAMS_TAG] = all_params
            if context.rules_param_vals:
                yaml_header[const.COMP_DEF_RULES_PARAM_VALS_TAG] = context.rules_param_vals

        if not control_file.exists():
            return comp_dict, yaml_header
        # if the file exists, load the contents and do not use prose from comp_dict
        try:
            md_api = MarkdownAPI()
            new_yaml_header, control_md = md_api.processor.process_markdown(control_file)
            yaml_header = new_yaml_header

            # first get the header strings, including statement labels, for statement imp reqs
            imp_string = '## Implementation '
            headers = control_md.get_all_headers_for_level(2)
            # get e.g. ## Implementation a.  ## Implementation b. etc
            imp_header_list = [header for header in headers if header.startswith(imp_string)]

            # now get the (one) header for the main solution
            main_headers = list(control_md.get_all_headers_for_key(const.SSP_MD_IMPLEMENTATION_QUESTION, False))
            # should be only one header, so warn if others found
            if main_headers:
                if len(main_headers) > 1:
                    logger.warning(
                        f'Control {control_id} has {len(main_headers)} main header responses.  Will use first one only.'
                    )
                main_header = main_headers[0]
                node = control_md.get_all_nodes_for_keys([main_header], False)[0]
                # this node is top level so it will have empty label
                # it may have subnodes of Rules, Implementation Status, Implementaton Remarks
                ControlReader._add_node_to_dict(comp_name, '', comp_dict, node, control_id, [], context)
            for imp_header in imp_header_list:
                label = ControlReader._get_label_from_implementation_header(imp_header)
                node = control_md.get_node_for_key(imp_header)
                ControlReader._add_node_to_dict(comp_name, label, comp_dict, node, control_id, [], context)

        except TrestleError as e:
            raise TrestleError(f'Error occurred reading {control_file}: {e}')
        return comp_dict, yaml_header

    @staticmethod
    def _handle_empty_prose(prose: str, id_: str) -> str:
        """Regard prompt text or id_ as no prose and return blank string."""
        if prose.startswith(const.SSP_ADD_IMPLEMENTATION_PREFIX) or prose == id_:
            return ''
        return prose

    @staticmethod
    def read_implemented_requirement(
        control_file: pathlib.Path, avail_comps: Dict[str, generic.GenericComponent], context: ControlContext
    ) -> Tuple[str, generic.GenericImplementedRequirement]:
        """
        Get the implementated requirement associated with given control and link to existing components or new ones.

        Args:
            control_file: path of the control markdown file
            avail_comps: dictionary of known components keyed by component name
            context: context of the control usage

        Returns:
            Tuple: The control sort-id and the one implemented requirement for this control.

        Notes:
            Each statement may have several responses, with each response in a by_component for a specific component.
            statement_map keeps track of statements that may have several by_component responses.
        """
        control_id = control_file.stem
        comp_dict, header = ControlReader.read_all_implementation_prose_and_header(None, control_file, context)

        statement_map: Dict[str, generic.GenericStatement] = {}
        # create a new implemented requirement linked to the control id to hold the statements
        imp_req: generic.GenericImplementedRequirement = generic.GenericImplementedRequirement.generate()
        imp_req.control_id = control_id

        raw_comp_dict = {ControlReader.simplify_name(key): value for key, value in comp_dict.items()}
        raw_avail_comps = {ControlReader.simplify_name(key): value for key, value in avail_comps.items()}

        # the comp_dict captures all component names referenced by the control
        # need to create new components if not already in dict by looping over comps referenced by this control
        for comp_name in comp_dict.keys():
            component: Optional[generic.GenericComponent] = None
            raw_comp_name = ControlReader.simplify_name(comp_name)
            if raw_comp_name == ControlReader.simplify_name(const.SSP_MD_IMPLEMENTATION_QUESTION):
                comp_info: ComponentImpInfo = list(raw_comp_dict[raw_comp_name].items())[0][1]
                if context.purpose == ContextPurpose.COMPONENT and not comp_info.rules:
                    logger.debug(f'Control {control_id} not written to md because it has no rules associated.')
                    continue
                imp_req.description = ControlReader._handle_empty_prose(comp_info.prose, control_id)
                if comp_info.status:
                    ControlInterface.insert_status_in_props(imp_req, comp_info.status)
                continue
            if raw_comp_name in raw_avail_comps:
                component = raw_avail_comps[raw_comp_name]
            else:
                # here is where we create a new component on the fly as needed
                component = generic.GenericComponent.generate()
                component.title = comp_name
                avail_comps[comp_name] = component
                raw_avail_comps[raw_comp_name] = component
            # now create statements to hold the by-components and assign the statement id
            for label, comp_info in raw_comp_dict[raw_comp_name].items():
                if context.purpose == ContextPurpose.COMPONENT:
                    # only assemble responses with associated rules
                    if not comp_info.rules:
                        continue
                    # if there is a statement label create by_comp - otherwise assign status and prose to imp_req
                    # create a new by-component to add to this statement
                    if not label:
                        imp_req.description = ControlReader._handle_empty_prose(comp_info.prose, control_id)
                        ControlInterface.insert_status_in_props(imp_req, comp_info.status)
                        continue
                by_comp: generic.GenericByComponent = generic.GenericByComponent.generate()
                # link it to the component uuid
                by_comp.component_uuid = component.uuid
                by_comp.implementation_status = comp_info.status
                # add the response prose to the description
                by_comp.description = ControlReader._handle_empty_prose(comp_info.prose, control_id)
                statement_id = ControlInterface.create_statement_id(control_id)
                # control level response has '' as label
                if label in ['', const.STATEMENT]:
                    statement_part_id = statement_id
                else:
                    clean_label = label.strip('.')
                    statement_part_id = ControlInterface.strip_to_make_ncname(f'{statement_id}.{clean_label}')
                if statement_part_id in statement_map:
                    statement = statement_map[statement_part_id]
                else:
                    statement: generic.GenericStatement = generic.GenericStatement.generate()
                    statement.statement_id = statement_part_id
                    statement.by_components = []
                    statement_map[statement_part_id] = statement
                statement.by_components.append(by_comp)

        imp_req.statements = list(statement_map.values())
        ControlReader._insert_header_content(imp_req, header, control_id)
        sort_id = header.get(const.SORT_ID, control_id)
        return sort_id, imp_req

    @staticmethod
    def _add_control_part(
        control_id: str,
        subnode: MarkdownNode,
        required_sections_list: List[str],
        sections_dict: Dict[str, str],
        snake_dict: Dict[str, str],
        control_parts: List[common.Part],
        found_sections: List[str],
        write_mode: bool
    ) -> bool:
        match = re.match(const.CONTROL_REGEX, subnode.key)
        if match:
            part_name_raw = match.groups(0)[0]
            prose = ControlReader._clean_prose(subnode.content.text)
            prose = '\n'.join(prose)
            # prose may be empty but make part anyway if it was in markdown
            # it also may contain sub-parts
            part_name_snake = spaces_and_caps_to_snake(part_name_raw)
            part_name = snake_dict.get(part_name_snake, part_name_snake)
            # if section is required and it hasn't been edited with prose raise error
            if not write_mode and part_name in required_sections_list and prose.startswith(
                    const.PROFILE_ADD_REQUIRED_SECTION_FOR_CONTROL_TEXT):
                missing_section = sections_dict.get(part_name, part_name)
                raise TrestleError(f'Control {control_id} is missing prose for required section {missing_section}')
            id_ = f'{control_id}_{part_name}'
            # use sections dict to find correct title otherwise leave it None
            part_title = sections_dict.get(part_name, None)
            part = common.Part(id=id_, name=part_name, prose=prose, title=part_title)
            part.parts = ControlReader._add_sub_parts(part.id, subnode)
            control_parts.append(part)
            found_sections.append(part_name)
            return True
        return False

    @staticmethod
    def _add_sub_parts(part_id: str,
                       node: MarkdownNode,
                       fixed_part_name: Optional[str] = None) -> Optional[List[common.Part]]:
        if not node.subnodes:
            return None
        parts = []
        for subnode in node.subnodes:
            # the count of hashes should be correct based on parsing already down by the markdown parser
            match = re.match(const.AFTER_HASHES_REGEX, subnode.key)
            if not match:
                raise TrestleError(f'Unexpected editable header {subnode.key} found in part {part_id}')
            part_name = match.groups(0)[0]
            part_name_snake = spaces_and_caps_to_snake(part_name)
            id_ = part_id + '.' + part_name_snake
            prose_lines = ControlReader._clean_prose(subnode.content.text)
            prose = '\n'.join(prose_lines)
            final_part_name = fixed_part_name if fixed_part_name else part_name_snake
            part = common.Part(id=id_, name=final_part_name, prose=prose)
            part.parts = ControlReader._add_sub_parts(part.id, subnode, fixed_part_name)
            parts.append(part)
        return parts

    @staticmethod
    def _add_sub_part(
        control_id: str,
        subnode: MarkdownNode,
        label_map: Dict[str, str],
        by_id_parts: Dict[str, List[common.Part]],
        sections_dict: Dict[str, str]
    ) -> None:
        """Add subnode contents to the list of by_id statement parts for the top level of the control."""
        match = re.match(const.PART_REGEX, subnode.key)
        if not match:
            raise TrestleError(f'Unexpected editable header {subnode.key} found in control {control_id}')
        by_part_label = match.groups(0)[0]
        control_label_map = label_map.get(control_id, None)
        if control_label_map is None:
            raise TrestleError(f'No label map found for control {control_id}')
        by_part_id = control_label_map.get(by_part_label, None)
        if by_part_id is None:
            raise TrestleError(f'No part id found for label {by_part_label} in control {control_id}')
        inv_map = {v: k for k, v in sections_dict.items()} if sections_dict else {}
        for node2 in as_list(subnode.subnodes):
            hash_pattern = '### '
            if node2.key.startswith(hash_pattern):
                part_name = spaces_and_caps_to_snake(node2.key.replace(hash_pattern, '', 1).strip())
                part_name = inv_map.get(part_name, part_name)
                prose = ControlReader._clean_prose(node2.content.text)
                prose = '\n'.join(prose)
                id_ = f'{by_part_id}.{part_name}'
                part = common.Part(id=id_, name=part_name, prose=prose)
                part.parts = ControlReader._add_sub_parts(part.id, node2)
            else:
                raise TrestleError(f'Unexpected header {node2.key} found in control {control_id}')
            if by_part_id not in by_id_parts:
                by_id_parts[by_part_id] = []
            by_id_parts[by_part_id].append(part)

    @staticmethod
    def _get_props_list(control_id: str, label_map: Dict[str, str],
                        yaml_header: Dict[str, Any]) -> Tuple[List[common.Property], Dict[str, List[common.Property]]]:
        """Get the list of props in the yaml header of this control as separate lists with and without by_id."""
        prop_list = yaml_header.get(const.TRESTLE_ADD_PROPS_TAG, [])
        props = []
        props_by_id = {}
        for prop_d in prop_list:
            by_id = prop_d.get('smt-part', None)
            if by_id and control_id in label_map:
                by_id = label_map[control_id].get(by_id, by_id)
            prop = common.Property(name=prop_d['name'], value=prop_d['value'], ns=prop_d.get('ns', None))
            if by_id:
                if by_id not in props_by_id:
                    props_by_id[by_id] = []
                props_by_id[by_id].append(prop)
            else:
                props.append(prop)
        return props, props_by_id

    @staticmethod
    def read_editable_content(
        control_path: pathlib.Path,
        required_sections_list: List[str],
        label_map: Dict[str, Dict[str, str]],
        sections_dict: Dict[str, str],
        write_mode: bool
    ) -> Tuple[str, List[prof.Alter], Dict[str, Any]]:
        """Get parts for the markdown control corresponding to Editable Content - along with the set-parameter dict."""
        control_id = control_path.stem
        new_alters: List[prof.Alter] = []
        snake_dict: Dict[str, str] = {}

        md_api = MarkdownAPI()
        yaml_header, control_tree = md_api.processor.process_markdown(control_path)
        # extract the sort_id if present in header
        sort_id = yaml_header.get(const.SORT_ID, control_id)
        # merge the incoming sections_dict with the one in the header, with priority to incoming
        header_sections_dict: Dict[str, str] = yaml_header.get(const.SECTIONS_TAG, {})
        merged_sections_dict = merge_dicts(header_sections_dict, sections_dict)
        # query header for mapping of short to long section names
        # create reverse lookup of long snake name to short name needed for part
        for key, value in merged_sections_dict.items():
            snake_dict[spaces_and_caps_to_snake(value)] = key
        found_sections: List[str] = []

        editable_node = None
        for header in list(control_tree.get_all_headers_for_level(1)):
            if header.startswith('# Editable'):
                editable_node = control_tree.get_node_for_key(header)
                break
        if not editable_node:
            return sort_id, [], {}

        control_parts = []
        by_id_parts = {}
        for subnode in editable_node.subnodes:
            # check if it is a part added directly to the list of parts for the control
            if not ControlReader._add_control_part(control_id,
                                                   subnode,
                                                   required_sections_list,
                                                   merged_sections_dict,
                                                   snake_dict,
                                                   control_parts,
                                                   found_sections,
                                                   write_mode):
                # otherwise add it to the list of new parts to be added to the sub-parts of a part based on by-id
                ControlReader._add_sub_part(control_id, subnode, label_map, by_id_parts, merged_sections_dict)

        missing_sections = set(required_sections_list) - set(found_sections)
        if missing_sections:
            raise TrestleError(f'Control {control_id} is missing required sections {missing_sections}')
        param_dict: Dict[str, Any] = {}
        # get set_params from the header and add to parm_dict
        header_params = yaml_header.get(const.SET_PARAMS_TAG, {})
        if header_params:
            param_dict.update(header_params)

        props, props_by_id = ControlReader._get_props_list(control_id, label_map, yaml_header)

        # When adding props without by_id it can either be starting or ending and we default to ending
        # This is the default behavior as described for implicit binding in
        # https://pages.nist.gov/OSCAL/concepts/processing/profile-resolution/
        # When adding props to a part using by_id, it is the same situation because it cannot be before or after since
        # props are not in the same list as parts

        adds: List[prof.Add] = []

        # add the parts and props at control level
        if control_parts or props:
            adds.append(prof.Add(parts=none_if_empty(control_parts), props=none_if_empty(props), position='ending'))

        # add the parts and props at the part level, by-id
        by_ids = set(by_id_parts.keys()).union(props_by_id.keys())
        for by_id in sorted(by_ids):
            parts = by_id_parts.get(by_id, None)
            props = props_by_id.get(by_id, None)
            adds.append(prof.Add(parts=parts, props=props, position='ending', by_id=by_id))

        new_alters = []
        if adds:
            new_alters = [prof.Alter(control_id=control_id, adds=adds)]
        return sort_id, new_alters, param_dict

    @staticmethod
    def _update_display_prop_namespace(item: TypeWithProps):
        """Set namespace for special property display_name."""
        for prop in as_list(item.props):
            if prop.name == const.DISPLAY_NAME:
                prop.ns = const.TRESTLE_GENERIC_NS

    @staticmethod
    def read_control(control_path: pathlib.Path, set_parameters_flag: bool) -> Tuple[cat.Control, str]:
        """Read the control and group title from the markdown file."""
        control = gens.generate_sample_model(cat.Control)
        md_api = MarkdownAPI()
        yaml_header, control_tree = md_api.processor.process_markdown(control_path)
        control_titles = list(control_tree.get_all_headers_for_level(1))
        if len(control_titles) == 0:
            raise TrestleError(f'Control markdown: {control_path} contains no control title.')

        control.id, group_title, control.title = ControlReader._parse_control_title_line(control_titles[0])

        control_headers = list(control_tree.get_all_headers_for_level(2))
        if len(control_headers) == 0:
            raise TrestleError(f'Control markdown: {control_path} contains no control statements.')

        control_statement = control_tree.get_node_for_key(control_headers[0])
        rc, statement_part = ControlReader._read_control_statement(
            0, control_statement.content.raw_text.split('\n'), control.id
        )
        if rc < 0:
            return control, group_title
        control.parts = [statement_part] if statement_part else None
        control_objective = control_tree.get_node_for_key(const.CONTROL_OBJECTIVE_HEADER)
        if control_objective is not None:
            _, objective_part = ControlReader._read_control_objective(
                0, control_objective.content.raw_text.split('\n'), control.id
            )
            if objective_part:
                if control.parts:
                    control.parts.append(objective_part)
                else:
                    control.parts = [objective_part]
        for header_key in control_tree.get_all_headers_for_key(const.CONTROL_HEADER, False):
            if header_key not in {control_headers[0], const.CONTROL_OBJECTIVE_HEADER, control_titles[0]}:
                section_node = control_tree.get_node_for_key(header_key)
                _, control.parts = ControlReader._read_sections(
                    0, section_node.content.raw_text.split('\n'), control.id, control.parts
                )
        if set_parameters_flag:
            params: Dict[str, str] = yaml_header.get(const.SET_PARAMS_TAG, [])
            if params:
                control.params = []
                for id_, param_dict in params.items():
                    param_dict['id'] = id_
                    param = ModelUtils.dict_to_parameter(param_dict)
                    # if display_name is in list of properties, set its namespace
                    ControlReader._update_display_prop_namespace(param)
                    control.params.append(param)
        if const.SORT_ID in yaml_header:
            control.props = control.props if control.props else []
            control.props.append(common.Property(name=const.SORT_ID, value=yaml_header[const.SORT_ID]))
        return control, group_title
Methods¤
read_all_implementation_prose_and_header(control, control_file, context) staticmethod ¤

Find all labels and associated implementation prose in the markdown for this control.

Parameters:

Name Type Description Default
control Optional[trestle.oscal.catalog.Control]

optional control used for finding statement labels

required
control_file Path

path to the control markdown file

required
context ControlContext

context of the control usage

required

Returns:

Type Description
Tuple[Dict[str, Dict[str, trestle.core.control_interface.ComponentImpInfo]], Dict[str, List[str]]]

Dictionary by comp_name of Dictionaries of part labels and corresponding prose read from the markdown file. Also returns the yaml header as dict in second part of tuple. This does not generate components - it only tracks component names and associated responses.

Notes

If a component is provided, any implementation prose for a control will be added. In addition, the implemented requirement will be queried for a property corresponding to implementation status and included if available.

Source code in trestle/core/control_reader.py
@staticmethod
def read_all_implementation_prose_and_header(
    control: Optional[cat.Control], control_file: pathlib.Path, context: ControlContext
) -> Tuple[CompDict, Dict[str, List[str]]]:
    """
    Find all labels and associated implementation prose in the markdown for this control.

    Args:
        control: optional control used for finding statement labels
        control_file: path to the control markdown file
        context: context of the control usage

    Returns:
        Dictionary by comp_name of Dictionaries of part labels and corresponding prose read from the markdown file.
        Also returns the yaml header as dict in second part of tuple.
        This does not generate components - it only tracks component names and associated responses.

    Notes:
        If a component is provided, any implementation prose for a control will be added.
        In addition, the implemented requirement will be queried for a
        property corresponding to implementation status and included if available.
    """
    # this level only adds for known component but add_node_to_dict can add for other components
    comp_name = context.comp_name if context.comp_name else const.SSP_MAIN_COMP_NAME
    control_id = control_file.stem

    comp_dict: CompDict = {}
    yaml_header = {}
    # use context.rules_dict and params_dict to map rules
    if context.purpose == ContextPurpose.COMPONENT:
        # find rule info needed by this control
        params_dict, rules_list = ControlReader._add_component_to_dict(context, control, comp_dict)
        all_params = []
        if params_dict:
            if not set(params_dict.keys()).issuperset(rules_list):
                raise TrestleError(f'Control {control_id} has a parameter assigned to a rule that is not defined.')
            if context.rules_dict:
                all_params.extend(
                    [
                        {
                            context.rules_dict[id_]['name']: context.rules_params_dict[id_]
                            for id_ in context.rules_params_dict.keys()
                        }
                    ]
                )
        if context.rules_dict:
            rule_ids = [id_ for id_ in context.rules_dict.keys() if context.rules_dict[id_]['name'] in rules_list]
            control_rules = [context.rules_dict[id_] for id_ in rule_ids]
            if control_rules:
                yaml_header[const.COMP_DEF_RULES_TAG] = control_rules
            all_params.extend(
                [context.rules_params_dict[id_] for id_ in rule_ids if id_ in context.rules_params_dict]
            )
        if all_params:
            yaml_header[const.RULES_PARAMS_TAG] = all_params
        if context.rules_param_vals:
            yaml_header[const.COMP_DEF_RULES_PARAM_VALS_TAG] = context.rules_param_vals

    if not control_file.exists():
        return comp_dict, yaml_header
    # if the file exists, load the contents and do not use prose from comp_dict
    try:
        md_api = MarkdownAPI()
        new_yaml_header, control_md = md_api.processor.process_markdown(control_file)
        yaml_header = new_yaml_header

        # first get the header strings, including statement labels, for statement imp reqs
        imp_string = '## Implementation '
        headers = control_md.get_all_headers_for_level(2)
        # get e.g. ## Implementation a.  ## Implementation b. etc
        imp_header_list = [header for header in headers if header.startswith(imp_string)]

        # now get the (one) header for the main solution
        main_headers = list(control_md.get_all_headers_for_key(const.SSP_MD_IMPLEMENTATION_QUESTION, False))
        # should be only one header, so warn if others found
        if main_headers:
            if len(main_headers) > 1:
                logger.warning(
                    f'Control {control_id} has {len(main_headers)} main header responses.  Will use first one only.'
                )
            main_header = main_headers[0]
            node = control_md.get_all_nodes_for_keys([main_header], False)[0]
            # this node is top level so it will have empty label
            # it may have subnodes of Rules, Implementation Status, Implementaton Remarks
            ControlReader._add_node_to_dict(comp_name, '', comp_dict, node, control_id, [], context)
        for imp_header in imp_header_list:
            label = ControlReader._get_label_from_implementation_header(imp_header)
            node = control_md.get_node_for_key(imp_header)
            ControlReader._add_node_to_dict(comp_name, label, comp_dict, node, control_id, [], context)

    except TrestleError as e:
        raise TrestleError(f'Error occurred reading {control_file}: {e}')
    return comp_dict, yaml_header
read_control(control_path, set_parameters_flag) staticmethod ¤

Read the control and group title from the markdown file.

Source code in trestle/core/control_reader.py
@staticmethod
def read_control(control_path: pathlib.Path, set_parameters_flag: bool) -> Tuple[cat.Control, str]:
    """Read the control and group title from the markdown file."""
    control = gens.generate_sample_model(cat.Control)
    md_api = MarkdownAPI()
    yaml_header, control_tree = md_api.processor.process_markdown(control_path)
    control_titles = list(control_tree.get_all_headers_for_level(1))
    if len(control_titles) == 0:
        raise TrestleError(f'Control markdown: {control_path} contains no control title.')

    control.id, group_title, control.title = ControlReader._parse_control_title_line(control_titles[0])

    control_headers = list(control_tree.get_all_headers_for_level(2))
    if len(control_headers) == 0:
        raise TrestleError(f'Control markdown: {control_path} contains no control statements.')

    control_statement = control_tree.get_node_for_key(control_headers[0])
    rc, statement_part = ControlReader._read_control_statement(
        0, control_statement.content.raw_text.split('\n'), control.id
    )
    if rc < 0:
        return control, group_title
    control.parts = [statement_part] if statement_part else None
    control_objective = control_tree.get_node_for_key(const.CONTROL_OBJECTIVE_HEADER)
    if control_objective is not None:
        _, objective_part = ControlReader._read_control_objective(
            0, control_objective.content.raw_text.split('\n'), control.id
        )
        if objective_part:
            if control.parts:
                control.parts.append(objective_part)
            else:
                control.parts = [objective_part]
    for header_key in control_tree.get_all_headers_for_key(const.CONTROL_HEADER, False):
        if header_key not in {control_headers[0], const.CONTROL_OBJECTIVE_HEADER, control_titles[0]}:
            section_node = control_tree.get_node_for_key(header_key)
            _, control.parts = ControlReader._read_sections(
                0, section_node.content.raw_text.split('\n'), control.id, control.parts
            )
    if set_parameters_flag:
        params: Dict[str, str] = yaml_header.get(const.SET_PARAMS_TAG, [])
        if params:
            control.params = []
            for id_, param_dict in params.items():
                param_dict['id'] = id_
                param = ModelUtils.dict_to_parameter(param_dict)
                # if display_name is in list of properties, set its namespace
                ControlReader._update_display_prop_namespace(param)
                control.params.append(param)
    if const.SORT_ID in yaml_header:
        control.props = control.props if control.props else []
        control.props.append(common.Property(name=const.SORT_ID, value=yaml_header[const.SORT_ID]))
    return control, group_title
read_editable_content(control_path, required_sections_list, label_map, sections_dict, write_mode) staticmethod ¤

Get parts for the markdown control corresponding to Editable Content - along with the set-parameter dict.

Source code in trestle/core/control_reader.py
@staticmethod
def read_editable_content(
    control_path: pathlib.Path,
    required_sections_list: List[str],
    label_map: Dict[str, Dict[str, str]],
    sections_dict: Dict[str, str],
    write_mode: bool
) -> Tuple[str, List[prof.Alter], Dict[str, Any]]:
    """Get parts for the markdown control corresponding to Editable Content - along with the set-parameter dict."""
    control_id = control_path.stem
    new_alters: List[prof.Alter] = []
    snake_dict: Dict[str, str] = {}

    md_api = MarkdownAPI()
    yaml_header, control_tree = md_api.processor.process_markdown(control_path)
    # extract the sort_id if present in header
    sort_id = yaml_header.get(const.SORT_ID, control_id)
    # merge the incoming sections_dict with the one in the header, with priority to incoming
    header_sections_dict: Dict[str, str] = yaml_header.get(const.SECTIONS_TAG, {})
    merged_sections_dict = merge_dicts(header_sections_dict, sections_dict)
    # query header for mapping of short to long section names
    # create reverse lookup of long snake name to short name needed for part
    for key, value in merged_sections_dict.items():
        snake_dict[spaces_and_caps_to_snake(value)] = key
    found_sections: List[str] = []

    editable_node = None
    for header in list(control_tree.get_all_headers_for_level(1)):
        if header.startswith('# Editable'):
            editable_node = control_tree.get_node_for_key(header)
            break
    if not editable_node:
        return sort_id, [], {}

    control_parts = []
    by_id_parts = {}
    for subnode in editable_node.subnodes:
        # check if it is a part added directly to the list of parts for the control
        if not ControlReader._add_control_part(control_id,
                                               subnode,
                                               required_sections_list,
                                               merged_sections_dict,
                                               snake_dict,
                                               control_parts,
                                               found_sections,
                                               write_mode):
            # otherwise add it to the list of new parts to be added to the sub-parts of a part based on by-id
            ControlReader._add_sub_part(control_id, subnode, label_map, by_id_parts, merged_sections_dict)

    missing_sections = set(required_sections_list) - set(found_sections)
    if missing_sections:
        raise TrestleError(f'Control {control_id} is missing required sections {missing_sections}')
    param_dict: Dict[str, Any] = {}
    # get set_params from the header and add to parm_dict
    header_params = yaml_header.get(const.SET_PARAMS_TAG, {})
    if header_params:
        param_dict.update(header_params)

    props, props_by_id = ControlReader._get_props_list(control_id, label_map, yaml_header)

    # When adding props without by_id it can either be starting or ending and we default to ending
    # This is the default behavior as described for implicit binding in
    # https://pages.nist.gov/OSCAL/concepts/processing/profile-resolution/
    # When adding props to a part using by_id, it is the same situation because it cannot be before or after since
    # props are not in the same list as parts

    adds: List[prof.Add] = []

    # add the parts and props at control level
    if control_parts or props:
        adds.append(prof.Add(parts=none_if_empty(control_parts), props=none_if_empty(props), position='ending'))

    # add the parts and props at the part level, by-id
    by_ids = set(by_id_parts.keys()).union(props_by_id.keys())
    for by_id in sorted(by_ids):
        parts = by_id_parts.get(by_id, None)
        props = props_by_id.get(by_id, None)
        adds.append(prof.Add(parts=parts, props=props, position='ending', by_id=by_id))

    new_alters = []
    if adds:
        new_alters = [prof.Alter(control_id=control_id, adds=adds)]
    return sort_id, new_alters, param_dict
read_implemented_requirement(control_file, avail_comps, context) staticmethod ¤

Get the implementated requirement associated with given control and link to existing components or new ones.

Parameters:

Name Type Description Default
control_file Path

path of the control markdown file

required
avail_comps Dict[str, trestle.core.generic_oscal.GenericComponent]

dictionary of known components keyed by component name

required
context ControlContext

context of the control usage

required

Returns:

Type Description
Tuple

The control sort-id and the one implemented requirement for this control.

Notes

Each statement may have several responses, with each response in a by_component for a specific component. statement_map keeps track of statements that may have several by_component responses.

Source code in trestle/core/control_reader.py
@staticmethod
def read_implemented_requirement(
    control_file: pathlib.Path, avail_comps: Dict[str, generic.GenericComponent], context: ControlContext
) -> Tuple[str, generic.GenericImplementedRequirement]:
    """
    Get the implementated requirement associated with given control and link to existing components or new ones.

    Args:
        control_file: path of the control markdown file
        avail_comps: dictionary of known components keyed by component name
        context: context of the control usage

    Returns:
        Tuple: The control sort-id and the one implemented requirement for this control.

    Notes:
        Each statement may have several responses, with each response in a by_component for a specific component.
        statement_map keeps track of statements that may have several by_component responses.
    """
    control_id = control_file.stem
    comp_dict, header = ControlReader.read_all_implementation_prose_and_header(None, control_file, context)

    statement_map: Dict[str, generic.GenericStatement] = {}
    # create a new implemented requirement linked to the control id to hold the statements
    imp_req: generic.GenericImplementedRequirement = generic.GenericImplementedRequirement.generate()
    imp_req.control_id = control_id

    raw_comp_dict = {ControlReader.simplify_name(key): value for key, value in comp_dict.items()}
    raw_avail_comps = {ControlReader.simplify_name(key): value for key, value in avail_comps.items()}

    # the comp_dict captures all component names referenced by the control
    # need to create new components if not already in dict by looping over comps referenced by this control
    for comp_name in comp_dict.keys():
        component: Optional[generic.GenericComponent] = None
        raw_comp_name = ControlReader.simplify_name(comp_name)
        if raw_comp_name == ControlReader.simplify_name(const.SSP_MD_IMPLEMENTATION_QUESTION):
            comp_info: ComponentImpInfo = list(raw_comp_dict[raw_comp_name].items())[0][1]
            if context.purpose == ContextPurpose.COMPONENT and not comp_info.rules:
                logger.debug(f'Control {control_id} not written to md because it has no rules associated.')
                continue
            imp_req.description = ControlReader._handle_empty_prose(comp_info.prose, control_id)
            if comp_info.status:
                ControlInterface.insert_status_in_props(imp_req, comp_info.status)
            continue
        if raw_comp_name in raw_avail_comps:
            component = raw_avail_comps[raw_comp_name]
        else:
            # here is where we create a new component on the fly as needed
            component = generic.GenericComponent.generate()
            component.title = comp_name
            avail_comps[comp_name] = component
            raw_avail_comps[raw_comp_name] = component
        # now create statements to hold the by-components and assign the statement id
        for label, comp_info in raw_comp_dict[raw_comp_name].items():
            if context.purpose == ContextPurpose.COMPONENT:
                # only assemble responses with associated rules
                if not comp_info.rules:
                    continue
                # if there is a statement label create by_comp - otherwise assign status and prose to imp_req
                # create a new by-component to add to this statement
                if not label:
                    imp_req.description = ControlReader._handle_empty_prose(comp_info.prose, control_id)
                    ControlInterface.insert_status_in_props(imp_req, comp_info.status)
                    continue
            by_comp: generic.GenericByComponent = generic.GenericByComponent.generate()
            # link it to the component uuid
            by_comp.component_uuid = component.uuid
            by_comp.implementation_status = comp_info.status
            # add the response prose to the description
            by_comp.description = ControlReader._handle_empty_prose(comp_info.prose, control_id)
            statement_id = ControlInterface.create_statement_id(control_id)
            # control level response has '' as label
            if label in ['', const.STATEMENT]:
                statement_part_id = statement_id
            else:
                clean_label = label.strip('.')
                statement_part_id = ControlInterface.strip_to_make_ncname(f'{statement_id}.{clean_label}')
            if statement_part_id in statement_map:
                statement = statement_map[statement_part_id]
            else:
                statement: generic.GenericStatement = generic.GenericStatement.generate()
                statement.statement_id = statement_part_id
                statement.by_components = []
                statement_map[statement_part_id] = statement
            statement.by_components.append(by_comp)

    imp_req.statements = list(statement_map.values())
    ControlReader._insert_header_content(imp_req, header, control_id)
    sort_id = header.get(const.SORT_ID, control_id)
    return sort_id, imp_req
simplify_name(name) staticmethod ¤

Simplify the name to ignore variations in case, space, hyphen, underscore, slash.

Source code in trestle/core/control_reader.py
@staticmethod
def simplify_name(name: str) -> str:
    """Simplify the name to ignore variations in case, space, hyphen, underscore, slash."""
    return name.lower().replace(' ', '').replace('-', '').replace('_', '').replace('/', '')

handler: python