Skip to content

csv_to_oscal_cd

trestle.tasks.csv_to_oscal_cd ¤

OSCAL transformation tasks.

logger ¤

Classes¤

CsvToOscalComponentDefinition (TaskBase) ¤

Task to create OSCAL ComponentDefinition json.

Attributes:

Name Type Description
name str

Name of the task.

Source code in trestle/tasks/csv_to_oscal_cd.py
class CsvToOscalComponentDefinition(TaskBase):
    """
    Task to create OSCAL ComponentDefinition json.

    Attributes:
        name: Name of the task.
    """

    name = 'csv-to-oscal-cd'

    def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
        """
        Initialize trestle task csv-to-oscal-cd.

        Args:
            config_object: Config section associated with the task.
        """
        super().__init__(config_object)
        self.csv_helper = CsvHelper()
        self._timestamp = datetime.datetime.utcnow().replace(microsecond=0).replace(tzinfo=datetime.timezone.utc
                                                                                    ).isoformat()
        self._verbose = False

    def print_info(self) -> None:
        """Print the help string."""
        self.csv_helper.print_info(self.name, 'component_definition')

    def simulate(self) -> TaskOutcome:
        """Provide a simulated outcome."""
        return TaskOutcome('simulated-success')

    def execute(self) -> TaskOutcome:
        """Provide an executed outcome."""
        try:
            return self._execute()
        except Exception:
            logger.error(traceback.format_exc())
            return TaskOutcome('failure')

    def _execute(self) -> TaskOutcome:
        """Execute path core."""
        if not self.csv_helper.configure(self):
            return TaskOutcome('failure')
        # verbosity
        quiet = self._config.get('quiet', False)
        verbose = not quiet
        # config output
        odir = self._config.get('output-dir')
        opth = pathlib.Path(odir)
        self._overwrite = self._config.getboolean('output-overwrite', True)
        # insure output dir exists
        opth.mkdir(exist_ok=True, parents=True)
        # calculate output file name & check writability
        oname = 'component-definition.json'
        ofile = opth / oname
        if not self._overwrite and pathlib.Path(ofile).exists():
            logger.warning(f'output: {ofile} already exists')
            return TaskOutcome('failure')
        # namespace
        self._ns = self._config.get('namespace')
        # user-namespace
        self._ns_user = self._config.get('user-namespace')
        # process rows
        self._process_rows()
        # create OSCAL ComponentDefinition
        metadata = Metadata(
            title=self.csv_helper.get_title(),
            last_modified=self._timestamp,
            oscal_version=OSCAL_VERSION,
            version=self.csv_helper.get_version(),
        )
        component_definition = ComponentDefinition(
            uuid=str(uuid.uuid4()),
            metadata=metadata,
            components=self._get_components(),
        )
        # write OSCAL ComponentDefinition to file
        if verbose:
            logger.info(f'output: {ofile}')
        component_definition.oscal_write(pathlib.Path(ofile))
        # issues
        self._report_issues()
        return TaskOutcome('success')

    def _get_component_key(self, row: List[str]) -> str:
        """Get component key."""
        resource = self.csv_helper.get_value(row, 'Resource')
        component_type = self.csv_helper.get_value(row, 'Component_Type')
        return f'{resource}:{component_type}'

    def _get_component(self, row: List[str]) -> None:
        """Get component."""
        key = self._get_component_key(row)
        rval = self._components.get(key)
        if rval is None:
            resource = self.csv_helper.get_value(row, 'Resource')
            type_ = self.csv_helper.get_value(row, 'Component_Type')
            description = ''
            rval = DefinedComponent(
                uuid=str(uuid.uuid4()),
                type=type_,
                title=resource,
                description=description,
                control_implementations=[],
            )
            self._components[key] = rval
            logger.debug(f'created component: {key}')
        return rval

    def _get_control_implementation_key(self, row: List[str]) -> str:
        """Get control implementation key."""
        resource = self.csv_helper.get_value(row, 'Resource')
        component_type = self.csv_helper.get_value(row, 'Component_Type')
        profile_description = self.csv_helper.get_value(row, 'Profile_Description')
        return f'{resource}:{component_type}:{profile_description}'

    def _get_control_implementation(self, row: List[str]) -> None:
        """Get control implementation."""
        key = self._get_control_implementation_key(row)
        rval = self._control_implementations.get(key)
        if rval is None:
            source = self.csv_helper.get_value(row, 'Profile_Reference_URL')
            description = self.csv_helper.get_value(row, 'Profile_Description')
            rval = ControlImplementation(
                uuid=str(uuid.uuid4()),
                source=source,
                description=description,
                implemented_requirements=[],
            )
            self._control_implementations[key] = rval
            component = self._get_component(row)
            component.control_implementations.append(rval)
            logger.debug(f'created control implementation: {key}')
        return rval

    def _create_rule_prop(
        self,
        row_num: int,
        control_implementation: ControlImplementation,
        row: List[str],
        col: str,
        ns: str,
        remarks: str
    ) -> None:
        """Create rule property."""
        value = self.csv_helper.get_value(row, col)
        logger.debug(f'row_num: {row_num} col: {col} value: {value}')
        if value is None or value == '':
            return
        else:
            class_ = self.csv_helper.get_class(col)
            prop = Property(
                name=col,
                value=value,
                ns=ns,
                class_=class_,
                remarks=remarks,
            )
            control_implementation.props.append(prop)

    def _get_rule_definition_key(self, row: List[str]) -> str:
        """Get rule definition key."""
        resource = self.csv_helper.get_value(row, 'Resource')
        component_type = self.csv_helper.get_value(row, 'Component_Type')
        rule_id = self.csv_helper.get_value(row, 'Rule_Id')
        return f'{resource}:{component_type}:{rule_id}'

    def _register_rule_set(self, row_num: int, key: str, props: List[Property]):
        """Register rule set."""
        self._rule_definitions[key] = props
        logger.debug(f'{row_num} {key} {props}')

    def _add_parameter(self, row_num: int, row: List[str], control_implementation, ns, remarks) -> None:
        # Parameter, if any
        value = self.csv_helper.get_value(row, 'Parameter_Id')
        if value:
            self._create_rule_prop(row_num, control_implementation, row, 'Parameter_Id', ns, remarks)
            self._create_rule_prop(row_num, control_implementation, row, 'Parameter_Description', ns, remarks)
            self._create_rule_prop(row_num, control_implementation, row, 'Parameter_Value_Alternatives', ns, remarks)
            name = self.csv_helper.get_value(row, 'Parameter_Id')
            value = self.csv_helper.get_value(row, 'Parameter_Default_Value')
            if value == '':
                col = 'Parameter_Default_Value'
                text = f'row: {row} missing expected value for col: {col}'
                raise RuntimeError(text)
            values = value.split(',')
            set_parameter = SetParameter(
                param_id=name,
                values=values,
            )
            if control_implementation.set_parameters is None:
                control_implementation.set_parameters = []
            control_implementation.set_parameters.append(set_parameter)

    def _add_rule_definition(self, row_num: int, row: List[str]) -> None:
        """Add rule definition."""
        # Create rule definition (as properties)
        key = self._get_rule_definition_key(row)
        rule_definition = self._rule_definitions.get(key)
        if rule_definition is None:
            control_implementation = self._get_control_implementation(row)
            ns = self._ns
            ns_user = self._ns_user
            fill_sz = int(log10(self.csv_helper.row_count())) + 1
            remarks = f'rule_set_{str(row_num).zfill(fill_sz)}'
            if control_implementation.props is None:
                control_implementation.props = []
            for column_name in self.csv_helper.get_filtered_required_column_names():
                self._create_rule_prop(row_num, control_implementation, row, column_name, ns, remarks)
            for column_name in self.csv_helper.get_filtered_optional_column_names():
                self._create_rule_prop(row_num, control_implementation, row, column_name, ns, remarks)
            # Parameter, if any
            self._add_parameter(row_num, row, control_implementation, ns, remarks)
            # user props
            for col_name in self.csv_helper.get_user_column_names():
                value = self.csv_helper.get_value(row, col_name)
                if not value:
                    continue
                self._create_rule_prop(row_num, control_implementation, row, col_name, ns_user, remarks)
            # Rule set created
            self._register_rule_set(row_num, key, control_implementation.props)
        else:
            text = f'row: {row_num} rule definition: "{key}" already exists?'
            raise RuntimeError(text)

    def _get_implemented_requirement_key(self, control_id: str, row: List[str]) -> str:
        """Get implemented requirement key."""
        resource = self.csv_helper.get_value(row, 'Resource')
        component_type = self.csv_helper.get_value(row, 'Component_Type')
        return f'{resource}:{component_type}:{control_id}'

    def _get_implemented_requirement(self, control_id: str, row: List[str]) -> None:
        """Get implemented requirement."""
        key = self._get_implemented_requirement_key(control_id, row)
        rval = self._implemented_requirements.get(key)
        if rval is None:
            rval = ImplementedRequirement(
                uuid=str(uuid.uuid4()),
                control_id=control_id,
                description='',
            )
            self._implemented_requirements[key] = rval
            control_implementation = self._get_control_implementation(row)
            control_implementation.implemented_requirements.append(rval)
            logger.debug(f'created implemented requirement: {key}')
        return rval

    def _get_statement(self, implemented_requirement: ImplementedRequirement, part_id: str) -> Statement:
        """Get statement."""
        rval = None
        if implemented_requirement.statements is None:
            implemented_requirement.statements = []
        for statement in implemented_requirement.statements:
            if statement.statement_id == part_id:
                rval = statement
                break
        if rval is None:
            statement = Statement(statement_id=part_id, uuid=str(uuid.uuid4()), description='', props=[])
            implemented_requirement.statements.append(statement)
        return statement

    def _add_rule_implementation(self, control_mapping: str, row: List[str]) -> None:
        """Add rule implementation."""
        # Create rule implementation (as property)
        name = 'Rule_Id'
        prop = Property(
            name=name,
            value=self.csv_helper.get_value(row, name),
            ns=self._ns,
            class_=self.csv_helper.get_class(name),
        )
        # Find or create implementation requirement
        control_id = derive_control_id(control_mapping)
        implemented_requirement = self._get_implemented_requirement(control_id, row)
        part_id = derive_part_id(control_mapping)
        # Add rule to implementation requirement as property or statement
        if part_id is None:
            if implemented_requirement.props is None:
                implemented_requirement.props = []
            implemented_requirement.props.append(prop)
        else:
            statement = self._get_statement(implemented_requirement, part_id)
            statement.props.append(prop)

    def _validate_columns(self, row) -> None:
        """Validate columns."""
        for col in self.csv_helper.get_required_column_names():
            value = self.csv_helper.get_value(row, col)
            if value is None or value == '':
                text = f'row: {row} missing expected value for col: {col}'
                raise RuntimeError(text)

    def _process_rows(self) -> None:
        """Process rows."""
        self._rule_definitions = {}
        self._implemented_requirements = {}
        self._control_implementations = {}
        self._components = {}
        for row_num, row in enumerate(self.csv_helper.row_generator()):
            control_mappings = self.csv_helper.get_value(row, 'Control_Mappings').split()
            if len(control_mappings):
                self._validate_columns(row)
                for control_mapping in control_mappings:
                    self._add_rule_implementation(control_mapping, row)
                self._add_rule_definition(row_num, row)

    def _get_components(self) -> List[DefinedComponent]:
        """Get components."""
        rval = []
        for key in self._components.keys():
            rval.append(self._components[key])
        return rval

    def _report_issues(self) -> None:
        """Report issues."""
        self.csv_helper.report_issues()
name: str ¤
Methods¤
__init__(self, config_object) special ¤

Initialize trestle task csv-to-oscal-cd.

Parameters:

Name Type Description Default
config_object Optional[configparser.SectionProxy]

Config section associated with the task.

required
Source code in trestle/tasks/csv_to_oscal_cd.py
def __init__(self, config_object: Optional[configparser.SectionProxy]) -> None:
    """
    Initialize trestle task csv-to-oscal-cd.

    Args:
        config_object: Config section associated with the task.
    """
    super().__init__(config_object)
    self.csv_helper = CsvHelper()
    self._timestamp = datetime.datetime.utcnow().replace(microsecond=0).replace(tzinfo=datetime.timezone.utc
                                                                                ).isoformat()
    self._verbose = False
execute(self) ¤

Provide an executed outcome.

Source code in trestle/tasks/csv_to_oscal_cd.py
def execute(self) -> TaskOutcome:
    """Provide an executed outcome."""
    try:
        return self._execute()
    except Exception:
        logger.error(traceback.format_exc())
        return TaskOutcome('failure')
print_info(self) ¤

Print the help string.

Source code in trestle/tasks/csv_to_oscal_cd.py
def print_info(self) -> None:
    """Print the help string."""
    self.csv_helper.print_info(self.name, 'component_definition')
simulate(self) ¤

Provide a simulated outcome.

Source code in trestle/tasks/csv_to_oscal_cd.py
def simulate(self) -> TaskOutcome:
    """Provide a simulated outcome."""
    return TaskOutcome('simulated-success')

Functions¤

derive_control_id(control_mapping) ¤

Derive control id.

Source code in trestle/tasks/csv_to_oscal_cd.py
def derive_control_id(control_mapping: str) -> str:
    """Derive control id."""
    rval = control_mapping.split('_smt')[0]
    return rval

derive_part_id(control_mapping) ¤

Derive part id.

Source code in trestle/tasks/csv_to_oscal_cd.py
def derive_part_id(control_mapping: str) -> str:
    """Derive part id."""
    if '_smt.' in control_mapping:
        rval = control_mapping
    else:
        rval = None
    return rval

handler: python