Source code for lnn.symbolic.logic.formula
##
# Copyright 2023 IBM Corp. All Rights Reserved.
#
# SPDX-License-Identifier: Apache-2.0
##
# flake8: noqa: E501
import logging
import itertools
import typing
from typing import Optional, Union, Tuple, Iterator, Set, List, Dict
from abc import ABC
from .variable import Variable
from .. import _trace
from ... import _utils, _exceptions, utils
from ...constants import Fact, World
import copy
import torch
import numpy as np
_utils.logger_setup()
subclasses: typing.Dict[str, object] = {}
def _isinstance(obj, class_str) -> bool:
"""
Returns True if an object is an instance of a class give that class name as a
string, otherwise False. The check is performed using the subclasses dictionary.
To see what the subclasses dictionary is populated with, refer to the init file of
this module.
"""
return isinstance(obj, subclasses[class_str])
[docs]class Formula(ABC):
r"""Symbolic container for a generic formula."""
def __init__(
self, *formulae: "Formula", name: Optional[str] = "", arity: int = None, **kwds
):
# construct edge and operand list for each formula
self.edge_list = list()
self.operands: List[Formula] = list()
# formula arity
self.arity: int = arity
# inherit propositional, variables, and graphs
self.propositional: bool = kwds.get("propositional")
self.variables: Tuple[Variable, ...] = kwds.get("variables")
self._inherit_from_subformulae(*formulae)
# formula naming
if _isinstance(self, "_LeafFormula"):
self.structure: str = name
self.name: str = name
else:
self.structure: str = self._formula_structure(True)
self.name: str = (
name if name else self._formula_structure(True, lambda x: x.name)
)
# formula grounding table maps grounded objects to table rows
self.grounding_table = None if self.propositional else dict()
# placeholder for neural variables and functions
self.neuron = None
self.func = None
self.func_inv = None
self.formula_number = None
self.operands_by_number = list()
self.congruent_nodes = list()
##
# External function definitions
##
def add_facts(self, *args, **kwds):
raise NameError(f"`add_facts` is deprecated, use `add_data` instead")
[docs] def add_data(
self,
data: Union[
Union[bool, Fact, float, Tuple[float, float]],
Dict[
Union[str, Tuple[str, ...]],
Union[bool, Fact, float, Tuple[float, float]],
],
],
):
r"""Add data to the formula in the form of classical facts or belief bounds.
Data given is a Fact or belief bounds assumes a propositional formula.
Data given in a dict assumes a first-order logic formula,
keyed by the grounding and a value given as a Fact or belief bounds.
Parameters
----------
data : Fact, belief bounds or dict
For propositional formulae, truths is given as either Facts or belief bounds. These beliefs can be given as a bool, float or a float-range, i.e. a tuple of 2 floats. For first-order logic formula, inputs truths are given as a dict. It is keyed by the grounding (a str for unary formlae or tuple of strings of larger arities), with values also as Facts or bounds on beliefs.
Examples
--------
```python
# propositional
P, Q = Propositions("P", "Q")
P.add_data(Fact.TRUE)
Q.add_data((.1, .4))
```
```python
# first-order logic
Person = Predicate("Person")
Person.add_data({
"Barack Obama": Fact.TRUE,
"Bo": (.1, .4)
})
# FOL with arity > 2
BD = Predicate("Birthdate", 2)
BD.add_data({
("Barack Obama", "04 August 1961"): Fact.TRUE,
("Bo", "09 October 2008"): (.6, .75)
})
```
"""
if self.propositional: # Propositional facts
if isinstance(data, bool):
data = Fact.TRUE if data else Fact.FALSE
_exceptions.AssertBounds(data)
self.neuron.add_data(data)
return
# FOL facts
if not isinstance(data, dict): # facts given per grounding
raise Exception(
"FOL facts should be from [dict, set], "
f'"{self}" received {type(data)}'
)
# replace fact keys (str -> tuple[str])
groundings = tuple(data.keys())
for g in groundings:
bounds = data.pop(g)
if isinstance(bounds, bool):
bounds = Fact.TRUE if bounds else Fact.FALSE
data[self._ground(g)] = bounds
# add missing groundings to `grounding_table`
groundings = tuple(data.keys())
self._add_groundings(*groundings)
# set facts for all groundings
table_facts = {self.grounding_table[g]: data[g] for g in groundings}
self.neuron.add_data(table_facts)
[docs] def add_labels(
self,
labels: Union[
Union[Fact, Tuple[float, float]],
Dict[Union[str, Tuple[str, ...]], Union[Fact, Tuple[float, float]]],
],
):
r"""Store labels as data within the symbolic container.
Labels given is a Fact or belief bounds assumes a propositional formula.
Labels given in a dict assumes a first-order logic formula,
keyed by the grounding and a value given as a Fact or belief bounds.
Parameters
----------
labels : Fact, belief bounds or dict
For propositional formulae, facts are given as either Facts or bounds on beliefs (a tuple of 2 floats). For first-order logic formula, inputs are given as a dict. It is keyed by the grounding (a str for unary formlae or tuple of strings of larger arities), with values also as Facts or bounds on beliefs.
Examples
--------
```python
# propositional
P, Q = Propositions("P", "Q")
P.add_labels(Fact.TRUE)
Q.add_labels((.1, .4))
```
```python
# first-order logic
Person = Predicate("Person")
Person.add_labels({
"Barack Obama": Fact.TRUE,
"Bo": (.1, .4)
})
# FOL with arity > 2
BD = Predicate("Birthdate", 2)
BD.add_labels({
("Barack Obama", "04 August 1961"): Fact.TRUE,
("Bo", "09 October 2008"): (.6, .75)
})
```
"""
if self.propositional: # Propositional labels
_exceptions.AssertBounds(labels)
self.labels = _utils.fact_to_bounds(labels, self.propositional)
else: # FOL labels
if not hasattr(self, "labels"):
self.labels = dict()
if isinstance(labels, dict): # labels given per grounding
_labels = copy.copy(labels)
for g in tuple(_labels.keys()):
# set labels for groundings, replace str keys -> Groundings
self.labels[self._ground(g)] = _utils.fact_to_bounds(
_labels.pop(g), self.propositional
)
else:
raise Exception(
"FOL facts should be from [dict, set], "
f'"{self}" received {type(labels)}'
)
def get_facts(self, *args, **kwds):
raise NameError(f"`get_facts` is deprecated, use `get_data` instead")
[docs] def get_data(
self, *groundings: Union[str, Tuple[str, ...], tuple[str]]
) -> torch.Tensor:
r"""Returns the current beliefs of the formula.
Uses the given groundings or the entire `groundind_table` to slice out the
current belief bounds from the `bounds_table`.
Parameters
-------
``*groundings``: str or tuple of str
NB - if unspecified, defaults to returning all the facts for the given formula. If specified, the table will be ordered according to the input grounding order.
"""
if self.propositional or len(groundings) == 0:
return self.neuron.get_data()
table_rows = [self.grounding_table.get(self._ground(g)) for g in groundings]
return self.neuron.get_data(table_rows, default=True)
[docs] def get_labels(self, *groundings: str) -> torch.Tensor:
r"""Returns the stored labels from the symbolic container."""
if self.propositional or len(groundings) == 0:
return self.labels
result = torch.stack([self.labels.get(self._ground(g)) for g in groundings])
return result[0] if len(groundings) == 1 else result
@property
def groundings(self) -> Set[Union[str, Tuple[str, ...]]]:
r"""Returns the groundings to the user as str or tuple of str."""
if self.grounding_table:
return set(self.grounding_table.keys())
return set()
@property
def is_classically_resolved(self):
r"""Checks if the query node is in a classical state besides `Fact.UNKNOWN`."""
if self.propositional and _utils.is_classical_proposition(
tuple(self.get_data().tolist())
):
return True
return False
[docs] def is_contradiction(
self, bounds: torch.Tensor = None, stacked=False
) -> torch.BoolTensor:
r"""Check if there are any bounds in contradiction."""
return self.contradicting_bounds(bounds, stacked).any()
[docs] def contradicting_bounds(
self, bounds: torch.Tensor = None, stacked=False
) -> torch.BoolTensor:
r"""Check which bounds are in contradiction."""
if stacked:
tensor = torch.logical_or(
self.neuron.is_contradiction(bounds[..., 0]),
self.neuron.is_contradiction(bounds[..., 1]),
)
return tensor.type(torch.bool)
return self.neuron.is_contradiction(bounds)
[docs] def contradicting_stacked_bounds(
self, bounds: torch.Tensor = None
) -> torch.BoolTensor:
r"""Check if bounds are in contradiction."""
[docs] def is_equal(self, other: "Formula") -> bool:
r"""Returns True if two nodes are symbolically equivalent.
Formulae can be symbolically equivalent yet neurally different, example:
```python
f = And(A, B, activation={"type": NeuralActivation.Lukasiewicz})
g = And(A, B, activation={"type": NeuralActivation.Godel})
```
The above will return `True`, since `f` is symbolically equivalent to `g`
despite different neural configurations.
"""
if self is other:
return True
result = list()
for f in self.congruent_nodes:
if _isinstance(f, "Congruent"):
result += [
other.structure == operand.structure for operand in f.operands
]
return any(result)
def is_unweighted(self) -> bool:
return all([self.neuron.bias == w for w in self.neuron.weights])
def negation_absorption(
self, store: bool = True
) -> (List[int], Tuple[Tuple["Formula"]]):
n_negations = []
edge_replace = []
operands = []
def recurse(formula):
for operand in formula.operands:
if _isinstance(operand, "Not"):
recurse(operand)
root_idx = len(n_negations) - 1
n_negations[root_idx] += 1
condition = [True] * len(self.neuron.weights)
condition[root_idx] = bool(root_idx % 2)
if store:
self.neuron.update_weights_where(
condition, -self.neuron.weights
)
self.operands[root_idx] = self.operands[root_idx].operands[0]
edge_replace.append(
(self.edge_list[root_idx], (self, self.operands[root_idx]))
)
else:
n_negations.append(0)
operands.append(operand)
recurse(self)
if store:
if edge_replace:
logging.info(f"ABSORBED NEGATIONS INTO WEIGHTS FOR: '{self.name}'")
return edge_replace, n_negations
else:
return operands, edge_replace, n_negations
def named_parameters(self) -> Iterator[Tuple[str, torch.Tensor]]:
return self.neuron.named_parameters()
def parameters(self) -> Iterator[torch.Tensor]:
return self.neuron.parameters()
def params(
self, *params: str, detach: bool = False
) -> Union[torch.Tensor, List[torch.Tensor]]:
result = list()
for param in params:
if param in dict(self.neuron.named_parameters()):
result.append(
getattr(self.neuron, param).clone().detach()
if detach
else getattr(self.neuron, param)
)
else:
raise ValueError(f"{self} has no attribute: {param}")
return result[0] if len(params) == 1 else result
[docs] def print(
self,
header_len: int = 50,
roundoff: int = 5,
params: bool = False,
grads: bool = False,
numbering: bool = False,
):
r"""Print the states of groundings in a formula.
```
OPEN Proposition: A UNKNOWN (L, U)
CLOSED Proposition: B FALSE (L, U)
OPEN Predicate: P1 (x)
"const_1" TRUE (L, U)
"const_2" CONTRADICTION (L, U)
"const_3" FALSE (L, U)
OPEN Predicate: P2 (x, y)
("const_1", "const_5") TRUE (L, U)
("const_2", "const_6") CONTRADICTION (L, U)
("const_3", "const_7") FALSE (L, U)
OPEN And: And_0 (x, y)
Bindings: P1 (x: "const_1"), P2 (x: 'const_2', y: ['const_3', ...])
('const_1', 'const_3') TRUE (L, U)
('const_2', 'const_3') CONTRADICTION (L, U)
('const_1', 'const_7') FALSE (L, U)
TRUE Forall: Forall_0 (y) UNKNOWN (L, U)
```
"""
def state_wrapper(grounding: Tuple[str]):
return f"{grounding}"
def round_bounds(grounding=None):
data = self.get_data(grounding)
if len(data.shape) > 1:
data = data[0]
return tuple([round(r, roundoff) for r in data.tolist()])
header = (
f"{str(self.world_state(True))} "
f"{self.__class__.__name__}"
f": {self.name}"
)
if params:
params = dict()
for name, symbol in _utils.param_symbols.items():
if hasattr(self.neuron, name):
val = getattr(self.neuron, name)
params[symbol] = f"{np.around(val.tolist(), roundoff)}" + (
f" grads {np.around(val.grad.tolist(), roundoff)}"
if grads and val.grad is not None
else ""
)
params = "params " + ", ".join([f"{k}: {v}" for k, v in params.items()])
else:
params = ""
number = (
f"{self.formula_number} {self.operands_by_number}: "
if self.formula_number is not None and numbering
else ""
)
# print propositional node - single bounds
if self.propositional:
states = (
f"{header:<{header_len}} "
f"{self.state().name:>13} "
f"{round_bounds()}\n"
f"{params}"
)
print(f"{number}{states}")
# print FOL node - table of bounds
else:
facts = (
[""]
if self.grounding_table is None
else (
[
(
f"{state_wrapper(g):{header_len}} "
f"{self.state(g).name:>13} "
f"{round_bounds(g)}\n"
)
for g in self.grounding_table
]
)
)
binder = list()
for op_idx, op in enumerate(self.operands):
if not self.propositional and self._has_bindings(op_idx):
for i in range(len(self.var_remap[op_idx])):
if None not in self.bindings[op_idx][i]:
binder.append(
f"{str(op)} "
"{"
f"{str(self.operand_map[op_idx][i])}: "
f"{list(map(str, self.bindings[op_idx][i]))}"
"}"
)
bind_str = ("\nBindings: " + ", ".join(binder)) if binder else ""
header = f"{header} {bind_str}"
params = f"\n{params}" if params else ""
header = f"{header}{params}"
result = f"{header}\n" + "".join(facts)
print(f"{number}{result}")
def project_params(self):
self.neuron.project_params()
def rename(self, name: str):
self.name = name
def reset_bounds(self):
self.neuron.reset_bounds()
def reset_world(self, world: World):
_exceptions.AssertBoundsLen(world)
self.world = world if isinstance(world, tuple) else world.value
self.neuron.reset_world(world)
def set_formula_number(self, idx) -> int:
self.formula_number = idx
for operand in self.operands:
if operand.formula_number is None:
idx = operand.set_formula_number(idx + 1)
self.operands_by_number.append(operand.formula_number)
return idx
[docs] def set_negative_weights(
self, is_negative: bool = True, store: bool = True
) -> (List[int], Tuple[Tuple["Formula"]]):
"""absorb `Not` into weights and allow negative computation"""
self.neuron.set_negative_weights(is_negative)
return self.negation_absorption(store=store)
[docs] def set_propositional(self, propositional: bool):
r"""Set's the neuron's world parameter."""
self.propositional = propositional
@property
def shape(self) -> torch.Size:
return self.neuron.bounds_table.shape
[docs] def state(
self,
groundings: Union[
str, Tuple[str, ...], List[str], List[Tuple[str, ...]]
] = None,
to_bool: bool = False,
bounds: torch.Tensor = None,
) -> Union[torch.Tensor, Dict[Union[str, Tuple[str, ...]], torch.Tensor]]:
r"""Returns the state of a single grounded fact.
if to_bool flag is True, will map classical Facts to bool:
i.e. {Fact.True: True, FALSE': False}
The rest of the node states will return as str
Notes
-----
see section [F.2](https://arxiv.org/abs/2006.13155) for more
information on node states
"""
if self.propositional:
result = self.neuron.state()
else:
if bounds is None:
if groundings is None or isinstance(groundings, list):
result = {
g: self.state(g)
for g in (
self.grounding_table if groundings is None else groundings
)
}
return result
groundings = self._ground(groundings)
bounds = self.get_data(groundings)
if len(bounds) == 0:
raise LookupError(f"grounding {groundings} not found in {str(self)}")
result = self.neuron.state(bounds[None, :])[0]
result = _utils.node_state(result)
return utils.fact_to_bool(result) if to_bool else result
@property
def unique_var_map(self) -> Dict:
result = {u: i for i, u in enumerate(self.unique_vars)}
if _isinstance(self, "_Quantifier"):
for idx, v in enumerate(self.variables):
result[v] = idx + len(self.unique_vars)
return result
@property
def world(self) -> World:
r"""Proxy to get the neuron's world parameter."""
return self.neuron.world
@world.setter
def world(self, new_world: Union[Tuple, World]):
r"""Proxy to set the neuron's world parameter."""
self.neuron.world = new_world
[docs] def world_state(self, name=False):
r"""Returns the state of the `world` variable."""
try:
w = World(self.world)
return w.name if name else w
except ValueError:
return self.world
def And(self, *formulae: "Formula", **kwds) -> "And":
return subclasses["And"](*self._formula_vars(self, *formulae), **kwds)
def Or(self, *formulae: "Formula", **kwds) -> "Or":
return subclasses["Or"](*self._formula_vars(self, *formulae), **kwds)
def Implies(self, formula: "Formula", **kwds) -> "Implies":
return subclasses["Implies"](*self._formula_vars(self, formula), **kwds)
def Not(self, **kwds) -> "Not":
return subclasses["Not"](*self._formula_vars(self), **kwds)
def Iff(self, formula: "Formula", **kwds) -> "Iff":
return subclasses["Iff"](*self._formula_vars(self, formula), **kwds)
def Exists(self, **kwds) -> "Exists":
return subclasses["Exists"](self.unique_vars, self, **kwds)
def Forall(self, **kwds) -> "Forall":
return subclasses["Forall"](*self.unique_vars, self, **kwds)
##
# Internal function definitions
##
def __call__(
self,
*variables: Variable,
) -> Tuple[
"Formula",
List[Variable],
Tuple[Variable, ...],
Tuple[Union[List[tuple[str]], List[None]]],
]:
r"""Variable remapping between operator and operand variables.
Examples
--------
FOL formulae that appear in connectives are callable:
Note `A`, `B` in the `And`
```python
x = Variable('x')
model['A'] = Predicate('A')
model['B'] = Predicate('B')
model['AB'] = And(A(x), B(x))
```
Returns
-------
Formula:
reference to the child object
variables:
tuple of child's Variables
var_remap:
tuple of parent-to-child remapping Variables
bindings:
tuple of parent-to-child groundings to bind inference to single groundings
"""
if len(variables) != self.num_unique_vars:
raise Exception(
f"please check if the FOL arity of {self} is correctly set "
"for the number of variables "
f"variables length ({len(variables)}) must be the same "
f"length as `num_unique_vars` ({self.num_unique_vars})"
)
bindings = list()
bind = dict()
variable_objects = list()
for v in variables:
if isinstance(v, str):
bindings.append([self._ground(v, arity_match=False)])
continue
elif isinstance(v, Variable):
variable_objects.append(v)
else:
raise TypeError(f"expected variable, received {type(v)}")
if v in bind:
if isinstance(bind[v], str): # a single binding
bindings.append([self._ground(bind[v], arity_match=False)])
elif isinstance(bind[v], list): # multiple bindings
bindings.append(
[self._ground(g, arity_match=False) for g in bind[v]]
)
elif isinstance(bind[v], Function): # A single function
bindings.append([bind[v]])
elif isinstance(bind[v], tuple): # A bound function
bindings.append([bind[v]])
else:
raise TypeError(
f"bindings expected as str, "
f"Function or list of str or "
f"Function, received {type(bind[v])}"
)
else:
bindings.append([None])
return self, self.variables, tuple(variable_objects), bindings
def __str__(self) -> str:
return self.name
def __eq__(self, other):
eq_condition = (self.structure == other.structure) and (
self.neuron == other.neuron
)
return eq_condition
def __hash__(self):
return hash(self.structure)
def _add_groundings(self, *groundings: tuple[str]):
r"""Adds missing groundings to `grounding_table` for those not yet stored.
Examples
--------
```python
# for formulae with arity == 1:
formula._add_groundings(tuple_1, tuple_2)
```
```python
# for formulae with arity > 1:
groundings = ((tuple_1, tuple_7),
(tuple_2, tuple_8))
formula._add_groundings(*groundings)
```
Warning
-------
groundings must be given in grounded form: `self._ground(grounding)`
"""
missing_groundings = {g for g in groundings if g not in self.grounding_table}
if len(missing_groundings) == 0:
self._has_new_groundings = False
return
self._has_new_groundings = True
table_rows = self.neuron.extend_groundings(len(missing_groundings))
self.grounding_table.update(
{g: table_rows[i] for i, g in enumerate(missing_groundings)}
)
def _formula_structure(self, as_int, get_str=lambda x: x.structure) -> str:
r"""Determine a name for input formula(e)."""
def subformula_structure(
subformula: Formula,
operator: str = None,
subformula_vars: Tuple = None,
root_var_remap: dict = None,
) -> str:
if subformula_vars is None:
root_var_remap = dict(
(k, k)
for k in (
self.expanded_unique_vars
if _isinstance(self, "_Quantifier")
else self.unique_vars
)
)
else:
if _isinstance(subformula, "_Quantifier"):
for v in subformula.expanded_unique_vars:
if v not in root_var_remap:
root_var_remap[v] = v
if subformula_vars != subformula.unique_vars:
root_var_remap = dict(
(
(
subformula.unique_vars[i],
root_var_remap[subformula_vars[i]],
)
if subformula.unique_vars[i] != subformula_vars[i]
else (subformula_vars[i], subformula_vars[i])
for i in range(len(subformula_vars))
)
)
result = [
(
f"{f.name}("
+ _utils.list_to_str(
[
root_var_remap[v]
if not as_int
else subformula.unique_var_map[root_var_remap[v]]
if _isinstance(subformula, "_Quantifier")
else self.unique_var_map[root_var_remap[v]]
for v in [
subformula.expanded_unique_vars[_]
if _isinstance(subformula, "_Quantifier")
else subformula.unique_vars[_]
for _ in subformula.operand_map[i]
]
]
if subformula.operand_map[i]
else []
)
+ ")"
)
if _isinstance(f, "Predicate")
else f"{f.name}"
if _isinstance(f, "Proposition")
else f"{f.structure}"
if _isinstance(f, "_Quantifier")
else (
"("
+ subformula_structure(
f, f.connective_str, subformula.var_remap[i], root_var_remap
)
+ ")"
)
if _isinstance(f, "_ConnectiveNeuron")
else (
f"{f.connective_str}"
+ subformula_structure(
f, None, subformula.var_remap[i], root_var_remap
)
)
if _isinstance(f, "Not")
else ""
for i, f in enumerate(subformula.operands)
]
return f" {operator} ".join(result) if operator else "".join(result)
if _isinstance(self, "_Quantifier"):
quantified_idxs = _utils.list_to_str(
[self.unique_var_map[v] for v in self.variables], ","
)
return (
f"({self.connective_str}{quantified_idxs}, "
f"{subformula_structure(self)})"
)
elif _isinstance(self, "Not"):
return (
f"{self.connective_str}{get_str(self.operands[0])}"
if self.propositional
else f"{self.connective_str}{subformula_structure(self)}"
)
return (
(
"("
+ f" {self.connective_str} ".join([get_str(f) for f in self.operands])
+ ")"
)
if self.propositional
else f"({subformula_structure(self, self.connective_str)})"
)
@staticmethod
def _formula_vars(*formulae: "Formula") -> List[Union["Formula", Tuple]]:
r"""Returns a list of formulae as wither called (when predicates)
or uncalled as connectives.
"""
variables = list(
Variable(f"?{i}") for i in range(max([f.arity for f in formulae]))
)
return [
f(*variables[: f.arity])
if (not f.propositional and _isinstance(f, "Predicate"))
else f
for f in formulae
]
def _ground(
self,
grounding: Union[str, Tuple[str, ...], tuple[str]],
arity_match: bool = True,
) -> tuple[str]:
r"""Returns a single grounded object given a grounding in str form.
If the grounding is already an internal "Grounding" object, it will simply be
returned to the user as is.
Examples
--------
```python
# for arity == 1
self._ground('str_1')
```
```python
# for arity > 1
grounding = ('str_1', 'str_2', ...)
self._ground(grounding)
```
Warning
-------
This function can only ground one object at a time due to tuple confusion
for multiple objects
"""
if isinstance(grounding, tuple):
return grounding
if isinstance(grounding, str):
return (grounding,)
if Formula._is_grounded(grounding):
return grounding
if isinstance(grounding, tuple):
if not all([type(g) in [str, None] for g in grounding]):
raise Exception(
"expected groundings as tuple of str for num_unique_vars "
f" > 1, received {type(grounding)} {grounding}"
)
if len(grounding) != self.num_unique_vars and arity_match:
raise Exception(
"expected grounding length to be of "
f"arity {self.num_unique_vars}, received {grounding}"
)
else:
if self.num_unique_vars != 1 and arity_match:
raise Exception(
f"{self} received str as grounding, expected grounding "
f"({grounding}) as a tuple of len {self.num_unique_vars}"
)
return grounding
@property
def _groundings(self) -> Set[tuple[str]]:
"""Internal usage to extract groundings"""
return set(self.grounding_table.keys())
def _has_bindings(self, slot: int = None) -> bool:
r"""Returns True if Formula has any bindings."""
if isinstance(slot, int):
return (
True
if self.bindings[slot]
and any(
[
self.bindings[slot][i][j]
for i in range(len(self.bindings[slot]))
for j in range(len(self.bindings[slot][i]))
]
)
else False
)
return any([self._has_bindings(i) for i in range(len(self.bindings))])
def _inherit_from_subformulae(self, *subformula: Union["Formula", Tuple]):
r"""Builds the local context for a formula using subformulae.
provides manual variable remapping if given with variables:
i.e. Formula used as a function via `__call__`
alternatively inherits remapping from operands if formula is not called
"""
# inheritance variables
subformulae = list()
_operand_vars: List[Union[Tuple[Variable, ...], None]] = [None] * self.arity
_var_remap: List[Union[Tuple[Variable, ...], None]] = [None] * self.arity
_bindings: List[Union[Tuple[tuple[str], ...], None]] = [None] * self.arity
for slot, f in enumerate(subformula):
# variable remapping from `called` operands:
#
# ```python
# P = Predicate("P", arity=2)
# Q = Predicate("P", arity=2)
# And(P(x, y), Q(y, z))
# ```
if isinstance(f, tuple):
subformulae.append(f[0])
_operand_vars[slot] = f[1]
_var_remap[slot] = f[2]
_bindings[slot] = f[3]
self.edge_list.append((self, f[0]))
self.operands.append(f[0])
# automatically inherit variable remapping from `uncalled` operands
# for higher level connective formulae:
#
# ```python
# Implies(And(...), Or(...))
# ```
else:
if not f.propositional:
_var_remap[slot] = f.unique_vars
_operand_vars[slot] = f.unique_vars
_bindings[slot] = [[None]] * len(f.unique_vars)
subformulae.append(f)
self.edge_list.append((self, f))
self.operands.append(f)
self.operands: Tuple[Formula] = tuple(self.operands)
# set class variables as read-only tuples
self.operand_vars: Tuple[Tuple[Variable, ...], ...] = tuple(_operand_vars)
self.var_remap: Tuple[Tuple[Variable, ...], ...] = tuple(_var_remap)
self.bindings: Tuple[Tuple[tuple[str], ...], ...] = tuple(_bindings)
# inherit propositional flag from children
if self.propositional is None:
if _isinstance(self, "_Quantifier"):
self.set_propositional(
not subclasses["_Quantifier"]._has_free_variables(
self.variables, self.operands[0]
)
)
else:
self.set_propositional(
False if any([not f.propositional for f in subformulae]) else True
)
# inherit variables from children
if _isinstance(self, "_Quantifier") or (
not self.propositional and not _isinstance(self, "_LeafFormula")
):
remap = self.var_remap
if _isinstance(self, "_Quantifier") and hasattr(self, "variables"):
remap = (
subclasses["_Quantifier"]._unique_variables_overlap(
self.variables, self.var_remap[0]
),
)
self._update_variables(*remap)
self._update_map()
# expand formula graph to include all subformulae
if subformulae:
self.edge_list.extend([edge for f in subformulae for edge in f.edge_list])
@staticmethod
def _is_grounded(groundings: Union[tuple[str], str, Tuple[str, ...]]) -> bool:
r"""Returns True if the grounding is given in internal "Grounded" form."""
return isinstance(groundings, tuple)
@staticmethod
def _unique_variables(*variables: Tuple[Variable, ...]) -> Tuple:
r"""Combines all predicate variables into a unique tuple
the tuple is sorted by the order of appearance of variables in
the operands.
"""
result = list()
for op_vars in variables:
if op_vars:
for v in op_vars:
if v not in result:
result.append(v)
return tuple(result)
def _update_map(self) -> None:
r"""Update the 'operand_map' to map parent groundings to operand groundings."""
self.operand_map: List[Tuple] = [None] * len(self.operand_vars)
for op_idx in range(len(self.var_remap)):
if self.var_remap[op_idx]:
op_map = [
self.unique_vars.index(op_var)
if op_var in self.unique_vars
else self.variables.index(op_var) + len(self.unique_vars)
for op_var in self.var_remap[op_idx]
]
self.operand_map[op_idx] = tuple(op_map)
def _update_variables(self, *variables: Tuple[Variable, ...]) -> None:
r"""
**Notes**
If the formula grounding_arity is not specified, it will be inherited
from the variables. If variables are not specified, both the
`grounding_arity` and variables are inherited from groundings
"""
self.unique_vars = Formula._unique_variables(*variables)
self.num_unique_vars = len(self.unique_vars)
def _contradiction_loss(self, coeff: float = None) -> torch.Tensor:
r"""Contradiction loss."""
if coeff is None:
coeff = 1
bounds = self.get_data()
x = bounds[..., 0] - bounds[..., 1]
return (self.contradicting_bounds() * coeff * x).sum()
def _uncertainty_loss(self, coeff: float = None) -> torch.Tensor:
r"""Uncertainty loss."""
if coeff is None:
coeff = 0
bounds = self.get_data()
x = bounds[..., 1] - bounds[..., 0]
return (self.is_contradiction().logical_not() * coeff * x).sum()
def _supervised_loss(self, coeff: float = None) -> Union[None, torch.Tensor]:
r"""Supervised loss."""
if coeff is None:
coeff = 1
loss = torch.nn.MSELoss()
if (
not hasattr(self, "labels")
or (self.propositional and not self.labels.numel())
or (not self.propositional and not self.labels)
):
return
labels = self.get_labels()
if self.propositional:
return coeff * loss(self.get_data(), labels)
groundings = [g for g in labels if g in self.grounding_table]
if len(groundings) == 0:
return
return coeff * loss(self.get_data(*groundings), self.get_labels(*groundings))
increment_param_history = _trace.increment_param_history