# Copyright (C) 2023-2025 IBM Corp.
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations
from ..context import Context
from ..error import Error as KIF_Error
from ..error import ShouldNotGetHere
from ..model import (
AnnotatedStatement,
Filter,
Fingerprint,
KIF_Object,
Quantity,
ReferenceRecordSet,
Snak,
Statement,
String,
TFingerprint,
TReferenceRecordSet,
TTextLanguage,
)
from ..model.flags import Flags as KIF_Flags
from ..typing import (
Any,
Callable,
cast,
ClassVar,
Final,
Iterator,
Location,
Set,
TypeAlias,
TypeVar,
)
at_property = property
T = TypeVar('T')
S = TypeVar('S')
[docs]
class Store(Set):
"""Abstract base class for stores."""
#: The store plugin registry.
registry: Final[dict[str, type[Store]]] = {}
#: The name of this store plugin.
store_name: ClassVar[str]
#: The description of this store plugin.
store_description: ClassVar[str]
@classmethod
def _register(
cls,
store: type[Store],
store_name: str,
store_description: str
) -> None:
store.store_name = store_name
store.store_description = store_description
cls.registry[store.store_name] = store
@classmethod
def __init_subclass__(
cls,
store_name: str,
store_description: str
) -> None:
Store._register(cls, store_name, store_description)
def __new__(cls, store_name: str, *args: Any, **kwargs: Any):
KIF_Object._check_arg(
store_name, store_name in cls.registry,
f"no such store plugin '{store_name}'",
Store, 'store_name', 1, ValueError)
return super().__new__(cls.registry[store_name]) # pyright: ignore
[docs]
class Error(KIF_Error):
"""Base class for store errors."""
@classmethod
def _error(cls, details: str) -> Error:
"""Makes a store error.
Parameters:
details: Details.
Returns:
:class:`Error`.
"""
return cls.Error(details)
@classmethod
def _should_not_get_here(
cls,
details: str | None = None
) -> ShouldNotGetHere:
"""Makes a "should not get here" error.
Parameters:
details: Details.
Returns:
:class:`ShouldNotGetHere`.
"""
return KIF_Object._should_not_get_here(details)
__slots__ = (
'_context',
'_base_filter',
'_distinct',
'_extra_references',
'_flags',
'_limit',
'_page_size',
'_timeout',
)
[docs]
def __init__(
self,
*args: Any,
base_filter: Filter | None = None,
distinct: bool | None = None,
extra_references: TReferenceRecordSet | None = None,
flags: TFlags | None = None,
limit: int | None = None,
page_size: int | None = None,
timeout: float | None = None,
**kwargs: Any
) -> None:
"""
Initializes :class:`Store`.
Parameters:
store_name: Name of the store plugin to instantiate.
args: Arguments.
base_filter: Base filter.
extra_references: Extra references to attach to statements.
flags: Store flags.
limit: Limit (maximum number) of responses.
page_size: Page size of paginated responses.
timeout: Timeout of responses (in seconds).
kwargs: Other keyword arguments.
"""
self._flags = None
self.flags = flags # type: ignore
self._base_filter = None
self.set_base_filter(base_filter)
self._distinct = None
self.set_distinct(distinct)
self._extra_references = None
self.set_extra_references(extra_references)
self._limit = None
self.set_limit(limit)
self._page_size = None
self.set_page_size(page_size)
self._timeout = None
self.set_timeout(timeout)
@at_property
def context(self) -> Context:
"""The current KIF context."""
return self.get_context()
[docs]
def get_context(self, context: Context | None = None) -> Context:
"""Gets the current KIF context.
If `context` is not ``None``, returns `context`.
Returns:
Context.
"""
return Context.top(context)
# -- Base filter -----------------------------------------------------------
@at_property
def default_base_filter(self) -> Filter:
"""The default value for :attr:`Store.base_filter`."""
return self.get_default_base_filter()
[docs]
def get_default_base_filter(self) -> Filter:
"""Gets the default value for :attr:`Store.base_filter`.
Returns:
Filter.
"""
return self.context.options.store.base_filter
#: Base filter.
_base_filter: Filter | None
@at_property
def base_filter(self) -> Filter:
"""The base filter of store."""
return self.get_base_filter()
@base_filter.setter
def base_filter(self, base_filter: Filter | None = None) -> None:
self.set_base_filter(base_filter)
[docs]
def get_base_filter(
self,
default: Filter | None = None
) -> Filter:
"""Gets the base filter of store.
If the base filter is ``None``, returns `default`.
If `default` is ``None``, assumes :attr:`Store.default_base_filter`.
Parameters:
default: Default base filter.
Returns:
Filter.
"""
if self._base_filter is not None:
base_filter: Filter = self._base_filter
elif default is not None:
base_filter = default
else:
base_filter = self.default_base_filter
return base_filter
[docs]
def set_base_filter(self, base_filter: Filter | None = None) -> None:
"""Sets the base filter of store.
If `filter` is ``None``, resets base filter to
:attr:`Store.default_base_filter`.
Parameters:
base_filter: Filter.
"""
base_filter = Filter.check_optional(
base_filter, None, self.set_base_filter, 'base_filter', 1)
if self._set_base_filter(self._base_filter, base_filter):
self._base_filter = base_filter
def _set_base_filter(
self,
old: Filter | None,
new: Filter | None
) -> bool:
return True
@at_property
def subject(self) -> Fingerprint:
"""The subject fingerprint of the base filter of store."""
return self.get_subject()
@subject.setter
def subject(self, subject: TFingerprint) -> None:
self.set_subject(subject)
[docs]
def get_subject(self) -> Fingerprint:
"""Gets the subject fingerprint of the base filter of store.
Returns:
Fingerprint.
"""
return self.base_filter.subject
[docs]
def set_subject(self, subject: TFingerprint | None = None) -> None:
"""Sets the subject fingerprint of the base filter of store.
If `subject` is ``None``, assumes the full fingerprint.
Parameters:
subject: Fingerprint.
"""
self.base_filter = self.base_filter.replace(subject=subject)
@at_property
def property(self) -> Fingerprint:
"""The property fingerprint of the base filter of store."""
return self.get_property()
@property.setter
def property(self, property: TFingerprint) -> None:
self.set_property(property)
[docs]
def get_property(self) -> Fingerprint:
"""Gets the property fingerprint of the base filter of store.
Returns:
Fingerprint.
"""
return self.base_filter.property
[docs]
def set_property(self, property: TFingerprint) -> None:
"""Sets the property fingerprint of the base filter of store.
If `property` is ``None``, assumes the full fingerprint.
Parameters:
property: Fingerprint.
"""
self.base_filter = self.base_filter.replace(property=property)
@at_property
def value(self) -> Fingerprint:
"""The value fingerprint of the base filter of store."""
return self.get_value()
@value.setter
def value(self, value: TFingerprint) -> None:
self.set_value(value)
[docs]
def get_value(self) -> Fingerprint:
"""Gets the value fingerprint of the base filter of store.
Returns:
Fingerprint.
"""
return self.base_filter.value
[docs]
def set_value(self, value: TFingerprint) -> None:
"""Sets the value fingerprint of the base filter of store.
If `value` is ``None``, assumes the full fingerprint.
Parameters:
value: Fingerprint.
"""
self.base_filter = self.base_filter.replace(value=value)
@at_property
def snak_mask(self) -> Filter.SnakMask:
"""The snak mask of the base filter of store."""
return self.get_snak_mask()
@snak_mask.setter
def snak_mask(self, snak_mask: Filter.SnakMask) -> None:
self.set_snak_mask(snak_mask)
[docs]
def get_snak_mask(self) -> Filter.SnakMask:
"""Gets the snak mask of the base filter of store.
Returns:
Snak mask.
"""
return self.base_filter.snak_mask
[docs]
def set_snak_mask(self, snak_mask: Filter.TSnakMask) -> None:
"""Sets the snak mask of the base filter of store.
Parameters:
snak_mask: Snak mask.
"""
self.base_filter = self.base_filter.replace(snak_mask=snak_mask)
@at_property
def subject_mask(self) -> Filter.DatatypeMask:
"""The subject mask of the base filter of store."""
return self.get_subject_mask()
@subject_mask.setter
def subject_mask(self, subject_mask: Filter.DatatypeMask) -> None:
self.set_subject_mask(subject_mask)
[docs]
def get_subject_mask(self) -> Filter.DatatypeMask:
"""Gets the subject mask of the base filter of store.
Returns:
Datatype mask.
"""
return self.base_filter.subject_mask
[docs]
def set_subject_mask(self, subject_mask: Filter.TDatatypeMask) -> None:
"""Sets the subject mask of the base filter of store.
Parameters:
subject_mask: Datatype mask.
"""
self.base_filter = self.base_filter.replace(subject_mask=subject_mask)
@at_property
def property_mask(self) -> Filter.DatatypeMask:
"""The property mask of the base filter of store."""
return self.get_property_mask()
@property_mask.setter
def property_mask(self, property_mask: Filter.DatatypeMask) -> None:
self.set_property_mask(property_mask)
[docs]
def get_property_mask(self) -> Filter.DatatypeMask:
"""Gets the property mask of the base filter of store.
Returns:
Datatype mask.
"""
return self.base_filter.property_mask
[docs]
def set_property_mask(self, property_mask: Filter.TDatatypeMask) -> None:
"""Sets the property mask of the base filter of store.
Parameters:
property_mask: Datatype mask.
"""
self.base_filter = self.base_filter.replace(
property_mask=property_mask)
@at_property
def value_mask(self) -> Filter.DatatypeMask:
"""The value mask of the base filter of store."""
return self.get_value_mask()
@value_mask.setter
def value_mask(self, value_mask: Filter.DatatypeMask) -> None:
self.set_value_mask(value_mask)
[docs]
def get_value_mask(self) -> Filter.DatatypeMask:
"""Gets the value mask of the base filter of store.
Returns:
Datatype mask.
"""
return self.base_filter.value_mask
[docs]
def set_value_mask(self, value_mask: Filter.TDatatypeMask) -> None:
"""Sets the value mask of the base filter of store.
Parameters:
value_mask: Datatype mask.
"""
self.base_filter = self.base_filter.replace(value_mask=value_mask)
@at_property
def rank_mask(self) -> Filter.RankMask:
"""The rank mask of the base filter of store."""
return self.get_rank_mask()
@rank_mask.setter
def rank_mask(self, rank_mask: Filter.RankMask) -> None:
self.set_rank_mask(rank_mask)
[docs]
def get_rank_mask(self) -> Filter.RankMask:
"""Gets the rank mask of the base filter of store.
Returns:
Datatype mask.
"""
return self.base_filter.rank_mask
[docs]
def set_rank_mask(self, rank_mask: Filter.TRankMask) -> None:
"""Sets the rank mask of the base filter of store.
Parameters:
rank_mask: Datatype mask.
"""
self.base_filter = self.base_filter.replace(rank_mask=rank_mask)
@at_property
def language(self) -> str | None:
"""The language of the base filter of store."""
return self.get_language()
@language.setter
def language(self, language: TTextLanguage | None) -> None:
self.set_language(language)
[docs]
def get_language(self) -> str | None:
"""Gets the language of the base filter of store.
Returns:
Language.
"""
return self.base_filter.language
[docs]
def set_language(self, language: TTextLanguage | None) -> None:
"""Sets the language of the base filter of store.
Parameters:
language: Language.
"""
self.base_filter = self.base_filter.replace(language=language)
@at_property
def annotated(self) -> bool:
"""The annotated flag of the base filter of store."""
return self.get_annotated()
@annotated.setter
def annotated(self, annotated: bool) -> None:
self.set_annotated(annotated)
[docs]
def get_annotated(self) -> bool:
"""Gets the annotated flag of the base filter of store.
Returns:
Annotated flag.
"""
return self.base_filter.annotated
[docs]
def set_annotated(self, annotated: bool) -> None:
"""Sets the annotated flag of the base filter of store.
Parameters:
annotated: Annotated flag.
"""
self.base_filter = self.base_filter.replace(annotated=annotated)
# -- Distinct --------------------------------------------------------------
@at_property
def default_distinct(self) -> bool:
"""The default value for :attr:`Store.distinct`."""
[docs]
return self.get_default_distinct()
def get_default_distinct(self) -> bool:
"""Gets the default value for :attr:`Store.distinct`.
Returns:
Default distinct flag.
"""
return self.context.options.store.distinct
#: Distinct flag.
_distinct: bool | None
@at_property
def distinct(self) -> bool:
"""The distinct flag of store (whether to suppress duplicates)."""
return self.get_distinct()
@distinct.setter
def distinct(self, distinct: bool | None = None) -> None:
[docs]
self.set_distinct(distinct)
def get_distinct(
self,
default: bool | None = None
) -> bool:
"""Gets the distinct flag of store.
If the distinct flag is ``None``, returns `default`.
If `default` is ``None``, assumes :attr:`Store.default_distinct`.
Parameters:
default: Default distinct flag.
Returns:
Distinct flag.
"""
if self._distinct is not None:
distinct: bool = self._distinct
elif default is not None:
distinct = default
else:
distinct = self.default_distinct
[docs]
return distinct
def set_distinct(
self,
distinct: bool | None = None
) -> None:
"""Sets distinct flag of store.
If `distinct` is ``None``, assumes :attr:`Store.default_distinct`.
Parameters:
distinct: Distinct flag.
"""
distinct = bool(distinct) if distinct is not None else None
if self._set_distinct(self._distinct, distinct):
self._distinct = distinct
def _set_distinct(self, old: bool | None, new: bool | None) -> bool:
return True
# -- Extra references ------------------------------------------------------
@at_property
def default_extra_references(self) -> ReferenceRecordSet:
Reference record set.
"""
return self.context.options.store.extra_references
#: Extra references.
_extra_references: ReferenceRecordSet | None
@at_property
def extra_references(self) -> ReferenceRecordSet:
"""The extra references of store."""
return self.get_extra_references()
@extra_references.setter
def extra_references(
self,
references: TReferenceRecordSet | None = None
else:
if self._set_extra_references(
self._extra_references, extra_references):
self._extra_references = extra_references
def _set_extra_references(
self,
old: ReferenceRecordSet | None,
new: ReferenceRecordSet | None
) -> bool:
return True
[docs]
# -- Flags -----------------------------------------------------------------
class Flags(KIF_Flags):
"""Store flags."""
#: Whether to enable debugging.
DEBUG = KIF_Flags.auto()
#: Whether to fetch only the best ranked statements.
BEST_RANK = KIF_Flags.auto()
#: Whether to fetch value snaks.
VALUE_SNAK = KIF_Flags.auto()
#: Whether to fetch some-value snaks.
SOME_VALUE_SNAK = KIF_Flags.auto()
#: Whether to fetch no-value snaks.
NO_VALUE_SNAK = KIF_Flags.auto()
#: All flags.
ALL = (
DEBUG
| BEST_RANK
| VALUE_SNAK
| SOME_VALUE_SNAK
| NO_VALUE_SNAK)
#: Whether to enable debugging.
DEBUG: Final[Flags] = Flags.DEBUG
#: Whether to fetch only the best ranked statements.
BEST_RANK: Final[Flags] = Flags.BEST_RANK
#: Whether to fetch value snaks.
VALUE_SNAK: Final[Flags] = Flags.VALUE_SNAK
#: Whether to fetch some-value snaks.
SOME_VALUE_SNAK: Final[Flags] = Flags.SOME_VALUE_SNAK
#: Whether to fetch no-value snaks.
NO_VALUE_SNAK: Final[Flags] = Flags.NO_VALUE_SNAK
#: Type alias for store flags.
TFlags: TypeAlias = Flags | int
@at_property
[docs]
def default_flags(self) -> Flags:
"""The default value for :attr:`Store.flags`."""
return self.get_default_flags()
def get_default_flags(self) -> Flags:
"""Gets the default value for :attr:`Store.flags`.
Returns:
Default store flags.
"""
return self.context.options.store.flags
#: Store flags.
_flags: Flags | None
@at_property
def flags(self) -> Flags:
"""The store flags."""
return self.get_flags()
@flags.setter
def flags(self, flags: TFlags | None) -> None:
flags = self.Flags.check_optional(
flags, None, self.set_flags, 'flags', 1)
if self._set_flags(self._flags, flags):
self._flags = flags
[docs]
def _set_flags(self, old: Flags | None, new: Flags | None) -> bool:
return True
def get_flags(self, default: Flags | None = None) -> Flags:
"""Gets the store flags.
If `default` is ``None``, assumes :attr:`Store.default_flags`.
Parameters:
default: Default flags.
Returns:
Store flags.
"""
if self._flags is not None:
flags: Store.Flags = self._flags
elif default is not None:
flags = default
[docs]
else:
flags = self.default_flags
return flags
def has_flags(self, flags: TFlags) -> bool:
"""Tests whether `flags` are set in store.
Parameters:
flags: Store flags.
Returns:
``True`` if successful; ``False`` otherwise.
[docs]
"""
flags = self.Flags.check(flags, self.has_flags, 'flags', 1)
return bool(self.flags & flags)
def set_flags(self, flags: TFlags | None = None) -> None:
"""Sets `flags` in store.
Parameters:
flags: Store flags.
"""
if flags is None:
self.flags = None # type: ignore
[docs]
else:
self.flags |= self.Flags.check(
flags, self.set_flags, 'flags', 1)
def unset_flags(self, flags: TFlags) -> None:
"""Unsets `flags` in store.
Parameters:
flags: Store flags.
"""
flags = self.Flags.check(flags, self.unset_flags, 'flags', 1)
self.flags &= ~flags
# -- Limit -----------------------------------------------------------------
@classmethod
def _check_limit(
cls,
arg: Any,
function: Location | None = None,
name: str | None = None,
position: int | None = None
) -> int:
return max(int(Quantity.check(
arg, function, name, position).amount), 0)
@classmethod
def _do_check_optional(
cls,
check: Callable[
[Any, Location | None, str | None, int | None], T],
arg: Any | None,
default: Any | None = None,
function: Location | None = None,
name: str | None = None,
position: int | None = None
) -> T | None:
if arg is None:
arg = default
if arg is None:
return default
else:
return check(arg, function, name, position)
@classmethod
def _check_optional_limit(
cls,
arg: Any | None,
default: Any | None = None,
function: Location | None = None,
name: str | None = None,
position: int | None = None
) -> int | None:
return cls._do_check_optional(
cls._check_limit, arg, default, function, name, position)
[docs]
@at_property
def max_limit(self) -> int:
"""The maximum value for :attr:`Store.limit`."""
return self.get_max_limit()
def get_max_limit(self) -> int:
"""Gets the maximum value for :attr:`Store.limit`.
Returns:
Maximum limit.
"""
return self.context.options.store.max_limit
[docs]
@at_property
def default_limit(self) -> int | None:
"""The default value for :attr:`Store.limit`."""
return self.get_default_limit()
def get_default_limit(self) -> int | None:
"""Gets the default value for :attr:`Store.limit`.
Returns:
Default limit or ``None``.
"""
return self.context.options.store.limit
#: Limit.
_limit: int | None
@at_property
def limit(self) -> int | None:
"""The limit of store (maximum number of responses)."""
return self.get_limit()
[docs]
@limit.setter
def limit(self, limit: int | None = None) -> None:
self.set_limit(limit)
def get_limit(
self,
default: int | None = None
) -> int | None:
"""Gets the limit of store.
If the limit is ``None``, returns `default`.
If `default` is ``None``, assumes :attr:`Store.default_limit`.
Parameters:
default: Default limit.
Returns:
Limit or ``None``.
"""
if self._limit is not None:
limit: int | None = self._limit
elif default is not None:
limit = default
else:
limit = self.default_limit
[docs]
if limit is None:
return limit
else:
return min(limit, self.max_limit)
def set_limit(
self,
limit: int | None = None
) -> None:
"""Sets the limit of store.
If `limit` is negative, assumes zero.
If `limit` is ``None``, assumes :attr:`Store.default_limit`.
Parameters:
limit: Limit.
"""
limit = self._check_optional_limit(
limit, None, self.set_limit, 'limit', 1)
if self._set_limit(self._limit, limit):
self._limit = limit
def _set_limit(self, old: int | None, new: int | None) -> bool:
return True
# -- Page size -------------------------------------------------------------
@classmethod
def _check_page_size(
cls,
arg: Any,
function: Location | None = None,
name: str | None = None,
position: int | None = None
) -> int:
return max(int(Quantity.check(
arg, function, name, position).amount), 0)
@classmethod
def _check_optional_page_size(
cls,
arg: Any | None,
default: Any | None = None,
function: Location | None = None,
name: str | None = None,
position: int | None = None
) -> int | None:
return cls._do_check_optional(
cls._check_page_size, arg, default, function, name, position)
[docs]
@at_property
def max_page_size(self) -> int:
"""The maximum value for :attr:`Store.page_size`."""
return self.get_max_page_size()
def get_max_page_size(self) -> int:
"""Gets the maximum value for :attr:`Store.page_size`.
Returns:
Maximum page size.
"""
return self.context.options.store.max_page_size
[docs]
@at_property
def default_page_size(self) -> int:
"""The default value for :attr:`Store.page_size`."""
return self.get_default_page_size()
def get_default_page_size(self) -> int:
"""Gets the default value for :attr:`Store.page_size`.
Returns:
Default page size.
"""
return self.context.options.store.page_size
#: Page size.
_page_size: int | None
@at_property
def page_size(self) -> int:
"""The page size of store (size of response pages)."""
[docs]
return self.get_page_size()
@page_size.setter
def page_size(self, page_size: int | None = None) -> None:
self.set_page_size(page_size)
def get_page_size(
self,
default: int | None = None
) -> int:
"""Gets the page size of store.
If the page size is ``None``, returns `default`.
If `default` is ``None``, assumes :attr:`Store.default_page_size`.
Parameters:
default: Default page size.
Returns:
Page size.
"""
if self._page_size is not None:
page_size: int = self._page_size
[docs]
elif default is not None:
page_size = default
else:
page_size = self.default_page_size
return min(page_size, self.max_page_size)
def set_page_size(
self,
page_size: int | None = None
) -> None:
"""Sets page size of store.
If `page_size` is negative, assumes zero.
If `page_size` is ``None``, assumes :attr:`Store.default_page_size`.
Parameters:
page_size: Page size.
"""
page_size = self._check_optional_page_size(
page_size, None, self.set_page_size, 'page_size', 1)
if self._set_page_size(self._page_size, page_size):
self._page_size = page_size
def _set_page_size(self, old: int | None, new: int | None) -> bool:
return True
# -- Timeout ---------------------------------------------------------------
@classmethod
def _check_timeout(
cls,
arg: Any,
function: Location | None = None,
name: str | None = None,
position: int | None = None
) -> float:
return max(float(Quantity.check(
arg, function, name, position).amount), 0.)
@classmethod
def _check_optional_timeout(
cls,
arg: Any | None,
default: Any | None = None,
function: Location | None = None,
name: str | None = None,
position: int | None = None
) -> float | None:
return cls._do_check_optional(
cls._check_timeout,
[docs]
arg, default, function, name, position)
@at_property
def max_timeout(self) -> float:
"""The maximum value for :attr:`Store.timeout`."""
return self.get_max_timeout()
def get_max_timeout(self) -> float:
"""Gets the maximum value for :attr:`Store.timeout`.
Returns:
Maximum timeout (in seconds).
"""
[docs]
return self.context.options.store.max_timeout
@at_property
def default_timeout(self) -> float | None:
"""The default value for :attr:`Store.timeout`."""
return self.get_default_timeout()
def get_default_timeout(self) -> float | None:
"""Gets the default value for :attr:`Store.timeout`.
Returns:
Timeout or ``None``.
"""
return self.context.options.store.timeout
#: Timeout (in seconds).
_timeout: float | None
@at_property
def timeout(self) -> float | None:
[docs]
"""The timeout of store (in seconds)."""
return self.get_timeout()
@timeout.setter
def timeout(self, timeout: float | None = None) -> None:
self.set_timeout(timeout)
def get_timeout(
self,
default: float | None = None
) -> float | None:
"""Gets the timeout of store.
If the timeout is ``None``, returns `default`.
If `default` is ``None``, assumes :attr:`Store.default_timeout`.
Parameters:
default: Default timeout.
Returns:
Timeout or ``None``.
"""
if self._timeout is not None:
timeout: float | None = self._timeout
elif default is not None:
timeout = default
[docs]
else:
timeout = self.default_timeout
if timeout is None:
return timeout
else:
return min(timeout, self.max_timeout)
def set_timeout(
self,
timeout: float | None = None
) -> None:
"""Sets the timeout of store.
If `timeout` is negative, assumes zero.
If `timeout` is ``None``, assumes :attr:`Store.default_timeout`.
Parameters:
timeout: Timeout.
"""
timeout = self._check_optional_timeout(
timeout, None, self.set_timeout, 'timeout', 1)
if self._set_timeout(self._timeout, timeout):
self._timeout = timeout
def _set_timeout(self, old: float | None, new: float | None) -> bool:
return True
# -- Set interface ---------------------------------------------------------
def __eq__(self, other: Any) -> bool:
if isinstance(other, Store):
###
# Prevent __eq__ from triggering content equality.
###
return id(self) == id(other)
else:
return NotImplemented
def __contains__(self, v: Any) -> bool:
return self._contains(v) if isinstance(v, Statement) else False
[docs]
def __iter__(self) -> Iterator[Statement]:
return self.filter()
def __len__(self) -> int:
return self.count()
# -- Statements ------------------------------------------------------------
def ask(
self,
subject: TFingerprint | None = None,
property: TFingerprint | None = None,
value: TFingerprint | None = None,
snak_mask: Filter.TSnakMask | None = None,
subject_mask: Filter.TDatatypeMask | None = None,
property_mask: Filter.TDatatypeMask | None = None,
value_mask: Filter.TDatatypeMask | None = None,
rank_mask: Filter.TRankMask | None = None,
language: str | None = None,
annotated: bool | None = None,
snak: Snak | None = None,
filter: Filter | None = None
) -> bool:
"""Tests whether some statement matches filter.
Parameters:
subject: Entity.
property: Property.
value: Value.
snak_mask: Snak mask.
subject_mask: Datatype mask.
property_mask: Datatype mask.
value_mask: Datatype mask.
rank_mask: Rank mask.
language: Language.
annotated: Annotated flag.
snak: Snak.
filter: Filter.
Returns:
``True`` if successful; ``False`` otherwise.
"""
return self._ask_tail(
self._check_filter(
subject=subject,
property=property,
value=value,
snak_mask=snak_mask,
subject_mask=subject_mask,
property_mask=property_mask,
value_mask=value_mask,
rank_mask=rank_mask,
language=language,
annotated=annotated,
snak=snak,
filter=filter,
function=self.ask))
[docs]
def _ask_tail(self, filter: Filter) -> bool:
if filter.is_nonempty():
return self._ask(filter)
else:
return False
def _ask(self, filter: Filter) -> bool:
return bool(next(self._filter(filter, 1, False), False))
def contains(self, stmt: Statement) -> bool:
"""Tests whether statement occurs in store.
Parameters:
stmt: Statement.
Returns:
``True`` if successful; ``False`` otherwise.
"""
Statement.check(stmt, self.contains, 'stmt', 1)
return self._contains(stmt)
[docs]
def _contains(self, stmt: Statement) -> bool:
filter = self._normalize_filter(Filter.from_statement(stmt))
if filter.is_nonempty():
annotated = isinstance(stmt, AnnotatedStatement)
return stmt in self.filter(filter=filter, annotated=annotated)
else:
return False
def count(
self,
subject: TFingerprint | None = None,
property: TFingerprint | None = None,
value: TFingerprint | None = None,
snak_mask: Filter.TSnakMask | None = None,
subject_mask: Filter.TDatatypeMask | None = None,
property_mask: Filter.TDatatypeMask | None = None,
value_mask: Filter.TDatatypeMask | None = None,
rank_mask: Filter.TRankMask | None = None,
language: str | None = None,
annotated: bool | None = None,
snak: Snak | None = None,
filter: Filter | None = None
) -> int:
"""Counts statements matching filter.
Parameters:
subject: Entity.
property: Property.
value: Value.
snak_mask: Snak mask.
subject_mask: Datatype mask.
property_mask: Datatype mask.
value_mask: Datatype mask.
rank_mask: Rank mask.
language: Language.
annotated: Annotated flag.
snak: Snak.
filter: Filter.
Returns:
The number of statements matching filter.
"""
return self._count_tail(
self._check_filter(
subject=subject,
property=property,
value=value,
snak_mask=snak_mask,
subject_mask=subject_mask,
property_mask=property_mask,
value_mask=value_mask,
rank_mask=rank_mask,
language=language,
annotated=annotated,
snak=snak,
filter=filter,
function=self.count))
[docs]
def _count_tail(self, filter: Filter) -> int:
if filter.is_nonempty():
return self._count(filter)
else:
return 0
def _count(self, filter: Filter) -> int:
return sum(1 for _ in self._filter(filter, self.max_limit, True))
def filter(
self,
subject: TFingerprint | None = None,
property: TFingerprint | None = None,
value: TFingerprint | None = None,
snak_mask: Filter.TSnakMask | None = None,
subject_mask: Filter.TDatatypeMask | None = None,
property_mask: Filter.TDatatypeMask | None = None,
value_mask: Filter.TDatatypeMask | None = None,
rank_mask: Filter.TRankMask | None = None,
language: str | None = None,
annotated: bool | None = None,
snak: Snak | None = None,
filter: Filter | None = None,
limit: int | None = None,
distinct: bool | None = None
) -> Iterator[Statement]:
"""Searches for statements matching filter.
Parameters:
subject: Entity.
property: Property.
value: Value.
snak_mask: Snak mask.
subject_mask: Datatype mask.
property_mask: Datatype mask.
value_mask: Datatype mask.
rank_mask: Rank mask.
language: Language.
annotated: Annotated flag.
snak: Snak.
filter: Filter filter.
limit: Limit (maximum number) of statements to return.
distinct: Whether to skip duplicated matches.
Returns:
An iterator of statements matching filter.
"""
distinct = bool(distinct) if distinct is not None else self.distinct
assert distinct is not None
limit = self._check_optional_limit(
limit, self.limit, self.filter, 'limit', 13)
if limit is None:
limit = self.get_limit(self.max_limit)
assert limit is not None
return self._filter_tail(
self._check_filter(
subject=subject,
property=property,
value=value,
snak_mask=snak_mask,
subject_mask=subject_mask,
property_mask=property_mask,
value_mask=value_mask,
rank_mask=rank_mask,
language=language,
annotated=annotated,
snak=snak,
filter=filter,
function=self.filter),
limit, distinct)
def _filter_tail(
self,
filter: Filter,
limit: int,
distinct: bool
) -> Iterator[Statement]:
if limit > 0 and filter.is_nonempty():
stmts = self._filter(filter, limit, distinct)
if filter.annotated and self.extra_references:
return map(lambda s: s.annotate(
references=self.extra_references), stmts)
else:
return stmts
else:
return iter(())
[docs]
def _filter(
self,
filter: Filter,
limit: int,
distinct: bool
) -> Iterator[Statement]:
return iter(())
def filter_annotated(
self,
subject: TFingerprint | None = None,
property: TFingerprint | None = None,
value: TFingerprint | None = None,
snak_mask: Filter.TSnakMask | None = None,
subject_mask: Filter.TDatatypeMask | None = None,
property_mask: Filter.TDatatypeMask | None = None,
value_mask: Filter.TDatatypeMask | None = None,
rank_mask: Filter.TRankMask | None = None,
language: str | None = None,
annotated: bool | None = None,
snak: Snak | None = None,
filter: Filter | None = None,
limit: int | None = None,
distinct: bool | None = None
) -> Iterator[AnnotatedStatement]:
""":meth:`Store.filter` with annotations.
Parameters:
subject: Entity.
property: Property.
value: Value.
snak_mask: Snak mask.
subject_mask: Datatype mask.
property_mask: Datatype mask.
value_mask: Datatype mask.
rank_mask: Rank mask.
language: Language.
annotated: Annotated flag (ignored).
snak: Snak.
filter: Filter.
limit: Limit (maximum number) of statements to return.
distinct: Whether to skip duplicated matches.
Returns:
An iterator of annotated statements matching filter.
"""
return cast(Iterator[AnnotatedStatement], self.filter(
subject=subject,
property=property,
value=value,
snak_mask=snak_mask,
subject_mask=subject_mask,
property_mask=property_mask,
value_mask=value_mask,
rank_mask=rank_mask,
language=language,
annotated=True, # force
snak=snak,
filter=filter,
limit=limit,
distinct=distinct))
def _check_filter(
self,
subject: TFingerprint | None = None,
property: TFingerprint | None = None,
value: TFingerprint | None = None,
snak_mask: Filter.TSnakMask | None = None,
subject_mask: Filter.TDatatypeMask | None = None,
property_mask: Filter.TDatatypeMask | None = None,
value_mask: Filter.TDatatypeMask | None = None,
rank_mask: Filter.TRankMask | None = None,
language: str | None = None,
annotated: bool | None = None,
snak: Snak | None = None,
filter: Filter | None = None,
function: Location | None = None
) -> Filter:
subject = Fingerprint.check_optional(
subject, None, function, 'subject', 1)
property = Fingerprint.check_optional(
property, None, function, 'property', 2)
value = Fingerprint.check_optional(
value, None, function, 'value', 3)
snak_mask = Filter.SnakMask.check_optional(
snak_mask, Filter.SnakMask.ALL, function, 'snak_mask', 4)
subject_mask = Filter.DatatypeMask.check_optional(
subject_mask, Filter.ENTITY, function, 'subject_mask', 5)
property_mask = Filter.DatatypeMask.check_optional(
property_mask, Filter.PROPERTY, function, 'property_mask', 6)
value_mask = Filter.DatatypeMask.check_optional(
value_mask, Filter.VALUE, function, 'value_mask', 7)
rank_mask = Filter.RankMask.check_optional(
rank_mask, Filter.RankMask.ALL, function, 'rank_mask', 8)
language = String.check(
language, function, 'language', 9).content\
if language is not None else None
if annotated is None:
annotated = self.base_filter.annotated
else:
annotated = bool(annotated)
if filter is None:
filter = self.base_filter.combine(Filter(
subject=subject,
property=property,
value=value,
snak_mask=snak_mask,
subject_mask=subject_mask,
property_mask=property_mask,
value_mask=value_mask,
rank_mask=rank_mask,
language=language)).replace(annotated=annotated)
else:
filter = Filter.check(filter, function, 'filter', 12)
filter = self.base_filter.combine(
filter,
Filter(
subject=subject,
property=property,
value=value,
snak_mask=snak_mask,
subject_mask=subject_mask,
property_mask=property_mask,
value_mask=value_mask,
rank_mask=rank_mask,
language=language)).replace(
annotated=filter.annotated or annotated)
if snak is not None:
filter = filter.combine(Filter.from_snak(None, Snak.check(
snak, function, 'snak', 11))).replace(
annotated=filter.annotated or annotated)
return self._normalize_filter(filter)
def _normalize_filter(
self,
filter: Filter
) -> Filter:
store_snak_mask = Filter.SnakMask(0)
if self.has_flags(self.VALUE_SNAK):
store_snak_mask |= Filter.VALUE_SNAK
if self.has_flags(self.SOME_VALUE_SNAK):
store_snak_mask |= Filter.SOME_VALUE_SNAK
if self.has_flags(self.NO_VALUE_SNAK):
store_snak_mask |= Filter.NO_VALUE_SNAK
return filter.normalize().replace(
filter.KEEP, filter.KEEP, filter.KEEP,
filter.snak_mask & store_snak_mask)