Source code for lnn.symbolic.logic.unary_operator

##
# Copyright 2023 IBM Corp. All Rights Reserved.
#
# SPDX-License-Identifier: Apache-2.0
##

# flake8: noqa: E501

import logging
from typing import Union, Tuple, Set

import pandas as pd
import torch

from .connective_formula import _ConnectiveFormula
from .formula import Formula
from .neural_activation import _NeuralActivation
from .node_activation import _NodeActivation
from .variable import Variable
from .. import _gm
from ... import _utils
from ...constants import Fact, Direction, Bound
from torch.nn.parameter import Parameter

_utils.logger_setup()


class _UnaryOperator(_ConnectiveFormula):
    r"""Restrict operators to 1 input."""

    def __init__(self, *formula: Formula, **kwds):
        if len(formula) != 1:
            raise Exception(
                "Unary operator expect 1 formula as input, received " f"{len(formula)}"
            )
        super().__init__(*formula, arity=1, **kwds)


class _Quantifier(_UnaryOperator):
    r"""Symbolic container for quantifiers.

    Parameters
    ------------
    kwds : dict
        fully_grounded : bool
            specifies if a full upward inference can be done on a
            quantifier due to all the groundings being present inside it.
            This applies to the lower bound of a `Forall` and upper bound
            of an `Exists`

    Attributes
    ----------
    fully_grounded : bool
    unique_var_slots : tuple of int
        returns the slot index of each unique variable

    """

    def __init__(self, *args, **kwds):
        variables = (
            args[:-1]
            if len(args) > 1
            else args[-1][0].unique_vars
            if isinstance(args[-1], tuple)
            else args[-1].unique_vars
        )
        super().__init__(args[-1], variables=variables, **kwds)
        self.fully_grounded = kwds.get("fully_grounded", False)
        self._grounding_set = set()
        self.operator = None

        if isinstance(self, Forall):
            self.operator = "And"

        if isinstance(self, Exists):
            self.operator = "Or"

        self._set_activation(**kwds)
        self.neurons = []
        self.free_vars = [
            self.operands[0].unique_vars.index(v) for v in self.unique_vars
        ]

        if len(self.free_vars) == 0:
            self.neuron.weights = Parameter(
                torch.tensor([1.0]), self.neuron.weights.requires_grad
            )

        self._new_groundings = []

    @property
    def expanded_unique_vars(self):
        result = list(self.unique_vars)
        for idx, v in enumerate(self.variables):
            result.append(v)
        return tuple(result)

    @staticmethod
    def _unique_variables_overlap(
        source: Tuple[Variable, ...], destination: Tuple[Variable, ...]
    ) -> Tuple[Variable, ...]:
        """combines all predicate variables into a unique tuple
        the tuple is sorted by the order of appearance of variables in
        the operands
        """
        result = list()
        for dst_var in destination:
            if dst_var not in source:
                result.append(dst_var)
        return tuple(result)

    def upward(self, **kwds) -> float:
        operand = self.operands[0]

        if len(operand.grounding_table) == 0:
            return 0.0

        if len(self.free_vars) == 0:
            return self._fully_quantified_upward()

        input_bounds = operand.get_data()
        if input_bounds is None:
            return 0.0

        df, grouped = self._get_groupings()

        bound = None
        if not self.fully_grounded:
            bound = Bound.UPPER if isinstance(self, Forall) else Bound.LOWER

        result = 0
        bounds_table = []
        for indices in grouped:
            grounding = tuple(df.iloc[indices[0], :].tolist()[:-1])
            arity = len(indices)

            if self.grounding_table is None:
                self.grounding_table = {}

            if grounding in self.grounding_table:
                neuron_id = self.grounding_table[grounding]
                neuron = self.neurons[neuron_id]

                if neuron.arity != len(indices):
                    neuron = self._add_neuron(arity, neuron_id)
            else:
                self._add_grounding(grounding)
                neuron = self._add_neuron(len(indices))

            ib = input_bounds[indices].permute([1, 0])[None, :, :]
            result += neuron.aggregate_bounds([0], neuron.func(ib), bound)
            bounds_table.append(neuron.get_data())

        self.neuron.bounds_table = torch.vstack(bounds_table)

        return result

    def downward(self, **kwds) -> Union[torch.Tensor, None]:
        operand = self.operands[0]

        if len(operand.grounding_table) == 0:
            return 0.0

        if len(self.free_vars) == 0:
            return self._fully_quantified_downward()

        self._propagate_groundings()

        input_bounds = operand.get_data()
        if input_bounds is None:
            return 0.0

        df, grouped = self._get_groupings()
        bounds = input_bounds.detach().clone()
        for indices in grouped:
            grounding = tuple(df.iloc[indices[0], :].tolist()[:-1])
            ib = bounds[indices].permute([1, 0])[None, :, :]
            neuron_id = self.grounding_table[grounding]
            neuron = self.neurons[neuron_id]

            if neuron.arity != len(indices):
                neuron = self._add_neuron(len(indices), neuron_id)

            bounds[indices] = neuron.func_inv(neuron.get_data(), ib)[0].permute([1, 0])

        if isinstance(operand, _Quantifier):
            result = 0
            for i, neuron in enumerate(operand.neurons):
                result += neuron.aggregate_bounds([0], bounds[None, i])

            return result

        return operand.neuron.aggregate_bounds(df.index.to_list(), bounds)

    def _add_grounding(self, grounding: tuple[str]):
        self.grounding_table[grounding] = len(self.grounding_table)

    def _add_groundings(self, *groundings: tuple[str]):
        for g in groundings:
            if g not in self.grounding_table:
                self._add_grounding(g)
                self._add_neuron(len(g))
                self.neuron.extend_groundings(1)
                self._new_groundings.append(g)

    def _get_groupings(self):
        operand = self.operands[0]
        groundings = operand.grounding_table.keys()
        df = pd.DataFrame(groundings)
        df = df[self.free_vars]
        df["index"] = df.index
        grouped = df.groupby(by=self.free_vars)["index"].apply(list)
        return df, grouped

    def _create_neuron(self, arity):
        kwds = {"propositional": False, "arity": arity, "world": self.world}
        neuron = _NeuralActivation()(activation={"weights_learning": False}, **kwds)
        neuron.extend_groundings(1)
        neuron.func = neuron.activation(self.operator, direction=Direction.UPWARD)
        neuron.func_inv = neuron.activation(self.operator, direction=Direction.DOWNWARD)
        return neuron

    def _add_neuron(self, arity, index=None):
        neuron = self._create_neuron(arity)

        if index is None:
            self.neurons.append(neuron)
        else:
            self.neurons[index] = neuron

        return neuron

    def _fully_quantified_upward(self):
        operand = self.operands[0]

        input_bounds = operand.get_data()
        if input_bounds is None:
            return 0.0

        bound = None
        if not self.fully_grounded:
            bound = Bound.UPPER if isinstance(self, Forall) else Bound.LOWER

        input_bounds = input_bounds.permute([1, 0])[None, :, :]

        self.neuron = self._create_neuron(arity=len(operand.grounding_table))
        self.func = self.neuron.func
        result = self.neuron.aggregate_bounds([0], self.func(input_bounds), bound)
        self.neuron.bounds_table = self.neuron.bounds_table[0]
        return result

    def _fully_quantified_downward(self):
        operand = self.operands[0]

        if isinstance(operand, _Quantifier):
            result = 0
            for i, operand_neuron in enumerate(operand.neurons):
                ib = operand_neuron.get_data().permute([1, 0])[None, :, :]
                bounds = self.func_inv(self.get_data()[None, :], ib)
                bounds = bounds[0].permute([1, 0])
                result += operand_neuron.aggregate_bounds([0], bounds[None, 0])

            return result

        groundings = list(operand.grounding_table.values())
        bounds = self.func_inv(
            self.get_data().repeat(len(operand.get_data()), 1),
            operand.get_data()[:, :, None],
        )
        return operand.neuron.aggregate_bounds(groundings, bounds[..., 0])

    def _propagate_groundings(self):
        if len(self._new_groundings):
            operand = self.operands[0]
            groundings = operand.grounding_table.keys()
            new_groundings_df = pd.DataFrame(self._new_groundings)
            df = pd.DataFrame(groundings)
            new_groundings_df.columns = df[self.free_vars].columns
            df = df.drop(self.free_vars, axis=1)
            merged = _gm._full_outer_join(new_groundings_df, df)
            merged.sort_index(axis=1, inplace=True)

            for grounding in merged.itertuples(index=False, name=None):
                grounding_object = operand._ground(grounding)
                operand._add_groundings(grounding_object)

            self._new_groundings = []

    def _set_activation(self, **kwds):
        """Updates the neural activation according to grounding dimension size

        The computation of a quantifier is implemented via one of the weighed
            neurons, And/Or for Forall/Exists.
        At present, weighted quantifiers have not been well studied and is
            therefore turned off
        However the dimension of computation is different, computing over the
            groundings of the input formula instead of multiple formulae, since
            there can only be one formula to quantify over.
        The activation is therefore required to grow according to number of
            groundings present in the formula, which can grow as groundings
            propagate via inference.

        """
        kwds.setdefault("arity", len(self._grounding_set))
        kwds.setdefault("propositional", self.propositional)
        self.neuron = _NeuralActivation()(
            activation={"weights_learning": False}, **kwds
        )
        self.func = self.neuron.activation(self.operator, direction=Direction.UPWARD)
        self.func_inv = self.neuron.activation(
            self.operator, direction=Direction.DOWNWARD
        )

    @staticmethod
    def _has_free_variables(variables: Tuple[Variable, ...], operand: Formula) -> bool:
        r"""Returns True if the quantifier contains free variables."""
        return len(set(variables)) != len({*operand.unique_vars})

    @property
    def true_groundings(
        self,
    ) -> Set[Union[str, Tuple[str, ...]]]:
        r"""Returns a set of groundings that are True."""
        if isinstance(self.operands[0], _Quantifier):
            return self.operands[0].true_groundings

        return {
            g
            for g in self.operands[0].groundings
            if self.operands[0].state(g) is Fact.TRUE
        }

    def add_data(self, facts: Union[Tuple[float, float], Fact, Set]):
        super().add_data(facts)
        self._set_activation(world=self.world)


[docs]class Not(_UnaryOperator): r"""Symbolic Negation Returns a logical negation where inputs can be propositional, first-order logic predicates or other connectives. Parameters ------------ formula : Formula accepts a unary input Formula Examples -------- ```python # Propositional A = Proposition('A') Not(A) ``` ```python # First-order logic x, y = Variables('x', 'y') A = Predicate('A', arity=2) Not(A(x, y))) ``` """ def __init__(self, formula: Formula, **kwds): self.connective_str = "¬" super().__init__(formula, **kwds) kwds.setdefault("propositional", self.propositional) self.neuron = _NodeActivation()(**kwds.get("activation", {}), **kwds)
[docs] def upward(self, **kwds) -> float: r"""Upward inference from the operands to the operator. Returns ------- tightened_bounds : float The amount of bounds tightening or new information that is leaned by the inference step. """ if self.propositional: groundings = {None} else: groundings = tuple(self.operands[0]._groundings) for g in groundings: if g not in self.grounding_table: self._add_groundings(g) bounds = self.neuron.aggregate_bounds( None, _utils.negate_bounds(self.operands[0].get_data(*groundings)) ) if self.is_contradiction(): logging.info( "↑ CONTRADICTION " f"FOR:'{self.name}' " f"FORMULA:{self.formula_number} " ) return bounds
[docs] def downward(self, **kwds) -> torch.Tensor: r"""Downward inference from the operator to the operands. Parameters ---------- Returns ------- tightened_bounds : float The amount of bounds tightening or new information that is leaned by the inference step. """ if self.propositional: groundings = {None} else: groundings = tuple(self._groundings) for g in groundings: if g not in self.operands[0]._groundings: self.operands[0]._add_groundings(g) bounds = self.operands[0].neuron.aggregate_bounds( None, _utils.negate_bounds(self.get_data(*groundings)) ) if self.operands[0].is_contradiction(): logging.info( "↓ CONTRADICTION " f"FOR:'{self.operands[0].name}' " f"FROM:'{self.name}' " f"FORMULA:{self.operands[0].formula_number} " f"PARENT:{self.formula_number} " ) return bounds
[docs]class Exists(_Quantifier): r"""Symbolic existential quantifier. When working with belief bounds - existential operators restrict upward inference to only work with the given formula's lower bound. Downward inference behaves as usual. Parameters ---------- ``*variables`` : Variable formula : Formula The FOL formula to quantify over, may be a connective formula or a Predicate. Examples -------- No free variables, quantifies over all of the variables in the formula. ```python Some_1 = Exists(birthdate(p, d))) Some_2 = Exists(p, d, birthdate(p, d))) ``` Free variables, quantifies over a subset of variables in the formula. ```python Some = Exists(p, birthdate(p, d))) ``` Warning ------- Quantifier with free variables, not yet implemented. It is required that we quantify over all the variables given in the formula, either by specifying all the variables or but not specifying any variables - which is equivalent to quantifying over all variables. """ def __init__(self, *args, **kwds): variables = ( args[:-1] if len(args) > 1 else args[-1][0].unique_vars if isinstance(args[-1], tuple) else args[-1].unique_vars ) a = variables[0] formula = args[-1] if len(variables) > 1: args = [a, Exists(*variables[1:], formula, **kwds)] self.connective_str = "∃" super().__init__(*args, **kwds)
[docs]class Forall(_Quantifier): r"""Symbolic universal quantifier. When working with belief bounds - universal operators restrict upward inference to only work with the given formula's upper bound. Downward inference behaves as usual. Parameters ---------- ``*variables`` : Variable formula : Formula The FOL formula to quantify over, may be a connective formula or a Predicate. Examples -------- No free variables, quantifies over all of the variables in the formula. ```python All_1 = Forall(birthdate(p, d))) All_2 = Forall(p, d, birthdate(p, d))) ``` Free variables, quantifies over a subset of variables in the formula. ```python All = Forall(p, birthdate(p, d))) ``` Warning ------- Quantifier with free variables, not yet implemented. It is required that we quantify over all the variables given in the formula, either by specifying all the variables or but not specifying any variables - which is equivalent to quantifying over all variables. """ def __init__(self, *args, **kwds): variables = ( args[:-1] if len(args) > 1 else args[-1][0].unique_vars if isinstance(args[-1], tuple) else args[-1].unique_vars ) if len(variables) > 1: a = variables[0] formula = args[-1] args = [a, Forall(*variables[1:], formula, **kwds)] self.connective_str = "∀" super().__init__(*args, **kwds)