Skip to content

model_utils

trestle.common.model_utils ¤

Common utilities for the OSCAL models and directories.

logger ¤

Classes¤

ModelUtils ¤

Utilities for the OSCAL models input and output.

Source code in trestle/common/model_utils.py
class ModelUtils:
    """Utilities for the OSCAL models input and output."""

    @staticmethod
    def load_distributed(
        abs_path: Path,
        abs_trestle_root: Path,
        collection_type: Optional[Type[Any]] = None
    ) -> Tuple[Type[OscalBaseModel], str, Union[OscalBaseModel, List[OscalBaseModel], Dict[str, OscalBaseModel]]]:
        """
        Given path to a model, load the model.

        If the model is decomposed/split/distributed,the decomposed models are loaded recursively.

        Args:
            abs_path: The path to the file/directory to be loaded.
            abs_trestle_root: The trestle project root directory.
            collection_type: The type of collection model, if it is a collection model.
                typing.List is the only collection type handled or expected.
                Defaults to None.

        Returns:
            Return a tuple of Model Type (e.g. class 'trestle.oscal.catalog.Catalog'),
            Model Alias (e.g. 'catalog.metadata') and Instance of the Model.
            If the model is decomposed/split/distributed, the instance of the model contains
            the decomposed models loaded recursively.

        Note:
            This does not validate the model.  You must either validate the model separately or use the load_validate
            utilities.
        """
        # if trying to load file that does not exist, load path instead
        if not abs_path.exists():
            abs_path = abs_path.with_name(abs_path.stem)

        if not abs_path.exists():
            raise TrestleNotFoundError(f'File {abs_path} not found for load.')

        if collection_type:
            # If the path contains a list type model
            if collection_type is list:
                return ModelUtils._load_list(abs_path, abs_trestle_root)
            # the only other collection type in OSCAL is dict, and it only applies to include_all,
            # which is too granular ever to be loaded by this routine
            else:
                raise TrestleError(f'Collection type {collection_type} not recognized for distributed load.')

        # Get current model
        primary_model_type, primary_model_alias = ModelUtils.get_stripped_model_type(abs_path, abs_trestle_root)
        primary_model_instance: Optional[OscalBaseModel] = None

        # is this an attempt to load an actual json or yaml file?
        content_type = FileContentType.path_to_content_type(abs_path)
        # if file is sought but it doesn't exist, ignore and load as decomposed model
        if FileContentType.is_readable_file(content_type) and abs_path.exists():
            primary_model_instance = primary_model_type.oscal_read(abs_path)
        # Is model decomposed?
        decomposed_dir = abs_path.with_name(abs_path.stem)

        if decomposed_dir.exists():
            aliases_not_to_be_stripped = []
            instances_to_be_merged: List[OscalBaseModel] = []

            for local_path in sorted(trestle.common.file_utils.iterdir_without_hidden_files(decomposed_dir)):
                if local_path.is_file():
                    model_type, model_alias, model_instance = ModelUtils.load_distributed(local_path, abs_trestle_root)
                    aliases_not_to_be_stripped.append(model_alias.split('.')[-1])
                    instances_to_be_merged.append(model_instance)

                elif local_path.is_dir():
                    model_type, model_alias = ModelUtils.get_stripped_model_type(local_path, abs_trestle_root)
                    # Only load the directory if it is a collection model. Otherwise do nothing - it gets loaded when
                    # iterating over the model file

                    # If a model is just a container for a list e.g.
                    # class Foo(OscalBaseModel):  noqa: E800
                    #      __root__: List[Bar]    noqa: E800
                    # You need to test whether first a root key exists
                    # then whether the outer_type of root is a collection.
                    # Alternative is to do a try except to avoid the error for an unknown key.

                    if model_type.is_collection_container():
                        # This directory is a decomposed List or Dict
                        collection_type = model_type.get_collection_type()
                        model_type, model_alias, model_instance = ModelUtils.load_distributed(local_path,
                                                                                              abs_trestle_root,
                                                                                              collection_type)
                        aliases_not_to_be_stripped.append(model_alias.split('.')[-1])
                        instances_to_be_merged.append(model_instance)
            primary_model_dict = {}
            if primary_model_instance is not None:
                primary_model_dict = primary_model_instance.__dict__

            merged_model_type, merged_model_alias = ModelUtils.get_stripped_model_type(abs_path,
                                                                                       abs_trestle_root,
                                                                                       aliases_not_to_be_stripped)

            # The following use of top_level is to allow loading of a top level model by name only, e.g. MyCatalog
            # There may be a better overall way to approach this.
            top_level = len(merged_model_alias.split('.')) == 1

            for i in range(len(aliases_not_to_be_stripped)):
                alias = aliases_not_to_be_stripped[i]
                instance = instances_to_be_merged[i]
                if hasattr(instance, '__dict__') and '__root__' in instance.__dict__ and isinstance(instance,
                                                                                                    OscalBaseModel):
                    instance = instance.__dict__['__root__']
                if top_level and not primary_model_dict:
                    primary_model_dict = instance.__dict__
                else:
                    primary_model_dict[alias] = instance

            merged_model_instance = merged_model_type(**primary_model_dict)  # type: ignore
            return merged_model_type, merged_model_alias, merged_model_instance
        return primary_model_type, primary_model_alias, primary_model_instance

    @staticmethod
    def load_top_level_model(
        trestle_root: pathlib.Path,
        model_name: str,
        model_class: TG,
        file_content_type: Optional[FileContentType] = None
    ) -> Tuple[TG, pathlib.Path]:
        """Load a model by name and model class and infer file content type if not specified.

        If you need to load an existing model but its content type may not be known, use this method.
        But the file content type should be specified if it is somehow known.

        Note:  This does not validate the model.  If you want to validate the model use the load_validate utilities.
        """
        root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
        if file_content_type is None:
            file_content_type = FileContentType.path_to_content_type(root_model_path)
        if not FileContentType.is_readable_file(file_content_type):
            raise TrestleError(f'Unable to load model {model_name} without specifying json or yaml.')
        full_model_path = root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
        _, _, model = ModelUtils.load_distributed(full_model_path, trestle_root)
        return model, full_model_path

    @staticmethod
    def save_top_level_model(
        model: TopLevelOscalModel, trestle_root: pathlib.Path, model_name: str, file_content_type: FileContentType
    ) -> None:
        """Save a model by name and infer model type by inspection.

        You don't need to specify the model type (catalog, profile, etc.) but you must specify the file content type.
        If the model directory does not exist, it is created.
        """
        root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model)
        full_model_path = root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
        if not full_model_path.parent.exists():
            full_model_path.parent.mkdir(parents=True, exist_ok=True)
        model.oscal_write(full_model_path)

    @staticmethod
    def get_relative_model_type(relative_path: pathlib.Path) -> Tuple[Type[OscalBaseModel], str]:
        """
        Given the relative path of a file with respect to 'trestle_root' return the oscal model type.

        Args:
            relative_path: Relative path of the model with respect to the root directory of the trestle project.
        Returns:
            Type of Oscal Model for the provided model
            Alias of that oscal model.
        """
        if len(relative_path.parts) < 2:
            raise TrestleError(
                'Insufficient path length to be a valid relative path w.r.t Trestle project root directory.'
            )
        model_dir = relative_path.parts[0]
        model_relative_path = pathlib.Path(*relative_path.parts[2:])  # catalogs, profiles, etc

        if model_dir in const.MODEL_DIR_LIST:
            module_name = const.MODEL_DIR_TO_MODEL_MODULE[model_dir]
        else:
            raise TrestleError(f'No valid trestle model type directory (e.g. catalogs) found for {model_dir}.')

        model_type, model_alias = ModelUtils.get_root_model(module_name)
        full_alias = model_alias

        for index, part in enumerate(model_relative_path.parts):
            alias = ModelUtils._extract_alias(part)
            if index > 0 or model_alias != alias:
                model_alias = alias
                full_alias = f'{full_alias}.{model_alias}'
                if utils.is_collection_field_type(model_type):
                    model_type = utils.get_inner_type(model_type)
                else:
                    model_type = model_type.alias_to_field_map()[alias].outer_type_

        return model_type, full_alias

    @staticmethod
    def get_stripped_model_type(
        absolute_path: pathlib.Path,
        absolute_trestle_root: pathlib.Path,
        aliases_not_to_be_stripped: List[str] = None
    ) -> Tuple[Type[OscalBaseModel], str]:
        """
        Get the stripped contextual model class and alias based on the contextual path.

        This function relies on the directory structure of the trestle model being edited to determine, based on the
        existing files and folder, which fields should be stripped from the model type represented by the
        path passed in as a parameter.
        """
        if aliases_not_to_be_stripped is None:
            aliases_not_to_be_stripped = []
        singular_model_type, model_alias = ModelUtils.get_relative_model_type(
            absolute_path.relative_to(absolute_trestle_root))
        logger.debug(f'singular model type {singular_model_type} model alias {model_alias}')

        # Stripped models do not apply to collection types such as List[] and Dict{}
        # if model type is a list or dict, generate a new wrapping model for it
        if utils.is_collection_field_type(singular_model_type):
            malias = model_alias.split('.')[-1]
            class_name = alias_to_classname(malias, AliasMode.JSON)
            logger.debug(f'collection field type class name {class_name} and alias {malias}')
            model_type = create_model(class_name, __base__=OscalBaseModel, __root__=(singular_model_type, ...))
            logger.debug(f'model_type created: {model_type}')
            model_type = cast(Type[OscalBaseModel], model_type)
            return model_type, model_alias

        malias = model_alias.split('.')[-1]
        logger.debug(f'not collection field type, malias: {malias}')
        if absolute_path.is_dir() and malias != ModelUtils._extract_alias(absolute_path.name):
            split_subdir = absolute_path / malias
        else:
            split_subdir = absolute_path.parent / absolute_path.with_suffix('').name

        aliases_to_be_stripped = set()
        if split_subdir.exists():
            for f in iterdir_without_hidden_files(split_subdir):
                alias = ModelUtils._extract_alias(f.name)
                if alias not in aliases_not_to_be_stripped:
                    aliases_to_be_stripped.add(alias)

        logger.debug(f'aliases to be stripped: {aliases_to_be_stripped}')
        if len(aliases_to_be_stripped) > 0:
            model_type = singular_model_type.create_stripped_model_type(
                stripped_fields_aliases=list(aliases_to_be_stripped)
            )
            logger.debug(f'model_type: {model_type}')
            return model_type, model_alias
        return singular_model_type, model_alias

    @staticmethod
    def model_type_to_model_dir(model_type: str) -> str:
        """Get plural model directory from model type."""
        if model_type not in const.MODEL_TYPE_LIST:
            raise err.TrestleError(f'Not a valid model type: {model_type}.')
        return const.MODEL_TYPE_TO_MODEL_DIR[model_type]

    @staticmethod
    def get_models_of_type(model_type: str, root: pathlib.Path) -> List[str]:
        """Get list of model names for requested type in trestle directory."""
        if model_type not in const.MODEL_TYPE_LIST:
            raise err.TrestleError(f'Model type {model_type} is not supported')
        # search relative to project root
        trestle_root = extract_trestle_project_root(root)
        if not trestle_root:
            logger.error(f'Given directory {root} is not within a trestle project.')
            raise err.TrestleError('Given directory is not within a trestle project.')

        # contruct path to the model file name
        model_dir_name = ModelUtils.model_type_to_model_dir(model_type)
        root_model_dir = trestle_root / model_dir_name
        model_list = []
        for f in root_model_dir.glob('*/'):
            # only look for proper json and yaml files
            if not ModelUtils._should_ignore(f.stem):
                if not f.is_dir():
                    logger.warning(
                        f'Ignoring validation of misplaced file {f.name} '
                        + f'found in the model directory, {model_dir_name}.'
                    )
                else:
                    model_list.append(f.stem)
        return model_list

    @staticmethod
    def get_all_models(root: pathlib.Path) -> List[Tuple[str, str]]:
        """Get list of all models in trestle directory as tuples (model_type, model_name)."""
        full_list = []
        for model_type in const.MODEL_TYPE_LIST:
            models = ModelUtils.get_models_of_type(model_type, root)
            for m in models:
                full_list.append((model_type, m))
        return full_list

    @staticmethod
    def full_path_for_top_level_model(
        trestle_root: pathlib.Path,
        model_name: str,
        model_class: Type[TopLevelOscalModel],
    ) -> Optional[pathlib.Path]:
        """
        Find the full path of an existing model given its name and model type but no file content type.

        Use this method when you need the path of a model but you don't know the file content type.
        Returns None if neither json nor yaml file can be found.
        If you do know the file content type, use path_for_top_level_model instead.
        """
        root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
        file_content_type = FileContentType.path_to_content_type(root_model_path)
        if not FileContentType.is_readable_file(file_content_type):
            return None
        return root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))

    @staticmethod
    def path_for_top_level_model(
        trestle_root: pathlib.Path,
        model_name: str,
        model_class: Type[TopLevelOscalModel],
        file_content_type: Optional[FileContentType]
    ) -> pathlib.Path:
        """
        Find the full path of a model given its name, model type and file content type.

        If file_content_type is given it will not inspect the file system or confirm the needed path and file exists.
        """
        if file_content_type is None:
            return ModelUtils.full_path_for_top_level_model(trestle_root, model_name, model_class)
        root_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
        return root_path.with_suffix(FileContentType.to_file_extension(file_content_type))

    @staticmethod
    def get_singular_alias(alias_path: str, relative_path: Optional[pathlib.Path] = None) -> str:
        """
        Get the alias in the singular form from a jsonpath.

        If contextual_mode is True and contextual_path is None, it assumes alias_path
        is relative to the directory the user is running trestle from.

        Args:
            alias_path: The current alias element path as a string
            relative_path: Optional relative path (w.r.t. trestle_root) to cater for relative element paths.
        Returns:
            Alias as a string
        """
        if len(alias_path.strip()) == 0:
            raise err.TrestleError(f'Invalid jsonpath {alias_path}')

        singular_alias: str = ''

        full_alias_path = alias_path
        if relative_path:
            logger.debug(f'get_singular_alias contextual mode: {str}')
            _, full_model_alias = ModelUtils.get_relative_model_type(relative_path)
            first_alias_a = full_model_alias.split('.')[-1]
            first_alias_b = alias_path.split('.')[0]
            if first_alias_a == first_alias_b:
                full_model_alias = '.'.join(full_model_alias.split('.')[:-1])
            full_alias_path = '.'.join([full_model_alias, alias_path]).strip('.')

        path_parts = full_alias_path.split(const.ALIAS_PATH_SEPARATOR)
        logger.debug(f'path parts: {path_parts}')

        model_types = []

        root_model_alias = path_parts[0]
        found = False
        for module_name in const.MODEL_TYPE_TO_MODEL_MODULE.values():
            model_type, model_alias = ModelUtils.get_root_model(module_name)
            if root_model_alias == model_alias:
                found = True
                model_types.append(model_type)
                break

        if not found:
            raise err.TrestleError(f'{root_model_alias} is an invalid root model alias.')

        if len(path_parts) == 1:
            return root_model_alias

        model_type = model_types[0]
        # go through path parts skipping first one
        for i in range(1, len(path_parts)):
            if utils.is_collection_field_type(model_type):
                # if it is a collection type and last part is * then break
                if i == len(path_parts) - 1 and path_parts[i] == '*':
                    break
                # otherwise get the inner type of items in the collection
                model_type = utils.get_inner_type(model_type)
                # and bump i
                i = i + 1
            else:
                path_part = path_parts[i]
                field_map = model_type.alias_to_field_map()
                if path_part not in field_map:
                    continue
                field = field_map[path_part]
                model_type = field.outer_type_
            model_types.append(model_type)

        last_alias = path_parts[-1]
        if last_alias == '*':
            last_alias = path_parts[-2]

        # generic model and not list, so return itself fixme doc
        if not utils.is_collection_field_type(model_type):
            return last_alias

        parent_model_type = model_types[-2]
        try:
            field_map = parent_model_type.alias_to_field_map()
            field = field_map[last_alias]
            outer_type = field.outer_type_
            inner_type = utils.get_inner_type(outer_type)
            inner_type_name = inner_type.__name__
            singular_alias = str_utils.classname_to_alias(inner_type_name, AliasMode.JSON)
        except Exception as e:
            raise err.TrestleError(f'Error in json path {alias_path}: {e}')

        return singular_alias

    @staticmethod
    def get_root_model(module_name: str) -> Tuple[Type[Any], str]:
        """Get the root model class and alias based on the module."""
        try:
            module = importlib.import_module(module_name)
        except ModuleNotFoundError as e:
            raise err.TrestleError(str(e))

        if hasattr(module, 'Model'):
            model_metadata = next(iter(module.Model.__fields__.values()))
            return model_metadata.type_, model_metadata.alias
        raise err.TrestleError('Invalid module')

    @staticmethod
    def _root_path_for_top_level_model(
        trestle_root: pathlib.Path, model_name: str, model_class: Union[TopLevelOscalModel, Type[TopLevelOscalModel]]
    ) -> pathlib.Path:
        """
        Find the root path to a model given its name and class - with no suffix.

        This is a private method used only to construct the root filepath based on model name and type.
        It does not check for existence or content type and it does not create the directory if it does not exist.
        """
        if not hasattr(model_class, '__module__') or model_class.__module__ not in const.MODEL_MODULE_LIST:
            raise TrestleError(f'Unable to determine model type for model {model_name} with class {model_class}')
        model_alias = const.MODEL_MODULE_TO_MODEL_TYPE[model_class.__module__]
        model_dir = trestle_root / f'{const.MODEL_TYPE_TO_MODEL_DIR[model_alias]}/{model_name}'
        return model_dir / model_alias

    @staticmethod
    def _extract_alias(string_dir: str) -> str:
        """
        Extract alias from filename or directory name removing extensions and prefixes related to dict and list.

        As we need to do this for multiple parts of a path operating on strings is easier.
        """
        alias = string_dir.split('.')[0].split(
            const.IDX_SEP
        )[-1]  # get suffix of file or directory name representing list or dict item
        return alias

    @staticmethod
    def _should_ignore(name: str) -> bool:
        """Check if the file or directory should be ignored or not."""
        return name[0] == '.' or name[0] == '_'

    @staticmethod
    def _load_list(abs_path: Path, abs_trestle_root: Path) -> Tuple[Type[OscalBaseModel], str, List[OscalBaseModel]]:
        """Given path to a directory of list(array) models, load the distributed models."""
        aliases_not_to_be_stripped = []
        instances_to_be_merged: List[OscalBaseModel] = []
        collection_model_type, collection_model_alias = ModelUtils.get_stripped_model_type(abs_path, abs_trestle_root)
        for path in sorted(trestle.common.file_utils.iterdir_without_hidden_files(abs_path)):

            # ASSUMPTION HERE: if it is a directory, there's a file that can not be decomposed further.
            if path.is_dir():
                continue
            _, model_alias, model_instance = ModelUtils.load_distributed(path, abs_trestle_root)

            instances_to_be_merged.append(model_instance)
            aliases_not_to_be_stripped.append(model_alias.split('.')[-1])

        return collection_model_type, collection_model_alias, instances_to_be_merged

    @staticmethod
    def _parameter_to_dict_recurse(obj: Union[OscalBaseModel, str], partial: bool) -> Union[str, Dict[str, Any]]:
        """
        Convert obj to dict containing only string values with recursion.

        Args:
            obj: The parameter or its consituent parts in recursive calls
            partial: Whether to convert the entire param or just the parts needed for markdown header

        Returns:
            The converted parameter as dictionary
        """
        main_fields = ['id', 'label', 'values', 'select', 'choice', 'how_many']
        if isinstance(obj, common.HowMany):
            return obj.name
        if isinstance(obj, common.Remarks) or isinstance(obj, common.ParameterValue):
            return obj.__root__
        # it is either a string already or we cast it to string
        if not hasattr(obj, const.FIELDS_SET):
            return str(obj)
        # it is an oscal object and we need to recurse within its attributes
        res = {}
        for field in obj.__fields_set__:
            if partial and field not in main_fields:
                continue
            attr = getattr(obj, field)
            if not attr:
                continue
            if isinstance(attr, list):
                # special handling when only one value present - convert to single string
                if field == 'values' and len(attr) == 1:
                    res[field] = str(attr[0].__root__)
                    continue
                new_list = []
                for item in attr:
                    new_list.append(ModelUtils._parameter_to_dict_recurse(item, partial))
                res[field] = new_list
            elif isinstance(attr, str):
                res[field] = attr
            else:
                res[field] = ModelUtils._parameter_to_dict_recurse(attr, partial)
        return res

    @staticmethod
    def parameter_to_dict(obj: Union[OscalBaseModel, str], partial: bool) -> Union[str, Dict[str, Any]]:
        """
        Convert obj to dict containing only string values, storing only the fields that have values set.

        Args:
            obj: The parameter or its consituent parts in recursive calls
            partial: Whether to convert the entire param or just the parts needed for markdown header

        Returns:
            The converted parameter as dictionary, with values as None if not present
        """
        res = ModelUtils._parameter_to_dict_recurse(obj, partial)
        if 'values' not in res:
            res['values'] = None
        return res

    @staticmethod
    def _string_to_howmany(count_str: str) -> Optional[common.HowMany]:
        clean_str = count_str.lower().strip().replace('-', ' ').replace('_', ' ')
        if clean_str == 'one or more':
            return common.HowMany.one_or_more
        elif clean_str == 'one':
            return common.HowMany.one
        return None

    @staticmethod
    def dict_to_parameter(param_dict: Dict[str, Any]) -> common.Parameter:
        """
        Convert dict with only string values to Parameter with handling for HowMany and with validity checks.

        Args:
            param_dict: Dictionary of pure string values representing Parameter contents

        Returns:
            A valid OSCAL Parameter

        Notes:
            This handles both partial and full parameter dictionaries
            It checks for validity of the values if a select and HowMany is specified
            There is special handling for values: If it is a single string it is converted to list of one ParameterValue
            But if it is a list of strings is regarded as a list of values and is converted to a list of ParameterValues
        """
        values = param_dict.get('values', [])
        # special handling when only one value present - convert to list of 1
        if isinstance(values, str):
            values = [values]
            param_dict['values'] = values
        if 'select' in param_dict and 'how_many' in param_dict['select']:
            count_str = param_dict['select']['how_many']
            how_many = ModelUtils._string_to_howmany(count_str)
            if how_many is None:
                raise TrestleError(f'Unrecognized HowMany value {how_many} in Parameter: should be one-or-more or one.')
            param_dict['select']['how_many'] = how_many
            if how_many == common.HowMany.one and len(values) > 1:
                logger.warning(f'Parameter specifies HowMany=1 but has {len(values)} values given.')
            choices = param_dict['select'].get('choice', [])
            if choices and values:
                for value in values:
                    if value not in choices:
                        logger.warning(f"Parameter {param_dict['id']} has value \"{value}\" not in choices: {choices}.")
        props = param_dict.get('props', [])
        if const.DISPLAY_NAME in param_dict:
            display_name = param_dict.pop(const.DISPLAY_NAME)
            props.append(common.Property(name=const.DISPLAY_NAME, value=display_name, ns=const.TRESTLE_GENERIC_NS))

        if 'ns' in param_dict:
            param_dict.pop('ns')
        param = common.Parameter(**param_dict)
        param.props = none_if_empty(props)
        return param

    @staticmethod
    def update_last_modified(model: TopLevelOscalModel, timestamp: Optional[datetime] = None) -> None:
        """Update the LastModified timestamp in top level model to now."""
        timestamp = timestamp if timestamp else datetime.now().astimezone()
        model.metadata.last_modified = common.LastModified(__root__=timestamp)

    @staticmethod
    def model_age(model: TopLevelOscalModel) -> int:
        """Find time in seconds since LastModified timestamp."""
        # default to one year if no last_modified
        age_seconds = const.DAY_SECONDS * 365
        if model.metadata.last_modified:
            dt = datetime.now().astimezone() - model.metadata.last_modified.__root__
            age_seconds = dt.seconds
        return age_seconds

    @staticmethod
    def find_values_by_name(object_of_interest: Any, name_of_interest: str) -> List[Any]:
        """Traverse object and return list of values of specified name."""
        loe = []
        if isinstance(object_of_interest, BaseModel):
            value = getattr(object_of_interest, name_of_interest, None)
            if value is not None:
                loe.append(value)
            fields = getattr(object_of_interest, const.FIELDS_SET, None)
            if fields is not None:
                for field in fields:
                    loe.extend(
                        ModelUtils.find_values_by_name(getattr(object_of_interest, field, None), name_of_interest)
                    )
        elif type(object_of_interest) is list:
            for item in object_of_interest:
                loe.extend(ModelUtils.find_values_by_name(item, name_of_interest))
        elif type(object_of_interest) is dict:
            if name_of_interest in object_of_interest:
                loe.append(object_of_interest[name_of_interest])
            for item in object_of_interest.values():
                loe.extend(ModelUtils.find_values_by_name(item, name_of_interest))
        return loe

    @staticmethod
    def has_no_duplicate_values_by_name(object_of_interest: BaseModel, name_of_interest: str) -> bool:
        """Determine if duplicate values of type exist in object."""
        loe = ModelUtils.find_values_by_name(object_of_interest, name_of_interest)
        set_loe = set(loe)
        if len(loe) == len(set_loe):
            return True
        items = {}
        for item in loe:
            items[item] = items.get(item, 0) + 1
        # now print items
        for item, instances in items.items():
            if instances > 1:
                logger.warning(f'Duplicate detected of item {item} with {instances} instances.')
        return False

    @staticmethod
    def find_uuid_refs(object_of_interest: BaseModel) -> Set[str]:
        """Find uuid references made in prose and links."""
        # hrefs have form #foo or #uuid
        uuid_strs = ModelUtils.find_values_by_name(object_of_interest, 'href')

        # prose has uuid refs in markdown form: [foo](#bar) or [foo](#uuid)
        prose_list = ModelUtils.find_values_by_name(object_of_interest, 'prose')
        for prose in prose_list:
            matches = re.findall(const.MARKDOWN_URL_REGEX, prose)
            # the [1] is to extract the inner of 3 capture patterns
            new_uuids = [match[1] for match in matches]
            uuid_strs.extend(new_uuids)

        # collect the strings that start with # and are potential uuids
        uuid_strs = [uuid_str for uuid_str in uuid_strs if uuid_str and uuid_str[0] == '#']

        # go through all matches and build set of those that are uuids
        uuid_set = {uuid_match for uuid_str in uuid_strs for uuid_match in re.findall(const.UUID_REGEX, uuid_str[1:])}
        return uuid_set

    @staticmethod
    def _regenerate_uuids_in_place(object_of_interest: Any, uuid_lut: Dict[str, str]) -> Tuple[Any, Dict[str, str]]:
        """Update all uuids in model that require updating.

        Go through the model and replace all dicts with key == 'uuid' and replace the value with a new uuid4.
        Build a lookup table of the updates that were made.
        This function does not update the corresponding refs to those uuid's.  That is done by update_uuid_refs
        Note that this function needs to be started off with uuid_lut == {}, i.e. an empty dict.
        After that it recurses and grows the lut.

        Args:
            object_of_interest: pydantic.BaseModel, list, dict or str will be updated
            uuid_lut: dict of the growing lut of old:new uuid's.  First call must be made with value {}

        Returns:
            The updated object_of_interest with new uuid's (but refs to them are not updated)
            The final lookup table of old:new uuid's

        """
        uuid_str = 'uuid'
        # Certain types are known not to need updating and should not change
        # Resources are identified by uuid, and the corresponding href will have # in front of the uuid string
        # Neither of these should change
        # If other similar types are found they should be added to the FixedUuidModel typevar to prevent updating
        if isinstance(object_of_interest, common.Resource):
            pass
        elif isinstance(object_of_interest, BaseModel):
            # fields has names of all fields in model
            fields = getattr(object_of_interest, const.FIELDS_SET, None)
            for field in fields:
                new_object = None
                if field == uuid_str:
                    orig_uuid = getattr(object_of_interest, field)
                    if orig_uuid:
                        new_object = str(uuid.uuid4())
                        uuid_lut[orig_uuid] = new_object
                else:
                    new_object, uuid_lut = ModelUtils._regenerate_uuids_in_place(
                        object_of_interest.__dict__[field],
                        uuid_lut
                    )
                object_of_interest.__dict__[field] = new_object
        elif type(object_of_interest) is list:
            new_list = []
            for item in object_of_interest:
                new_item, uuid_lut = ModelUtils._regenerate_uuids_in_place(item, uuid_lut)
                new_list.append(new_item)
            object_of_interest = new_list
        elif type(object_of_interest) is dict:
            new_dict = {}
            for key, value in object_of_interest.items():
                if key == uuid_str:
                    new_val = str(uuid.uuid4())
                    new_dict[uuid_str] = new_val
                    uuid_lut[value] = new_val
                else:
                    new_value, uuid_lut = ModelUtils._regenerate_uuids_in_place(value, uuid_lut)
                    new_dict[key] = new_value
            object_of_interest = new_dict
        return object_of_interest, uuid_lut

    @staticmethod
    def _update_new_uuid_refs(object_of_interest: Any, uuid_lut: Dict[str, str]) -> Tuple[Any, int]:
        """Update all refs to uuids that were changed."""
        n_refs_updated = 0
        if isinstance(object_of_interest, BaseModel):
            fields = getattr(object_of_interest, const.FIELDS_SET, None)
            for field in fields:
                new_object, n_new_updates = ModelUtils._update_new_uuid_refs(
                    object_of_interest.__dict__[field],
                    uuid_lut
                )
                n_refs_updated += n_new_updates
                object_of_interest.__dict__[field] = new_object
        elif type(object_of_interest) is list:
            new_list = []
            for item in object_of_interest:
                new_item, n_new_updates = ModelUtils._update_new_uuid_refs(item, uuid_lut)
                n_refs_updated += n_new_updates
                new_list.append(new_item)
            object_of_interest = new_list
        elif type(object_of_interest) is dict:
            new_dict = {}
            for key, value in object_of_interest.items():
                if isinstance(value, str):
                    if value in uuid_lut:
                        new_dict[key] = uuid_lut[value]
                        n_refs_updated += 1
                    else:
                        new_dict[key] = value
                else:
                    new_value, n_new_updates = ModelUtils._update_new_uuid_refs(value, uuid_lut)
                    n_refs_updated += n_new_updates
                    new_dict[key] = new_value
            object_of_interest = new_dict
        elif isinstance(object_of_interest, str):
            if object_of_interest in uuid_lut:
                n_refs_updated += 1
                object_of_interest = uuid_lut[object_of_interest]
        return object_of_interest, n_refs_updated

    @staticmethod
    def regenerate_uuids(object_of_interest: Any) -> Tuple[Any, Dict[str, str], int]:
        """Regenerate all uuids in object and update corresponding references.

        Find all dicts with key == 'uuid' and replace the value with a new uuid4.
        Build a corresponding lookup table as you go, of old:new uuid values.
        Then make a second pass through the object and replace all string values
        present in the lookup table with the new value.

        Args:
            object_of_interest: pydantic.BaseModel, list, dict or str will be updated

        Returns:
            The updated object with new uuid's and refs
            The final lookup table of old:new uuid's
            A count of the number of refs that were updated
        """
        new_object, uuid_lut = ModelUtils._regenerate_uuids_in_place(object_of_interest, {})
        new_object, n_refs_updated = ModelUtils._update_new_uuid_refs(new_object, uuid_lut)
        return new_object, uuid_lut, n_refs_updated

    @staticmethod
    def fields_set_non_none(obj: BaseModel) -> Set[str]:
        """Find the fields set with Nones and empty items removed."""
        return set(as_filtered_list(list(obj.__fields_set__), lambda f: getattr(obj, f)))

    @staticmethod
    def _objects_differ(
        obj_a: Any, obj_b: Any, ignore_type_list: List[Any], ignore_name_list: List[str], ignore_all_uuid: bool
    ) -> bool:
        """
        Compare two objects with option to ignore given types.

        This does not check for tuples or other structures that won't be found in JSON.
        """
        obj_a_type = type(obj_a)
        obj_b_type = type(obj_b)
        if bool(obj_a) != bool(obj_b) or obj_a_type != obj_b_type:
            return True
        if not bool(obj_a):
            return False
        if obj_a_type in ignore_type_list:
            return False
        if obj_a_type is str:
            return obj_a != obj_b
        elif isinstance(obj_a, BaseModel):
            fields_a = ModelUtils.fields_set_non_none(obj_a)
            fields_b = ModelUtils.fields_set_non_none(obj_b)
            if fields_a != fields_b:
                return True
            for field in list_utils.as_filtered_list(fields_a, lambda f: f not in ignore_name_list):
                if ignore_all_uuid and 'uuid' in field:
                    continue
                if ModelUtils._objects_differ(getattr(obj_a, field),
                                              getattr(obj_b, field),
                                              ignore_type_list,
                                              ignore_name_list,
                                              ignore_all_uuid):
                    return True
        elif obj_a_type is list:
            if len(obj_a) != len(obj_b):
                return True
            for item_a, item_b in zip(obj_a, obj_b):
                if ModelUtils._objects_differ(item_a, item_b, ignore_type_list, ignore_name_list, ignore_all_uuid):
                    return True
        elif obj_a_type is dict:
            if obj_a.keys() != obj_b.keys():
                return True
            for key, val in obj_a.items():
                if ignore_all_uuid and 'uuid' in key:
                    continue
                if key not in ignore_name_list and ModelUtils._objects_differ(
                        val, obj_b[key], ignore_type_list, ignore_name_list, ignore_all_uuid):
                    return True
        elif obj_a != obj_b:
            return True
        return False

    @staticmethod
    def models_are_equivalent(
        model_a: Optional[TopLevelOscalModel],
        model_b: Optional[TopLevelOscalModel],
        ignore_all_uuid: bool = False
    ) -> bool:
        """
        Test if models are equivalent except for last modified and possibly uuid.

        If a model has had uuids regenerated, then all uuids *and references to them* are updated.  This means that
        special handling is required if a model has had uuids regenerated - when checking equivalence.
        """
        uuid_type_list = [
            common.LastModified,
            common.LocationUuid,
            common.MemberOfOrganization,
            common.PartyUuid,
            common.RelatedRisk,
            common.RelatedObservation1,
            common.Source
        ]
        type_list = uuid_type_list if ignore_all_uuid else [common.LastModified]
        return not ModelUtils._objects_differ(model_a, model_b, type_list, ['last-modified'], ignore_all_uuid)

    @staticmethod
    def get_title_from_model_uri(trestle_root: pathlib.Path, uri: str) -> str:
        """Get title from model at uri."""
        try:
            fetcher = cache.FetcherFactory.get_fetcher(trestle_root, uri)
            model, _ = fetcher.get_oscal()
            return model.metadata.title
        except TrestleError as e:
            logger.warning(f'Error finding title for model at uri {uri}: {e}')
            raise
Methods¤
dict_to_parameter(param_dict) staticmethod ¤

Convert dict with only string values to Parameter with handling for HowMany and with validity checks.

Parameters:

Name Type Description Default
param_dict Dict[str, Any]

Dictionary of pure string values representing Parameter contents

required

Returns:

Type Description
Parameter

A valid OSCAL Parameter

Notes

This handles both partial and full parameter dictionaries It checks for validity of the values if a select and HowMany is specified There is special handling for values: If it is a single string it is converted to list of one ParameterValue But if it is a list of strings is regarded as a list of values and is converted to a list of ParameterValues

Source code in trestle/common/model_utils.py
@staticmethod
def dict_to_parameter(param_dict: Dict[str, Any]) -> common.Parameter:
    """
    Convert dict with only string values to Parameter with handling for HowMany and with validity checks.

    Args:
        param_dict: Dictionary of pure string values representing Parameter contents

    Returns:
        A valid OSCAL Parameter

    Notes:
        This handles both partial and full parameter dictionaries
        It checks for validity of the values if a select and HowMany is specified
        There is special handling for values: If it is a single string it is converted to list of one ParameterValue
        But if it is a list of strings is regarded as a list of values and is converted to a list of ParameterValues
    """
    values = param_dict.get('values', [])
    # special handling when only one value present - convert to list of 1
    if isinstance(values, str):
        values = [values]
        param_dict['values'] = values
    if 'select' in param_dict and 'how_many' in param_dict['select']:
        count_str = param_dict['select']['how_many']
        how_many = ModelUtils._string_to_howmany(count_str)
        if how_many is None:
            raise TrestleError(f'Unrecognized HowMany value {how_many} in Parameter: should be one-or-more or one.')
        param_dict['select']['how_many'] = how_many
        if how_many == common.HowMany.one and len(values) > 1:
            logger.warning(f'Parameter specifies HowMany=1 but has {len(values)} values given.')
        choices = param_dict['select'].get('choice', [])
        if choices and values:
            for value in values:
                if value not in choices:
                    logger.warning(f"Parameter {param_dict['id']} has value \"{value}\" not in choices: {choices}.")
    props = param_dict.get('props', [])
    if const.DISPLAY_NAME in param_dict:
        display_name = param_dict.pop(const.DISPLAY_NAME)
        props.append(common.Property(name=const.DISPLAY_NAME, value=display_name, ns=const.TRESTLE_GENERIC_NS))

    if 'ns' in param_dict:
        param_dict.pop('ns')
    param = common.Parameter(**param_dict)
    param.props = none_if_empty(props)
    return param
fields_set_non_none(obj) staticmethod ¤

Find the fields set with Nones and empty items removed.

Source code in trestle/common/model_utils.py
@staticmethod
def fields_set_non_none(obj: BaseModel) -> Set[str]:
    """Find the fields set with Nones and empty items removed."""
    return set(as_filtered_list(list(obj.__fields_set__), lambda f: getattr(obj, f)))
find_uuid_refs(object_of_interest) staticmethod ¤

Find uuid references made in prose and links.

Source code in trestle/common/model_utils.py
@staticmethod
def find_uuid_refs(object_of_interest: BaseModel) -> Set[str]:
    """Find uuid references made in prose and links."""
    # hrefs have form #foo or #uuid
    uuid_strs = ModelUtils.find_values_by_name(object_of_interest, 'href')

    # prose has uuid refs in markdown form: [foo](#bar) or [foo](#uuid)
    prose_list = ModelUtils.find_values_by_name(object_of_interest, 'prose')
    for prose in prose_list:
        matches = re.findall(const.MARKDOWN_URL_REGEX, prose)
        # the [1] is to extract the inner of 3 capture patterns
        new_uuids = [match[1] for match in matches]
        uuid_strs.extend(new_uuids)

    # collect the strings that start with # and are potential uuids
    uuid_strs = [uuid_str for uuid_str in uuid_strs if uuid_str and uuid_str[0] == '#']

    # go through all matches and build set of those that are uuids
    uuid_set = {uuid_match for uuid_str in uuid_strs for uuid_match in re.findall(const.UUID_REGEX, uuid_str[1:])}
    return uuid_set
find_values_by_name(object_of_interest, name_of_interest) staticmethod ¤

Traverse object and return list of values of specified name.

Source code in trestle/common/model_utils.py
@staticmethod
def find_values_by_name(object_of_interest: Any, name_of_interest: str) -> List[Any]:
    """Traverse object and return list of values of specified name."""
    loe = []
    if isinstance(object_of_interest, BaseModel):
        value = getattr(object_of_interest, name_of_interest, None)
        if value is not None:
            loe.append(value)
        fields = getattr(object_of_interest, const.FIELDS_SET, None)
        if fields is not None:
            for field in fields:
                loe.extend(
                    ModelUtils.find_values_by_name(getattr(object_of_interest, field, None), name_of_interest)
                )
    elif type(object_of_interest) is list:
        for item in object_of_interest:
            loe.extend(ModelUtils.find_values_by_name(item, name_of_interest))
    elif type(object_of_interest) is dict:
        if name_of_interest in object_of_interest:
            loe.append(object_of_interest[name_of_interest])
        for item in object_of_interest.values():
            loe.extend(ModelUtils.find_values_by_name(item, name_of_interest))
    return loe
full_path_for_top_level_model(trestle_root, model_name, model_class) staticmethod ¤

Find the full path of an existing model given its name and model type but no file content type.

Use this method when you need the path of a model but you don't know the file content type. Returns None if neither json nor yaml file can be found. If you do know the file content type, use path_for_top_level_model instead.

Source code in trestle/common/model_utils.py
@staticmethod
def full_path_for_top_level_model(
    trestle_root: pathlib.Path,
    model_name: str,
    model_class: Type[TopLevelOscalModel],
) -> Optional[pathlib.Path]:
    """
    Find the full path of an existing model given its name and model type but no file content type.

    Use this method when you need the path of a model but you don't know the file content type.
    Returns None if neither json nor yaml file can be found.
    If you do know the file content type, use path_for_top_level_model instead.
    """
    root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
    file_content_type = FileContentType.path_to_content_type(root_model_path)
    if not FileContentType.is_readable_file(file_content_type):
        return None
    return root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
get_all_models(root) staticmethod ¤

Get list of all models in trestle directory as tuples (model_type, model_name).

Source code in trestle/common/model_utils.py
@staticmethod
def get_all_models(root: pathlib.Path) -> List[Tuple[str, str]]:
    """Get list of all models in trestle directory as tuples (model_type, model_name)."""
    full_list = []
    for model_type in const.MODEL_TYPE_LIST:
        models = ModelUtils.get_models_of_type(model_type, root)
        for m in models:
            full_list.append((model_type, m))
    return full_list
get_models_of_type(model_type, root) staticmethod ¤

Get list of model names for requested type in trestle directory.

Source code in trestle/common/model_utils.py
@staticmethod
def get_models_of_type(model_type: str, root: pathlib.Path) -> List[str]:
    """Get list of model names for requested type in trestle directory."""
    if model_type not in const.MODEL_TYPE_LIST:
        raise err.TrestleError(f'Model type {model_type} is not supported')
    # search relative to project root
    trestle_root = extract_trestle_project_root(root)
    if not trestle_root:
        logger.error(f'Given directory {root} is not within a trestle project.')
        raise err.TrestleError('Given directory is not within a trestle project.')

    # contruct path to the model file name
    model_dir_name = ModelUtils.model_type_to_model_dir(model_type)
    root_model_dir = trestle_root / model_dir_name
    model_list = []
    for f in root_model_dir.glob('*/'):
        # only look for proper json and yaml files
        if not ModelUtils._should_ignore(f.stem):
            if not f.is_dir():
                logger.warning(
                    f'Ignoring validation of misplaced file {f.name} '
                    + f'found in the model directory, {model_dir_name}.'
                )
            else:
                model_list.append(f.stem)
    return model_list
get_relative_model_type(relative_path) staticmethod ¤

Given the relative path of a file with respect to 'trestle_root' return the oscal model type.

Parameters:

Name Type Description Default
relative_path Path

Relative path of the model with respect to the root directory of the trestle project.

required

Returns:

Type Description
Tuple[Type[trestle.core.base_model.OscalBaseModel], str]

Type of Oscal Model for the provided model Alias of that oscal model.

Source code in trestle/common/model_utils.py
@staticmethod
def get_relative_model_type(relative_path: pathlib.Path) -> Tuple[Type[OscalBaseModel], str]:
    """
    Given the relative path of a file with respect to 'trestle_root' return the oscal model type.

    Args:
        relative_path: Relative path of the model with respect to the root directory of the trestle project.
    Returns:
        Type of Oscal Model for the provided model
        Alias of that oscal model.
    """
    if len(relative_path.parts) < 2:
        raise TrestleError(
            'Insufficient path length to be a valid relative path w.r.t Trestle project root directory.'
        )
    model_dir = relative_path.parts[0]
    model_relative_path = pathlib.Path(*relative_path.parts[2:])  # catalogs, profiles, etc

    if model_dir in const.MODEL_DIR_LIST:
        module_name = const.MODEL_DIR_TO_MODEL_MODULE[model_dir]
    else:
        raise TrestleError(f'No valid trestle model type directory (e.g. catalogs) found for {model_dir}.')

    model_type, model_alias = ModelUtils.get_root_model(module_name)
    full_alias = model_alias

    for index, part in enumerate(model_relative_path.parts):
        alias = ModelUtils._extract_alias(part)
        if index > 0 or model_alias != alias:
            model_alias = alias
            full_alias = f'{full_alias}.{model_alias}'
            if utils.is_collection_field_type(model_type):
                model_type = utils.get_inner_type(model_type)
            else:
                model_type = model_type.alias_to_field_map()[alias].outer_type_

    return model_type, full_alias
get_root_model(module_name) staticmethod ¤

Get the root model class and alias based on the module.

Source code in trestle/common/model_utils.py
@staticmethod
def get_root_model(module_name: str) -> Tuple[Type[Any], str]:
    """Get the root model class and alias based on the module."""
    try:
        module = importlib.import_module(module_name)
    except ModuleNotFoundError as e:
        raise err.TrestleError(str(e))

    if hasattr(module, 'Model'):
        model_metadata = next(iter(module.Model.__fields__.values()))
        return model_metadata.type_, model_metadata.alias
    raise err.TrestleError('Invalid module')
get_singular_alias(alias_path, relative_path=None) staticmethod ¤

Get the alias in the singular form from a jsonpath.

If contextual_mode is True and contextual_path is None, it assumes alias_path is relative to the directory the user is running trestle from.

Parameters:

Name Type Description Default
alias_path str

The current alias element path as a string

required
relative_path Optional[pathlib.Path]

Optional relative path (w.r.t. trestle_root) to cater for relative element paths.

None

Returns:

Type Description
str

Alias as a string

Source code in trestle/common/model_utils.py
@staticmethod
def get_singular_alias(alias_path: str, relative_path: Optional[pathlib.Path] = None) -> str:
    """
    Get the alias in the singular form from a jsonpath.

    If contextual_mode is True and contextual_path is None, it assumes alias_path
    is relative to the directory the user is running trestle from.

    Args:
        alias_path: The current alias element path as a string
        relative_path: Optional relative path (w.r.t. trestle_root) to cater for relative element paths.
    Returns:
        Alias as a string
    """
    if len(alias_path.strip()) == 0:
        raise err.TrestleError(f'Invalid jsonpath {alias_path}')

    singular_alias: str = ''

    full_alias_path = alias_path
    if relative_path:
        logger.debug(f'get_singular_alias contextual mode: {str}')
        _, full_model_alias = ModelUtils.get_relative_model_type(relative_path)
        first_alias_a = full_model_alias.split('.')[-1]
        first_alias_b = alias_path.split('.')[0]
        if first_alias_a == first_alias_b:
            full_model_alias = '.'.join(full_model_alias.split('.')[:-1])
        full_alias_path = '.'.join([full_model_alias, alias_path]).strip('.')

    path_parts = full_alias_path.split(const.ALIAS_PATH_SEPARATOR)
    logger.debug(f'path parts: {path_parts}')

    model_types = []

    root_model_alias = path_parts[0]
    found = False
    for module_name in const.MODEL_TYPE_TO_MODEL_MODULE.values():
        model_type, model_alias = ModelUtils.get_root_model(module_name)
        if root_model_alias == model_alias:
            found = True
            model_types.append(model_type)
            break

    if not found:
        raise err.TrestleError(f'{root_model_alias} is an invalid root model alias.')

    if len(path_parts) == 1:
        return root_model_alias

    model_type = model_types[0]
    # go through path parts skipping first one
    for i in range(1, len(path_parts)):
        if utils.is_collection_field_type(model_type):
            # if it is a collection type and last part is * then break
            if i == len(path_parts) - 1 and path_parts[i] == '*':
                break
            # otherwise get the inner type of items in the collection
            model_type = utils.get_inner_type(model_type)
            # and bump i
            i = i + 1
        else:
            path_part = path_parts[i]
            field_map = model_type.alias_to_field_map()
            if path_part not in field_map:
                continue
            field = field_map[path_part]
            model_type = field.outer_type_
        model_types.append(model_type)

    last_alias = path_parts[-1]
    if last_alias == '*':
        last_alias = path_parts[-2]

    # generic model and not list, so return itself fixme doc
    if not utils.is_collection_field_type(model_type):
        return last_alias

    parent_model_type = model_types[-2]
    try:
        field_map = parent_model_type.alias_to_field_map()
        field = field_map[last_alias]
        outer_type = field.outer_type_
        inner_type = utils.get_inner_type(outer_type)
        inner_type_name = inner_type.__name__
        singular_alias = str_utils.classname_to_alias(inner_type_name, AliasMode.JSON)
    except Exception as e:
        raise err.TrestleError(f'Error in json path {alias_path}: {e}')

    return singular_alias
get_stripped_model_type(absolute_path, absolute_trestle_root, aliases_not_to_be_stripped=None) staticmethod ¤

Get the stripped contextual model class and alias based on the contextual path.

This function relies on the directory structure of the trestle model being edited to determine, based on the existing files and folder, which fields should be stripped from the model type represented by the path passed in as a parameter.

Source code in trestle/common/model_utils.py
@staticmethod
def get_stripped_model_type(
    absolute_path: pathlib.Path,
    absolute_trestle_root: pathlib.Path,
    aliases_not_to_be_stripped: List[str] = None
) -> Tuple[Type[OscalBaseModel], str]:
    """
    Get the stripped contextual model class and alias based on the contextual path.

    This function relies on the directory structure of the trestle model being edited to determine, based on the
    existing files and folder, which fields should be stripped from the model type represented by the
    path passed in as a parameter.
    """
    if aliases_not_to_be_stripped is None:
        aliases_not_to_be_stripped = []
    singular_model_type, model_alias = ModelUtils.get_relative_model_type(
        absolute_path.relative_to(absolute_trestle_root))
    logger.debug(f'singular model type {singular_model_type} model alias {model_alias}')

    # Stripped models do not apply to collection types such as List[] and Dict{}
    # if model type is a list or dict, generate a new wrapping model for it
    if utils.is_collection_field_type(singular_model_type):
        malias = model_alias.split('.')[-1]
        class_name = alias_to_classname(malias, AliasMode.JSON)
        logger.debug(f'collection field type class name {class_name} and alias {malias}')
        model_type = create_model(class_name, __base__=OscalBaseModel, __root__=(singular_model_type, ...))
        logger.debug(f'model_type created: {model_type}')
        model_type = cast(Type[OscalBaseModel], model_type)
        return model_type, model_alias

    malias = model_alias.split('.')[-1]
    logger.debug(f'not collection field type, malias: {malias}')
    if absolute_path.is_dir() and malias != ModelUtils._extract_alias(absolute_path.name):
        split_subdir = absolute_path / malias
    else:
        split_subdir = absolute_path.parent / absolute_path.with_suffix('').name

    aliases_to_be_stripped = set()
    if split_subdir.exists():
        for f in iterdir_without_hidden_files(split_subdir):
            alias = ModelUtils._extract_alias(f.name)
            if alias not in aliases_not_to_be_stripped:
                aliases_to_be_stripped.add(alias)

    logger.debug(f'aliases to be stripped: {aliases_to_be_stripped}')
    if len(aliases_to_be_stripped) > 0:
        model_type = singular_model_type.create_stripped_model_type(
            stripped_fields_aliases=list(aliases_to_be_stripped)
        )
        logger.debug(f'model_type: {model_type}')
        return model_type, model_alias
    return singular_model_type, model_alias
get_title_from_model_uri(trestle_root, uri) staticmethod ¤

Get title from model at uri.

Source code in trestle/common/model_utils.py
@staticmethod
def get_title_from_model_uri(trestle_root: pathlib.Path, uri: str) -> str:
    """Get title from model at uri."""
    try:
        fetcher = cache.FetcherFactory.get_fetcher(trestle_root, uri)
        model, _ = fetcher.get_oscal()
        return model.metadata.title
    except TrestleError as e:
        logger.warning(f'Error finding title for model at uri {uri}: {e}')
        raise
has_no_duplicate_values_by_name(object_of_interest, name_of_interest) staticmethod ¤

Determine if duplicate values of type exist in object.

Source code in trestle/common/model_utils.py
@staticmethod
def has_no_duplicate_values_by_name(object_of_interest: BaseModel, name_of_interest: str) -> bool:
    """Determine if duplicate values of type exist in object."""
    loe = ModelUtils.find_values_by_name(object_of_interest, name_of_interest)
    set_loe = set(loe)
    if len(loe) == len(set_loe):
        return True
    items = {}
    for item in loe:
        items[item] = items.get(item, 0) + 1
    # now print items
    for item, instances in items.items():
        if instances > 1:
            logger.warning(f'Duplicate detected of item {item} with {instances} instances.')
    return False
load_distributed(abs_path, abs_trestle_root, collection_type=None) staticmethod ¤

Given path to a model, load the model.

If the model is decomposed/split/distributed,the decomposed models are loaded recursively.

Parameters:

Name Type Description Default
abs_path Path

The path to the file/directory to be loaded.

required
abs_trestle_root Path

The trestle project root directory.

required
collection_type Optional[Type[Any]]

The type of collection model, if it is a collection model. typing.List is the only collection type handled or expected. Defaults to None.

None

Returns:

Type Description
Tuple[Type[trestle.core.base_model.OscalBaseModel], str, Union[trestle.core.base_model.OscalBaseModel, List[trestle.core.base_model.OscalBaseModel], Dict[str, trestle.core.base_model.OscalBaseModel]]]

Return a tuple of Model Type (e.g. class 'trestle.oscal.catalog.Catalog'), Model Alias (e.g. 'catalog.metadata') and Instance of the Model. If the model is decomposed/split/distributed, the instance of the model contains the decomposed models loaded recursively.

Note

This does not validate the model. You must either validate the model separately or use the load_validate utilities.

Source code in trestle/common/model_utils.py
@staticmethod
def load_distributed(
    abs_path: Path,
    abs_trestle_root: Path,
    collection_type: Optional[Type[Any]] = None
) -> Tuple[Type[OscalBaseModel], str, Union[OscalBaseModel, List[OscalBaseModel], Dict[str, OscalBaseModel]]]:
    """
    Given path to a model, load the model.

    If the model is decomposed/split/distributed,the decomposed models are loaded recursively.

    Args:
        abs_path: The path to the file/directory to be loaded.
        abs_trestle_root: The trestle project root directory.
        collection_type: The type of collection model, if it is a collection model.
            typing.List is the only collection type handled or expected.
            Defaults to None.

    Returns:
        Return a tuple of Model Type (e.g. class 'trestle.oscal.catalog.Catalog'),
        Model Alias (e.g. 'catalog.metadata') and Instance of the Model.
        If the model is decomposed/split/distributed, the instance of the model contains
        the decomposed models loaded recursively.

    Note:
        This does not validate the model.  You must either validate the model separately or use the load_validate
        utilities.
    """
    # if trying to load file that does not exist, load path instead
    if not abs_path.exists():
        abs_path = abs_path.with_name(abs_path.stem)

    if not abs_path.exists():
        raise TrestleNotFoundError(f'File {abs_path} not found for load.')

    if collection_type:
        # If the path contains a list type model
        if collection_type is list:
            return ModelUtils._load_list(abs_path, abs_trestle_root)
        # the only other collection type in OSCAL is dict, and it only applies to include_all,
        # which is too granular ever to be loaded by this routine
        else:
            raise TrestleError(f'Collection type {collection_type} not recognized for distributed load.')

    # Get current model
    primary_model_type, primary_model_alias = ModelUtils.get_stripped_model_type(abs_path, abs_trestle_root)
    primary_model_instance: Optional[OscalBaseModel] = None

    # is this an attempt to load an actual json or yaml file?
    content_type = FileContentType.path_to_content_type(abs_path)
    # if file is sought but it doesn't exist, ignore and load as decomposed model
    if FileContentType.is_readable_file(content_type) and abs_path.exists():
        primary_model_instance = primary_model_type.oscal_read(abs_path)
    # Is model decomposed?
    decomposed_dir = abs_path.with_name(abs_path.stem)

    if decomposed_dir.exists():
        aliases_not_to_be_stripped = []
        instances_to_be_merged: List[OscalBaseModel] = []

        for local_path in sorted(trestle.common.file_utils.iterdir_without_hidden_files(decomposed_dir)):
            if local_path.is_file():
                model_type, model_alias, model_instance = ModelUtils.load_distributed(local_path, abs_trestle_root)
                aliases_not_to_be_stripped.append(model_alias.split('.')[-1])
                instances_to_be_merged.append(model_instance)

            elif local_path.is_dir():
                model_type, model_alias = ModelUtils.get_stripped_model_type(local_path, abs_trestle_root)
                # Only load the directory if it is a collection model. Otherwise do nothing - it gets loaded when
                # iterating over the model file

                # If a model is just a container for a list e.g.
                # class Foo(OscalBaseModel):  noqa: E800
                #      __root__: List[Bar]    noqa: E800
                # You need to test whether first a root key exists
                # then whether the outer_type of root is a collection.
                # Alternative is to do a try except to avoid the error for an unknown key.

                if model_type.is_collection_container():
                    # This directory is a decomposed List or Dict
                    collection_type = model_type.get_collection_type()
                    model_type, model_alias, model_instance = ModelUtils.load_distributed(local_path,
                                                                                          abs_trestle_root,
                                                                                          collection_type)
                    aliases_not_to_be_stripped.append(model_alias.split('.')[-1])
                    instances_to_be_merged.append(model_instance)
        primary_model_dict = {}
        if primary_model_instance is not None:
            primary_model_dict = primary_model_instance.__dict__

        merged_model_type, merged_model_alias = ModelUtils.get_stripped_model_type(abs_path,
                                                                                   abs_trestle_root,
                                                                                   aliases_not_to_be_stripped)

        # The following use of top_level is to allow loading of a top level model by name only, e.g. MyCatalog
        # There may be a better overall way to approach this.
        top_level = len(merged_model_alias.split('.')) == 1

        for i in range(len(aliases_not_to_be_stripped)):
            alias = aliases_not_to_be_stripped[i]
            instance = instances_to_be_merged[i]
            if hasattr(instance, '__dict__') and '__root__' in instance.__dict__ and isinstance(instance,
                                                                                                OscalBaseModel):
                instance = instance.__dict__['__root__']
            if top_level and not primary_model_dict:
                primary_model_dict = instance.__dict__
            else:
                primary_model_dict[alias] = instance

        merged_model_instance = merged_model_type(**primary_model_dict)  # type: ignore
        return merged_model_type, merged_model_alias, merged_model_instance
    return primary_model_type, primary_model_alias, primary_model_instance
load_top_level_model(trestle_root, model_name, model_class, file_content_type=None) staticmethod ¤

Load a model by name and model class and infer file content type if not specified.

If you need to load an existing model but its content type may not be known, use this method. But the file content type should be specified if it is somehow known.

Note: This does not validate the model. If you want to validate the model use the load_validate utilities.

Source code in trestle/common/model_utils.py
@staticmethod
def load_top_level_model(
    trestle_root: pathlib.Path,
    model_name: str,
    model_class: TG,
    file_content_type: Optional[FileContentType] = None
) -> Tuple[TG, pathlib.Path]:
    """Load a model by name and model class and infer file content type if not specified.

    If you need to load an existing model but its content type may not be known, use this method.
    But the file content type should be specified if it is somehow known.

    Note:  This does not validate the model.  If you want to validate the model use the load_validate utilities.
    """
    root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
    if file_content_type is None:
        file_content_type = FileContentType.path_to_content_type(root_model_path)
    if not FileContentType.is_readable_file(file_content_type):
        raise TrestleError(f'Unable to load model {model_name} without specifying json or yaml.')
    full_model_path = root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
    _, _, model = ModelUtils.load_distributed(full_model_path, trestle_root)
    return model, full_model_path
model_age(model) staticmethod ¤

Find time in seconds since LastModified timestamp.

Source code in trestle/common/model_utils.py
@staticmethod
def model_age(model: TopLevelOscalModel) -> int:
    """Find time in seconds since LastModified timestamp."""
    # default to one year if no last_modified
    age_seconds = const.DAY_SECONDS * 365
    if model.metadata.last_modified:
        dt = datetime.now().astimezone() - model.metadata.last_modified.__root__
        age_seconds = dt.seconds
    return age_seconds
model_type_to_model_dir(model_type) staticmethod ¤

Get plural model directory from model type.

Source code in trestle/common/model_utils.py
@staticmethod
def model_type_to_model_dir(model_type: str) -> str:
    """Get plural model directory from model type."""
    if model_type not in const.MODEL_TYPE_LIST:
        raise err.TrestleError(f'Not a valid model type: {model_type}.')
    return const.MODEL_TYPE_TO_MODEL_DIR[model_type]
models_are_equivalent(model_a, model_b, ignore_all_uuid=False) staticmethod ¤

Test if models are equivalent except for last modified and possibly uuid.

If a model has had uuids regenerated, then all uuids and references to them are updated. This means that special handling is required if a model has had uuids regenerated - when checking equivalence.

Source code in trestle/common/model_utils.py
@staticmethod
def models_are_equivalent(
    model_a: Optional[TopLevelOscalModel],
    model_b: Optional[TopLevelOscalModel],
    ignore_all_uuid: bool = False
) -> bool:
    """
    Test if models are equivalent except for last modified and possibly uuid.

    If a model has had uuids regenerated, then all uuids *and references to them* are updated.  This means that
    special handling is required if a model has had uuids regenerated - when checking equivalence.
    """
    uuid_type_list = [
        common.LastModified,
        common.LocationUuid,
        common.MemberOfOrganization,
        common.PartyUuid,
        common.RelatedRisk,
        common.RelatedObservation1,
        common.Source
    ]
    type_list = uuid_type_list if ignore_all_uuid else [common.LastModified]
    return not ModelUtils._objects_differ(model_a, model_b, type_list, ['last-modified'], ignore_all_uuid)
parameter_to_dict(obj, partial) staticmethod ¤

Convert obj to dict containing only string values, storing only the fields that have values set.

Parameters:

Name Type Description Default
obj Union[trestle.core.base_model.OscalBaseModel, str]

The parameter or its consituent parts in recursive calls

required
partial bool

Whether to convert the entire param or just the parts needed for markdown header

required

Returns:

Type Description
Union[str, Dict[str, Any]]

The converted parameter as dictionary, with values as None if not present

Source code in trestle/common/model_utils.py
@staticmethod
def parameter_to_dict(obj: Union[OscalBaseModel, str], partial: bool) -> Union[str, Dict[str, Any]]:
    """
    Convert obj to dict containing only string values, storing only the fields that have values set.

    Args:
        obj: The parameter or its consituent parts in recursive calls
        partial: Whether to convert the entire param or just the parts needed for markdown header

    Returns:
        The converted parameter as dictionary, with values as None if not present
    """
    res = ModelUtils._parameter_to_dict_recurse(obj, partial)
    if 'values' not in res:
        res['values'] = None
    return res
path_for_top_level_model(trestle_root, model_name, model_class, file_content_type) staticmethod ¤

Find the full path of a model given its name, model type and file content type.

If file_content_type is given it will not inspect the file system or confirm the needed path and file exists.

Source code in trestle/common/model_utils.py
@staticmethod
def path_for_top_level_model(
    trestle_root: pathlib.Path,
    model_name: str,
    model_class: Type[TopLevelOscalModel],
    file_content_type: Optional[FileContentType]
) -> pathlib.Path:
    """
    Find the full path of a model given its name, model type and file content type.

    If file_content_type is given it will not inspect the file system or confirm the needed path and file exists.
    """
    if file_content_type is None:
        return ModelUtils.full_path_for_top_level_model(trestle_root, model_name, model_class)
    root_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model_class)
    return root_path.with_suffix(FileContentType.to_file_extension(file_content_type))
regenerate_uuids(object_of_interest) staticmethod ¤

Regenerate all uuids in object and update corresponding references.

Find all dicts with key == 'uuid' and replace the value with a new uuid4. Build a corresponding lookup table as you go, of old:new uuid values. Then make a second pass through the object and replace all string values present in the lookup table with the new value.

Parameters:

Name Type Description Default
object_of_interest Any

pydantic.BaseModel, list, dict or str will be updated

required

Returns:

Type Description
The updated object with new uuid's and refs The final lookup table of old

new uuid's A count of the number of refs that were updated

Source code in trestle/common/model_utils.py
@staticmethod
def regenerate_uuids(object_of_interest: Any) -> Tuple[Any, Dict[str, str], int]:
    """Regenerate all uuids in object and update corresponding references.

    Find all dicts with key == 'uuid' and replace the value with a new uuid4.
    Build a corresponding lookup table as you go, of old:new uuid values.
    Then make a second pass through the object and replace all string values
    present in the lookup table with the new value.

    Args:
        object_of_interest: pydantic.BaseModel, list, dict or str will be updated

    Returns:
        The updated object with new uuid's and refs
        The final lookup table of old:new uuid's
        A count of the number of refs that were updated
    """
    new_object, uuid_lut = ModelUtils._regenerate_uuids_in_place(object_of_interest, {})
    new_object, n_refs_updated = ModelUtils._update_new_uuid_refs(new_object, uuid_lut)
    return new_object, uuid_lut, n_refs_updated
save_top_level_model(model, trestle_root, model_name, file_content_type) staticmethod ¤

Save a model by name and infer model type by inspection.

You don't need to specify the model type (catalog, profile, etc.) but you must specify the file content type. If the model directory does not exist, it is created.

Source code in trestle/common/model_utils.py
@staticmethod
def save_top_level_model(
    model: TopLevelOscalModel, trestle_root: pathlib.Path, model_name: str, file_content_type: FileContentType
) -> None:
    """Save a model by name and infer model type by inspection.

    You don't need to specify the model type (catalog, profile, etc.) but you must specify the file content type.
    If the model directory does not exist, it is created.
    """
    root_model_path = ModelUtils._root_path_for_top_level_model(trestle_root, model_name, model)
    full_model_path = root_model_path.with_suffix(FileContentType.to_file_extension(file_content_type))
    if not full_model_path.parent.exists():
        full_model_path.parent.mkdir(parents=True, exist_ok=True)
    model.oscal_write(full_model_path)
update_last_modified(model, timestamp=None) staticmethod ¤

Update the LastModified timestamp in top level model to now.

Source code in trestle/common/model_utils.py
@staticmethod
def update_last_modified(model: TopLevelOscalModel, timestamp: Optional[datetime] = None) -> None:
    """Update the LastModified timestamp in top level model to now."""
    timestamp = timestamp if timestamp else datetime.now().astimezone()
    model.metadata.last_modified = common.LastModified(__root__=timestamp)

handler: python