Skip to content

csv_helper

trestle.tasks.csv_helper ¤

CSV utilities.

logger ¤

Classes¤

Column ¤

Column.

Source code in trestle/tasks/csv_helper.py
class Column():
    """Column."""

    def __init__(self) -> None:
        """Initialize."""
        self._columns_required = [
            'Rule_Id',
            'Rule_Description',
            'Profile_Reference_URL',
            'Profile_Description',
            'Component_Type',
            'Control_Mappings',
            'Resource',
        ]
        self._columns_optional = [
            'Parameter_Id',
            'Parameter_Description',
            'Parameter_Default_Value',
            'Parameter_Value_Alternatives',
            'Check_Id',
            'Check_Description',
            'Fetcher',
            'Fetcher_Description',
            'Resource_Instance_Type',
        ]
        self.help_list_required = []
        for column in self._columns_required:
            self.help_list_required.append(column)
        self.help_list_optional = []
        for column in self._columns_optional:
            self.help_list_optional.append(column)

    def is_user_column(self, column_name: str) -> bool:
        """Check if user column name."""
        rval = True
        if column_name in self._columns_required + self._columns_optional:
            rval = False
        return rval

    def get_required_column_names(self) -> List[str]:
        """Get required column names."""
        rval = []
        rval += self._columns_required
        return rval

    def get_optional_column_names(self) -> List[str]:
        """Get optional column names."""
        rval = []
        rval += self._columns_optional
        return rval

    def map_head(self, head_row: str) -> None:
        """Keep head row."""
        self.head_row = head_row

    def get_index(self, name: str) -> int:
        """Get index for column name."""
        rval = -1
        index = 0
        for column in self.head_row:
            if column == name:
                rval = index
                break
            index += 1
        return rval
Methods¤
__init__(self) special ¤

Initialize.

Source code in trestle/tasks/csv_helper.py
def __init__(self) -> None:
    """Initialize."""
    self._columns_required = [
        'Rule_Id',
        'Rule_Description',
        'Profile_Reference_URL',
        'Profile_Description',
        'Component_Type',
        'Control_Mappings',
        'Resource',
    ]
    self._columns_optional = [
        'Parameter_Id',
        'Parameter_Description',
        'Parameter_Default_Value',
        'Parameter_Value_Alternatives',
        'Check_Id',
        'Check_Description',
        'Fetcher',
        'Fetcher_Description',
        'Resource_Instance_Type',
    ]
    self.help_list_required = []
    for column in self._columns_required:
        self.help_list_required.append(column)
    self.help_list_optional = []
    for column in self._columns_optional:
        self.help_list_optional.append(column)
get_index(self, name) ¤

Get index for column name.

Source code in trestle/tasks/csv_helper.py
def get_index(self, name: str) -> int:
    """Get index for column name."""
    rval = -1
    index = 0
    for column in self.head_row:
        if column == name:
            rval = index
            break
        index += 1
    return rval
get_optional_column_names(self) ¤

Get optional column names.

Source code in trestle/tasks/csv_helper.py
def get_optional_column_names(self) -> List[str]:
    """Get optional column names."""
    rval = []
    rval += self._columns_optional
    return rval
get_required_column_names(self) ¤

Get required column names.

Source code in trestle/tasks/csv_helper.py
def get_required_column_names(self) -> List[str]:
    """Get required column names."""
    rval = []
    rval += self._columns_required
    return rval
is_user_column(self, column_name) ¤

Check if user column name.

Source code in trestle/tasks/csv_helper.py
def is_user_column(self, column_name: str) -> bool:
    """Check if user column name."""
    rval = True
    if column_name in self._columns_required + self._columns_optional:
        rval = False
    return rval
map_head(self, head_row) ¤

Keep head row.

Source code in trestle/tasks/csv_helper.py
def map_head(self, head_row: str) -> None:
    """Keep head row."""
    self.head_row = head_row

CsvHelper ¤

Csv Helper common functions and assistance.

Source code in trestle/tasks/csv_helper.py
class CsvHelper:
    """Csv Helper common functions and assistance."""

    eg_ns = 'https://ibm.github.io/compliance-trestle/schemas/oscal/cd'
    eg_ns_user = 'https://ibm.github.io/compliance-trestle/schemas/oscal/cd/user-defined'

    def __init__(self) -> None:
        """Initialize."""
        self._csv = []
        self._column = Column()
        self._filtered = [
            'Profile_Reference_URL',
            'Profile_Description',
            'Component_Type',
            'Control_Mappings',
            'Resource',
            'Parameter_Id',
            'Parameter_Description',
            'Parameter_Default_Value',
            'Parameter_Value_Alternatives',
        ]

    def print_info(self, name: str, oscal_name: str) -> None:
        """Print the help string."""
        logger.info(f'Help information for {name} task.')
        logger.info('')
        logger.info(f'Purpose: From csv produce OSCAL {oscal_name} file.')
        logger.info('')
        logger.info('')
        logger.info(f'Configuration flags sit under [task.{name}]:')
        text1 = '  title             = '
        text2 = '(required) the component definition title.'
        logger.info(text1 + text2)
        text1 = '  version           = '
        text2 = '(required) the component definition version.'
        logger.info(text1 + text2)
        text1 = '  csv-file          = '
        text2 = '(required) the path of the csv file.'
        logger.info(text1 + text2)
        text1 = '  required columns:   '
        for text2 in self._column.help_list_required:
            logger.info(text1 + text2)
            text1 = '                      '
        text1 = '  optional columns:   '
        for text2 in self._column.help_list_optional:
            logger.info(text1 + text2)
            text1 = '                      '
        text1 = '  output-dir        = '
        text2 = '(required) the path of the output directory for synthesized OSCAL .json files.'
        logger.info(text1 + text2)
        text1 = '  namespace         = '
        text2 = f'(optional) the namespace for properties, e.g. {self.eg_ns}'
        logger.info(text1 + text2)
        text1 = '  user-namespace    = '
        text2 = f'(optional) the user-namespace for properties, e.g. {self.eg_ns_user}'
        logger.info(text1 + text2)
        text1 = '  class.column-name = '
        text2 = '(optional) the class to associate with the specified column name, e.g. class.Rule_Id = scc_class'
        logger.info(text1 + text2)
        text1 = '  output-overwrite  = '
        text2 = '(optional) true [default] or false; replace existing output when true.'
        logger.info(text1 + text2)

    def configure(self, task: TaskBase) -> bool:
        """Configure."""
        self._config = task._config
        if not self._config:
            logger.warning('config missing')
            return False
        # config verbosity
        quiet = self._config.get('quiet', False)
        self._verbose = not quiet
        # title
        self._title = self._config.get('title')
        if self._title is None:
            logger.warning('title missing')
            return False
        # version
        self._version = self._config.get('version')
        if self._version is None:
            logger.warning('version missing')
            return False
        # config csv
        csv_file = self._config.get('csv-file')
        if csv_file is None:
            logger.warning('config missing "csv-file"')
            return False
        csv_path = pathlib.Path(csv_file)
        if not csv_path.exists():
            logger.warning('"csv-file" not found')
            return False
        # announce csv
        if self._verbose:
            logger.info(f'input: {csv_file}')
        # load spread sheet
        self.load(csv_file)
        rval = self.verify()
        return rval

    def load(self, csv_path: pathlib.Path) -> None:
        """Load."""
        with open(csv_path, 'r', newline='') as f:
            csv_reader = csv.reader(f, delimiter=',', quoting=csv.QUOTE_MINIMAL)
            for row in csv_reader:
                self._csv.append(row)
            if len(self._csv):
                self._column.map_head(self._csv[0])

    def verify(self) -> bool:
        """Verify."""
        rval = True
        required_columns = self._column.get_required_column_names()
        if len(self._csv):
            head_row = self._csv[0]
            for heading in head_row:
                if heading in required_columns:
                    required_columns.remove(heading)
        if len(required_columns):
            logger.warning(f'Missing columns: {required_columns}')
            rval = False
        return rval

    def row_count(self) -> int:
        """Row count."""
        return len(self._csv) - 1

    def row_generator(self) -> Iterator[List[str]]:
        """Generate rows."""
        index = -1
        for row in self._csv:
            index += 1
            if index == 0:
                continue
            logger.debug(f'{index} {row}')
            yield row

    def get_value(self, row: List[str], name: str) -> str:
        """Get value for specified name."""
        rval = ''
        index = self._column.get_index(name)
        if index >= 0:
            rval = row[index]
        return rval

    def get_class(self, name: str) -> str:
        """Get class value for specified name from config."""
        class_name_key = f'class.{name}'
        return self._config.get(class_name_key)

    def get_required_column_names(self) -> List[str]:
        """Get required column names."""
        rval = []
        for column_name in self._column.get_required_column_names():
            rval.append(column_name)
        return rval

    def get_filtered_required_column_names(self) -> List[str]:
        """Get filtered required column names."""
        rval = []
        for column_name in self._column.get_required_column_names():
            if column_name not in self._filtered:
                rval.append(column_name)
        return rval

    def get_filtered_optional_column_names(self) -> List[str]:
        """Get filtered optional column names."""
        rval = []
        for column_name in self._column.get_optional_column_names():
            if column_name not in self._filtered:
                rval.append(column_name)
        return rval

    def get_user_column_names(self) -> List[str]:
        """Get user column names."""
        user_column_names = []
        for column_name in self._csv[0]:
            if self._column.is_user_column(column_name):
                user_column_names.append(column_name)
        return user_column_names

    def get_title(self) -> bool:
        """Get title."""
        return self._title

    def get_version(self) -> bool:
        """Get version."""
        return self._version

    def report_issues(self) -> None:
        """Report issues."""
eg_ns ¤
eg_ns_user ¤
Methods¤
__init__(self) special ¤

Initialize.

Source code in trestle/tasks/csv_helper.py
def __init__(self) -> None:
    """Initialize."""
    self._csv = []
    self._column = Column()
    self._filtered = [
        'Profile_Reference_URL',
        'Profile_Description',
        'Component_Type',
        'Control_Mappings',
        'Resource',
        'Parameter_Id',
        'Parameter_Description',
        'Parameter_Default_Value',
        'Parameter_Value_Alternatives',
    ]
configure(self, task) ¤

Configure.

Source code in trestle/tasks/csv_helper.py
def configure(self, task: TaskBase) -> bool:
    """Configure."""
    self._config = task._config
    if not self._config:
        logger.warning('config missing')
        return False
    # config verbosity
    quiet = self._config.get('quiet', False)
    self._verbose = not quiet
    # title
    self._title = self._config.get('title')
    if self._title is None:
        logger.warning('title missing')
        return False
    # version
    self._version = self._config.get('version')
    if self._version is None:
        logger.warning('version missing')
        return False
    # config csv
    csv_file = self._config.get('csv-file')
    if csv_file is None:
        logger.warning('config missing "csv-file"')
        return False
    csv_path = pathlib.Path(csv_file)
    if not csv_path.exists():
        logger.warning('"csv-file" not found')
        return False
    # announce csv
    if self._verbose:
        logger.info(f'input: {csv_file}')
    # load spread sheet
    self.load(csv_file)
    rval = self.verify()
    return rval
get_class(self, name) ¤

Get class value for specified name from config.

Source code in trestle/tasks/csv_helper.py
def get_class(self, name: str) -> str:
    """Get class value for specified name from config."""
    class_name_key = f'class.{name}'
    return self._config.get(class_name_key)
get_filtered_optional_column_names(self) ¤

Get filtered optional column names.

Source code in trestle/tasks/csv_helper.py
def get_filtered_optional_column_names(self) -> List[str]:
    """Get filtered optional column names."""
    rval = []
    for column_name in self._column.get_optional_column_names():
        if column_name not in self._filtered:
            rval.append(column_name)
    return rval
get_filtered_required_column_names(self) ¤

Get filtered required column names.

Source code in trestle/tasks/csv_helper.py
def get_filtered_required_column_names(self) -> List[str]:
    """Get filtered required column names."""
    rval = []
    for column_name in self._column.get_required_column_names():
        if column_name not in self._filtered:
            rval.append(column_name)
    return rval
get_required_column_names(self) ¤

Get required column names.

Source code in trestle/tasks/csv_helper.py
def get_required_column_names(self) -> List[str]:
    """Get required column names."""
    rval = []
    for column_name in self._column.get_required_column_names():
        rval.append(column_name)
    return rval
get_title(self) ¤

Get title.

Source code in trestle/tasks/csv_helper.py
def get_title(self) -> bool:
    """Get title."""
    return self._title
get_user_column_names(self) ¤

Get user column names.

Source code in trestle/tasks/csv_helper.py
def get_user_column_names(self) -> List[str]:
    """Get user column names."""
    user_column_names = []
    for column_name in self._csv[0]:
        if self._column.is_user_column(column_name):
            user_column_names.append(column_name)
    return user_column_names
get_value(self, row, name) ¤

Get value for specified name.

Source code in trestle/tasks/csv_helper.py
def get_value(self, row: List[str], name: str) -> str:
    """Get value for specified name."""
    rval = ''
    index = self._column.get_index(name)
    if index >= 0:
        rval = row[index]
    return rval
get_version(self) ¤

Get version.

Source code in trestle/tasks/csv_helper.py
def get_version(self) -> bool:
    """Get version."""
    return self._version
load(self, csv_path) ¤

Load.

Source code in trestle/tasks/csv_helper.py
def load(self, csv_path: pathlib.Path) -> None:
    """Load."""
    with open(csv_path, 'r', newline='') as f:
        csv_reader = csv.reader(f, delimiter=',', quoting=csv.QUOTE_MINIMAL)
        for row in csv_reader:
            self._csv.append(row)
        if len(self._csv):
            self._column.map_head(self._csv[0])
print_info(self, name, oscal_name) ¤

Print the help string.

Source code in trestle/tasks/csv_helper.py
def print_info(self, name: str, oscal_name: str) -> None:
    """Print the help string."""
    logger.info(f'Help information for {name} task.')
    logger.info('')
    logger.info(f'Purpose: From csv produce OSCAL {oscal_name} file.')
    logger.info('')
    logger.info('')
    logger.info(f'Configuration flags sit under [task.{name}]:')
    text1 = '  title             = '
    text2 = '(required) the component definition title.'
    logger.info(text1 + text2)
    text1 = '  version           = '
    text2 = '(required) the component definition version.'
    logger.info(text1 + text2)
    text1 = '  csv-file          = '
    text2 = '(required) the path of the csv file.'
    logger.info(text1 + text2)
    text1 = '  required columns:   '
    for text2 in self._column.help_list_required:
        logger.info(text1 + text2)
        text1 = '                      '
    text1 = '  optional columns:   '
    for text2 in self._column.help_list_optional:
        logger.info(text1 + text2)
        text1 = '                      '
    text1 = '  output-dir        = '
    text2 = '(required) the path of the output directory for synthesized OSCAL .json files.'
    logger.info(text1 + text2)
    text1 = '  namespace         = '
    text2 = f'(optional) the namespace for properties, e.g. {self.eg_ns}'
    logger.info(text1 + text2)
    text1 = '  user-namespace    = '
    text2 = f'(optional) the user-namespace for properties, e.g. {self.eg_ns_user}'
    logger.info(text1 + text2)
    text1 = '  class.column-name = '
    text2 = '(optional) the class to associate with the specified column name, e.g. class.Rule_Id = scc_class'
    logger.info(text1 + text2)
    text1 = '  output-overwrite  = '
    text2 = '(optional) true [default] or false; replace existing output when true.'
    logger.info(text1 + text2)
report_issues(self) ¤

Report issues.

Source code in trestle/tasks/csv_helper.py
def report_issues(self) -> None:
    """Report issues."""
row_count(self) ¤

Row count.

Source code in trestle/tasks/csv_helper.py
def row_count(self) -> int:
    """Row count."""
    return len(self._csv) - 1
row_generator(self) ¤

Generate rows.

Source code in trestle/tasks/csv_helper.py
def row_generator(self) -> Iterator[List[str]]:
    """Generate rows."""
    index = -1
    for row in self._csv:
        index += 1
        if index == 0:
            continue
        logger.debug(f'{index} {row}')
        yield row
verify(self) ¤

Verify.

Source code in trestle/tasks/csv_helper.py
def verify(self) -> bool:
    """Verify."""
    rval = True
    required_columns = self._column.get_required_column_names()
    if len(self._csv):
        head_row = self._csv[0]
        for heading in head_row:
            if heading in required_columns:
                required_columns.remove(heading)
    if len(required_columns):
        logger.warning(f'Missing columns: {required_columns}')
        rval = False
    return rval

handler: python