ssp
trestle.core.commands.author.ssp
¤
Create ssp from catalog and profile.
logger
¤
Classes¤
SSPAssemble (AuthorCommonCommand)
¤
Assemble markdown files of controls into an SSP json file.
Source code in trestle/core/commands/author/ssp.py
class SSPAssemble(AuthorCommonCommand):
"""Assemble markdown files of controls into an SSP json file."""
name = 'ssp-assemble'
def _init_arguments(self) -> None:
name_help_str = (
'Optional name of the ssp model in the trestle workspace that is being modified. '
'If not provided the output name is used.'
)
self.add_argument('-n', '--name', help=name_help_str, required=False, type=str)
file_help_str = 'Name of the input markdown file directory'
self.add_argument('-m', '--markdown', help=file_help_str, required=True, type=str)
output_help_str = 'Name of the output generated json SSP'
self.add_argument('-o', '--output', help=output_help_str, required=True, type=str)
self.add_argument('-r', '--regenerate', action='store_true', help=const.HELP_REGENERATE)
self.add_argument('-vn', '--version', help=const.HELP_VERSION, required=False, type=str)
def _merge_imp_reqs(self, ssp: ossp.SystemSecurityPlan, imp_reqs: List[ossp.ImplementedRequirement]) -> None:
"""
Merge the new imp_reqs into the ssp and optionally regenerate uuids.
If a statement has same id and same by_comp uuid as ssp, use the ssp version with new description.
Otherwise just insert the statement.
When the statement was loaded it had access to the current components so the uuids should match.
"""
id_map: Dict[str, Dict[str, ossp.Statement]] = {}
control_map: Dict[str, ossp.ImplementedRequirement] = {}
for imp_req in ssp.control_implementation.implemented_requirements:
control_map[imp_req.control_id] = imp_req
for statement in imp_req.statements:
for by_comp in statement.by_components:
id_ = statement.statement_id
if id_ not in id_map:
id_map[id_] = {}
id_map[id_][by_comp.component_uuid] = statement
for imp_req in imp_reqs:
if imp_req.control_id in control_map:
imp_req.uuid = control_map[imp_req.control_id].uuid
for statement in as_list(imp_req.statements):
id_ = statement.statement_id
# for each statement id match the statement per component to the original
if id_ in id_map:
comp_dict = id_map[id_]
for by_comp in as_list(statement.by_components):
if by_comp.component_uuid in comp_dict:
statement.uuid = comp_dict[by_comp.component_uuid].uuid
for orig_by_comp in as_list(comp_dict[by_comp.component_uuid].by_components):
if orig_by_comp.component_uuid == by_comp.component_uuid:
by_comp.uuid = orig_by_comp.uuid
break
changed = ssp.control_implementation.implemented_requirements != imp_reqs
ssp.control_implementation.implemented_requirements = imp_reqs
return changed
def _generate_roles_in_metadata(self, ssp: ossp.SystemSecurityPlan) -> bool:
"""Find all roles referenced by imp reqs and create role in metadata as needed."""
metadata = ssp.metadata
metadata.roles = as_list(metadata.roles)
known_role_ids = [role.id for role in metadata.roles]
changed = False
for imp_req in ssp.control_implementation.implemented_requirements:
role_ids = [resp_role.role_id for resp_role in as_list(imp_req.responsible_roles)]
for role_id in role_ids:
if role_id not in known_role_ids:
role = com.Role(id=role_id, title=role_id)
metadata.roles.append(role)
known_role_ids.append(role_id)
changed = True
metadata.roles = none_if_empty(metadata.roles)
return changed
def _run(self, args: argparse.Namespace) -> int:
try:
log.set_log_level_from_args(args)
trestle_root = pathlib.Path(args.trestle_root)
md_path = trestle_root / args.markdown
# the original, reference ssp name defaults to same as output if name not specified
# thus in cyclic editing you are reading and writing same json ssp
orig_ssp_name = args.output
if args.name:
orig_ssp_name = args.name
new_ssp_name = args.output
new_file_content_type = FileContentType.JSON
# if output ssp already exists, load it to see if new one is different
existing_ssp: Optional[ossp.SystemSecurityPlan] = None
new_ssp_path = ModelUtils.full_path_for_top_level_model(trestle_root, new_ssp_name, ossp.SystemSecurityPlan)
if new_ssp_path:
_, _, existing_ssp = ModelUtils.load_distributed(new_ssp_path, trestle_root)
new_file_content_type = FileContentType.path_to_content_type(new_ssp_path)
ssp: ossp.SystemSecurityPlan
comp_dict: Dict[str, generic.GenericComponent] = {}
# if orig ssp exists - need to load it rather than instantiate new one
orig_ssp_path = ModelUtils.full_path_for_top_level_model(
trestle_root, orig_ssp_name, ossp.SystemSecurityPlan
)
context = ControlContext.generate(ContextPurpose.SSP, True, trestle_root, md_path)
# need to load imp_reqs from markdown but need component first
if orig_ssp_path:
# load the existing json ssp
_, _, ssp = ModelUtils.load_distributed(orig_ssp_path, trestle_root)
for component in ssp.system_implementation.components:
comp_dict[component.title] = generic.GenericComponent.from_system_component(component)
# read the new imp reqs from markdown and have them reference existing components
imp_reqs = CatalogInterface.read_catalog_imp_reqs(md_path, comp_dict, context)
new_imp_reqs = []
for imp_req in imp_reqs:
new_imp_reqs.append(imp_req.as_ssp())
self._merge_imp_reqs(ssp, new_imp_reqs)
new_file_content_type = FileContentType.path_to_content_type(orig_ssp_path)
else:
# create a sample ssp to hold all the parts
ssp = gens.generate_sample_model(ossp.SystemSecurityPlan)
# load the imp_reqs from markdown and create components as needed, referenced by ### headers
imp_reqs = CatalogInterface.read_catalog_imp_reqs(md_path, comp_dict, context)
new_imp_reqs = []
for imp_req in imp_reqs:
new_imp_reqs.append(imp_req.as_ssp())
# create system implementation
system_imp: ossp.SystemImplementation = gens.generate_sample_model(ossp.SystemImplementation)
ssp.system_implementation = system_imp
# create a control implementation to hold the implementated requirements
control_imp: ossp.ControlImplementation = gens.generate_sample_model(ossp.ControlImplementation)
control_imp.implemented_requirements = new_imp_reqs
control_imp.description = const.SSP_SYSTEM_CONTROL_IMPLEMENTATION_TEXT
# insert the parts into the ssp
ssp.control_implementation = control_imp
ssp.system_implementation = system_imp
# we don't have access to the original profile so we don't know the href
import_profile: ossp.ImportProfile = gens.generate_sample_model(ossp.ImportProfile)
import_profile.href = 'REPLACE_ME'
ssp.import_profile = import_profile
# now that we know the complete list of needed components, add them to the sys_imp
# TODO if the ssp already existed then components may need to be removed if not ref'd by imp_reqs
component_list: List[ossp.SystemComponent] = []
for comp in comp_dict.values():
# need to skip the component corresponding to statement level prose
if comp.title:
component_list.append(comp.as_system_component())
if ssp.system_implementation.components:
# reconstruct list with same order as existing, but add/remove components as needed
new_list: List[ossp.SystemComponent] = []
for comp in ssp.system_implementation.components:
if comp in component_list:
new_list.append(comp)
for comp in component_list:
if comp not in new_list:
new_list.append(comp)
ssp.system_implementation.components = new_list
elif component_list:
ssp.system_implementation.components = component_list
self._generate_roles_in_metadata(ssp)
if args.version:
ssp.metadata.version = com.Version(__root__=args.version)
if ModelUtils.models_are_equivalent(existing_ssp, ssp):
logger.info('No changes to assembled ssp so ssp not written out.')
return CmdReturnCodes.SUCCESS.value
if args.regenerate:
ssp, _, _ = ModelUtils.regenerate_uuids(ssp)
ModelUtils.update_last_modified(ssp)
# write out the ssp as json
ModelUtils.save_top_level_model(ssp, trestle_root, new_ssp_name, new_file_content_type)
return CmdReturnCodes.SUCCESS.value
except Exception as e: # pragma: no cover
return handle_generic_command_exception(e, logger, 'Error while assembling SSP')
name
¤
SSPFilter (AuthorCommonCommand)
¤
Filter the controls in an ssp based on files included by profile and/or list of component names.
Source code in trestle/core/commands/author/ssp.py
class SSPFilter(AuthorCommonCommand):
"""Filter the controls in an ssp based on files included by profile and/or list of component names."""
name = 'ssp-filter'
def _init_arguments(self) -> None:
file_help_str = 'Name of the input ssp'
self.add_argument('-n', '--name', help=file_help_str, required=True, type=str)
file_help_str = 'Name of the optional input profile that defines set of controls in filtered ssp'
self.add_argument('-p', '--profile', help=file_help_str, required=False, type=str)
output_help_str = 'Name of the output generated SSP'
self.add_argument('-o', '--output', help=output_help_str, required=True, type=str)
self.add_argument('-r', '--regenerate', action='store_true', help=const.HELP_REGENERATE)
self.add_argument('-vn', '--version', help=const.HELP_VERSION, required=False, type=str)
comp_help_str = 'Colon-delimited list of component names to include in filtered ssp.'
self.add_argument('-c', '--components', help=comp_help_str, required=False, type=str)
def _run(self, args: argparse.Namespace) -> int:
try:
log.set_log_level_from_args(args)
trestle_root = pathlib.Path(args.trestle_root)
comp_names: Optional[List[str]] = None
if args.components:
comp_names = args.components.split(':')
elif not args.profile:
logger.warning('You must specify either a profile or list of component names for ssp-filter.')
return 1
return self.filter_ssp(
trestle_root, args.name, args.profile, args.output, args.regenerate, args.version, comp_names
)
except Exception as e: # pragma: no cover
return handle_generic_command_exception(e, logger, 'Error generating the filtered ssp')
def filter_ssp(
self,
trestle_root: pathlib.Path,
ssp_name: str,
profile_name: str,
out_name: str,
regenerate: bool,
version: Optional[str],
components: Optional[List[str]] = None
) -> int:
"""
Filter the ssp based on controls included by the profile and/or components and output new ssp.
Args:
trestle_root: root directory of the trestle project
ssp_name: name of the ssp model
profile_name: name of the optional profile model used for filtering
out_name: name of the output ssp model with filtered controls
regenerate: whether to regenerate the uuid's in the ssp
version: new version for the model
components: optional list of component names used for filtering
Returns:
0 on success, 1 otherwise
"""
# load the ssp
ssp: ossp.SystemSecurityPlan
ssp, _ = load_validate_model_name(trestle_root, ssp_name, ossp.SystemSecurityPlan, FileContentType.JSON)
profile_path = ModelUtils.path_for_top_level_model(
trestle_root, profile_name, prof.Profile, FileContentType.JSON
)
if components:
raw_comp_names = [ControlReader.simplify_name(name) for name in components]
comp_uuids: List[str] = []
for component in ssp.system_implementation.components:
if ControlReader.simplify_name(component.title) in raw_comp_names:
comp_uuids.append(component.uuid)
# imp_reqs can be by comp
# and imp_reqs can have statements that are by comp
if comp_uuids:
new_imp_reqs: List[ossp.ImplementedRequirement] = []
# these are all required to be present
for imp_req in ssp.control_implementation.implemented_requirements:
new_by_comps: List[ossp.ByComponent] = []
# by_comps is optional
for by_comp in as_list(imp_req.by_components):
if by_comp.component.uuid in comp_uuids:
new_by_comps.append(by_comp)
imp_req.by_components = none_if_empty(new_by_comps)
new_imp_reqs.append(imp_req)
new_statements: List[ossp.Statement] = []
for statement in as_list(imp_req.statements):
new_by_comps: List[ossp.ByComponent] = []
for by_comp in as_list(statement.by_components):
if by_comp.component_uuid in comp_uuids:
new_by_comps.append(by_comp)
statement.by_components = none_if_empty(new_by_comps)
new_statements.append(statement)
imp_req.statements = none_if_empty(new_statements)
ssp.control_implementation.implemented_requirements = new_imp_reqs
# now remove any unused components from the ssp
new_comp_list: List[ossp.SystemComponent] = []
for comp in ssp.system_implementation.components:
if comp.uuid in comp_uuids:
new_comp_list.append(comp)
ssp.system_implementation.components = new_comp_list
# filter by controls in profile
if profile_name:
prof_resolver = ProfileResolver()
catalog = prof_resolver.get_resolved_profile_catalog(trestle_root, profile_path, show_value_warnings=True)
catalog_interface = CatalogInterface(catalog)
# The input ssp should reference a superset of the controls referenced by the profile
# Need to cull references in the ssp to controls not in the profile
# Also make sure the output ssp contains imp reqs for all controls in the profile
control_imp = ssp.control_implementation
ssp_control_ids: Set[str] = set()
new_set_params: List[ossp.SetParameter] = []
for set_param in as_list(control_imp.set_parameters):
control = catalog_interface.get_control_by_param_id(set_param.param_id)
if control is not None:
new_set_params.append(set_param)
ssp_control_ids.add(control.id)
control_imp.set_parameters = none_if_empty(new_set_params)
new_imp_requirements: List[ossp.ImplementedRequirement] = []
for imp_requirement in as_list(control_imp.implemented_requirements):
control = catalog_interface.get_control(imp_requirement.control_id)
if control is not None:
new_imp_requirements.append(imp_requirement)
ssp_control_ids.add(control.id)
control_imp.implemented_requirements = new_imp_requirements
# make sure all controls in the profile have implemented reqs in the final ssp
if not ssp_control_ids.issuperset(catalog_interface.get_control_ids()):
raise TrestleError('Unable to filter the ssp because the profile references controls not in it.')
ssp.control_implementation = control_imp
if version:
ssp.metadata.version = com.Version(__root__=version)
existing_ssp_path = ModelUtils.full_path_for_top_level_model(trestle_root, out_name, ossp.SystemSecurityPlan)
if existing_ssp_path is not None:
existing_ssp, _ = load_validate_model_name(trestle_root, out_name, ossp.SystemSecurityPlan)
if ModelUtils.models_are_equivalent(existing_ssp, ssp):
logger.info('No changes to filtered ssp so ssp not written out.')
return CmdReturnCodes.SUCCESS.value
if regenerate:
ssp, _, _ = ModelUtils.regenerate_uuids(ssp)
ModelUtils.update_last_modified(ssp)
ModelUtils.save_top_level_model(ssp, trestle_root, out_name, FileContentType.JSON)
return CmdReturnCodes.SUCCESS.value
name
¤
Methods¤
filter_ssp(self, trestle_root, ssp_name, profile_name, out_name, regenerate, version, components=None)
¤
Filter the ssp based on controls included by the profile and/or components and output new ssp.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
trestle_root |
Path |
root directory of the trestle project |
required |
ssp_name |
str |
name of the ssp model |
required |
profile_name |
str |
name of the optional profile model used for filtering |
required |
out_name |
str |
name of the output ssp model with filtered controls |
required |
regenerate |
bool |
whether to regenerate the uuid's in the ssp |
required |
version |
Optional[str] |
new version for the model |
required |
components |
Optional[List[str]] |
optional list of component names used for filtering |
None |
Returns:
Type | Description |
---|---|
int |
0 on success, 1 otherwise |
Source code in trestle/core/commands/author/ssp.py
def filter_ssp(
self,
trestle_root: pathlib.Path,
ssp_name: str,
profile_name: str,
out_name: str,
regenerate: bool,
version: Optional[str],
components: Optional[List[str]] = None
) -> int:
"""
Filter the ssp based on controls included by the profile and/or components and output new ssp.
Args:
trestle_root: root directory of the trestle project
ssp_name: name of the ssp model
profile_name: name of the optional profile model used for filtering
out_name: name of the output ssp model with filtered controls
regenerate: whether to regenerate the uuid's in the ssp
version: new version for the model
components: optional list of component names used for filtering
Returns:
0 on success, 1 otherwise
"""
# load the ssp
ssp: ossp.SystemSecurityPlan
ssp, _ = load_validate_model_name(trestle_root, ssp_name, ossp.SystemSecurityPlan, FileContentType.JSON)
profile_path = ModelUtils.path_for_top_level_model(
trestle_root, profile_name, prof.Profile, FileContentType.JSON
)
if components:
raw_comp_names = [ControlReader.simplify_name(name) for name in components]
comp_uuids: List[str] = []
for component in ssp.system_implementation.components:
if ControlReader.simplify_name(component.title) in raw_comp_names:
comp_uuids.append(component.uuid)
# imp_reqs can be by comp
# and imp_reqs can have statements that are by comp
if comp_uuids:
new_imp_reqs: List[ossp.ImplementedRequirement] = []
# these are all required to be present
for imp_req in ssp.control_implementation.implemented_requirements:
new_by_comps: List[ossp.ByComponent] = []
# by_comps is optional
for by_comp in as_list(imp_req.by_components):
if by_comp.component.uuid in comp_uuids:
new_by_comps.append(by_comp)
imp_req.by_components = none_if_empty(new_by_comps)
new_imp_reqs.append(imp_req)
new_statements: List[ossp.Statement] = []
for statement in as_list(imp_req.statements):
new_by_comps: List[ossp.ByComponent] = []
for by_comp in as_list(statement.by_components):
if by_comp.component_uuid in comp_uuids:
new_by_comps.append(by_comp)
statement.by_components = none_if_empty(new_by_comps)
new_statements.append(statement)
imp_req.statements = none_if_empty(new_statements)
ssp.control_implementation.implemented_requirements = new_imp_reqs
# now remove any unused components from the ssp
new_comp_list: List[ossp.SystemComponent] = []
for comp in ssp.system_implementation.components:
if comp.uuid in comp_uuids:
new_comp_list.append(comp)
ssp.system_implementation.components = new_comp_list
# filter by controls in profile
if profile_name:
prof_resolver = ProfileResolver()
catalog = prof_resolver.get_resolved_profile_catalog(trestle_root, profile_path, show_value_warnings=True)
catalog_interface = CatalogInterface(catalog)
# The input ssp should reference a superset of the controls referenced by the profile
# Need to cull references in the ssp to controls not in the profile
# Also make sure the output ssp contains imp reqs for all controls in the profile
control_imp = ssp.control_implementation
ssp_control_ids: Set[str] = set()
new_set_params: List[ossp.SetParameter] = []
for set_param in as_list(control_imp.set_parameters):
control = catalog_interface.get_control_by_param_id(set_param.param_id)
if control is not None:
new_set_params.append(set_param)
ssp_control_ids.add(control.id)
control_imp.set_parameters = none_if_empty(new_set_params)
new_imp_requirements: List[ossp.ImplementedRequirement] = []
for imp_requirement in as_list(control_imp.implemented_requirements):
control = catalog_interface.get_control(imp_requirement.control_id)
if control is not None:
new_imp_requirements.append(imp_requirement)
ssp_control_ids.add(control.id)
control_imp.implemented_requirements = new_imp_requirements
# make sure all controls in the profile have implemented reqs in the final ssp
if not ssp_control_ids.issuperset(catalog_interface.get_control_ids()):
raise TrestleError('Unable to filter the ssp because the profile references controls not in it.')
ssp.control_implementation = control_imp
if version:
ssp.metadata.version = com.Version(__root__=version)
existing_ssp_path = ModelUtils.full_path_for_top_level_model(trestle_root, out_name, ossp.SystemSecurityPlan)
if existing_ssp_path is not None:
existing_ssp, _ = load_validate_model_name(trestle_root, out_name, ossp.SystemSecurityPlan)
if ModelUtils.models_are_equivalent(existing_ssp, ssp):
logger.info('No changes to filtered ssp so ssp not written out.')
return CmdReturnCodes.SUCCESS.value
if regenerate:
ssp, _, _ = ModelUtils.regenerate_uuids(ssp)
ModelUtils.update_last_modified(ssp)
ModelUtils.save_top_level_model(ssp, trestle_root, out_name, FileContentType.JSON)
return CmdReturnCodes.SUCCESS.value
SSPGenerate (AuthorCommonCommand)
¤
Generate SSP in markdown form from a Profile.
Source code in trestle/core/commands/author/ssp.py
class SSPGenerate(AuthorCommonCommand):
"""Generate SSP in markdown form from a Profile."""
name = 'ssp-generate'
def _init_arguments(self) -> None:
file_help_str = 'Name of the profile model in the trestle workspace'
self.add_argument('-p', '--profile', help=file_help_str, required=True, type=str)
self.add_argument('-o', '--output', help=const.HELP_MARKDOWN_NAME, required=True, type=str)
self.add_argument('-y', '--yaml-header', help=const.HELP_YAML_PATH, required=False, type=str)
self.add_argument(
'-fo', '--force-overwrite', help=const.HELP_FO_OUTPUT, required=False, action='store_true', default=False
)
self.add_argument(
'-ohv',
'--overwrite-header-values',
help=const.HELP_OVERWRITE_HEADER_VALUES,
required=False,
action='store_true',
default=False
)
sections_help_str = (
'Comma separated list of section:alias pairs. Provides mapping of short names to long for markdown.'
)
self.add_argument('-s', '--sections', help=sections_help_str, required=False, type=str)
allowed_sections_help_str = (
'Comma separated list of section short names to include in the markdown. Others will not appear.'
)
self.add_argument('-as', '--allowed-sections', help=allowed_sections_help_str, required=False, type=str)
def _run(self, args: argparse.Namespace) -> int:
try:
log.set_log_level_from_args(args)
trestle_root = args.trestle_root
if not file_utils.is_directory_name_allowed(args.output):
raise TrestleError(f'{args.output} is not an allowed directory name')
if args.force_overwrite:
try:
logger.debug(f'Overwriting the content of {args.output}.')
clear_folder(pathlib.Path(args.output))
except TrestleError as e: # pragma: no cover
raise TrestleError(f'Unable to overwrite contents of {args.output}: {e}')
profile_path = trestle_root / f'profiles/{args.profile}/profile.json'
yaml_header: dict = {}
if args.yaml_header:
try:
logging.debug(f'Loading yaml header file {args.yaml_header}')
yaml = YAML()
yaml_header = yaml.load(pathlib.Path(args.yaml_header).open('r'))
except YAMLError as e:
raise TrestleError(f'YAML error loading yaml header for ssp generation: {e}')
markdown_path = trestle_root / args.output
profile_resolver = ProfileResolver()
# in ssp context we want to see missing value warnings
resolved_catalog = profile_resolver.get_resolved_profile_catalog(
trestle_root, profile_path, show_value_warnings=True
)
catalog_interface = CatalogInterface(resolved_catalog)
sections_dict: Dict[str, str] = {}
if args.sections:
sections_dict = sections_to_dict(args.sections)
if const.STATEMENT in sections_dict:
raise TrestleError('Statement is not allowed as a section name.')
# add any existing sections from the controls but only have short names
control_section_short_names = catalog_interface.get_sections()
for short_name in control_section_short_names:
if short_name not in sections_dict:
sections_dict[short_name] = short_name
logger.debug(f'ssp sections dict: {sections_dict}')
context = ControlContext.generate(ContextPurpose.SSP, True, trestle_root, markdown_path)
context.yaml_header = yaml_header
context.sections_dict = sections_dict
context.prompt_responses = True
context.overwrite_header_values = args.overwrite_header_values
context.allowed_sections = args.allowed_sections
catalog_interface.write_catalog_as_markdown(context, catalog_interface.get_statement_part_id_map(False))
return CmdReturnCodes.SUCCESS.value
except Exception as e: # pragma: no cover
return handle_generic_command_exception(e, logger, 'Error while writing markdown from catalog')
name
¤
handler: python