Skip to content

import_

trestle.core.commands.import_ ¤

Trestle Import Command.

logger ¤

Classes¤

ImportCmd (CommandPlusDocs) ¤

Import an existing full OSCAL model into the trestle project.

Source code in trestle/core/commands/import_.py
class ImportCmd(CommandPlusDocs):
    """Import an existing full OSCAL model into the trestle project."""

    name = 'import'

    def _init_arguments(self) -> None:
        logger.debug('Init arguments')
        self.add_argument(
            '-f', '--file', help='OSCAL file to import - either file path or url.', type=str, required=True
        )
        self.add_argument('-o', '--output', help='Name of output element.', type=str, required=True)
        self.add_argument('-r', '--regenerate', action='store_true', help=const.HELP_REGENERATE)

    def _run(self, args: argparse.Namespace) -> int:
        """Top level import run command."""
        try:
            log.set_log_level_from_args(args)
            trestle_root = args.trestle_root
            if not file_utils.is_valid_project_root(trestle_root):
                raise TrestleRootError(f'Attempt to import from non-valid trestle project root {trestle_root}')

            input_uri = args.file
            if cache.FetcherFactory.in_trestle_directory(trestle_root, input_uri):
                raise TrestleError(
                    f'Imported file {input_uri} cannot be from current trestle project. Use duplicate instead.'
                )

            content_type = FileContentType.to_content_type('.' + input_uri.split('.')[-1])

            fetcher = cache.FetcherFactory.get_fetcher(trestle_root, str(input_uri))

            model_read, parent_alias = fetcher.get_oscal(True)

            # validate the loaded model in memory before writing out
            # this will do any needed fixes to the file, such as assign missing catalog group ids
            args_validate = argparse.Namespace(mode=const.VAL_MODE_ALL)
            validator: Validator = validator_factory.get(args_validate)
            if not validator.model_is_valid(model_read, True):
                logger.warning(f'Validation of file to be imported {input_uri} did not pass.  Import failed.')
                return CmdReturnCodes.COMMAND_ERROR.value

            plural_path = ModelUtils.model_type_to_model_dir(parent_alias)

            output_name = args.output

            desired_model_dir = trestle_root / plural_path
            desired_model_path: pathlib.Path = desired_model_dir / output_name / parent_alias
            desired_model_path = desired_model_path.with_suffix(FileContentType.to_file_extension(content_type)
                                                                ).resolve()

            if desired_model_path.exists():
                logger.warning(f'Cannot import because file to be imported here: {desired_model_path} already exists.')
                return CmdReturnCodes.COMMAND_ERROR.value

            if args.regenerate:
                logger.debug(f'regenerating uuids in imported file {input_uri}')
                model_read, lut, nchanged = ModelUtils.regenerate_uuids(model_read)
                logger.debug(f'uuid lut has {len(lut.items())} entries and {nchanged} refs were updated')

            top_element = Element(model_read)
            create_action = CreatePathAction(desired_model_path, True)
            write_action = WriteFileAction(desired_model_path, top_element, content_type)

            # create a plan to create the directory and write the imported file.
            import_plan = Plan()
            import_plan.add_action(create_action)
            import_plan.add_action(write_action)

            import_plan.execute()

            args = argparse.Namespace(
                file=desired_model_path,
                verbose=args.verbose,
                trestle_root=args.trestle_root,
                type=None,
                all=None,
                quiet=True
            )
            return CmdReturnCodes.SUCCESS.value

        except Exception as e:  # pragma: no cover
            return handle_generic_command_exception(e, logger, 'Error while importing OSCAL file')
name ¤

handler: python