Kubeflow Pipelines Workflow ############################ The :ref:`api/backends:KFP Workflow` is a backend for configuring a *FunctionFuse* workflow graph as a *Kubeflow Pipeline*. In this implementation the code and requirements for the *FunctionFuse* graph are packaged in a container. The container is used within Kubernetes when the ``KFPWorkflow`` is ``.run()``. The DAG gets created within KFP with identical container and code for each Node, but on graph traversal the specific Node's execution functions are referened by an environment variable matching the Node name. The intention is to prevent complex code being written into yaml files. Instead identical simpler code is written to the yaml file for each KFP component, and then the main code is packaged inside the container. When a component is run, an environment variable is used to indicate the precise function to call. See the `Kubeflow Pipelines documentation `_ for more details on how to use KFP in general. After declaring a frontend-defined workflow, e.g.: .. highlight:: python .. code-block:: python from functionfuse import workflow @workflow def sum(a, b): return a + b a, b = 1, 1 a_plus_b = sum(a, b).set_name("node1") a_plus_two_b = sum(a_plus_b, b).set_name("node2") a ``KFPWorkflow`` can be instantiated, and the ``run()`` function will go through a series of steps to convert the *FunctionFuse* DAG to a compatible format for KFP, and immediately initiate a KFP Run. .. highlight:: python .. code-block:: python from functionfuse.backends.addons.kfpback import KFPWorkflow kfp_workflow = KFPWorkflow(a_plus_two_b, workflow_name="sums") kfp_workflow.run() Run Sequence ------------- ``KFPWorkflow.run()`` triggers a sequence of operations: #. Build a container by copying the folder that python was executed in to a folder ``/ffrun/`` on the baseimage #. Push that container to a specified docker registry (see below for setting registry options) #. A KFP component preparation stage then traverses the graph and for each Node: #. gathers ``type`` of all inputs #. for inputs that are Node results, replace the type with the KFP ``InputBinaryFile(bytes)`` #. a new function is created (see the Note below on what that function does), and the signature of that function is then assigned to ensure that arguments are named with types provided #. a KFP component is created using ``create_component_from_func()`` -- note that this means that types for the KFP ``component_spec`` actually are defined by the inputs passed to a Node at runtime #. A ``@kfp.dsl.pipeline()`` is created by traversing the graph again, this time calling each Node's KFP component. An environment variable called ``FFFUNCTION`` is set in the container for each component to indicate which Node is being run. #. Connect to the KFP Client and call ``client.create_run_from_pipeline_func()`` with the create pipeline function to start a run The generated function ----------------------- During KFP component creation, a new function called ``exec_func`` is created for each Node, and the KFP component is created from the new ``exec_func``. Each KFP component is run standalone in a container, so to get access to global information, and to run arbitrary functions with any signatures (to maintain the simplicity of the *FunctionFuse* frontend), we call the entire code again from scratch within ``exec_func``. An environment variable ``FFFUNCTION`` is used to divert the processing to run function of a Node with name matching the contents of ``FFFUNCTION``. The general flow of ``exec_func`` is as follows: #. Load data from ``InputBinaryFile`` sources, and extract indexed results to pass as the arguments to the ``Node.func``. #. Arguments are then set as attributes of the ``os`` module in the ``os.ffargs`` and ``os.ffkargs`` variables. #. Call ``import`` on the original file that was used to set up the workflow. This, of course, calls ``KFPWorkflow.run()`` again, but this time the presence of the ``FFFUNCTION`` environment variable diverts to code flow before creating the KFP components, and instead traverses the graph to find the Node whose name matches ``FFFUNCION``. #. The arguments are retrieved by importing ``os`` and the ``Node.func`` is called. Returned values are assinged to ``os.ffresult``. #. Back in ``exec_func``, os.ffresult is pickled to an ``OutputBinaryFile``. This can then be retrieved by subsequent KFP components. .. warning:: Due to the use of ``import`` on the original file that starts the workflow, the workflow run must be triggered in that file without the use of a ``if __name__ == "__main__"`` statement. All code that is not installed on the container (or specified for KFP to install using ``packages_to_install``) must be in the folder that the original python script is executed from. Kubernetes interactions ------------------------ Before calling ``kfp_workflow.run()``, additional settings may be required. The created container will be pushed to a container registry that the KFP Host Kuberenetes cluster has access to. If this is a private registry, information for a docker login command should be provided: .. highlight:: python .. code-block:: python # baseimage defaults to python. This image contains requirements, but is in a private registry: baseimage = "docker-na.artifactory.swg-devops.com/res-hcls-mcm-brain-docker-local/particles-py3.10:1.5" # Provide credential used by docker login: registry_credentials = {"server": "docker-na.artifactory.swg-devops.com/res-hcls-mcm-brain-docker-local", "username": "user@us.ibm.com", "password": "xxxxx"} kfp_workflow.set_baseimage(baseimage) kfp_workflow.set_registry_credentials(registry_credentials) kfp_workflow.run() The Kubernetes cluster should also have access to this registry via an ``ImagePullSecret`` in the ``kubeflow`` namespace. This can be created using kubectl (this should probably use a service account rather than a user's personal login to the registry): .. code-block:: kubectl create secret -n kubeflow docker-registry regcred \ --docker-server=docker-na.artifactory.swg-devops.com/res-hcls-mcm-brain-docker-local \ --docker-username=user@us.ibm.com \ --docker-password=xxxxx The default address for the KFP host is ``"http://localhost:3000"``. To use a different address set: .. highlight:: python .. code-block:: python kfp_host = "http://
:" kfp_workflow.set_kfp_host(kfp_host) kfp_workflow.run() KFP Dashboard -------------- Workflow runs logged in the KFP system: .. image:: /images/KFPDashboardRuns.png :alt: List of Pipeline Runs An individual Run shows information about the components run, locations of inputs and outputs, logs, and results: .. image:: /images/KFPDashboardRunResult.png :alt: Pipeline Run Result