Skip to content

repository

trestle.core.repository ¤

Trestle Repository APIs.

logger ¤

Classes¤

AgileAuthoring (Repository) ¤

AgileAuthoring extends the Repository class for performing authoring specific operations on Trestle repository.

This class provides a set of APIs to perform generate and assemble authoring operations in the trestle repository rather than using the command line.

Source code in trestle/core/repository.py
class AgileAuthoring(Repository):
    """
    AgileAuthoring extends the Repository class for performing authoring specific operations on Trestle repository.

    This class provides a set of APIs to perform generate and assemble authoring operations in the trestle repository
    rather than using the command line.

    """

    def __init__(self, root_dir: pathlib.Path) -> None:
        """Initialize trestle repository object."""
        super().__init__(root_dir)

    def assemble_catalog_markdown(
        self,
        name: str,
        output: str,
        markdown_dir: str,
        set_parameters: bool = False,
        regenerate: bool = False,
        version: str = ''
    ) -> bool:
        """Assemble catalog markdown into OSCAL Catalog in JSON."""
        logger.debug(f'Assembling model {name} of type catalog.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name,
            output=output,
            markdown=markdown_dir,
            trestle_root=self.root_dir,
            set_parameters=set_parameters,
            regenerate=regenerate,
            version=version,
            verbose=verbose
        )

        try:
            ret = catalogauthorcmd.CatalogAssemble()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error assembling catalog {name}: {e}')

        logger.debug(f'Model {name} assembled successfully.')
        return success

    def assemble_profile_markdown(
        self,
        name: str,
        output: str,
        markdown_dir: str,
        set_parameters: bool = False,
        regenerate: bool = False,
        version: str = '',
        sections: str = '',
        required_sections: str = '',
        allowed_sections: str = ''
    ) -> bool:
        """Assemble profile markdown into OSCAL Profile in JSON."""
        logger.debug(f'Assembling model {name} of type profile.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name,
            output=output,
            markdown=markdown_dir,
            trestle_root=self.root_dir,
            set_parameters=set_parameters,
            regenerate=regenerate,
            version=version,
            sections=sections,
            required_sections=required_sections,
            allowed_sections=allowed_sections,
            verbose=verbose
        )

        try:
            ret = profileauthorcmd.ProfileAssemble()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error assembling profile {name}: {e}')

        logger.debug(f'Model {name} assembled successfully.')
        return success

    def assemble_component_definition_markdown(
        self, name: str, output: str, markdown_dir: str, regenerate: bool = False, version: str = ''
    ) -> bool:
        """Assemble component definition markdown into OSCAL Component Definition in JSON."""
        logger.debug(f'Assembling model {name} of type component definition.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name,
            output=output,
            markdown=markdown_dir,
            trestle_root=self.root_dir,
            regenerate=regenerate,
            version=version,
            verbose=verbose
        )

        try:
            ret = componentauthorcmd.ComponentAssemble()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error assembling component definition {name}: {e}')

        logger.debug(f'Model {name} assembled successfully.')
        return success

    def assemble_ssp_markdown(
        self,
        name: str,
        output: str,
        markdown_dir: str,
        compdefs: str,
        regenerate: bool = False,
        version: str = ''
    ) -> bool:
        """Assemble ssp markdown into OSCAL SSP in JSON."""
        logger.debug(f'Assembling model {name} of type ssp.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name,
            output=output,
            markdown=markdown_dir,
            compdefs=compdefs,
            trestle_root=self.root_dir,
            regenerate=regenerate,
            version=version,
            verbose=verbose
        )

        try:
            ret = sspauthorcmd.SSPAssemble()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error assembling ssp {name}: {e}')

        logger.debug(f'Model {name} assembled successfully.')
        return success

    def generate_catalog_markdown(
        self,
        name: str,
        output: str,
        force_overwrite: bool = False,
        yaml_header: str = '',
        overwrite_header_values: bool = False
    ) -> bool:
        """Generate catalog markdown from OSCAL Catalog in JSON."""
        logger.debug(f'Generating markdown for {name} of type catalog.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name,
            output=output,
            trestle_root=self.root_dir,
            force_overwrite=force_overwrite,
            yaml_header=yaml_header,
            overwrite_header_values=overwrite_header_values,
            verbose=verbose
        )

        try:
            ret = catalogauthorcmd.CatalogGenerate()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error generate markdown for catalog {name}: {e}')

        logger.debug(f'Model {name} markdown generated successfully.')
        return success

    def generate_profile_markdown(
        self,
        name: str,
        output: str,
        force_overwrite: bool = False,
        yaml_header: str = '',
        overwrite_header_values: bool = False,
        sections: str = '',
        required_sections: str = ''
    ) -> bool:
        """Generate profile markdown from OSCAL Profile in JSON."""
        logger.debug(f'Generating markdown for {name} of type profile.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name,
            output=output,
            trestle_root=self.root_dir,
            force_overwrite=force_overwrite,
            yaml_header=yaml_header,
            overwrite_header_values=overwrite_header_values,
            sections=sections,
            required_sections=required_sections,
            verbose=verbose
        )

        try:
            ret = profileauthorcmd.ProfileGenerate()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error generate markdown for profile {name}: {e}')

        logger.debug(f'Model {name} markdown generated successfully.')
        return success

    def generate_component_definition_markdown(
        self,
        name: str,
        output: str,
        force_overwrite: bool = False,
    ) -> bool:
        """Generate component definition markdown from OSCAL Component Definition in JSON."""
        logger.debug(f'Generating markdown for {name} of type component definition.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            name=name, output=output, trestle_root=self.root_dir, force_overwrite=force_overwrite, verbose=verbose
        )

        try:
            ret = componentauthorcmd.ComponentGenerate()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error generating markdown for component definition {name}: {e}')

        logger.debug(f'Model {name} markdown generated successfully.')
        return success

    def generate_ssp_markdown(
        self,
        profile: str,
        output: str,
        compdefs: str,
        force_overwrite: bool = False,
        yaml_header: str = '',
        overwrite_header_values: bool = False
    ) -> bool:
        """Generate ssp markdown from OSCAL Profile and Component Definitions."""
        logger.debug(f'Generating markdown for {output} of type ssp.')
        success = False

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            profile=profile,
            output=output,
            compdefs=compdefs,
            trestle_root=self.root_dir,
            force_overwrite=force_overwrite,
            yaml_header=yaml_header,
            overwrite_header_values=overwrite_header_values,
            verbose=verbose
        )

        try:
            ret = sspauthorcmd.SSPGenerate()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error in generating markdown for ssp {output}: {e}')

        logger.debug(f'Model {output} markdown generated successfully.')
        return success
Methods¤
__init__(self, root_dir) special ¤
Source code in trestle/core/repository.py
def __init__(self, root_dir: pathlib.Path) -> None:
    """Initialize trestle repository object."""
    super().__init__(root_dir)
assemble_catalog_markdown(self, name, output, markdown_dir, set_parameters=False, regenerate=False, version='') ¤

Assemble catalog markdown into OSCAL Catalog in JSON.

Source code in trestle/core/repository.py
def assemble_catalog_markdown(
    self,
    name: str,
    output: str,
    markdown_dir: str,
    set_parameters: bool = False,
    regenerate: bool = False,
    version: str = ''
) -> bool:
    """Assemble catalog markdown into OSCAL Catalog in JSON."""
    logger.debug(f'Assembling model {name} of type catalog.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name,
        output=output,
        markdown=markdown_dir,
        trestle_root=self.root_dir,
        set_parameters=set_parameters,
        regenerate=regenerate,
        version=version,
        verbose=verbose
    )

    try:
        ret = catalogauthorcmd.CatalogAssemble()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error assembling catalog {name}: {e}')

    logger.debug(f'Model {name} assembled successfully.')
    return success
assemble_component_definition_markdown(self, name, output, markdown_dir, regenerate=False, version='') ¤

Assemble component definition markdown into OSCAL Component Definition in JSON.

Source code in trestle/core/repository.py
def assemble_component_definition_markdown(
    self, name: str, output: str, markdown_dir: str, regenerate: bool = False, version: str = ''
) -> bool:
    """Assemble component definition markdown into OSCAL Component Definition in JSON."""
    logger.debug(f'Assembling model {name} of type component definition.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name,
        output=output,
        markdown=markdown_dir,
        trestle_root=self.root_dir,
        regenerate=regenerate,
        version=version,
        verbose=verbose
    )

    try:
        ret = componentauthorcmd.ComponentAssemble()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error assembling component definition {name}: {e}')

    logger.debug(f'Model {name} assembled successfully.')
    return success
assemble_profile_markdown(self, name, output, markdown_dir, set_parameters=False, regenerate=False, version='', sections='', required_sections='', allowed_sections='') ¤

Assemble profile markdown into OSCAL Profile in JSON.

Source code in trestle/core/repository.py
def assemble_profile_markdown(
    self,
    name: str,
    output: str,
    markdown_dir: str,
    set_parameters: bool = False,
    regenerate: bool = False,
    version: str = '',
    sections: str = '',
    required_sections: str = '',
    allowed_sections: str = ''
) -> bool:
    """Assemble profile markdown into OSCAL Profile in JSON."""
    logger.debug(f'Assembling model {name} of type profile.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name,
        output=output,
        markdown=markdown_dir,
        trestle_root=self.root_dir,
        set_parameters=set_parameters,
        regenerate=regenerate,
        version=version,
        sections=sections,
        required_sections=required_sections,
        allowed_sections=allowed_sections,
        verbose=verbose
    )

    try:
        ret = profileauthorcmd.ProfileAssemble()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error assembling profile {name}: {e}')

    logger.debug(f'Model {name} assembled successfully.')
    return success
assemble_ssp_markdown(self, name, output, markdown_dir, compdefs, regenerate=False, version='') ¤

Assemble ssp markdown into OSCAL SSP in JSON.

Source code in trestle/core/repository.py
def assemble_ssp_markdown(
    self,
    name: str,
    output: str,
    markdown_dir: str,
    compdefs: str,
    regenerate: bool = False,
    version: str = ''
) -> bool:
    """Assemble ssp markdown into OSCAL SSP in JSON."""
    logger.debug(f'Assembling model {name} of type ssp.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name,
        output=output,
        markdown=markdown_dir,
        compdefs=compdefs,
        trestle_root=self.root_dir,
        regenerate=regenerate,
        version=version,
        verbose=verbose
    )

    try:
        ret = sspauthorcmd.SSPAssemble()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error assembling ssp {name}: {e}')

    logger.debug(f'Model {name} assembled successfully.')
    return success
generate_catalog_markdown(self, name, output, force_overwrite=False, yaml_header='', overwrite_header_values=False) ¤

Generate catalog markdown from OSCAL Catalog in JSON.

Source code in trestle/core/repository.py
def generate_catalog_markdown(
    self,
    name: str,
    output: str,
    force_overwrite: bool = False,
    yaml_header: str = '',
    overwrite_header_values: bool = False
) -> bool:
    """Generate catalog markdown from OSCAL Catalog in JSON."""
    logger.debug(f'Generating markdown for {name} of type catalog.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name,
        output=output,
        trestle_root=self.root_dir,
        force_overwrite=force_overwrite,
        yaml_header=yaml_header,
        overwrite_header_values=overwrite_header_values,
        verbose=verbose
    )

    try:
        ret = catalogauthorcmd.CatalogGenerate()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error generate markdown for catalog {name}: {e}')

    logger.debug(f'Model {name} markdown generated successfully.')
    return success
generate_component_definition_markdown(self, name, output, force_overwrite=False) ¤

Generate component definition markdown from OSCAL Component Definition in JSON.

Source code in trestle/core/repository.py
def generate_component_definition_markdown(
    self,
    name: str,
    output: str,
    force_overwrite: bool = False,
) -> bool:
    """Generate component definition markdown from OSCAL Component Definition in JSON."""
    logger.debug(f'Generating markdown for {name} of type component definition.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name, output=output, trestle_root=self.root_dir, force_overwrite=force_overwrite, verbose=verbose
    )

    try:
        ret = componentauthorcmd.ComponentGenerate()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error generating markdown for component definition {name}: {e}')

    logger.debug(f'Model {name} markdown generated successfully.')
    return success
generate_profile_markdown(self, name, output, force_overwrite=False, yaml_header='', overwrite_header_values=False, sections='', required_sections='') ¤

Generate profile markdown from OSCAL Profile in JSON.

Source code in trestle/core/repository.py
def generate_profile_markdown(
    self,
    name: str,
    output: str,
    force_overwrite: bool = False,
    yaml_header: str = '',
    overwrite_header_values: bool = False,
    sections: str = '',
    required_sections: str = ''
) -> bool:
    """Generate profile markdown from OSCAL Profile in JSON."""
    logger.debug(f'Generating markdown for {name} of type profile.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        name=name,
        output=output,
        trestle_root=self.root_dir,
        force_overwrite=force_overwrite,
        yaml_header=yaml_header,
        overwrite_header_values=overwrite_header_values,
        sections=sections,
        required_sections=required_sections,
        verbose=verbose
    )

    try:
        ret = profileauthorcmd.ProfileGenerate()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error generate markdown for profile {name}: {e}')

    logger.debug(f'Model {name} markdown generated successfully.')
    return success
generate_ssp_markdown(self, profile, output, compdefs, force_overwrite=False, yaml_header='', overwrite_header_values=False) ¤

Generate ssp markdown from OSCAL Profile and Component Definitions.

Source code in trestle/core/repository.py
def generate_ssp_markdown(
    self,
    profile: str,
    output: str,
    compdefs: str,
    force_overwrite: bool = False,
    yaml_header: str = '',
    overwrite_header_values: bool = False
) -> bool:
    """Generate ssp markdown from OSCAL Profile and Component Definitions."""
    logger.debug(f'Generating markdown for {output} of type ssp.')
    success = False

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        profile=profile,
        output=output,
        compdefs=compdefs,
        trestle_root=self.root_dir,
        force_overwrite=force_overwrite,
        yaml_header=yaml_header,
        overwrite_header_values=overwrite_header_values,
        verbose=verbose
    )

    try:
        ret = sspauthorcmd.SSPGenerate()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error in generating markdown for ssp {output}: {e}')

    logger.debug(f'Model {output} markdown generated successfully.')
    return success

ManagedOSCAL ¤

Object representing OSCAL models in repository for programmatic manipulation.

Source code in trestle/core/repository.py
class ManagedOSCAL:
    """Object representing OSCAL models in repository for programmatic manipulation."""

    def __init__(self, root_dir: pathlib.Path, model_type: Type[OscalBaseModel], name: str) -> None:
        """Initialize repository OSCAL model object."""
        if not file_utils.is_valid_project_root(root_dir):
            raise TrestleError(f'Provided root directory {str(root_dir)} is not a valid Trestle root directory.')
        self.root_dir = root_dir
        self.model_type = model_type
        self.model_name = name

        # set model alais and dir
        self.model_alias = classname_to_alias(self.model_type.__name__, AliasMode.JSON)
        if parser.to_full_model_name(self.model_alias) is None:
            raise TrestleError(f'Given model {self.model_alias} is not a top level model.')

        plural_path = ModelUtils.model_type_to_model_dir(self.model_alias)
        self.model_dir = self.root_dir / plural_path / self.model_name

        if not self.model_dir.exists() or not self.model_dir.is_dir():
            raise TrestleError(f'Model dir {self.model_name} does not exist.')

        file_content_type = FileContentType.path_to_content_type(self.model_dir / self.model_alias)
        if file_content_type == FileContentType.UNKNOWN:
            raise TrestleError(f'Model file for model {self.model_name} does not exist.')
        self.file_content_type = file_content_type

        filepath = pathlib.Path(
            self.model_dir,
            self.model_alias + FileContentType.path_to_file_extension(self.model_dir / self.model_alias)
        )

        self.filepath = filepath

    def read(self) -> OscalBaseModel:
        """Read OSCAL model from repository."""
        logger.debug(f'Reading model {self.model_name}.')
        model = load_validate_model_path(self.root_dir, self.filepath)
        return model

    def write(self, model: OscalBaseModel) -> bool:
        """Write OSCAL model to repository."""
        logger.debug(f'Writing model {self.model_name}.')
        model_alias = classname_to_alias(model.__class__.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')

        # split directory if the model was split
        split_dir = pathlib.Path(self.model_dir, self.model_alias)

        # Prepare actions; delete split model dir if any, recreate model file, and write to filepath
        top_element = Element(model)
        remove_action = RemovePathAction(split_dir)
        create_action = CreatePathAction(self.filepath, True)
        write_action = WriteFileAction(self.filepath, top_element, self.file_content_type)

        # create a plan to create the directory and imported file.
        import_plan = Plan()
        import_plan.add_action(remove_action)
        import_plan.add_action(create_action)
        import_plan.add_action(write_action)

        import_plan.execute()

        logger.debug(f'Model {self.model_name} written to repository')
        return True

    def split(self, model_file: pathlib.Path, elements: List[str]) -> bool:
        """Split the given OSCAL model file in repository.

        Model file path should be relative to the main model directory, e.g., model dir is $TRESTLE_ROOT/catalogs/NIST
        then model file path can be 'catalog/metadata.json' if metadata is to be split.

        Elements should be specified relative to model file, e.g., 'metadata.props.*'
        """
        logger.debug(f'Splitting model {self.model_name}, file {model_file}.')
        # input model_file should be relative to the model dir
        model_file_path = self.model_dir / model_file
        model_file_path = model_file_path.resolve()
        file_parent = model_file_path.parent
        filename = model_file_path.name

        elems = ''
        first = True
        for elem in elements:
            if first:
                elems = elem
                first = False
            else:
                elems = elems + ',' + elem

        success = False
        try:
            ret = splitcmd.SplitCmd().perform_split(file_parent, filename, elems, self.root_dir)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error in splitting model: {e}')

        logger.debug(f'Model {self.model_name}, file {model_file} splitted successfully.')
        return success

    def merge(self, elements: List[str], parent_model_dir: Optional[pathlib.Path] = None) -> bool:
        """Merge OSCAL elements in repository.

        The parent_model_dir specifies the parent model direcotry in which to merge relative to main model dir.
        For example, if we have to merge 'metadata.*' into 'metadata' then parent_model_dir should be the 'catalog'
        dir that contains the 'metadata.json' file or the 'metadata' directory
        """
        logger.debug(f'Merging model {self.model_name}, parent dir {parent_model_dir}.')
        if parent_model_dir is None:
            effective_cwd = self.model_dir
        else:
            effective_cwd = self.model_dir / parent_model_dir

        success = True
        try:
            for elem in elements:
                plan = mergecmd.MergeCmd.merge(effective_cwd, ElementPath(elem), self.root_dir)
                plan.execute()

        except Exception as e:
            raise TrestleError(f'Error in merging model: {e}')

        logger.debug(f'Model {self.model_name} merged successfully.')
        return success

    def validate(self) -> bool:
        """Validate OSCAL model in repository."""
        logger.debug(f'Validating model {self.model_name}.')
        repo = Repository(self.root_dir)
        success = repo.validate_model(self.model_type, self.model_name)
        return success
Methods¤
__init__(self, root_dir, model_type, name) special ¤

Initialize repository OSCAL model object.

Source code in trestle/core/repository.py
def __init__(self, root_dir: pathlib.Path, model_type: Type[OscalBaseModel], name: str) -> None:
    """Initialize repository OSCAL model object."""
    if not file_utils.is_valid_project_root(root_dir):
        raise TrestleError(f'Provided root directory {str(root_dir)} is not a valid Trestle root directory.')
    self.root_dir = root_dir
    self.model_type = model_type
    self.model_name = name

    # set model alais and dir
    self.model_alias = classname_to_alias(self.model_type.__name__, AliasMode.JSON)
    if parser.to_full_model_name(self.model_alias) is None:
        raise TrestleError(f'Given model {self.model_alias} is not a top level model.')

    plural_path = ModelUtils.model_type_to_model_dir(self.model_alias)
    self.model_dir = self.root_dir / plural_path / self.model_name

    if not self.model_dir.exists() or not self.model_dir.is_dir():
        raise TrestleError(f'Model dir {self.model_name} does not exist.')

    file_content_type = FileContentType.path_to_content_type(self.model_dir / self.model_alias)
    if file_content_type == FileContentType.UNKNOWN:
        raise TrestleError(f'Model file for model {self.model_name} does not exist.')
    self.file_content_type = file_content_type

    filepath = pathlib.Path(
        self.model_dir,
        self.model_alias + FileContentType.path_to_file_extension(self.model_dir / self.model_alias)
    )

    self.filepath = filepath
merge(self, elements, parent_model_dir=None) ¤

Merge OSCAL elements in repository.

The parent_model_dir specifies the parent model direcotry in which to merge relative to main model dir. For example, if we have to merge 'metadata.*' into 'metadata' then parent_model_dir should be the 'catalog' dir that contains the 'metadata.json' file or the 'metadata' directory

Source code in trestle/core/repository.py
def merge(self, elements: List[str], parent_model_dir: Optional[pathlib.Path] = None) -> bool:
    """Merge OSCAL elements in repository.

    The parent_model_dir specifies the parent model direcotry in which to merge relative to main model dir.
    For example, if we have to merge 'metadata.*' into 'metadata' then parent_model_dir should be the 'catalog'
    dir that contains the 'metadata.json' file or the 'metadata' directory
    """
    logger.debug(f'Merging model {self.model_name}, parent dir {parent_model_dir}.')
    if parent_model_dir is None:
        effective_cwd = self.model_dir
    else:
        effective_cwd = self.model_dir / parent_model_dir

    success = True
    try:
        for elem in elements:
            plan = mergecmd.MergeCmd.merge(effective_cwd, ElementPath(elem), self.root_dir)
            plan.execute()

    except Exception as e:
        raise TrestleError(f'Error in merging model: {e}')

    logger.debug(f'Model {self.model_name} merged successfully.')
    return success
read(self) ¤

Read OSCAL model from repository.

Source code in trestle/core/repository.py
def read(self) -> OscalBaseModel:
    """Read OSCAL model from repository."""
    logger.debug(f'Reading model {self.model_name}.')
    model = load_validate_model_path(self.root_dir, self.filepath)
    return model
split(self, model_file, elements) ¤

Split the given OSCAL model file in repository.

Model file path should be relative to the main model directory, e.g., model dir is $TRESTLE_ROOT/catalogs/NIST then model file path can be 'catalog/metadata.json' if metadata is to be split.

Elements should be specified relative to model file, e.g., 'metadata.props.*'

Source code in trestle/core/repository.py
def split(self, model_file: pathlib.Path, elements: List[str]) -> bool:
    """Split the given OSCAL model file in repository.

    Model file path should be relative to the main model directory, e.g., model dir is $TRESTLE_ROOT/catalogs/NIST
    then model file path can be 'catalog/metadata.json' if metadata is to be split.

    Elements should be specified relative to model file, e.g., 'metadata.props.*'
    """
    logger.debug(f'Splitting model {self.model_name}, file {model_file}.')
    # input model_file should be relative to the model dir
    model_file_path = self.model_dir / model_file
    model_file_path = model_file_path.resolve()
    file_parent = model_file_path.parent
    filename = model_file_path.name

    elems = ''
    first = True
    for elem in elements:
        if first:
            elems = elem
            first = False
        else:
            elems = elems + ',' + elem

    success = False
    try:
        ret = splitcmd.SplitCmd().perform_split(file_parent, filename, elems, self.root_dir)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error in splitting model: {e}')

    logger.debug(f'Model {self.model_name}, file {model_file} splitted successfully.')
    return success
validate(self) ¤

Validate OSCAL model in repository.

Source code in trestle/core/repository.py
def validate(self) -> bool:
    """Validate OSCAL model in repository."""
    logger.debug(f'Validating model {self.model_name}.')
    repo = Repository(self.root_dir)
    success = repo.validate_model(self.model_type, self.model_name)
    return success
write(self, model) ¤

Write OSCAL model to repository.

Source code in trestle/core/repository.py
def write(self, model: OscalBaseModel) -> bool:
    """Write OSCAL model to repository."""
    logger.debug(f'Writing model {self.model_name}.')
    model_alias = classname_to_alias(model.__class__.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')

    # split directory if the model was split
    split_dir = pathlib.Path(self.model_dir, self.model_alias)

    # Prepare actions; delete split model dir if any, recreate model file, and write to filepath
    top_element = Element(model)
    remove_action = RemovePathAction(split_dir)
    create_action = CreatePathAction(self.filepath, True)
    write_action = WriteFileAction(self.filepath, top_element, self.file_content_type)

    # create a plan to create the directory and imported file.
    import_plan = Plan()
    import_plan.add_action(remove_action)
    import_plan.add_action(create_action)
    import_plan.add_action(write_action)

    import_plan.execute()

    logger.debug(f'Model {self.model_name} written to repository')
    return True

Repository ¤

Repository class for performing operations on Trestle repository.

This class provides a set of APIs to perform operations on trestle repository programmatically rather than using the command line. It takes the trestle root directory as input while creating an instance of this object. Operations such as import and get model return a ManagedOSCAL object representing the specific model that can be used to perform operations on the specific models.

Source code in trestle/core/repository.py
class Repository:
    """Repository class for performing operations on Trestle repository.

    This class provides a set of APIs to perform operations on trestle repository programmatically
    rather than using the command line. It takes the trestle root directory as input while creating
    an instance of this object. Operations such as import and get model return a ManagedOSCAL object
    representing the specific model that can be used to perform operations on the specific models.

    """

    def __init__(self, root_dir: pathlib.Path) -> None:
        """Initialize trestle repository object."""
        if not file_utils.is_valid_project_root(root_dir):
            raise TrestleError(f'Provided root directory {root_dir} is not a valid Trestle root directory.')
        self.root_dir = root_dir

    def import_model(
        self, model: OscalBaseModel, name: str, content_type: FileContentType = FileContentType.JSON
    ) -> ManagedOSCAL:
        """Import OSCAL object into trestle repository."""
        logger.debug(f'Importing model {name} of type {model.__class__.__name__}.')
        model_alias = classname_to_alias(model.__class__.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')

        # Work out output directory and file
        plural_path = ModelUtils.model_type_to_model_dir(model_alias)

        desired_model_dir = self.root_dir / plural_path
        desired_model_path = desired_model_dir / name / (model_alias + FileContentType.to_file_extension(content_type))
        desired_model_path = desired_model_path.resolve()

        if desired_model_path.exists():
            raise TrestleError(f'OSCAL file to be created here: {desired_model_path} exists.')

        content_type = FileContentType.to_content_type(pathlib.Path(desired_model_path).suffix)

        # Prepare actions
        top_element = Element(model)
        create_action = CreatePathAction(desired_model_path, True)
        write_action = WriteFileAction(desired_model_path, top_element, content_type)

        # create a plan to create the directory and imported file.
        import_plan = Plan()
        import_plan.add_action(create_action)
        import_plan.add_action(write_action)
        import_plan.execute()

        # Validate the imported file, rollback if unsuccessful
        success = False
        errmsg = ''
        try:
            success = self.validate_model(model.__class__, name)
            if not success:
                errmsg = f'Validation of model {name} did not pass'
                logger.error(errmsg)
        except Exception as err:
            logger.error(errmsg)
            errmsg = f'Import of model {name} failed. Validation failed with error: {err}'

        if not success:
            # rollback in case of validation error or failure
            logger.debug(f'Rolling back import of model {name} to {desired_model_path}')
            try:
                import_plan.rollback()
            except TrestleError as err:
                logger.error(f'Failed to rollback: {err}. Remove {desired_model_path} to resolve state.')
            else:
                logger.debug(f'Successful rollback of import to {desired_model_path}')

            # raise trestle error
            raise TrestleError(errmsg)

        # all well; model was imported and validated successfully
        logger.debug(f'Model {name} of type {model.__class__.__name__} imported successfully.')
        return ManagedOSCAL(self.root_dir, model.__class__, name)

    def load_and_import_model(
        self,
        model_path: pathlib.Path,
        name: str,
        content_type: FileContentType = FileContentType.JSON
    ) -> ManagedOSCAL:
        """Load the model at the specified path into trestle with the specified name."""
        fetcher = cache.FetcherFactory.get_fetcher(self.root_dir, str(model_path))
        model, _ = fetcher.get_oscal(True)

        return self.import_model(model, name, content_type)

    def list_models(self, model_type: Type[OscalBaseModel]) -> List[str]:
        """List models of a given type in trestle repository."""
        logger.debug(f'Listing models of type {model_type.__name__}.')
        model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')
        models = ModelUtils.get_models_of_type(model_alias, self.root_dir)

        return models

    def get_model(self, model_type: Type[OscalBaseModel], name: str) -> ManagedOSCAL:
        """Get a specific OSCAL model from repository."""
        logger.debug(f'Getting model {name} of type {model_type.__name__}.')
        model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')
        plural_path = ModelUtils.model_type_to_model_dir(model_alias)
        desired_model_dir = self.root_dir / plural_path / name

        if not desired_model_dir.exists() or not desired_model_dir.is_dir():
            raise TrestleError(f'Model {name} does not exist.')

        return ManagedOSCAL(self.root_dir, model_type, name)

    def delete_model(self, model_type: Type[OscalBaseModel], name: str) -> bool:
        """Delete an OSCAL model from repository."""
        logger.debug(f'Deleting model {name} of type {model_type.__name__}.')
        model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')
        plural_path = ModelUtils.model_type_to_model_dir(model_alias)
        desired_model_dir = self.root_dir / plural_path / name

        if not desired_model_dir.exists() or not desired_model_dir.is_dir():
            raise TrestleError(f'Model {name} does not exist.')
        shutil.rmtree(desired_model_dir)

        # remove model from dist directory if it exists
        dist_model_dir = self.root_dir / const.TRESTLE_DIST_DIR / plural_path
        file_content_type = FileContentType.path_to_content_type(dist_model_dir / name)
        if file_content_type != FileContentType.UNKNOWN:
            file_path = pathlib.Path(
                dist_model_dir, name + FileContentType.path_to_file_extension(dist_model_dir / name)
            )
            logger.debug(f'Deleting model {name} from dist directory.')
            os.remove(file_path)

        logger.debug(f'Model {name} deleted successfully.')
        return True

    def assemble_model(self, model_type: Type[OscalBaseModel], name: str, extension: str = 'json') -> bool:
        """Assemble an OSCAL model in repository and publish it to 'dist' directory."""
        logger.debug(f'Assembling model {name} of type {model_type.__name__}.')
        success = False

        model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(
            type=model_alias, name=name, extension=extension, trestle_root=self.root_dir, verbose=verbose
        )

        try:
            ret = assemblecmd.AssembleCmd().assemble_model(model_alias, args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error in assembling model: {e}')

        logger.debug(f'Model {name} assembled successfully.')
        return success

    def validate_model(self, model_type: Type[OscalBaseModel], name: str) -> bool:
        """Validate an OSCAL model in repository."""
        logger.debug(f'Validating model {name} of type {model_type.__name__}.')
        success = False

        model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
        if parser.to_full_model_name(model_alias) is None:
            raise TrestleError(f'Given model {model_alias} is not a top level model.')

        verbose = log.get_current_verbosity_level(logger)
        args = argparse.Namespace(type=model_alias, name=name, trestle_root=self.root_dir, verbose=verbose, quiet=False)

        try:
            ret = validatecmd.ValidateCmd()._run(args)
            if ret == 0:
                success = True
        except Exception as e:
            raise TrestleError(f'Error in validating model: {e}')

        logger.debug(f'Model {name} validated successfully.')
        return success
Methods¤
__init__(self, root_dir) special ¤

Initialize trestle repository object.

Source code in trestle/core/repository.py
def __init__(self, root_dir: pathlib.Path) -> None:
    """Initialize trestle repository object."""
    if not file_utils.is_valid_project_root(root_dir):
        raise TrestleError(f'Provided root directory {root_dir} is not a valid Trestle root directory.')
    self.root_dir = root_dir
assemble_model(self, model_type, name, extension='json') ¤

Assemble an OSCAL model in repository and publish it to 'dist' directory.

Source code in trestle/core/repository.py
def assemble_model(self, model_type: Type[OscalBaseModel], name: str, extension: str = 'json') -> bool:
    """Assemble an OSCAL model in repository and publish it to 'dist' directory."""
    logger.debug(f'Assembling model {name} of type {model_type.__name__}.')
    success = False

    model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(
        type=model_alias, name=name, extension=extension, trestle_root=self.root_dir, verbose=verbose
    )

    try:
        ret = assemblecmd.AssembleCmd().assemble_model(model_alias, args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error in assembling model: {e}')

    logger.debug(f'Model {name} assembled successfully.')
    return success
delete_model(self, model_type, name) ¤

Delete an OSCAL model from repository.

Source code in trestle/core/repository.py
def delete_model(self, model_type: Type[OscalBaseModel], name: str) -> bool:
    """Delete an OSCAL model from repository."""
    logger.debug(f'Deleting model {name} of type {model_type.__name__}.')
    model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')
    plural_path = ModelUtils.model_type_to_model_dir(model_alias)
    desired_model_dir = self.root_dir / plural_path / name

    if not desired_model_dir.exists() or not desired_model_dir.is_dir():
        raise TrestleError(f'Model {name} does not exist.')
    shutil.rmtree(desired_model_dir)

    # remove model from dist directory if it exists
    dist_model_dir = self.root_dir / const.TRESTLE_DIST_DIR / plural_path
    file_content_type = FileContentType.path_to_content_type(dist_model_dir / name)
    if file_content_type != FileContentType.UNKNOWN:
        file_path = pathlib.Path(
            dist_model_dir, name + FileContentType.path_to_file_extension(dist_model_dir / name)
        )
        logger.debug(f'Deleting model {name} from dist directory.')
        os.remove(file_path)

    logger.debug(f'Model {name} deleted successfully.')
    return True
get_model(self, model_type, name) ¤

Get a specific OSCAL model from repository.

Source code in trestle/core/repository.py
def get_model(self, model_type: Type[OscalBaseModel], name: str) -> ManagedOSCAL:
    """Get a specific OSCAL model from repository."""
    logger.debug(f'Getting model {name} of type {model_type.__name__}.')
    model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')
    plural_path = ModelUtils.model_type_to_model_dir(model_alias)
    desired_model_dir = self.root_dir / plural_path / name

    if not desired_model_dir.exists() or not desired_model_dir.is_dir():
        raise TrestleError(f'Model {name} does not exist.')

    return ManagedOSCAL(self.root_dir, model_type, name)
import_model(self, model, name, content_type=<FileContentType.JSON: 1>) ¤

Import OSCAL object into trestle repository.

Source code in trestle/core/repository.py
def import_model(
    self, model: OscalBaseModel, name: str, content_type: FileContentType = FileContentType.JSON
) -> ManagedOSCAL:
    """Import OSCAL object into trestle repository."""
    logger.debug(f'Importing model {name} of type {model.__class__.__name__}.')
    model_alias = classname_to_alias(model.__class__.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')

    # Work out output directory and file
    plural_path = ModelUtils.model_type_to_model_dir(model_alias)

    desired_model_dir = self.root_dir / plural_path
    desired_model_path = desired_model_dir / name / (model_alias + FileContentType.to_file_extension(content_type))
    desired_model_path = desired_model_path.resolve()

    if desired_model_path.exists():
        raise TrestleError(f'OSCAL file to be created here: {desired_model_path} exists.')

    content_type = FileContentType.to_content_type(pathlib.Path(desired_model_path).suffix)

    # Prepare actions
    top_element = Element(model)
    create_action = CreatePathAction(desired_model_path, True)
    write_action = WriteFileAction(desired_model_path, top_element, content_type)

    # create a plan to create the directory and imported file.
    import_plan = Plan()
    import_plan.add_action(create_action)
    import_plan.add_action(write_action)
    import_plan.execute()

    # Validate the imported file, rollback if unsuccessful
    success = False
    errmsg = ''
    try:
        success = self.validate_model(model.__class__, name)
        if not success:
            errmsg = f'Validation of model {name} did not pass'
            logger.error(errmsg)
    except Exception as err:
        logger.error(errmsg)
        errmsg = f'Import of model {name} failed. Validation failed with error: {err}'

    if not success:
        # rollback in case of validation error or failure
        logger.debug(f'Rolling back import of model {name} to {desired_model_path}')
        try:
            import_plan.rollback()
        except TrestleError as err:
            logger.error(f'Failed to rollback: {err}. Remove {desired_model_path} to resolve state.')
        else:
            logger.debug(f'Successful rollback of import to {desired_model_path}')

        # raise trestle error
        raise TrestleError(errmsg)

    # all well; model was imported and validated successfully
    logger.debug(f'Model {name} of type {model.__class__.__name__} imported successfully.')
    return ManagedOSCAL(self.root_dir, model.__class__, name)
list_models(self, model_type) ¤

List models of a given type in trestle repository.

Source code in trestle/core/repository.py
def list_models(self, model_type: Type[OscalBaseModel]) -> List[str]:
    """List models of a given type in trestle repository."""
    logger.debug(f'Listing models of type {model_type.__name__}.')
    model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')
    models = ModelUtils.get_models_of_type(model_alias, self.root_dir)

    return models
load_and_import_model(self, model_path, name, content_type=<FileContentType.JSON: 1>) ¤

Load the model at the specified path into trestle with the specified name.

Source code in trestle/core/repository.py
def load_and_import_model(
    self,
    model_path: pathlib.Path,
    name: str,
    content_type: FileContentType = FileContentType.JSON
) -> ManagedOSCAL:
    """Load the model at the specified path into trestle with the specified name."""
    fetcher = cache.FetcherFactory.get_fetcher(self.root_dir, str(model_path))
    model, _ = fetcher.get_oscal(True)

    return self.import_model(model, name, content_type)
validate_model(self, model_type, name) ¤

Validate an OSCAL model in repository.

Source code in trestle/core/repository.py
def validate_model(self, model_type: Type[OscalBaseModel], name: str) -> bool:
    """Validate an OSCAL model in repository."""
    logger.debug(f'Validating model {name} of type {model_type.__name__}.')
    success = False

    model_alias = classname_to_alias(model_type.__name__, AliasMode.JSON)
    if parser.to_full_model_name(model_alias) is None:
        raise TrestleError(f'Given model {model_alias} is not a top level model.')

    verbose = log.get_current_verbosity_level(logger)
    args = argparse.Namespace(type=model_alias, name=name, trestle_root=self.root_dir, verbose=verbose, quiet=False)

    try:
        ret = validatecmd.ValidateCmd()._run(args)
        if ret == 0:
            success = True
    except Exception as e:
        raise TrestleError(f'Error in validating model: {e}')

    logger.debug(f'Model {name} validated successfully.')
    return success

handler: python