Source code for lnn.symbolic.logic.formula

##
# Copyright 2023 IBM Corp. All Rights Reserved.
#
# SPDX-License-Identifier: Apache-2.0
##

# flake8: noqa: E501

import logging
import itertools
import typing
from typing import Optional, Union, Tuple, Iterator, Set, List, Dict

from abc import ABC

from .variable import Variable
from .. import _trace
from ... import _utils, _exceptions, utils
from ...constants import Fact, World

import copy
import torch
import numpy as np

_utils.logger_setup()
subclasses: typing.Dict[str, object] = {}


def _isinstance(obj, class_str) -> bool:
    """
    Returns True if an object is an instance of a class give that class name as a
    string, otherwise False. The check is performed using the subclasses dictionary.
    To see what the subclasses dictionary is populated with, refer to the init file of
    this module.
    """
    return isinstance(obj, subclasses[class_str])


[docs]class Formula(ABC): r"""Symbolic container for a generic formula.""" def __init__( self, *formulae: "Formula", name: Optional[str] = "", arity: int = None, **kwds ): # construct edge and operand list for each formula self.edge_list = list() self.operands: List[Formula] = list() # formula arity self.arity: int = arity # inherit propositional, variables, and graphs self.propositional: bool = kwds.get("propositional") self.variables: Tuple[Variable, ...] = kwds.get("variables") self._inherit_from_subformulae(*formulae) # formula naming if _isinstance(self, "_LeafFormula"): self.structure: str = name self.name: str = name else: self.structure: str = self._formula_structure(True) self.name: str = ( name if name else self._formula_structure(True, lambda x: x.name) ) # formula grounding table maps grounded objects to table rows self.grounding_table = None if self.propositional else dict() # placeholder for neural variables and functions self.neuron = None self.func = None self.func_inv = None self.formula_number = None self.operands_by_number = list() self.congruent_nodes = list() ## # External function definitions ## def add_facts(self, *args, **kwds): raise NameError(f"`add_facts` is deprecated, use `add_data` instead")
[docs] def add_data( self, data: Union[ Union[bool, Fact, float, Tuple[float, float]], Dict[ Union[str, Tuple[str, ...]], Union[bool, Fact, float, Tuple[float, float]], ], ], ): r"""Add data to the formula in the form of classical facts or belief bounds. Data given is a Fact or belief bounds assumes a propositional formula. Data given in a dict assumes a first-order logic formula, keyed by the grounding and a value given as a Fact or belief bounds. Parameters ---------- data : Fact, belief bounds or dict For propositional formulae, truths is given as either Facts or belief bounds. These beliefs can be given as a bool, float or a float-range, i.e. a tuple of 2 floats. For first-order logic formula, inputs truths are given as a dict. It is keyed by the grounding (a str for unary formlae or tuple of strings of larger arities), with values also as Facts or bounds on beliefs. Examples -------- ```python # propositional P, Q = Propositions("P", "Q") P.add_data(Fact.TRUE) Q.add_data((.1, .4)) ``` ```python # first-order logic Person = Predicate("Person") Person.add_data({ "Barack Obama": Fact.TRUE, "Bo": (.1, .4) }) # FOL with arity > 2 BD = Predicate("Birthdate", 2) BD.add_data({ ("Barack Obama", "04 August 1961"): Fact.TRUE, ("Bo", "09 October 2008"): (.6, .75) }) ``` """ if self.propositional: # Propositional facts if isinstance(data, bool): data = Fact.TRUE if data else Fact.FALSE _exceptions.AssertBounds(data) self.neuron.add_data(data) return # FOL facts if not isinstance(data, dict): # facts given per grounding raise Exception( "FOL facts should be from [dict, set], " f'"{self}" received {type(data)}' ) # replace fact keys (str -> tuple[str]) groundings = tuple(data.keys()) for g in groundings: bounds = data.pop(g) if isinstance(bounds, bool): bounds = Fact.TRUE if bounds else Fact.FALSE data[self._ground(g)] = bounds # add missing groundings to `grounding_table` groundings = tuple(data.keys()) self._add_groundings(*groundings) # set facts for all groundings table_facts = {self.grounding_table[g]: data[g] for g in groundings} self.neuron.add_data(table_facts)
[docs] def add_labels( self, labels: Union[ Union[Fact, Tuple[float, float]], Dict[Union[str, Tuple[str, ...]], Union[Fact, Tuple[float, float]]], ], ): r"""Store labels as data within the symbolic container. Labels given is a Fact or belief bounds assumes a propositional formula. Labels given in a dict assumes a first-order logic formula, keyed by the grounding and a value given as a Fact or belief bounds. Parameters ---------- labels : Fact, belief bounds or dict For propositional formulae, facts are given as either Facts or bounds on beliefs (a tuple of 2 floats). For first-order logic formula, inputs are given as a dict. It is keyed by the grounding (a str for unary formlae or tuple of strings of larger arities), with values also as Facts or bounds on beliefs. Examples -------- ```python # propositional P, Q = Propositions("P", "Q") P.add_labels(Fact.TRUE) Q.add_labels((.1, .4)) ``` ```python # first-order logic Person = Predicate("Person") Person.add_labels({ "Barack Obama": Fact.TRUE, "Bo": (.1, .4) }) # FOL with arity > 2 BD = Predicate("Birthdate", 2) BD.add_labels({ ("Barack Obama", "04 August 1961"): Fact.TRUE, ("Bo", "09 October 2008"): (.6, .75) }) ``` """ if self.propositional: # Propositional labels _exceptions.AssertBounds(labels) self.labels = _utils.fact_to_bounds(labels, self.propositional) else: # FOL labels if not hasattr(self, "labels"): self.labels = dict() if isinstance(labels, dict): # labels given per grounding _labels = copy.copy(labels) for g in tuple(_labels.keys()): # set labels for groundings, replace str keys -> Groundings self.labels[self._ground(g)] = _utils.fact_to_bounds( _labels.pop(g), self.propositional ) else: raise Exception( "FOL facts should be from [dict, set], " f'"{self}" received {type(labels)}' )
[docs] def flush(self): r"""Set all facts in formula to `Fact.UNKNOWN`.""" self.neuron.flush()
def get_facts(self, *args, **kwds): raise NameError(f"`get_facts` is deprecated, use `get_data` instead")
[docs] def get_data( self, *groundings: Union[str, Tuple[str, ...], tuple[str]] ) -> torch.Tensor: r"""Returns the current beliefs of the formula. Uses the given groundings or the entire `groundind_table` to slice out the current belief bounds from the `bounds_table`. Parameters ------- ``*groundings``: str or tuple of str NB - if unspecified, defaults to returning all the facts for the given formula. If specified, the table will be ordered according to the input grounding order. """ if self.propositional or len(groundings) == 0: return self.neuron.get_data() table_rows = [self.grounding_table.get(self._ground(g)) for g in groundings] return self.neuron.get_data(table_rows, default=True)
[docs] def get_labels(self, *groundings: str) -> torch.Tensor: r"""Returns the stored labels from the symbolic container.""" if self.propositional or len(groundings) == 0: return self.labels result = torch.stack([self.labels.get(self._ground(g)) for g in groundings]) return result[0] if len(groundings) == 1 else result
@property def groundings(self) -> Set[Union[str, Tuple[str, ...]]]: r"""Returns the groundings to the user as str or tuple of str.""" if self.grounding_table: return set(self.grounding_table.keys()) return set() @property def is_classically_resolved(self): r"""Checks if the query node is in a classical state besides `Fact.UNKNOWN`.""" if self.propositional and _utils.is_classical_proposition( tuple(self.get_data().tolist()) ): return True return False
[docs] def is_contradiction( self, bounds: torch.Tensor = None, stacked=False ) -> torch.BoolTensor: r"""Check if there are any bounds in contradiction.""" return self.contradicting_bounds(bounds, stacked).any()
[docs] def contradicting_bounds( self, bounds: torch.Tensor = None, stacked=False ) -> torch.BoolTensor: r"""Check which bounds are in contradiction.""" if stacked: tensor = torch.logical_or( self.neuron.is_contradiction(bounds[..., 0]), self.neuron.is_contradiction(bounds[..., 1]), ) return tensor.type(torch.bool) return self.neuron.is_contradiction(bounds)
[docs] def contradicting_stacked_bounds( self, bounds: torch.Tensor = None ) -> torch.BoolTensor: r"""Check if bounds are in contradiction."""
[docs] def is_equal(self, other: "Formula") -> bool: r"""Returns True if two nodes are symbolically equivalent. Formulae can be symbolically equivalent yet neurally different, example: ```python f = And(A, B, activation={"type": NeuralActivation.Lukasiewicz}) g = And(A, B, activation={"type": NeuralActivation.Godel}) ``` The above will return `True`, since `f` is symbolically equivalent to `g` despite different neural configurations. """ if self is other: return True result = list() for f in self.congruent_nodes: if _isinstance(f, "Congruent"): result += [ other.structure == operand.structure for operand in f.operands ] return any(result)
def is_unweighted(self) -> bool: return all([self.neuron.bias == w for w in self.neuron.weights]) def negation_absorption( self, store: bool = True ) -> (List[int], Tuple[Tuple["Formula"]]): n_negations = [] edge_replace = [] operands = [] def recurse(formula): for operand in formula.operands: if _isinstance(operand, "Not"): recurse(operand) root_idx = len(n_negations) - 1 n_negations[root_idx] += 1 condition = [True] * len(self.neuron.weights) condition[root_idx] = bool(root_idx % 2) if store: self.neuron.update_weights_where( condition, -self.neuron.weights ) self.operands[root_idx] = self.operands[root_idx].operands[0] edge_replace.append( (self.edge_list[root_idx], (self, self.operands[root_idx])) ) else: n_negations.append(0) operands.append(operand) recurse(self) if store: if edge_replace: logging.info(f"ABSORBED NEGATIONS INTO WEIGHTS FOR: '{self.name}'") return edge_replace, n_negations else: return operands, edge_replace, n_negations def named_parameters(self) -> Iterator[Tuple[str, torch.Tensor]]: return self.neuron.named_parameters() def parameters(self) -> Iterator[torch.Tensor]: return self.neuron.parameters() def params( self, *params: str, detach: bool = False ) -> Union[torch.Tensor, List[torch.Tensor]]: result = list() for param in params: if param in dict(self.neuron.named_parameters()): result.append( getattr(self.neuron, param).clone().detach() if detach else getattr(self.neuron, param) ) else: raise ValueError(f"{self} has no attribute: {param}") return result[0] if len(params) == 1 else result
[docs] def print( self, header_len: int = 50, roundoff: int = 5, params: bool = False, grads: bool = False, numbering: bool = False, ): r"""Print the states of groundings in a formula. ``` OPEN Proposition: A UNKNOWN (L, U) CLOSED Proposition: B FALSE (L, U) OPEN Predicate: P1 (x) "const_1" TRUE (L, U) "const_2" CONTRADICTION (L, U) "const_3" FALSE (L, U) OPEN Predicate: P2 (x, y) ("const_1", "const_5") TRUE (L, U) ("const_2", "const_6") CONTRADICTION (L, U) ("const_3", "const_7") FALSE (L, U) OPEN And: And_0 (x, y) Bindings: P1 (x: "const_1"), P2 (x: 'const_2', y: ['const_3', ...]) ('const_1', 'const_3') TRUE (L, U) ('const_2', 'const_3') CONTRADICTION (L, U) ('const_1', 'const_7') FALSE (L, U) TRUE Forall: Forall_0 (y) UNKNOWN (L, U) ``` """ def state_wrapper(grounding: Tuple[str]): return f"{grounding}" def round_bounds(grounding=None): data = self.get_data(grounding) if len(data.shape) > 1: data = data[0] return tuple([round(r, roundoff) for r in data.tolist()]) header = ( f"{str(self.world_state(True))} " f"{self.__class__.__name__}" f": {self.name}" ) if params: params = dict() for name, symbol in _utils.param_symbols.items(): if hasattr(self.neuron, name): val = getattr(self.neuron, name) params[symbol] = f"{np.around(val.tolist(), roundoff)}" + ( f" grads {np.around(val.grad.tolist(), roundoff)}" if grads and val.grad is not None else "" ) params = "params " + ", ".join([f"{k}: {v}" for k, v in params.items()]) else: params = "" number = ( f"{self.formula_number} {self.operands_by_number}: " if self.formula_number is not None and numbering else "" ) # print propositional node - single bounds if self.propositional: states = ( f"{header:<{header_len}} " f"{self.state().name:>13} " f"{round_bounds()}\n" f"{params}" ) print(f"{number}{states}") # print FOL node - table of bounds else: facts = ( [""] if self.grounding_table is None else ( [ ( f"{state_wrapper(g):{header_len}} " f"{self.state(g).name:>13} " f"{round_bounds(g)}\n" ) for g in self.grounding_table ] ) ) binder = list() for op_idx, op in enumerate(self.operands): if not self.propositional and self._has_bindings(op_idx): for i in range(len(self.var_remap[op_idx])): if None not in self.bindings[op_idx][i]: binder.append( f"{str(op)} " "{" f"{str(self.operand_map[op_idx][i])}: " f"{list(map(str, self.bindings[op_idx][i]))}" "}" ) bind_str = ("\nBindings: " + ", ".join(binder)) if binder else "" header = f"{header} {bind_str}" params = f"\n{params}" if params else "" header = f"{header}{params}" result = f"{header}\n" + "".join(facts) print(f"{number}{result}")
def project_params(self): self.neuron.project_params() def rename(self, name: str): self.name = name def reset_bounds(self): self.neuron.reset_bounds() def reset_world(self, world: World): _exceptions.AssertBoundsLen(world) self.world = world if isinstance(world, tuple) else world.value self.neuron.reset_world(world) def set_formula_number(self, idx) -> int: self.formula_number = idx for operand in self.operands: if operand.formula_number is None: idx = operand.set_formula_number(idx + 1) self.operands_by_number.append(operand.formula_number) return idx
[docs] def set_negative_weights( self, is_negative: bool = True, store: bool = True ) -> (List[int], Tuple[Tuple["Formula"]]): """absorb `Not` into weights and allow negative computation""" self.neuron.set_negative_weights(is_negative) return self.negation_absorption(store=store)
[docs] def set_propositional(self, propositional: bool): r"""Set's the neuron's world parameter.""" self.propositional = propositional
@property def shape(self) -> torch.Size: return self.neuron.bounds_table.shape
[docs] def state( self, groundings: Union[ str, Tuple[str, ...], List[str], List[Tuple[str, ...]] ] = None, to_bool: bool = False, bounds: torch.Tensor = None, ) -> Union[torch.Tensor, Dict[Union[str, Tuple[str, ...]], torch.Tensor]]: r"""Returns the state of a single grounded fact. if to_bool flag is True, will map classical Facts to bool: i.e. {Fact.True: True, FALSE': False} The rest of the node states will return as str Notes ----- see section [F.2](https://arxiv.org/abs/2006.13155) for more information on node states """ if self.propositional: result = self.neuron.state() else: if bounds is None: if groundings is None or isinstance(groundings, list): result = { g: self.state(g) for g in ( self.grounding_table if groundings is None else groundings ) } return result groundings = self._ground(groundings) bounds = self.get_data(groundings) if len(bounds) == 0: raise LookupError(f"grounding {groundings} not found in {str(self)}") result = self.neuron.state(bounds[None, :])[0] result = _utils.node_state(result) return utils.fact_to_bool(result) if to_bool else result
@property def unique_var_map(self) -> Dict: result = {u: i for i, u in enumerate(self.unique_vars)} if _isinstance(self, "_Quantifier"): for idx, v in enumerate(self.variables): result[v] = idx + len(self.unique_vars) return result @property def world(self) -> World: r"""Proxy to get the neuron's world parameter.""" return self.neuron.world @world.setter def world(self, new_world: Union[Tuple, World]): r"""Proxy to set the neuron's world parameter.""" self.neuron.world = new_world
[docs] def world_state(self, name=False): r"""Returns the state of the `world` variable.""" try: w = World(self.world) return w.name if name else w except ValueError: return self.world
def And(self, *formulae: "Formula", **kwds) -> "And": return subclasses["And"](*self._formula_vars(self, *formulae), **kwds) def Or(self, *formulae: "Formula", **kwds) -> "Or": return subclasses["Or"](*self._formula_vars(self, *formulae), **kwds) def Implies(self, formula: "Formula", **kwds) -> "Implies": return subclasses["Implies"](*self._formula_vars(self, formula), **kwds) def Not(self, **kwds) -> "Not": return subclasses["Not"](*self._formula_vars(self), **kwds) def Iff(self, formula: "Formula", **kwds) -> "Iff": return subclasses["Iff"](*self._formula_vars(self, formula), **kwds) def Exists(self, **kwds) -> "Exists": return subclasses["Exists"](self.unique_vars, self, **kwds) def Forall(self, **kwds) -> "Forall": return subclasses["Forall"](*self.unique_vars, self, **kwds) ## # Internal function definitions ## def __call__( self, *variables: Variable, ) -> Tuple[ "Formula", List[Variable], Tuple[Variable, ...], Tuple[Union[List[tuple[str]], List[None]]], ]: r"""Variable remapping between operator and operand variables. Examples -------- FOL formulae that appear in connectives are callable: Note `A`, `B` in the `And` ```python x = Variable('x') model['A'] = Predicate('A') model['B'] = Predicate('B') model['AB'] = And(A(x), B(x)) ``` Returns ------- Formula: reference to the child object variables: tuple of child's Variables var_remap: tuple of parent-to-child remapping Variables bindings: tuple of parent-to-child groundings to bind inference to single groundings """ if len(variables) != self.num_unique_vars: raise Exception( f"please check if the FOL arity of {self} is correctly set " "for the number of variables " f"variables length ({len(variables)}) must be the same " f"length as `num_unique_vars` ({self.num_unique_vars})" ) bindings = list() bind = dict() variable_objects = list() for v in variables: if isinstance(v, str): bindings.append([self._ground(v, arity_match=False)]) continue elif isinstance(v, Variable): variable_objects.append(v) else: raise TypeError(f"expected variable, received {type(v)}") if v in bind: if isinstance(bind[v], str): # a single binding bindings.append([self._ground(bind[v], arity_match=False)]) elif isinstance(bind[v], list): # multiple bindings bindings.append( [self._ground(g, arity_match=False) for g in bind[v]] ) elif isinstance(bind[v], Function): # A single function bindings.append([bind[v]]) elif isinstance(bind[v], tuple): # A bound function bindings.append([bind[v]]) else: raise TypeError( f"bindings expected as str, " f"Function or list of str or " f"Function, received {type(bind[v])}" ) else: bindings.append([None]) return self, self.variables, tuple(variable_objects), bindings def __str__(self) -> str: return self.name def __eq__(self, other): eq_condition = (self.structure == other.structure) and ( self.neuron == other.neuron ) return eq_condition def __hash__(self): return hash(self.structure) def _add_groundings(self, *groundings: tuple[str]): r"""Adds missing groundings to `grounding_table` for those not yet stored. Examples -------- ```python # for formulae with arity == 1: formula._add_groundings(tuple_1, tuple_2) ``` ```python # for formulae with arity > 1: groundings = ((tuple_1, tuple_7), (tuple_2, tuple_8)) formula._add_groundings(*groundings) ``` Warning ------- groundings must be given in grounded form: `self._ground(grounding)` """ missing_groundings = {g for g in groundings if g not in self.grounding_table} if len(missing_groundings) == 0: self._has_new_groundings = False return self._has_new_groundings = True table_rows = self.neuron.extend_groundings(len(missing_groundings)) self.grounding_table.update( {g: table_rows[i] for i, g in enumerate(missing_groundings)} ) def _formula_structure(self, as_int, get_str=lambda x: x.structure) -> str: r"""Determine a name for input formula(e).""" def subformula_structure( subformula: Formula, operator: str = None, subformula_vars: Tuple = None, root_var_remap: dict = None, ) -> str: if subformula_vars is None: root_var_remap = dict( (k, k) for k in ( self.expanded_unique_vars if _isinstance(self, "_Quantifier") else self.unique_vars ) ) else: if _isinstance(subformula, "_Quantifier"): for v in subformula.expanded_unique_vars: if v not in root_var_remap: root_var_remap[v] = v if subformula_vars != subformula.unique_vars: root_var_remap = dict( ( ( subformula.unique_vars[i], root_var_remap[subformula_vars[i]], ) if subformula.unique_vars[i] != subformula_vars[i] else (subformula_vars[i], subformula_vars[i]) for i in range(len(subformula_vars)) ) ) result = [ ( f"{f.name}(" + _utils.list_to_str( [ root_var_remap[v] if not as_int else subformula.unique_var_map[root_var_remap[v]] if _isinstance(subformula, "_Quantifier") else self.unique_var_map[root_var_remap[v]] for v in [ subformula.expanded_unique_vars[_] if _isinstance(subformula, "_Quantifier") else subformula.unique_vars[_] for _ in subformula.operand_map[i] ] ] if subformula.operand_map[i] else [] ) + ")" ) if _isinstance(f, "Predicate") else f"{f.name}" if _isinstance(f, "Proposition") else f"{f.structure}" if _isinstance(f, "_Quantifier") else ( "(" + subformula_structure( f, f.connective_str, subformula.var_remap[i], root_var_remap ) + ")" ) if _isinstance(f, "_ConnectiveNeuron") else ( f"{f.connective_str}" + subformula_structure( f, None, subformula.var_remap[i], root_var_remap ) ) if _isinstance(f, "Not") else "" for i, f in enumerate(subformula.operands) ] return f" {operator} ".join(result) if operator else "".join(result) if _isinstance(self, "_Quantifier"): quantified_idxs = _utils.list_to_str( [self.unique_var_map[v] for v in self.variables], "," ) return ( f"({self.connective_str}{quantified_idxs}, " f"{subformula_structure(self)})" ) elif _isinstance(self, "Not"): return ( f"{self.connective_str}{get_str(self.operands[0])}" if self.propositional else f"{self.connective_str}{subformula_structure(self)}" ) return ( ( "(" + f" {self.connective_str} ".join([get_str(f) for f in self.operands]) + ")" ) if self.propositional else f"({subformula_structure(self, self.connective_str)})" ) @staticmethod def _formula_vars(*formulae: "Formula") -> List[Union["Formula", Tuple]]: r"""Returns a list of formulae as wither called (when predicates) or uncalled as connectives. """ variables = list( Variable(f"?{i}") for i in range(max([f.arity for f in formulae])) ) return [ f(*variables[: f.arity]) if (not f.propositional and _isinstance(f, "Predicate")) else f for f in formulae ] def _ground( self, grounding: Union[str, Tuple[str, ...], tuple[str]], arity_match: bool = True, ) -> tuple[str]: r"""Returns a single grounded object given a grounding in str form. If the grounding is already an internal "Grounding" object, it will simply be returned to the user as is. Examples -------- ```python # for arity == 1 self._ground('str_1') ``` ```python # for arity > 1 grounding = ('str_1', 'str_2', ...) self._ground(grounding) ``` Warning ------- This function can only ground one object at a time due to tuple confusion for multiple objects """ if isinstance(grounding, tuple): return grounding if isinstance(grounding, str): return (grounding,) if Formula._is_grounded(grounding): return grounding if isinstance(grounding, tuple): if not all([type(g) in [str, None] for g in grounding]): raise Exception( "expected groundings as tuple of str for num_unique_vars " f" > 1, received {type(grounding)} {grounding}" ) if len(grounding) != self.num_unique_vars and arity_match: raise Exception( "expected grounding length to be of " f"arity {self.num_unique_vars}, received {grounding}" ) else: if self.num_unique_vars != 1 and arity_match: raise Exception( f"{self} received str as grounding, expected grounding " f"({grounding}) as a tuple of len {self.num_unique_vars}" ) return grounding @property def _groundings(self) -> Set[tuple[str]]: """Internal usage to extract groundings""" return set(self.grounding_table.keys()) def _has_bindings(self, slot: int = None) -> bool: r"""Returns True if Formula has any bindings.""" if isinstance(slot, int): return ( True if self.bindings[slot] and any( [ self.bindings[slot][i][j] for i in range(len(self.bindings[slot])) for j in range(len(self.bindings[slot][i])) ] ) else False ) return any([self._has_bindings(i) for i in range(len(self.bindings))]) def _inherit_from_subformulae(self, *subformula: Union["Formula", Tuple]): r"""Builds the local context for a formula using subformulae. provides manual variable remapping if given with variables: i.e. Formula used as a function via `__call__` alternatively inherits remapping from operands if formula is not called """ # inheritance variables subformulae = list() _operand_vars: List[Union[Tuple[Variable, ...], None]] = [None] * self.arity _var_remap: List[Union[Tuple[Variable, ...], None]] = [None] * self.arity _bindings: List[Union[Tuple[tuple[str], ...], None]] = [None] * self.arity for slot, f in enumerate(subformula): # variable remapping from `called` operands: # # ```python # P = Predicate("P", arity=2) # Q = Predicate("P", arity=2) # And(P(x, y), Q(y, z)) # ``` if isinstance(f, tuple): subformulae.append(f[0]) _operand_vars[slot] = f[1] _var_remap[slot] = f[2] _bindings[slot] = f[3] self.edge_list.append((self, f[0])) self.operands.append(f[0]) # automatically inherit variable remapping from `uncalled` operands # for higher level connective formulae: # # ```python # Implies(And(...), Or(...)) # ``` else: if not f.propositional: _var_remap[slot] = f.unique_vars _operand_vars[slot] = f.unique_vars _bindings[slot] = [[None]] * len(f.unique_vars) subformulae.append(f) self.edge_list.append((self, f)) self.operands.append(f) self.operands: Tuple[Formula] = tuple(self.operands) # set class variables as read-only tuples self.operand_vars: Tuple[Tuple[Variable, ...], ...] = tuple(_operand_vars) self.var_remap: Tuple[Tuple[Variable, ...], ...] = tuple(_var_remap) self.bindings: Tuple[Tuple[tuple[str], ...], ...] = tuple(_bindings) # inherit propositional flag from children if self.propositional is None: if _isinstance(self, "_Quantifier"): self.set_propositional( not subclasses["_Quantifier"]._has_free_variables( self.variables, self.operands[0] ) ) else: self.set_propositional( False if any([not f.propositional for f in subformulae]) else True ) # inherit variables from children if _isinstance(self, "_Quantifier") or ( not self.propositional and not _isinstance(self, "_LeafFormula") ): remap = self.var_remap if _isinstance(self, "_Quantifier") and hasattr(self, "variables"): remap = ( subclasses["_Quantifier"]._unique_variables_overlap( self.variables, self.var_remap[0] ), ) self._update_variables(*remap) self._update_map() # expand formula graph to include all subformulae if subformulae: self.edge_list.extend([edge for f in subformulae for edge in f.edge_list]) @staticmethod def _is_grounded(groundings: Union[tuple[str], str, Tuple[str, ...]]) -> bool: r"""Returns True if the grounding is given in internal "Grounded" form.""" return isinstance(groundings, tuple) @staticmethod def _unique_variables(*variables: Tuple[Variable, ...]) -> Tuple: r"""Combines all predicate variables into a unique tuple the tuple is sorted by the order of appearance of variables in the operands. """ result = list() for op_vars in variables: if op_vars: for v in op_vars: if v not in result: result.append(v) return tuple(result) def _update_map(self) -> None: r"""Update the 'operand_map' to map parent groundings to operand groundings.""" self.operand_map: List[Tuple] = [None] * len(self.operand_vars) for op_idx in range(len(self.var_remap)): if self.var_remap[op_idx]: op_map = [ self.unique_vars.index(op_var) if op_var in self.unique_vars else self.variables.index(op_var) + len(self.unique_vars) for op_var in self.var_remap[op_idx] ] self.operand_map[op_idx] = tuple(op_map) def _update_variables(self, *variables: Tuple[Variable, ...]) -> None: r""" **Notes** If the formula grounding_arity is not specified, it will be inherited from the variables. If variables are not specified, both the `grounding_arity` and variables are inherited from groundings """ self.unique_vars = Formula._unique_variables(*variables) self.num_unique_vars = len(self.unique_vars) def _contradiction_loss(self, coeff: float = None) -> torch.Tensor: r"""Contradiction loss.""" if coeff is None: coeff = 1 bounds = self.get_data() x = bounds[..., 0] - bounds[..., 1] return (self.contradicting_bounds() * coeff * x).sum() def _uncertainty_loss(self, coeff: float = None) -> torch.Tensor: r"""Uncertainty loss.""" if coeff is None: coeff = 0 bounds = self.get_data() x = bounds[..., 1] - bounds[..., 0] return (self.is_contradiction().logical_not() * coeff * x).sum() def _supervised_loss(self, coeff: float = None) -> Union[None, torch.Tensor]: r"""Supervised loss.""" if coeff is None: coeff = 1 loss = torch.nn.MSELoss() if ( not hasattr(self, "labels") or (self.propositional and not self.labels.numel()) or (not self.propositional and not self.labels) ): return labels = self.get_labels() if self.propositional: return coeff * loss(self.get_data(), labels) groundings = [g for g in labels if g in self.grounding_table] if len(groundings) == 0: return return coeff * loss(self.get_data(*groundings), self.get_labels(*groundings)) increment_param_history = _trace.increment_param_history