Skip to content

ssp

trestle.core.commands.author.ssp ¤

Create ssp from catalog and profile.

logger ¤

Classes¤

SSPAssemble (AuthorCommonCommand) ¤

Assemble markdown files of controls into an SSP json file.

Source code in trestle/core/commands/author/ssp.py
class SSPAssemble(AuthorCommonCommand):
    """Assemble markdown files of controls into an SSP json file."""

    name = 'ssp-assemble'

    def _init_arguments(self) -> None:
        name_help_str = (
            'Optional name of the ssp model in the trestle workspace that is being modified.  '
            'If not provided the output name is used.'
        )
        self.add_argument('-n', '--name', help=name_help_str, required=False, type=str)
        file_help_str = 'Name of the input markdown file directory'
        self.add_argument('-m', '--markdown', help=file_help_str, required=True, type=str)
        output_help_str = 'Name of the output generated json SSP'
        self.add_argument('-o', '--output', help=output_help_str, required=True, type=str)
        self.add_argument('-r', '--regenerate', action='store_true', help=const.HELP_REGENERATE)
        self.add_argument('-vn', '--version', help=const.HELP_VERSION, required=False, type=str)

    def _merge_imp_reqs(self, ssp: ossp.SystemSecurityPlan, imp_reqs: List[ossp.ImplementedRequirement]) -> None:
        """
        Merge the new imp_reqs into the ssp and optionally regenerate uuids.

        If a statement has same id and same by_comp uuid as ssp, use the ssp version with new description.
        Otherwise just insert the statement.
        When the statement was loaded it had access to the current components so the uuids should match.
        """
        id_map: Dict[str, Dict[str, ossp.Statement]] = {}
        control_map: Dict[str, ossp.ImplementedRequirement] = {}
        for imp_req in ssp.control_implementation.implemented_requirements:
            control_map[imp_req.control_id] = imp_req
            for statement in imp_req.statements:
                for by_comp in statement.by_components:
                    id_ = statement.statement_id
                    if id_ not in id_map:
                        id_map[id_] = {}
                    id_map[id_][by_comp.component_uuid] = statement

        for imp_req in imp_reqs:
            if imp_req.control_id in control_map:
                imp_req.uuid = control_map[imp_req.control_id].uuid
            for statement in as_list(imp_req.statements):
                id_ = statement.statement_id
                # for each statement id match the statement per component to the original
                if id_ in id_map:
                    comp_dict = id_map[id_]
                    for by_comp in as_list(statement.by_components):
                        if by_comp.component_uuid in comp_dict:
                            statement.uuid = comp_dict[by_comp.component_uuid].uuid
                            for orig_by_comp in as_list(comp_dict[by_comp.component_uuid].by_components):
                                if orig_by_comp.component_uuid == by_comp.component_uuid:
                                    by_comp.uuid = orig_by_comp.uuid
                                    break

        changed = ssp.control_implementation.implemented_requirements != imp_reqs
        ssp.control_implementation.implemented_requirements = imp_reqs
        return changed

    def _generate_roles_in_metadata(self, ssp: ossp.SystemSecurityPlan) -> bool:
        """Find all roles referenced by imp reqs and create role in metadata as needed."""
        metadata = ssp.metadata
        metadata.roles = as_list(metadata.roles)
        known_role_ids = [role.id for role in metadata.roles]
        changed = False
        for imp_req in ssp.control_implementation.implemented_requirements:
            role_ids = [resp_role.role_id for resp_role in as_list(imp_req.responsible_roles)]
            for role_id in role_ids:
                if role_id not in known_role_ids:
                    role = com.Role(id=role_id, title=role_id)
                    metadata.roles.append(role)
                    known_role_ids.append(role_id)
                    changed = True
        metadata.roles = none_if_empty(metadata.roles)
        return changed

    def _run(self, args: argparse.Namespace) -> int:
        try:
            log.set_log_level_from_args(args)
            trestle_root = pathlib.Path(args.trestle_root)

            md_path = trestle_root / args.markdown

            # the original, reference ssp name defaults to same as output if name not specified
            # thus in cyclic editing you are reading and writing same json ssp
            orig_ssp_name = args.output
            if args.name:
                orig_ssp_name = args.name
            new_ssp_name = args.output

            new_file_content_type = FileContentType.JSON

            # if output ssp already exists, load it to see if new one is different
            existing_ssp: Optional[ossp.SystemSecurityPlan] = None
            new_ssp_path = ModelUtils.full_path_for_top_level_model(trestle_root, new_ssp_name, ossp.SystemSecurityPlan)
            if new_ssp_path:
                _, _, existing_ssp = ModelUtils.load_distributed(new_ssp_path, trestle_root)
                new_file_content_type = FileContentType.path_to_content_type(new_ssp_path)

            ssp: ossp.SystemSecurityPlan
            comp_dict: Dict[str, generic.GenericComponent] = {}

            # if orig ssp exists - need to load it rather than instantiate new one
            orig_ssp_path = ModelUtils.full_path_for_top_level_model(
                trestle_root, orig_ssp_name, ossp.SystemSecurityPlan
            )

            context = ControlContext.generate(ContextPurpose.SSP, True, trestle_root, md_path)

            # need to load imp_reqs from markdown but need component first
            if orig_ssp_path:
                # load the existing json ssp
                _, _, ssp = ModelUtils.load_distributed(orig_ssp_path, trestle_root)
                for component in ssp.system_implementation.components:
                    comp_dict[component.title] = generic.GenericComponent.from_system_component(component)
                # read the new imp reqs from markdown and have them reference existing components
                imp_reqs = CatalogInterface.read_catalog_imp_reqs(md_path, comp_dict, context)
                new_imp_reqs = []
                for imp_req in imp_reqs:
                    new_imp_reqs.append(imp_req.as_ssp())
                self._merge_imp_reqs(ssp, new_imp_reqs)
                new_file_content_type = FileContentType.path_to_content_type(orig_ssp_path)
            else:
                # create a sample ssp to hold all the parts
                ssp = gens.generate_sample_model(ossp.SystemSecurityPlan)
                # load the imp_reqs from markdown and create components as needed, referenced by ### headers
                imp_reqs = CatalogInterface.read_catalog_imp_reqs(md_path, comp_dict, context)
                new_imp_reqs = []
                for imp_req in imp_reqs:
                    new_imp_reqs.append(imp_req.as_ssp())

                # create system implementation
                system_imp: ossp.SystemImplementation = gens.generate_sample_model(ossp.SystemImplementation)
                ssp.system_implementation = system_imp

                # create a control implementation to hold the implementated requirements
                control_imp: ossp.ControlImplementation = gens.generate_sample_model(ossp.ControlImplementation)
                control_imp.implemented_requirements = new_imp_reqs
                control_imp.description = const.SSP_SYSTEM_CONTROL_IMPLEMENTATION_TEXT

                # insert the parts into the ssp
                ssp.control_implementation = control_imp
                ssp.system_implementation = system_imp

                # we don't have access to the original profile so we don't know the href
                import_profile: ossp.ImportProfile = gens.generate_sample_model(ossp.ImportProfile)
                import_profile.href = 'REPLACE_ME'
                ssp.import_profile = import_profile

            # now that we know the complete list of needed components, add them to the sys_imp
            # TODO if the ssp already existed then components may need to be removed if not ref'd by imp_reqs
            component_list: List[ossp.SystemComponent] = []
            for comp in comp_dict.values():
                # need to skip the component corresponding to statement level prose
                if comp.title:
                    component_list.append(comp.as_system_component())
            if ssp.system_implementation.components:
                # reconstruct list with same order as existing, but add/remove components as needed
                new_list: List[ossp.SystemComponent] = []
                for comp in ssp.system_implementation.components:
                    if comp in component_list:
                        new_list.append(comp)
                for comp in component_list:
                    if comp not in new_list:
                        new_list.append(comp)
                ssp.system_implementation.components = new_list
            elif component_list:
                ssp.system_implementation.components = component_list
            self._generate_roles_in_metadata(ssp)

            if args.version:
                ssp.metadata.version = com.Version(__root__=args.version)

            if ModelUtils.models_are_equivalent(existing_ssp, ssp):
                logger.info('No changes to assembled ssp so ssp not written out.')
                return CmdReturnCodes.SUCCESS.value

            if args.regenerate:
                ssp, _, _ = ModelUtils.regenerate_uuids(ssp)
            ModelUtils.update_last_modified(ssp)

            # write out the ssp as json
            ModelUtils.save_top_level_model(ssp, trestle_root, new_ssp_name, new_file_content_type)

            return CmdReturnCodes.SUCCESS.value

        except Exception as e:  # pragma: no cover
            return handle_generic_command_exception(e, logger, 'Error while assembling SSP')
name ¤

SSPFilter (AuthorCommonCommand) ¤

Filter the controls in an ssp based on files included by profile and/or list of component names.

Source code in trestle/core/commands/author/ssp.py
class SSPFilter(AuthorCommonCommand):
    """Filter the controls in an ssp based on files included by profile and/or list of component names."""

    name = 'ssp-filter'

    def _init_arguments(self) -> None:
        file_help_str = 'Name of the input ssp'
        self.add_argument('-n', '--name', help=file_help_str, required=True, type=str)
        file_help_str = 'Name of the optional input profile that defines set of controls in filtered ssp'
        self.add_argument('-p', '--profile', help=file_help_str, required=False, type=str)
        output_help_str = 'Name of the output generated SSP'
        self.add_argument('-o', '--output', help=output_help_str, required=True, type=str)
        self.add_argument('-r', '--regenerate', action='store_true', help=const.HELP_REGENERATE)
        self.add_argument('-vn', '--version', help=const.HELP_VERSION, required=False, type=str)
        comp_help_str = 'Colon-delimited list of component names to include in filtered ssp.'
        self.add_argument('-c', '--components', help=comp_help_str, required=False, type=str)

    def _run(self, args: argparse.Namespace) -> int:
        try:
            log.set_log_level_from_args(args)
            trestle_root = pathlib.Path(args.trestle_root)
            comp_names: Optional[List[str]] = None
            if args.components:
                comp_names = args.components.split(':')
            elif not args.profile:
                logger.warning('You must specify either a profile or list of component names for ssp-filter.')
                return 1

            return self.filter_ssp(
                trestle_root, args.name, args.profile, args.output, args.regenerate, args.version, comp_names
            )
        except Exception as e:  # pragma: no cover
            return handle_generic_command_exception(e, logger, 'Error generating the filtered ssp')

    def filter_ssp(
        self,
        trestle_root: pathlib.Path,
        ssp_name: str,
        profile_name: str,
        out_name: str,
        regenerate: bool,
        version: Optional[str],
        components: Optional[List[str]] = None
    ) -> int:
        """
        Filter the ssp based on controls included by the profile and/or components and output new ssp.

        Args:
            trestle_root: root directory of the trestle project
            ssp_name: name of the ssp model
            profile_name: name of the optional profile model used for filtering
            out_name: name of the output ssp model with filtered controls
            regenerate: whether to regenerate the uuid's in the ssp
            version: new version for the model
            components: optional list of component names used for filtering

        Returns:
            0 on success, 1 otherwise
        """
        # load the ssp
        ssp: ossp.SystemSecurityPlan
        ssp, _ = load_validate_model_name(trestle_root, ssp_name, ossp.SystemSecurityPlan, FileContentType.JSON)
        profile_path = ModelUtils.path_for_top_level_model(
            trestle_root, profile_name, prof.Profile, FileContentType.JSON
        )

        if components:
            raw_comp_names = [ControlReader.simplify_name(name) for name in components]
            comp_uuids: List[str] = []
            for component in ssp.system_implementation.components:
                if ControlReader.simplify_name(component.title) in raw_comp_names:
                    comp_uuids.append(component.uuid)
            # imp_reqs can be by comp
            # and imp_reqs can have statements that are by comp
            if comp_uuids:
                new_imp_reqs: List[ossp.ImplementedRequirement] = []
                # these are all required to be present
                for imp_req in ssp.control_implementation.implemented_requirements:
                    new_by_comps: List[ossp.ByComponent] = []
                    # by_comps is optional
                    for by_comp in as_list(imp_req.by_components):
                        if by_comp.component.uuid in comp_uuids:
                            new_by_comps.append(by_comp)
                    imp_req.by_components = none_if_empty(new_by_comps)
                    new_imp_reqs.append(imp_req)
                    new_statements: List[ossp.Statement] = []
                    for statement in as_list(imp_req.statements):
                        new_by_comps: List[ossp.ByComponent] = []
                        for by_comp in as_list(statement.by_components):
                            if by_comp.component_uuid in comp_uuids:
                                new_by_comps.append(by_comp)
                        statement.by_components = none_if_empty(new_by_comps)
                        new_statements.append(statement)
                    imp_req.statements = none_if_empty(new_statements)
                ssp.control_implementation.implemented_requirements = new_imp_reqs
                # now remove any unused components from the ssp
                new_comp_list: List[ossp.SystemComponent] = []
                for comp in ssp.system_implementation.components:
                    if comp.uuid in comp_uuids:
                        new_comp_list.append(comp)
                ssp.system_implementation.components = new_comp_list

        # filter by controls in profile
        if profile_name:
            prof_resolver = ProfileResolver()
            catalog = prof_resolver.get_resolved_profile_catalog(trestle_root, profile_path, show_value_warnings=True)
            catalog_interface = CatalogInterface(catalog)

            # The input ssp should reference a superset of the controls referenced by the profile
            # Need to cull references in the ssp to controls not in the profile
            # Also make sure the output ssp contains imp reqs for all controls in the profile
            control_imp = ssp.control_implementation
            ssp_control_ids: Set[str] = set()

            new_set_params: List[ossp.SetParameter] = []
            for set_param in as_list(control_imp.set_parameters):
                control = catalog_interface.get_control_by_param_id(set_param.param_id)
                if control is not None:
                    new_set_params.append(set_param)
                    ssp_control_ids.add(control.id)
            control_imp.set_parameters = none_if_empty(new_set_params)

            new_imp_requirements: List[ossp.ImplementedRequirement] = []
            for imp_requirement in as_list(control_imp.implemented_requirements):
                control = catalog_interface.get_control(imp_requirement.control_id)
                if control is not None:
                    new_imp_requirements.append(imp_requirement)
                    ssp_control_ids.add(control.id)
            control_imp.implemented_requirements = new_imp_requirements

            # make sure all controls in the profile have implemented reqs in the final ssp
            if not ssp_control_ids.issuperset(catalog_interface.get_control_ids()):
                raise TrestleError('Unable to filter the ssp because the profile references controls not in it.')

            ssp.control_implementation = control_imp

        if version:
            ssp.metadata.version = com.Version(__root__=version)

        existing_ssp_path = ModelUtils.full_path_for_top_level_model(trestle_root, out_name, ossp.SystemSecurityPlan)
        if existing_ssp_path is not None:
            existing_ssp, _ = load_validate_model_name(trestle_root, out_name, ossp.SystemSecurityPlan)
            if ModelUtils.models_are_equivalent(existing_ssp, ssp):
                logger.info('No changes to filtered ssp so ssp not written out.')
                return CmdReturnCodes.SUCCESS.value

        if regenerate:
            ssp, _, _ = ModelUtils.regenerate_uuids(ssp)

        ModelUtils.update_last_modified(ssp)

        ModelUtils.save_top_level_model(ssp, trestle_root, out_name, FileContentType.JSON)

        return CmdReturnCodes.SUCCESS.value
name ¤
Methods¤
filter_ssp(self, trestle_root, ssp_name, profile_name, out_name, regenerate, version, components=None) ¤

Filter the ssp based on controls included by the profile and/or components and output new ssp.

Parameters:

Name Type Description Default
trestle_root Path

root directory of the trestle project

required
ssp_name str

name of the ssp model

required
profile_name str

name of the optional profile model used for filtering

required
out_name str

name of the output ssp model with filtered controls

required
regenerate bool

whether to regenerate the uuid's in the ssp

required
version Optional[str]

new version for the model

required
components Optional[List[str]]

optional list of component names used for filtering

None

Returns:

Type Description
int

0 on success, 1 otherwise

Source code in trestle/core/commands/author/ssp.py
def filter_ssp(
    self,
    trestle_root: pathlib.Path,
    ssp_name: str,
    profile_name: str,
    out_name: str,
    regenerate: bool,
    version: Optional[str],
    components: Optional[List[str]] = None
) -> int:
    """
    Filter the ssp based on controls included by the profile and/or components and output new ssp.

    Args:
        trestle_root: root directory of the trestle project
        ssp_name: name of the ssp model
        profile_name: name of the optional profile model used for filtering
        out_name: name of the output ssp model with filtered controls
        regenerate: whether to regenerate the uuid's in the ssp
        version: new version for the model
        components: optional list of component names used for filtering

    Returns:
        0 on success, 1 otherwise
    """
    # load the ssp
    ssp: ossp.SystemSecurityPlan
    ssp, _ = load_validate_model_name(trestle_root, ssp_name, ossp.SystemSecurityPlan, FileContentType.JSON)
    profile_path = ModelUtils.path_for_top_level_model(
        trestle_root, profile_name, prof.Profile, FileContentType.JSON
    )

    if components:
        raw_comp_names = [ControlReader.simplify_name(name) for name in components]
        comp_uuids: List[str] = []
        for component in ssp.system_implementation.components:
            if ControlReader.simplify_name(component.title) in raw_comp_names:
                comp_uuids.append(component.uuid)
        # imp_reqs can be by comp
        # and imp_reqs can have statements that are by comp
        if comp_uuids:
            new_imp_reqs: List[ossp.ImplementedRequirement] = []
            # these are all required to be present
            for imp_req in ssp.control_implementation.implemented_requirements:
                new_by_comps: List[ossp.ByComponent] = []
                # by_comps is optional
                for by_comp in as_list(imp_req.by_components):
                    if by_comp.component.uuid in comp_uuids:
                        new_by_comps.append(by_comp)
                imp_req.by_components = none_if_empty(new_by_comps)
                new_imp_reqs.append(imp_req)
                new_statements: List[ossp.Statement] = []
                for statement in as_list(imp_req.statements):
                    new_by_comps: List[ossp.ByComponent] = []
                    for by_comp in as_list(statement.by_components):
                        if by_comp.component_uuid in comp_uuids:
                            new_by_comps.append(by_comp)
                    statement.by_components = none_if_empty(new_by_comps)
                    new_statements.append(statement)
                imp_req.statements = none_if_empty(new_statements)
            ssp.control_implementation.implemented_requirements = new_imp_reqs
            # now remove any unused components from the ssp
            new_comp_list: List[ossp.SystemComponent] = []
            for comp in ssp.system_implementation.components:
                if comp.uuid in comp_uuids:
                    new_comp_list.append(comp)
            ssp.system_implementation.components = new_comp_list

    # filter by controls in profile
    if profile_name:
        prof_resolver = ProfileResolver()
        catalog = prof_resolver.get_resolved_profile_catalog(trestle_root, profile_path, show_value_warnings=True)
        catalog_interface = CatalogInterface(catalog)

        # The input ssp should reference a superset of the controls referenced by the profile
        # Need to cull references in the ssp to controls not in the profile
        # Also make sure the output ssp contains imp reqs for all controls in the profile
        control_imp = ssp.control_implementation
        ssp_control_ids: Set[str] = set()

        new_set_params: List[ossp.SetParameter] = []
        for set_param in as_list(control_imp.set_parameters):
            control = catalog_interface.get_control_by_param_id(set_param.param_id)
            if control is not None:
                new_set_params.append(set_param)
                ssp_control_ids.add(control.id)
        control_imp.set_parameters = none_if_empty(new_set_params)

        new_imp_requirements: List[ossp.ImplementedRequirement] = []
        for imp_requirement in as_list(control_imp.implemented_requirements):
            control = catalog_interface.get_control(imp_requirement.control_id)
            if control is not None:
                new_imp_requirements.append(imp_requirement)
                ssp_control_ids.add(control.id)
        control_imp.implemented_requirements = new_imp_requirements

        # make sure all controls in the profile have implemented reqs in the final ssp
        if not ssp_control_ids.issuperset(catalog_interface.get_control_ids()):
            raise TrestleError('Unable to filter the ssp because the profile references controls not in it.')

        ssp.control_implementation = control_imp

    if version:
        ssp.metadata.version = com.Version(__root__=version)

    existing_ssp_path = ModelUtils.full_path_for_top_level_model(trestle_root, out_name, ossp.SystemSecurityPlan)
    if existing_ssp_path is not None:
        existing_ssp, _ = load_validate_model_name(trestle_root, out_name, ossp.SystemSecurityPlan)
        if ModelUtils.models_are_equivalent(existing_ssp, ssp):
            logger.info('No changes to filtered ssp so ssp not written out.')
            return CmdReturnCodes.SUCCESS.value

    if regenerate:
        ssp, _, _ = ModelUtils.regenerate_uuids(ssp)

    ModelUtils.update_last_modified(ssp)

    ModelUtils.save_top_level_model(ssp, trestle_root, out_name, FileContentType.JSON)

    return CmdReturnCodes.SUCCESS.value

SSPGenerate (AuthorCommonCommand) ¤

Generate SSP in markdown form from a Profile.

Source code in trestle/core/commands/author/ssp.py
class SSPGenerate(AuthorCommonCommand):
    """Generate SSP in markdown form from a Profile."""

    name = 'ssp-generate'

    def _init_arguments(self) -> None:
        file_help_str = 'Name of the profile model in the trestle workspace'
        self.add_argument('-p', '--profile', help=file_help_str, required=True, type=str)
        self.add_argument('-o', '--output', help=const.HELP_MARKDOWN_NAME, required=True, type=str)
        self.add_argument('-y', '--yaml-header', help=const.HELP_YAML_PATH, required=False, type=str)
        self.add_argument(
            '-fo', '--force-overwrite', help=const.HELP_FO_OUTPUT, required=False, action='store_true', default=False
        )
        self.add_argument(
            '-ohv',
            '--overwrite-header-values',
            help=const.HELP_OVERWRITE_HEADER_VALUES,
            required=False,
            action='store_true',
            default=False
        )
        sections_help_str = (
            'Comma separated list of section:alias pairs.  Provides mapping of short names to long for markdown.'
        )
        self.add_argument('-s', '--sections', help=sections_help_str, required=False, type=str)
        allowed_sections_help_str = (
            'Comma separated list of section short names to include in the markdown.  Others will not appear.'
        )
        self.add_argument('-as', '--allowed-sections', help=allowed_sections_help_str, required=False, type=str)

    def _run(self, args: argparse.Namespace) -> int:
        try:
            log.set_log_level_from_args(args)
            trestle_root = args.trestle_root
            if not file_utils.is_directory_name_allowed(args.output):
                raise TrestleError(f'{args.output} is not an allowed directory name')

            if args.force_overwrite:
                try:
                    logger.debug(f'Overwriting the content of {args.output}.')
                    clear_folder(pathlib.Path(args.output))
                except TrestleError as e:  # pragma: no cover
                    raise TrestleError(f'Unable to overwrite contents of {args.output}: {e}')

            profile_path = trestle_root / f'profiles/{args.profile}/profile.json'

            yaml_header: dict = {}
            if args.yaml_header:
                try:
                    logging.debug(f'Loading yaml header file {args.yaml_header}')
                    yaml = YAML()
                    yaml_header = yaml.load(pathlib.Path(args.yaml_header).open('r'))
                except YAMLError as e:
                    raise TrestleError(f'YAML error loading yaml header for ssp generation: {e}')

            markdown_path = trestle_root / args.output

            profile_resolver = ProfileResolver()

            # in ssp context we want to see missing value warnings
            resolved_catalog = profile_resolver.get_resolved_profile_catalog(
                trestle_root, profile_path, show_value_warnings=True
            )
            catalog_interface = CatalogInterface(resolved_catalog)

            sections_dict: Dict[str, str] = {}
            if args.sections:
                sections_dict = sections_to_dict(args.sections)
                if const.STATEMENT in sections_dict:
                    raise TrestleError('Statement is not allowed as a section name.')
                # add any existing sections from the controls but only have short names
                control_section_short_names = catalog_interface.get_sections()
                for short_name in control_section_short_names:
                    if short_name not in sections_dict:
                        sections_dict[short_name] = short_name
                logger.debug(f'ssp sections dict: {sections_dict}')

            context = ControlContext.generate(ContextPurpose.SSP, True, trestle_root, markdown_path)
            context.yaml_header = yaml_header
            context.sections_dict = sections_dict
            context.prompt_responses = True
            context.overwrite_header_values = args.overwrite_header_values
            context.allowed_sections = args.allowed_sections

            catalog_interface.write_catalog_as_markdown(context, catalog_interface.get_statement_part_id_map(False))

            return CmdReturnCodes.SUCCESS.value

        except Exception as e:  # pragma: no cover
            return handle_generic_command_exception(e, logger, 'Error while writing markdown from catalog')
name ¤

handler: python