Source code for oso.framework.plugin.addons.signing_server
#
# (c) Copyright IBM Corp. 2025
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Signing Server Addon."""
from __future__ import annotations
import uuid
import logging
import pathlib
import base64
from typing import TYPE_CHECKING
from typing import Callable
from pydantic import field_validator
from ..main import AddonProtocol, BaseAddonConfig
from ._key import KeyPair, KeyType
from ._grep11_client import Grep11Client
from oso.framework.data.types import V1_3
from oso.framework.core.logging import get_logger
if TYPE_CHECKING:
from typing import Any, Callable, ClassVar, Literal
NAME: Literal["SigningServer"] = "SigningServer"
[docs]
class SigningServerConfig(BaseAddonConfig):
"""Signing Server Addon Specific Configuration.
Attributes
----------
ca_cert: str
PEM-encoded root certificates as a byte string for gRPC channel
client_cert: str
PEM-encoded certificate chain as a byte string for gRPC channel
client_key: str
PEM-encoded certificate chain as a byte string for gRPC channel
grep11_endpoint: str
Endpoint used to connect to the GREP11 server
keystore_path: str
Path of the attached persistent data volume used to store generated
keys between iterations
"""
ca_cert: str
client_cert: str
client_key: str
grep11_endpoint: str = "localhost"
keystore_path: str
@field_validator("ca_cert", "client_cert", "client_key", mode="before")
def _decode_base64_fields(cls, v: str) -> str:
return base64.b64decode(v).decode("utf-8")
[docs]
class SigningServerAddon(AddonProtocol):
"""Signing Server.
Parameters
----------
framework_config:
Whole application configuration.
plugin_config:
Configuration specific to this addon.
Attributes
----------
config
"""
NAME: ClassVar[str] = NAME
configure: ClassVar[Callable] = configure
def __init__(self, framework_config: Any, addon_config: SigningServerConfig):
self._config = addon_config
self._logger = get_logger(name="signing_server")
self._keystore = pathlib.Path(self._config.keystore_path)
self._grep11_client = Grep11Client(self._config)
self._grep11_client.health_check()
[docs]
def generate_key_pair(self, key_type: KeyType) -> tuple[str, bytes]:
"""Generate a new key pair.
Parameters
----------
key_type : KeyType
The type of key to generate.
Returns
-------
tuple[str, bytes]
- key_id : str
The unique identifier for the generated key.
- pub_key_pem : bytes
The public key in PEM format.
"""
logging.info(f"Generating new key pair of type {key_type.name}")
key_pair = self._grep11_client.generate_key_pair(key_type=key_type)
key_id = self._save_key_pair(key_type, key_pair)
pub_key_pem = self._grep11_client.serialized_key_to_pem(
key_type=key_type, pub_key_bytes=key_pair.PublicKey
)
self._logger.info("Finished generating a new key pair")
self._logger.debug(f"New key id: '{key_id}'")
return key_id, pub_key_pem
[docs]
def list_keys(self, key_type: KeyType) -> list[str]:
"""Find the existing keys of the specified type in the keystore.
Parameters
----------
key_type : KeyType
The type of keys to find.
Returns
-------
list[str]
List of key ids of the given key type.
"""
key_id_list = []
key_type_dir = self._keystore / key_type.name
if key_type_dir.exists():
if not key_type_dir.is_dir():
raise Exception(
f"{key_type_dir} is an existing file, it should be a directory"
)
for key_file in key_type_dir.glob("*.key"):
key_id = key_file.stem
if key_file.with_suffix(".pub").exists():
key_id_list.append(key_id)
else:
self._logger.info(
f"Corresponding public key does not exist for {key_file}"
)
else:
self._logger.debug(f"'{key_type.name}' dir does not exist in the key store")
return key_id_list
[docs]
def get_key_pem(self, key_id: str) -> bytes | None:
"""Get the public key PEM for a given key ID.
Parameters
----------
key_id : str
The unique identifier of the key for which the public PEM is requested.
Returns
-------
bytes | None
The PEM-encoded public key as bytes if the key is found and conversion
succeeds, otherwise None.
"""
keys = self._find_keys(key_id=key_id)
if not keys:
self._logger.info(f"Could not find key pair for key id: '{key_id}'")
return None
key_type, key_pair = keys
pub_key_pem = self._grep11_client.serialized_key_to_pem(
key_type=key_type, pub_key_bytes=key_pair.PublicKey
)
return pub_key_pem
def _find_keys(self, key_id: str) -> tuple[KeyType, KeyPair] | None:
"""Find private and public keys for the given key ID.
Parameters
----------
key_id : str
The ID of the key to find.
Returns
-------
tuple[KeyType, KeyPair] | None
- KeyType: The resolved type of the key.
- KeyPair: A container holding the deserialized private
and public keys.
Returns None if the key is not found or if the type cannot be resolved.
Raises
------
FileNotFoundError
If either the private or public key file exists but is not a valid file.
"""
for priv_key_file in self._keystore.glob("*/*.key"):
file_id = priv_key_file.stem
if key_id == file_id:
pub_key_file = priv_key_file.with_suffix(".pub")
if not priv_key_file.is_file():
raise FileNotFoundError(
f"Private key path '{priv_key_file}' is not a valid file"
)
if not pub_key_file.is_file():
raise FileNotFoundError(
f"Corresponding public key for '{priv_key_file}' does not exist"
)
key_type_name = priv_key_file.parent.name
key_type = self._get_key_type(key_type_name=key_type_name)
if key_type is None:
self._logger.info("Key ID does not match with known key type")
self._logger.debug(f"Key ID: {key_id}")
return None
key_pair = KeyPair(
PrivateKey=priv_key_file.read_bytes(),
PublicKey=pub_key_file.read_bytes(),
)
return key_type, key_pair
return None
def _get_key_type(self, key_type_name: str) -> KeyType | None:
key_type = None
for kt in KeyType:
if kt.name == key_type_name:
key_type = kt
return key_type
def _save_key_pair(self, key_type: KeyType, key_pair: KeyPair) -> str:
key_type_dir = self._keystore / key_type.name
if key_type_dir.exists():
if not key_type_dir.is_dir():
raise NotADirectoryError(
f"{key_type_dir} exists but is not a directory."
)
else:
key_type_dir.mkdir(parents=True, exist_ok=True)
key_id = str(uuid.uuid4())
self._logger.info(f"Writing {key_type.name} key with key ID: '{key_id}'")
priv_key_filename = key_type_dir / f"{key_id}.key"
priv_key_filename.write_bytes(key_pair.PrivateKey)
pub_key_filename = key_type_dir / f"{key_id}.pub"
pub_key_filename.write_bytes(key_pair.PublicKey)
self._logger.debug(
f"Wrote priv key to {priv_key_filename} and pub key to {pub_key_filename}"
)
return key_id
[docs]
def sign(self, key_id: str, data: bytes) -> str:
"""Sign data using GREP11 server.
Parameters
----------
key_id : str
Key ID used to find stored key, prefixed with key type OID
data : bytes
Data to be signed.
Returns
-------
str
Signature as a string.
"""
keys = self._find_keys(key_id=key_id)
if not keys:
self._logger.info(f"Could not find key pair for key id: '{key_id}'")
raise Exception(f"Could not find key pair for key id: '{key_id}'")
key_type, key_pair = keys
return self._grep11_client.sign(
key_type=key_type, priv_key_bytes=key_pair.PrivateKey, data=data
)
[docs]
def health_check(self) -> V1_3.ComponentStatus:
"""Check the GREP11 server health status.
Returns
-------
`oso.framework.data.types.ComponentStatus`
OSO component status.
"""
return self._grep11_client.health_check()