### Copyright 2023 IBM Corp. All Rights Reserved.## SPDX-License-Identifier: Apache-2.0### flake8: noqa: E501importloggingfromtypingimportUnion,Tuple,Setimportpandasaspdimporttorchfrom.connective_formulaimport_ConnectiveFormulafrom.formulaimportFormulafrom.neural_activationimport_NeuralActivationfrom.node_activationimport_NodeActivationfrom.variableimportVariablefrom..import_gmfrom...import_utilsfrom...constantsimportFact,Direction,Boundfromtorch.nn.parameterimportParameter_utils.logger_setup()class_UnaryOperator(_ConnectiveFormula):r"""Restrict operators to 1 input."""def__init__(self,*formula:Formula,**kwds):iflen(formula)!=1:raiseException("Unary operator expect 1 formula as input, received "f"{len(formula)}")super().__init__(*formula,arity=1,**kwds)class_Quantifier(_UnaryOperator):r"""Symbolic container for quantifiers. Parameters ------------ kwds : dict fully_grounded : bool specifies if a full upward inference can be done on a quantifier due to all the groundings being present inside it. This applies to the lower bound of a `Forall` and upper bound of an `Exists` Attributes ---------- fully_grounded : bool unique_var_slots : tuple of int returns the slot index of each unique variable """def__init__(self,*args,**kwds):variables=(args[:-1]iflen(args)>1elseargs[-1][0].unique_varsifisinstance(args[-1],tuple)elseargs[-1].unique_vars)super().__init__(args[-1],variables=variables,**kwds)self.fully_grounded=kwds.get("fully_grounded",False)self._grounding_set=set()self.operator=Noneifisinstance(self,Forall):self.operator="And"ifisinstance(self,Exists):self.operator="Or"self._set_activation(**kwds)self.neurons=[]self.free_vars=[self.operands[0].unique_vars.index(v)forvinself.unique_vars]iflen(self.free_vars)==0:self.neuron.weights=Parameter(torch.tensor([1.0]),self.neuron.weights.requires_grad)self._new_groundings=[]@propertydefexpanded_unique_vars(self):result=list(self.unique_vars)foridx,vinenumerate(self.variables):result.append(v)returntuple(result)@staticmethoddef_unique_variables_overlap(source:Tuple[Variable,...],destination:Tuple[Variable,...])->Tuple[Variable,...]:"""combines all predicate variables into a unique tuple the tuple is sorted by the order of appearance of variables in the operands """result=list()fordst_varindestination:ifdst_varnotinsource:result.append(dst_var)returntuple(result)defupward(self,**kwds)->float:operand=self.operands[0]iflen(operand.grounding_table)==0:return0.0iflen(self.free_vars)==0:returnself._fully_quantified_upward()input_bounds=operand.get_data()ifinput_boundsisNone:return0.0df,grouped=self._get_groupings()bound=Noneifnotself.fully_grounded:bound=Bound.UPPERifisinstance(self,Forall)elseBound.LOWERresult=0bounds_table=[]forindicesingrouped:grounding=tuple(df.iloc[indices[0],:].tolist()[:-1])arity=len(indices)ifself.grounding_tableisNone:self.grounding_table={}ifgroundinginself.grounding_table:neuron_id=self.grounding_table[grounding]neuron=self.neurons[neuron_id]ifneuron.arity!=len(indices):neuron=self._add_neuron(arity,neuron_id)else:self._add_grounding(grounding)neuron=self._add_neuron(len(indices))ib=input_bounds[indices].permute([1,0])[None,:,:]result+=neuron.aggregate_bounds([0],neuron.func(ib),bound)bounds_table.append(neuron.get_data())self.neuron.bounds_table=torch.vstack(bounds_table)returnresultdefdownward(self,**kwds)->Union[torch.Tensor,None]:operand=self.operands[0]iflen(operand.grounding_table)==0:return0.0iflen(self.free_vars)==0:returnself._fully_quantified_downward()self._propagate_groundings()input_bounds=operand.get_data()ifinput_boundsisNone:return0.0df,grouped=self._get_groupings()bounds=input_bounds.detach().clone()forindicesingrouped:grounding=tuple(df.iloc[indices[0],:].tolist()[:-1])ib=bounds[indices].permute([1,0])[None,:,:]neuron_id=self.grounding_table[grounding]neuron=self.neurons[neuron_id]ifneuron.arity!=len(indices):neuron=self._add_neuron(len(indices),neuron_id)bounds[indices]=neuron.func_inv(neuron.get_data(),ib)[0].permute([1,0])ifisinstance(operand,_Quantifier):result=0fori,neuroninenumerate(operand.neurons):result+=neuron.aggregate_bounds([0],bounds[None,i])returnresultreturnoperand.neuron.aggregate_bounds(df.index.to_list(),bounds)def_add_grounding(self,grounding:tuple[str]):self.grounding_table[grounding]=len(self.grounding_table)def_add_groundings(self,*groundings:tuple[str]):forgingroundings:ifgnotinself.grounding_table:self._add_grounding(g)self._add_neuron(len(g))self.neuron.extend_groundings(1)self._new_groundings.append(g)def_get_groupings(self):operand=self.operands[0]groundings=operand.grounding_table.keys()df=pd.DataFrame(groundings)df=df[self.free_vars]df["index"]=df.indexgrouped=df.groupby(by=self.free_vars)["index"].apply(list)returndf,groupeddef_create_neuron(self,arity):kwds={"propositional":False,"arity":arity,"world":self.world}neuron=_NeuralActivation()(activation={"weights_learning":False},**kwds)neuron.extend_groundings(1)neuron.func=neuron.activation(self.operator,direction=Direction.UPWARD)neuron.func_inv=neuron.activation(self.operator,direction=Direction.DOWNWARD)returnneurondef_add_neuron(self,arity,index=None):neuron=self._create_neuron(arity)ifindexisNone:self.neurons.append(neuron)else:self.neurons[index]=neuronreturnneurondef_fully_quantified_upward(self):operand=self.operands[0]input_bounds=operand.get_data()ifinput_boundsisNone:return0.0bound=Noneifnotself.fully_grounded:bound=Bound.UPPERifisinstance(self,Forall)elseBound.LOWERinput_bounds=input_bounds.permute([1,0])[None,:,:]self.neuron=self._create_neuron(arity=len(operand.grounding_table))self.func=self.neuron.funcresult=self.neuron.aggregate_bounds([0],self.func(input_bounds),bound)self.neuron.bounds_table=self.neuron.bounds_table[0]returnresultdef_fully_quantified_downward(self):operand=self.operands[0]ifisinstance(operand,_Quantifier):result=0fori,operand_neuroninenumerate(operand.neurons):ib=operand_neuron.get_data().permute([1,0])[None,:,:]bounds=self.func_inv(self.get_data()[None,:],ib)bounds=bounds[0].permute([1,0])result+=operand_neuron.aggregate_bounds([0],bounds[None,0])returnresultgroundings=list(operand.grounding_table.values())bounds=self.func_inv(self.get_data().repeat(len(operand.get_data()),1),operand.get_data()[:,:,None],)returnoperand.neuron.aggregate_bounds(groundings,bounds[...,0])def_propagate_groundings(self):iflen(self._new_groundings):operand=self.operands[0]groundings=operand.grounding_table.keys()new_groundings_df=pd.DataFrame(self._new_groundings)df=pd.DataFrame(groundings)new_groundings_df.columns=df[self.free_vars].columnsdf=df.drop(self.free_vars,axis=1)merged=_gm._full_outer_join(new_groundings_df,df)merged.sort_index(axis=1,inplace=True)forgroundinginmerged.itertuples(index=False,name=None):grounding_object=operand._ground(grounding)operand._add_groundings(grounding_object)self._new_groundings=[]def_set_activation(self,**kwds):"""Updates the neural activation according to grounding dimension size The computation of a quantifier is implemented via one of the weighed neurons, And/Or for Forall/Exists. At present, weighted quantifiers have not been well studied and is therefore turned off However the dimension of computation is different, computing over the groundings of the input formula instead of multiple formulae, since there can only be one formula to quantify over. The activation is therefore required to grow according to number of groundings present in the formula, which can grow as groundings propagate via inference. """kwds.setdefault("arity",len(self._grounding_set))kwds.setdefault("propositional",self.propositional)self.neuron=_NeuralActivation()(activation={"weights_learning":False},**kwds)self.func=self.neuron.activation(self.operator,direction=Direction.UPWARD)self.func_inv=self.neuron.activation(self.operator,direction=Direction.DOWNWARD)@staticmethoddef_has_free_variables(variables:Tuple[Variable,...],operand:Formula)->bool:r"""Returns True if the quantifier contains free variables."""returnlen(set(variables))!=len({*operand.unique_vars})@propertydeftrue_groundings(self,)->Set[Union[str,Tuple[str,...]]]:r"""Returns a set of groundings that are True."""ifisinstance(self.operands[0],_Quantifier):returnself.operands[0].true_groundingsreturn{gforginself.operands[0].groundingsifself.operands[0].state(g)isFact.TRUE}defadd_data(self,facts:Union[Tuple[float,float],Fact,Set]):super().add_data(facts)self._set_activation(world=self.world)
[docs]classNot(_UnaryOperator):r"""Symbolic Negation Returns a logical negation where inputs can be propositional, first-order logic predicates or other connectives. Parameters ------------ formula : Formula accepts a unary input Formula Examples -------- ```python # Propositional A = Proposition('A') Not(A) ``` ```python # First-order logic x, y = Variables('x', 'y') A = Predicate('A', arity=2) Not(A(x, y))) ``` """def__init__(self,formula:Formula,**kwds):self.connective_str="¬"super().__init__(formula,**kwds)kwds.setdefault("propositional",self.propositional)self.neuron=_NodeActivation()(**kwds.get("activation",{}),**kwds)
[docs]defupward(self,**kwds)->float:r"""Upward inference from the operands to the operator. Returns ------- tightened_bounds : float The amount of bounds tightening or new information that is leaned by the inference step. """ifself.propositional:groundings={None}else:groundings=tuple(self.operands[0]._groundings)forgingroundings:ifgnotinself.grounding_table:self._add_groundings(g)bounds=self.neuron.aggregate_bounds(None,_utils.negate_bounds(self.operands[0].get_data(*groundings)))ifself.is_contradiction():logging.info("↑ CONTRADICTION "f"FOR:'{self.name}' "f"FORMULA:{self.formula_number} ")returnbounds
[docs]defdownward(self,**kwds)->torch.Tensor:r"""Downward inference from the operator to the operands. Parameters ---------- Returns ------- tightened_bounds : float The amount of bounds tightening or new information that is leaned by the inference step. """ifself.propositional:groundings={None}else:groundings=tuple(self._groundings)forgingroundings:ifgnotinself.operands[0]._groundings:self.operands[0]._add_groundings(g)bounds=self.operands[0].neuron.aggregate_bounds(None,_utils.negate_bounds(self.get_data(*groundings)))ifself.operands[0].is_contradiction():logging.info("↓ CONTRADICTION "f"FOR:'{self.operands[0].name}' "f"FROM:'{self.name}' "f"FORMULA:{self.operands[0].formula_number} "f"PARENT:{self.formula_number} ")returnbounds
[docs]classExists(_Quantifier):r"""Symbolic existential quantifier. When working with belief bounds - existential operators restrict upward inference to only work with the given formula's lower bound. Downward inference behaves as usual. Parameters ---------- ``*variables`` : Variable formula : Formula The FOL formula to quantify over, may be a connective formula or a Predicate. Examples -------- No free variables, quantifies over all of the variables in the formula. ```python Some_1 = Exists(birthdate(p, d))) Some_2 = Exists(p, d, birthdate(p, d))) ``` Free variables, quantifies over a subset of variables in the formula. ```python Some = Exists(p, birthdate(p, d))) ``` Warning ------- Quantifier with free variables, not yet implemented. It is required that we quantify over all the variables given in the formula, either by specifying all the variables or but not specifying any variables - which is equivalent to quantifying over all variables. """def__init__(self,*args,**kwds):variables=(args[:-1]iflen(args)>1elseargs[-1][0].unique_varsifisinstance(args[-1],tuple)elseargs[-1].unique_vars)a=variables[0]formula=args[-1]iflen(variables)>1:args=[a,Exists(*variables[1:],formula,**kwds)]self.connective_str="∃"super().__init__(*args,**kwds)
[docs]classForall(_Quantifier):r"""Symbolic universal quantifier. When working with belief bounds - universal operators restrict upward inference to only work with the given formula's upper bound. Downward inference behaves as usual. Parameters ---------- ``*variables`` : Variable formula : Formula The FOL formula to quantify over, may be a connective formula or a Predicate. Examples -------- No free variables, quantifies over all of the variables in the formula. ```python All_1 = Forall(birthdate(p, d))) All_2 = Forall(p, d, birthdate(p, d))) ``` Free variables, quantifies over a subset of variables in the formula. ```python All = Forall(p, birthdate(p, d))) ``` Warning ------- Quantifier with free variables, not yet implemented. It is required that we quantify over all the variables given in the formula, either by specifying all the variables or but not specifying any variables - which is equivalent to quantifying over all variables. """def__init__(self,*args,**kwds):variables=(args[:-1]iflen(args)>1elseargs[-1][0].unique_varsifisinstance(args[-1],tuple)elseargs[-1].unique_vars)iflen(variables)>1:a=variables[0]formula=args[-1]args=[a,Forall(*variables[1:],formula,**kwds)]self.connective_str="∀"super().__init__(*args,**kwds)