Skip to content

osco

trestle.transforms.implementations.osco ¤

Facilitate OSCAL-OSCO transformation.

logger ¤

Classes¤

ComplianceOperatorResult ¤

Represents one result of OSCO data.

Source code in trestle/transforms/implementations/osco.py
class ComplianceOperatorResult():
    """Represents one result of OSCO data."""

    def __init__(self, osco_xml: str) -> None:
        """Initialize given specified args."""
        self.osco_xml = osco_xml

    def _get_version(self, root: Element) -> str:
        """Extract version from the XML."""
        value = None
        for key, val in root.attrib.items():
            if key == 'version':
                value = val
                break
        return value

    def _get_id(self, root: Element) -> str:
        """Extract id from the XML."""
        value = None
        for key, val in root.attrib.items():
            if key == 'id':
                value = val
                break
        return value

    def _get_target(self, root: Element) -> str:
        """Extract target from the XML."""
        value = None
        for lev1 in root:
            tag = _remove_namespace(lev1.tag)
            if tag == 'target':
                value = root.find(lev1.tag).text
                break
        return value

    def _get_target_type(self, root: Element) -> str:
        """Extract target_type from the XML."""
        value = None
        benchmark_href = self._get_benchmark_href(root)
        if benchmark_href is not None and '-' in benchmark_href:
            value = benchmark_href.split('-')[1]
        return value

    def _get_benchmark(self, root: Element, kw) -> str:
        """Extract benchmark from the XML."""
        value = None
        for lev1 in root:
            tag = _remove_namespace(lev1.tag)
            if tag == 'benchmark':
                value = lev1.get(kw)
                break
        return value

    def _get_benchmark_href(self, root: Element) -> str:
        """Extract benchmark.href from the XML."""
        return self._get_benchmark(root, 'href')

    def _get_benchmark_id(self, root: Element) -> str:
        """Extract benchmark.id from the XML."""
        return self._get_benchmark(root, 'id')

    def _get_fact(self, lev1: Element, kw: str) -> str:
        """Extract fact from the XML."""
        value = None
        for lev2 in lev1:
            tag = _remove_namespace(lev2.tag)
            if tag == 'fact':
                name = lev2.get('name')
                if name == kw:
                    value = lev2.text
                    break
        return value

    def _get_scanner_name(self, root: Element) -> str:
        """Extract scanner:name from the XML."""
        value = None
        for lev1 in root:
            tag = _remove_namespace(lev1.tag)
            if tag == 'target-facts':
                value = self._get_fact(lev1, 'urn:xccdf:fact:scanner:name')
                break
        return value

    def _get_scanner_version(self, root: Element) -> str:
        """Extract scanner:version from the XML."""
        value = None
        for lev1 in root:
            tag = _remove_namespace(lev1.tag)
            if tag == 'target-facts':
                value = self._get_fact(lev1, 'urn:xccdf:fact:scanner:version')
                break
        return value

    def _get_host_name(self, root: Element) -> str:
        """Extract asset:identifier:host_name from the XML."""
        value = None
        for lev1 in root:
            tag = _remove_namespace(lev1.tag)
            if tag == 'target-facts':
                value = self._get_fact(lev1, 'urn:xccdf:fact:asset:identifier:host_name')
                break
        return value

    def _get_result(self, lev1: Element) -> str:
        """Extract result from the XML."""
        value = None
        for lev2 in lev1:
            tag = _remove_namespace(lev2.tag)
            if tag == 'result':
                value = lev1.find(lev2.tag).text
                break
        return value

    def _parse_xml(self) -> Iterator[RuleUse]:
        """Parse the stringified XML."""
        results = self.osco_xml
        root = ElementTree.fromstring(results, forbid_dtd=True)
        version = self._get_version(root)
        id_ = self._get_id(root)
        target = self._get_target(root)
        target_type = self._get_target_type(root)
        host_name = self._get_host_name(root)
        benchmark_href = self._get_benchmark_href(root)
        benchmark_id = self._get_benchmark_id(root)
        scanner_name = self._get_scanner_name(root)
        scanner_version = self._get_scanner_version(root)
        for lev1 in root:
            tag = _remove_namespace(lev1.tag)
            if tag == 'rule-result':
                idref = lev1.get('idref')
                time = lev1.get('time')
                severity = lev1.get('severity')
                weight = lev1.get('weight')
                result = self._get_result(lev1)
                args = {
                    'id_': id_,
                    'target': target,
                    'target_type': target_type,
                    'host_name': host_name,
                    'benchmark_href': benchmark_href,
                    'benchmark_id': benchmark_id,
                    'scanner_name': scanner_name,
                    'scanner_version': scanner_version,
                    'idref': idref,
                    'version': version,
                    'time': time,
                    'result': result,
                    'severity': severity,
                    'weight': weight
                }
                rule_use = RuleUse(args)
                yield rule_use

    def rule_use_generator(self) -> Iterator[RuleUse]:
        """Generate RuleUses by way of parsing the embedded XML."""
        return self._parse_xml()
Methods¤
__init__(self, osco_xml) special ¤

Initialize given specified args.

Source code in trestle/transforms/implementations/osco.py
def __init__(self, osco_xml: str) -> None:
    """Initialize given specified args."""
    self.osco_xml = osco_xml
rule_use_generator(self) ¤

Generate RuleUses by way of parsing the embedded XML.

Source code in trestle/transforms/implementations/osco.py
def rule_use_generator(self) -> Iterator[RuleUse]:
    """Generate RuleUses by way of parsing the embedded XML."""
    return self._parse_xml()

OscalProfileToOscoProfileTransformer (FromOscalTransformer) ¤

Interface for Oscal Profile to Osco Profile transformer.

Source code in trestle/transforms/implementations/osco.py
class OscalProfileToOscoProfileTransformer(FromOscalTransformer):
    """Interface for Oscal Profile to Osco Profile transformer."""

    def __init__(
        self,
        extends='ocp4-cis-node',
        api_version='compliance.openshift.io/v1alpha1',
        kind='TailoredProfile',
        name='customized-tailored-profile',
        namespace='openshift-compliance',
    ) -> None:
        """Initialize."""
        self._extends = extends
        self._api_version = api_version
        self._kind = kind
        self._name = name
        self._namespace = namespace

    def transform(self, profile: Profile) -> str:
        """Transform the Profile into a OSCO yaml."""
        self._profile = profile
        self._osco_version = self._get_normalized_version('osco_version', '0.1.46')
        # set values
        set_values = self._get_set_values()
        # spec
        if self._osco_version < (0, 1, 40):
            # for versions prior to 0.1.40, exclude 'description'
            spec = {
                'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
                'title': self._profile.metadata.title,
                'setValues': set_values,
            }
        else:
            # for versions 0.1.40 and beyond, include 'description'
            spec = {
                'description': self._get_metadata_prop_value('profile_mnemonic', self._name),
                'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
                'title': self._profile.metadata.title,
                'setValues': set_values,
            }
        disable_rules = self._get_disable_rules()
        if disable_rules:
            spec['disableRules'] = disable_rules
        # yaml data
        ydata = {
            'apiVersion': self._api_version,
            'kind': self._kind,
            'metadata': {
                'name': self._get_metadata_prop_value('profile_mnemonic', self._name),
                'namespace': self._namespace,
            },
            'spec': spec,
        }
        return json.dumps(ydata)

    def _get_normalized_version(self, prop_name, prop_default) -> Tuple[int, int, int]:
        """Get normalized version.

        Normalize the "x.y.z" string value to an integer: 1,000,000*x + 1,000*y + z.
        """
        try:
            vparts = self._get_metadata_prop_value(prop_name, prop_default).split('.')
            normalized_version = (int(vparts[0]), int(vparts[1]), int(vparts[2]))
        except Exception:
            logger.warning(f'metadata prop name={prop_name} value error')
            vparts = prop_default.split('.')
            normalized_version = (int(vparts[0]), int(vparts[1]), int(vparts[2]))
        return normalized_version

    def _get_set_values(self) -> List[Dict]:
        """Extract set_paramater name/value pairs from profile."""
        set_values = []
        # for check versions prior to 0.1.59 include parameters
        # for later versions parameters should not be specified, caveat emptor
        if self._profile.modify is not None:
            for set_parameter in as_list(self._profile.modify.set_parameters):
                name = self._format_osco_rule_name(set_parameter.param_id)
                parameter_value = set_parameter.values[0]
                value = parameter_value.__root__
                rationale = self._get_rationale_for_set_value()
                set_value = {'name': name, 'value': value, 'rationale': rationale}
                set_values.append(set_value)
        return set_values

    def _format_osco_rule_name(self, name: str) -> str:
        """Format for OSCO.

        1. remove prefix xccdf_org.ssgproject.content_rule_
        2. change underscores to dashes
        3. add prefix ocp4-
        """
        normalized_name = name.replace('xccdf_org.ssgproject.content_rule_', '').replace('_', '-')
        if not normalized_name.startswith('ocp4-'):
            normalized_name = f'ocp4-{normalized_name}'
        return normalized_name

    def _get_metadata_prop_value(self, name: str, default_: str) -> str:
        """Extract metadata prop or else default if not present."""
        for prop in as_list(self._profile.metadata.props):
            if prop.name == name:
                return prop.value
        logger.info(f'using default: {name} = {default_}')
        return default_

    def _get_disable_rules(self) -> List[str]:
        """Extract disabled rules."""
        value = []
        for _import in as_list(self._profile.imports):
            for control in as_list(_import.exclude_controls):
                self._add_disable_rules_for_control(value, control)
        return value

    def _add_disable_rules_for_control(self, value, control):
        """Extract disabled rules for control."""
        for with_id in as_list(control.with_ids):
            name = self._format_osco_rule_name(with_id.__root__)
            rationale = self._get_rationale_for_disable_rule()
            entry = {'name': name, 'rationale': rationale}
            value.append(entry)

    def _get_rationale_for_set_value(self) -> str:
        """Rationale for set value."""
        return 'not determinable from specification'

    def _get_rationale_for_disable_rule(self) -> str:
        """Rationale for disable rule."""
        return 'not determinable from specification'
Methods¤
__init__(self, extends='ocp4-cis-node', api_version='compliance.openshift.io/v1alpha1', kind='TailoredProfile', name='customized-tailored-profile', namespace='openshift-compliance') special ¤

Initialize.

Source code in trestle/transforms/implementations/osco.py
def __init__(
    self,
    extends='ocp4-cis-node',
    api_version='compliance.openshift.io/v1alpha1',
    kind='TailoredProfile',
    name='customized-tailored-profile',
    namespace='openshift-compliance',
) -> None:
    """Initialize."""
    self._extends = extends
    self._api_version = api_version
    self._kind = kind
    self._name = name
    self._namespace = namespace
transform(self, profile) ¤

Transform the Profile into a OSCO yaml.

Source code in trestle/transforms/implementations/osco.py
def transform(self, profile: Profile) -> str:
    """Transform the Profile into a OSCO yaml."""
    self._profile = profile
    self._osco_version = self._get_normalized_version('osco_version', '0.1.46')
    # set values
    set_values = self._get_set_values()
    # spec
    if self._osco_version < (0, 1, 40):
        # for versions prior to 0.1.40, exclude 'description'
        spec = {
            'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
            'title': self._profile.metadata.title,
            'setValues': set_values,
        }
    else:
        # for versions 0.1.40 and beyond, include 'description'
        spec = {
            'description': self._get_metadata_prop_value('profile_mnemonic', self._name),
            'extends': self._get_metadata_prop_value('base_profile_mnemonic', self._extends),
            'title': self._profile.metadata.title,
            'setValues': set_values,
        }
    disable_rules = self._get_disable_rules()
    if disable_rules:
        spec['disableRules'] = disable_rules
    # yaml data
    ydata = {
        'apiVersion': self._api_version,
        'kind': self._kind,
        'metadata': {
            'name': self._get_metadata_prop_value('profile_mnemonic', self._name),
            'namespace': self._namespace,
        },
        'spec': spec,
    }
    return json.dumps(ydata)

OscalResultsFactory ¤

Build OSCO OSCAL entities.

Source code in trestle/transforms/implementations/osco.py
class OscalResultsFactory():
    """Build OSCO OSCAL entities."""

    default_timestamp = ResultsTransformer.get_timestamp()

    def __init__(self, timestamp: str = default_timestamp, checking: bool = False) -> None:
        """Initialize."""
        self._timestamp = timestamp
        self._observation_list: List[Observation] = []
        self._result_properties_list: List[Property] = []
        self._component_map: Dict[str, SystemComponent] = {}
        self._inventory_map: Dict[str, InventoryItem] = {}
        self._ns = 'https://ibm.github.io/compliance-trestle/schemas/oscal/ar/osco'
        self._checking = checking

    @property
    def components(self) -> List[SystemComponent]:
        """OSCAL components."""
        return list(self._component_map.values())

    @property
    def control_selections(self) -> List[ControlSelection]:
        """OSCAL control selections."""
        prop = []
        prop.append(ControlSelection())
        return prop

    @property
    def inventory(self) -> ValuesView[InventoryItem]:
        """OSCAL inventory."""
        return self._inventory_map.values()

    @property
    def local_definitions(self) -> LocalDefinitions1:
        """OSCAL local definitions."""
        prop = LocalDefinitions1()
        prop.components = self.components
        prop.inventory_items = list(self.inventory)
        return prop

    @property
    def observations(self) -> List[Observation]:
        """OSCAL observations."""
        return self._observation_list

    @property
    def result_properties(self) -> List[Property]:
        """OSCAL result properties."""
        return self._result_properties_list

    @property
    def reviewed_controls(self) -> ReviewedControls:
        """OSCAL reviewed controls."""
        prop = ReviewedControls(control_selections=self.control_selections)
        return prop

    @property
    def result(self) -> Result:
        """OSCAL result."""
        # perform result properties aggregation
        if self.observations:
            self._result_properties_list = TransformerHelper().remove_common_observation_properties(self.observations)
        # produce result
        prop = Result(
            uuid=str(uuid.uuid4()),
            title='OpenShift Compliance Operator',
            description='OpenShift Compliance Operator Scan Results',
            start=self._timestamp,
            end=self._timestamp,
            reviewed_controls=self.reviewed_controls,
        )
        if self.result_properties:
            prop.props = self.result_properties
        if self.inventory:
            prop.local_definitions = self.local_definitions
        if self.observations:
            prop.observations = self.observations
        return prop

    @property
    def analysis(self) -> List[str]:
        """OSCAL statistics."""
        analysis = []
        analysis.append(f'inventory: {len(self.inventory)}')
        analysis.append(f'observations: {len(self.observations)}')
        return analysis

    def _component_extract(self, rule_use: RuleUse) -> None:
        """Extract component from RuleUse."""
        _type = 'Service'
        _title = f'Red Hat OpenShift Kubernetes Service Compliance Operator for {rule_use.target_type}'
        _desc = _title
        for component in self._component_map.values():
            if component.type == _type and component.title == _title and component.description == _desc:
                return
        component_ref = str(uuid.uuid4())
        status = Status1(state='operational')
        component = SystemComponent(uuid=component_ref, type=_type, title=_title, description=_desc, status=status)
        self._component_map[component_ref] = component

    def _get_component_ref(self, rule_use: RuleUse) -> str:
        """Get component reference for specified RuleUse."""
        uuid = None
        for component_ref, component in self._component_map.items():
            if component.title.endswith(rule_use.target_type):
                uuid = component_ref
        return uuid

    def _inventory_extract(self, rule_use: RuleUse) -> None:
        """Extract inventory from RuleUse."""
        if rule_use.inventory_key in self._inventory_map:
            return
        inventory = InventoryItem(uuid=str(uuid.uuid4()), description='inventory')
        inventory.props = self._get_inventory_properties(rule_use)
        inventory.implemented_components = [ImplementedComponent(component_uuid=self._get_component_ref(rule_use))]
        self._inventory_map[rule_use.inventory_key] = inventory

    def _get_inventory_properties(self, rule_use):
        """Get inventory properties."""
        if self._checking:
            return self._get_inventory_properties_checked(rule_use)
        else:
            return self._get_inventory_properties_unchecked(rule_use)

    def _get_inventory_properties_checked(self, rule_use):
        """Get inventory properties, with checking."""
        props = []
        if rule_use.host_name is None:
            props.append(Property(name='target', value=rule_use.target, ns=self._ns, class_='scc_inventory_item_id'))
            props.append(Property(name='target_type', value=rule_use.target_type, ns=self._ns))
        else:
            props.append(Property(name='target', value=rule_use.target, ns=self._ns))
            props.append(Property(name='target_type', value=rule_use.target_type, ns=self._ns))
            props.append(
                Property(name='host_name', value=rule_use.host_name, ns=self._ns, class_='scc_inventory_item_id')
            )
        return props

    def _get_inventory_properties_unchecked(self, rule_use):
        """Get observation properties, without checking."""
        props = []
        if rule_use.host_name is None:
            props.append(
                Property.construct(name='target', value=rule_use.target, ns=self._ns, class_='scc_inventory_item_id')
            )
            props.append(Property.construct(name='target_type', value=rule_use.target_type, ns=self._ns))
        else:
            props.append(Property.construct(name='target', value=rule_use.target, ns=self._ns))
            props.append(Property.construct(name='target_type', value=rule_use.target_type, ns=self._ns))
            props.append(
                Property.construct(
                    name='host_name', value=rule_use.host_name, ns=self._ns, class_='scc_inventory_item_id'
                )
            )
        return props

    def _get_inventory_ref(self, rule_use: RuleUse) -> str:
        """Get inventory reference for specified RuleUse."""
        return self._inventory_map[rule_use.inventory_key].uuid

    def _observation_extract(self, rule_use: RuleUse) -> None:
        """Extract observation from RuleUse."""
        observation = Observation(
            uuid=str(uuid.uuid4()), description=rule_use.idref, methods=['TEST-AUTOMATED'], collected=self._timestamp
        )
        subject_reference = SubjectReference(subject_uuid=self._get_inventory_ref(rule_use), type='inventory-item')
        observation.subjects = [subject_reference]
        observation.props = self._get_observation_properties(rule_use)
        self._observation_list.append(observation)
        rule_use.observation = observation

    def _get_observation_properties(self, rule_use):
        """Get observation properties."""
        if self._checking:
            return self._get_observation_properties_checked(rule_use)
        else:
            return self._get_observation_properties_unchecked(rule_use)

    def _get_observation_properties_checked(self, rule_use):
        """Get observation properties, with checking."""
        props = []
        props.append(Property(name='scanner_name', value=rule_use.scanner_name, ns=self._ns))
        props.append(Property(name='scanner_version', value=rule_use.scanner_version, ns=self._ns))
        props.append(Property(name='idref', value=rule_use.idref, ns=self._ns, class_='scc_check_name_id'))
        props.append(Property(name='version', value=rule_use.version, ns=self._ns, class_='scc_check_version'))
        props.append(Property(name='result', value=rule_use.result, ns=self._ns, class_='scc_result'))
        props.append(Property(name='time', value=rule_use.time, ns=self._ns, class_='scc_timestamp'))
        props.append(Property(name='severity', value=rule_use.severity, ns=self._ns, class_='scc_check_severity'))
        props.append(Property(name='weight', value=rule_use.weight, ns=self._ns))
        props.append(Property(name='benchmark_id', value=rule_use.benchmark_id, ns=self._ns))
        props.append(Property(name='benchmark_href', value=rule_use.benchmark_href, ns=self._ns))
        props.append(Property(name='id', value=rule_use.id_, ns=self._ns, class_='scc_predefined_profile'))
        return props

    def _get_observation_properties_unchecked(self, rule_use):
        """Get observation properties, without checking."""
        props = []
        props.append(Property.construct(name='scanner_name', value=rule_use.scanner_name, ns=self._ns))
        props.append(Property.construct(name='scanner_version', value=rule_use.scanner_version, ns=self._ns))
        props.append(Property.construct(name='idref', value=rule_use.idref, ns=self._ns, class_='scc_check_name_id'))
        props.append(
            Property.construct(name='version', value=rule_use.version, ns=self._ns, class_='scc_check_version')
        )
        props.append(Property.construct(name='result', value=rule_use.result, ns=self._ns, class_='scc_result'))
        props.append(Property.construct(name='time', value=rule_use.time, ns=self._ns, class_='scc_timestamp'))
        props.append(
            Property.construct(name='severity', value=rule_use.severity, ns=self._ns, class_='scc_check_severity')
        )
        props.append(Property.construct(name='weight', value=rule_use.weight, ns=self._ns))
        props.append(Property.construct(name='benchmark_id', value=rule_use.benchmark_id, ns=self._ns))
        props.append(Property.construct(name='benchmark_href', value=rule_use.benchmark_href, ns=self._ns))
        props.append(Property.construct(name='id', value=rule_use.id_, ns=self._ns, class_='scc_predefined_profile'))
        return props

    def _process(self, co_result: ComplianceOperatorResult) -> None:
        """Process ingested data."""
        rule_use_generator = co_result.rule_use_generator()
        for rule_use in rule_use_generator:
            self._component_extract(rule_use)
            self._inventory_extract(rule_use)
            self._observation_extract(rule_use)

    def ingest(self, osco_data: Dict[str, Any]) -> None:
        """Process OSCO json."""
        if 'data' not in osco_data.keys():
            return
        if 'results' not in osco_data['data']:
            return
        results = osco_data['data']['results']
        self.ingest_xml(results)

    def ingest_xml(self, osco_xml: str) -> None:
        """Process OSCO xml."""
        if not osco_xml.startswith('<?xml'):
            osco_xml = bz2.decompress(base64.b64decode(osco_xml))
        co_result = ComplianceOperatorResult(osco_xml)
        self._process(co_result)
Attributes¤
analysis: List[str] property readonly ¤

OSCAL statistics.

components: List[trestle.oscal.assessment_results.SystemComponent] property readonly ¤

OSCAL components.

control_selections: List[trestle.oscal.assessment_results.ControlSelection] property readonly ¤

OSCAL control selections.

default_timestamp ¤
inventory: ValuesView[trestle.oscal.common.InventoryItem] property readonly ¤

OSCAL inventory.

local_definitions: LocalDefinitions1 property readonly ¤

OSCAL local definitions.

observations: List[trestle.oscal.assessment_results.Observation] property readonly ¤

OSCAL observations.

result: Result property readonly ¤

OSCAL result.

result_properties: List[trestle.oscal.common.Property] property readonly ¤

OSCAL result properties.

reviewed_controls: ReviewedControls property readonly ¤

OSCAL reviewed controls.

Methods¤
__init__(self, timestamp='2022-11-17T00:56:15+00:00', checking=False) special ¤

Initialize.

Source code in trestle/transforms/implementations/osco.py
def __init__(self, timestamp: str = default_timestamp, checking: bool = False) -> None:
    """Initialize."""
    self._timestamp = timestamp
    self._observation_list: List[Observation] = []
    self._result_properties_list: List[Property] = []
    self._component_map: Dict[str, SystemComponent] = {}
    self._inventory_map: Dict[str, InventoryItem] = {}
    self._ns = 'https://ibm.github.io/compliance-trestle/schemas/oscal/ar/osco'
    self._checking = checking
ingest(self, osco_data) ¤

Process OSCO json.

Source code in trestle/transforms/implementations/osco.py
def ingest(self, osco_data: Dict[str, Any]) -> None:
    """Process OSCO json."""
    if 'data' not in osco_data.keys():
        return
    if 'results' not in osco_data['data']:
        return
    results = osco_data['data']['results']
    self.ingest_xml(results)
ingest_xml(self, osco_xml) ¤

Process OSCO xml.

Source code in trestle/transforms/implementations/osco.py
def ingest_xml(self, osco_xml: str) -> None:
    """Process OSCO xml."""
    if not osco_xml.startswith('<?xml'):
        osco_xml = bz2.decompress(base64.b64decode(osco_xml))
    co_result = ComplianceOperatorResult(osco_xml)
    self._process(co_result)

OscoResultToOscalARTransformer (ResultsTransformer) ¤

Interface for Osco transformer.

Source code in trestle/transforms/implementations/osco.py
class OscoResultToOscalARTransformer(ResultsTransformer):
    """Interface for Osco transformer."""

    def __init__(self) -> None:
        """Initialize."""
        self._modes = {}

    @property
    def analysis(self) -> List[str]:
        """Analysis."""
        return self._results_factory.analysis

    @property
    def checking(self):
        """Return checking."""
        return self._modes.get('checking', False)

    def set_modes(self, modes: Dict[str, Any]) -> None:
        """Keep modes info."""
        if modes is not None:
            self._modes = modes

    def transform(self, blob: str) -> Results:
        """Transform the blob into a Results.

        The expected blob is a string that is one of:
            - data from OpenShift Compliance Operator (json, yaml, xml)
            - data from Auditree OSCO fetcher/check (json)
        """
        results = None
        self._results_factory = OscalResultsFactory(self.get_timestamp(), self.checking)
        if results is None:
            results = self._ingest_xml(blob)
        if results is None:
            results = self._ingest_json(blob)
        if results is None:
            results = self._ingest_yaml(blob)
        return results

    def _ingest_xml(self, blob: str) -> Results:
        """Ingest xml data."""
        # ?xml data
        if blob.startswith('<?xml'):
            resource = blob
            self._results_factory.ingest_xml(resource)
        else:
            return None
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results

    def _ingest_json(self, blob: str) -> Results:
        """Ingest json data."""
        try:
            # ? configmaps or auditree data
            jdata = json.loads(blob)
            # https://docs.openshift.com/container-platform/3.7/rest_api/api/v1.ConfigMap.html#Get-api-v1-namespaces-namespace-configmaps-name
            if 'kind' in jdata.keys() and jdata['kind'] == 'ConfigMapList' and 'items' in jdata.keys():
                items = jdata['items']
                for item in items:
                    if 'data' in item.keys():
                        data = item['data']
                        if 'results' in data:
                            resource = item
                            self._results_factory.ingest(resource)
            # https://github.com/ComplianceAsCode/auditree-arboretum/blob/main/arboretum/kubernetes/fetchers/fetch_cluster_resource.py
            else:
                for key in jdata.keys():
                    for group in jdata[key]:
                        for cluster in jdata[key][group]:
                            if 'resources' in cluster:
                                for resource in cluster['resources']:
                                    self._results_factory.ingest(resource)
        except json.decoder.JSONDecodeError:
            return None
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results

    def _ingest_yaml(self, blob: str) -> Results:
        """Ingest yaml data."""
        try:
            # ? yaml data
            yaml = YAML(typ='safe')
            resource = yaml.load(blob)
            self._results_factory.ingest(resource)
        except Exception as e:
            raise e
        results = Results()
        results.__root__.append(self._results_factory.result)
        return results
Attributes¤
analysis: List[str] property readonly ¤

Analysis.

checking property readonly ¤

Return checking.

Methods¤
__init__(self) special ¤

Initialize.

Source code in trestle/transforms/implementations/osco.py
def __init__(self) -> None:
    """Initialize."""
    self._modes = {}
set_modes(self, modes) ¤

Keep modes info.

Source code in trestle/transforms/implementations/osco.py
def set_modes(self, modes: Dict[str, Any]) -> None:
    """Keep modes info."""
    if modes is not None:
        self._modes = modes
transform(self, blob) ¤

Transform the blob into a Results.

The expected blob is a string that is one of: - data from OpenShift Compliance Operator (json, yaml, xml) - data from Auditree OSCO fetcher/check (json)

Source code in trestle/transforms/implementations/osco.py
def transform(self, blob: str) -> Results:
    """Transform the blob into a Results.

    The expected blob is a string that is one of:
        - data from OpenShift Compliance Operator (json, yaml, xml)
        - data from Auditree OSCO fetcher/check (json)
    """
    results = None
    self._results_factory = OscalResultsFactory(self.get_timestamp(), self.checking)
    if results is None:
        results = self._ingest_xml(blob)
    if results is None:
        results = self._ingest_json(blob)
    if results is None:
        results = self._ingest_yaml(blob)
    return results

OscoTransformer (OscoResultToOscalARTransformer) ¤

Legacy class name.

Source code in trestle/transforms/implementations/osco.py
class OscoTransformer(OscoResultToOscalARTransformer):
    """Legacy class name."""

RuleUse ¤

Represents one rule of OSCO data.

Source code in trestle/transforms/implementations/osco.py
class RuleUse():
    """Represents one rule of OSCO data."""

    def __init__(self, args: Dict[str, str]) -> None:
        """Initialize given specified args."""
        self.id_ = args['id_']
        self.target = args['target']
        self.target_type = args['target_type']
        self.host_name = args['host_name']
        self.benchmark_href = args['benchmark_href']
        self.benchmark_id = args['benchmark_id']
        self.scanner_name = args['scanner_name']
        self.scanner_version = args['scanner_version']
        self.idref = args['idref']
        self.version = args['version']
        self.time = args['time']
        self.result = args['result']
        self.severity = args['severity']
        self.weight = args['weight']

    @property
    def inventory_key(self):
        """Derive inventory key."""
        if self.host_name is None:
            # OpenScap 1.3.3
            rval = self.target + ':' + self.target_type
        else:
            # OpenScap 1.3.5
            rval = self.host_name + ':' + self.target_type
        return rval
Attributes¤
inventory_key property readonly ¤

Derive inventory key.

Methods¤
__init__(self, args) special ¤

Initialize given specified args.

Source code in trestle/transforms/implementations/osco.py
def __init__(self, args: Dict[str, str]) -> None:
    """Initialize given specified args."""
    self.id_ = args['id_']
    self.target = args['target']
    self.target_type = args['target_type']
    self.host_name = args['host_name']
    self.benchmark_href = args['benchmark_href']
    self.benchmark_id = args['benchmark_id']
    self.scanner_name = args['scanner_name']
    self.scanner_version = args['scanner_version']
    self.idref = args['idref']
    self.version = args['version']
    self.time = args['time']
    self.result = args['result']
    self.severity = args['severity']
    self.weight = args['weight']

handler: python