Source code for kif_lib.store.sparql

# Copyright (C) 2023-2024 IBM Corp.
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import functools
import logging

import httpx
from rdflib.plugins.sparql import prepareQuery
from rdflib.plugins.sparql.sparql import Query

from .. import itertools
from .. import namespace as NS
from ..compiler.sparql import SPARQL_FilterCompiler, SPARQL_PatternCompiler
from ..model import (
    AnnotationRecord,
    AnnotationRecordSet,
    ClosedPattern,
    ClosedTerm,
    Datatype,
    DeepDataValue,
    Descriptor,
    Entity,
    ExternalId,
    Filter,
    IRI,
    Item,
    ItemDescriptor,
    Lexeme,
    LexemeDescriptor,
    NoValueSnak,
    OpenPattern,
    Pattern,
    Property,
    PropertyDescriptor,
    Quantity,
    Rank,
    ReferenceRecord,
    Snak,
    SomeValueSnak,
    Statement,
    StatementVariable,
    String,
    T_IRI,
    TemplatePattern,
    Term,
    Text,
    Time,
    Value,
    ValueSnak,
    VariablePattern,
)
from ..rdflib import BNode, URIRef
from ..typing import (
    Any,
    Callable,
    cast,
    ClassVar,
    Collection,
    Hashable,
    Iterable,
    Iterator,
    Mapping,
    MutableSet,
    override,
    Set,
    TypeAlias,
    TypeVar,
    Union,
)
from ..version import __version__
from .abc import Store
from .sparql_builder import SPARQL_Builder
from .sparql_results import SPARQL_Results

LOG = logging.getLogger(__name__)

T = TypeVar('T')
T_WDS = Hashable
TOpq: TypeAlias = Union[BNode, URIRef]
TTrm = SPARQL_Builder.TTrm


[docs] class SPARQL_Store( Store, store_name='sparql', store_description='SPARQL endpoint' ): """SPARQL store. Parameters: store_name: Store plugin to instantiate. iri: SPARQL endpoint IRI. """ _headers = { # See <https://meta.wikimedia.org/wiki/User-Agent_policy>. 'User-Agent': f'KIF/{__version__} (https://github.com/IBM/kif/; ' 'guilherme.lima@ibm.com)', 'Content-Type': 'application/sparql-query;charset=utf-8', 'Accept': 'application/sparql-results+json;charset=utf-8', } __slots__ = ( '_client', '_iri', ) #: HTTP client. _client: httpx.Client #: IRI of the target SPARQL endpoint. _iri: IRI
[docs] def __init__(self, store_name: str, iri: T_IRI, **kwargs: Any) -> None: assert store_name == self.store_name self._client = httpx.Client(headers=self._headers) super().__init__(**kwargs) self._iri = IRI.check(iri, type(self), 'iri', 2)
def __del__(self) -> None: self._client.close()
[docs] @override def set_timeout(self, timeout: float | None = None) -> None: super().set_timeout(timeout) self._client.timeout = httpx.Timeout(timeout)
@property def iri(self) -> IRI: """The IRI of the target SPARQL endpoint.""" return self.get_iri()
[docs] def get_iri(self) -> IRI: """Gets the IRI of the target SPARQL endpoint. Returns: IRI. """ return self._iri
# -- Caching --------------------------------------------------------------- def _cache_get_wdss(self, stmt: Statement) -> Set[T_WDS] | None: return self._cache.get(stmt, 'wdss') def _cache_add_wds(self, stmt: Statement, wds: T_WDS) -> Set[T_WDS]: self._cache.set(wds, 'statement', stmt) wdss = self._cache.get(stmt, 'wdss') if wdss is None: wdss = self._cache.set(stmt, 'wdss', set()) wdss.add(wds) return wdss # -- Query evaluation (internal) ------------------------------------------- def _eval_select_query( self, query: SPARQL_Builder, parse_results_fn: Callable[ [SPARQL_Results], Iterator[T | None]], vars: Collection[TTrm | tuple[TTrm, TTrm]] = tuple(), order_by: TTrm | None = None, limit: int | None = None, distinct: bool = False, trim: bool = False ) -> Iterator[T]: def eval_fn( eval_limit: int | None = None, eval_offset: int | None = None ) -> Iterator[T | None]: return parse_results_fn( self._eval_select_query_string( query.select( *vars, distinct=distinct, order_by=order_by, limit=eval_limit, offset=eval_offset))) return self._eval_query(query, eval_fn, limit, trim) def _eval_query( self, query: SPARQL_Builder, eval_fn: Callable[ [int | None, int | None], Iterator[T | None]], limit: int | None = None, trim: bool = False, page_size: int | None = None, offset: int | None = None ) -> Iterator[T]: import sys if limit is None: limit = sys.maxsize assert limit > 0 page_size = page_size or self.page_size offset = offset or 0 if trim and limit < page_size: page_size = limit i = 0 for obj in eval_fn(page_size, offset): i += 1 if obj is not None: yield obj limit -= 1 if limit <= 0: assert limit == 0 return # done assert i <= page_size if i == page_size: yield from self._eval_query( query, eval_fn, limit=limit, trim=trim, page_size=page_size, offset=offset + i) def _eval_select_query_string( self, text: str, fake_results: bool = False, **kwargs: Any ) -> SPARQL_Results: # text = self._prepare_query_string_wrapper(text) results = self._eval_query_string(text, **kwargs).json() if fake_results: return cast(SPARQL_Results, results) else: return SPARQL_Results(results) def _prepare_query_string_wrapper(self, text: str) -> str: return self._prepare_query_string(text)._original_args[0] def _prepare_query_string(self, text: str) -> Query: from pyparsing.exceptions import ParseException try: return prepareQuery(text, initNs=None) except ParseException as err: raise self._error_bad_query_string( err.args[0], err.lineno, err.column, err.explain()) from None def _error_bad_query_string( self, query: str, line: int, column: int, details: str ) -> SyntaxError: return SyntaxError(f'''\ bad query: {query} At line {line}, column {column}: {details}''') def _eval_query_string( self, text: str, **kwargs: Any ) -> httpx.Response: LOG.debug('%s():\n%s', self._eval_query_string.__qualname__, text) try: res = self._client.post( self.iri.content, content=text.encode('utf-8'), **kwargs) res.raise_for_status() return res except httpx.RequestError as err: raise err # -- Match ----------------------------------------------------------------- @override def _match( self, pattern: Pattern, limit: int, distinct: bool ) -> Iterator[ClosedTerm]: assert isinstance(pattern, Pattern) assert limit >= 0 compiler = self._compile_pattern(pattern) if isinstance(pattern, ClosedPattern): raise NotImplementedError elif isinstance(pattern, OpenPattern): page_size = min(self.page_size, limit) offset, count = 0, 0 while count <= limit: query = compiler.query.select( limit=page_size, offset=offset, distinct=distinct) res = self._eval_select_query_string(str(query)) bindings = res['results']['bindings'] if not bindings: break # done for binding in bindings: theta = compiler.theta.instantiate(binding) if isinstance(pattern, TemplatePattern): term: Term | None = pattern.template.instantiate(theta) elif isinstance(pattern, VariablePattern): term = pattern.variable.instantiate(theta) else: raise self._should_not_get_here() if isinstance(term, ClosedTerm): yield term count += 1 if len(bindings) < page_size: break # done if count == limit: break # done offset += page_size else: raise self._should_not_get_here() #: Flags to be passed to filter compiler. _compile_pattern_flags: ClassVar[SPARQL_FilterCompiler.Flags] =\ SPARQL_PatternCompiler.default_flags def _compile_pattern( self, pattern: Pattern ) -> SPARQL_PatternCompiler: compiler = SPARQL_PatternCompiler(pattern, self._compile_pattern_flags) if self.has_flags(self.DEBUG): compiler.set_flags(compiler.DEBUG) else: compiler.unset_flags(compiler.DEBUG) if self.has_flags(self.BEST_RANK): compiler.set_flags(compiler.BEST_RANK) else: compiler.unset_flags(compiler.BEST_RANK) compiler.compile() return compiler # -- Statements ------------------------------------------------------------ @override def _contains(self, filter: Filter) -> bool: compiler = self._compile_filter(filter) res = self._eval_select_query_string( str(compiler.query.ask()), fake_results=True) return res['boolean'] @override def _count(self, filter: Filter) -> int: if not filter.is_full(): compiler = self._compile_filter(filter) q = compiler.query else: q = SPARQL_FilterCompiler.Query() wds = q.fresh_var() if self.has_flags(self.BEST_RANK): q.triples()((wds, NS.RDF.type, NS.WIKIBASE.BestRank)) else: q.triples()((wds, NS.WIKIBASE.rank, q.var('rank'))) count = q.fresh_var() res = self._eval_select_query_string( str(q.select((q.count(), count)))) assert len(res['results']['bindings']) == 1 return int(res['results']['bindings'][0][str(count)]['value']) @override def _filter( self, filter: Filter, limit: int, distinct: bool ) -> Iterator[Statement]: compiler = self._compile_filter(filter) assert limit >= 0 page_size = min(self.page_size, limit) offset, count = 0, 0 while count <= limit: query = compiler.query.select( limit=page_size, offset=offset, distinct=distinct) assert isinstance(compiler.pattern, VariablePattern) assert isinstance(compiler.pattern.variable, StatementVariable) var = compiler.pattern.variable wds = compiler.wds res = self._eval_select_query_string(str(query)) bindings = res['results']['bindings'] if not bindings: break # done for binding in bindings: theta = compiler.theta.instantiate(binding) stmt = var.instantiate(theta) assert isinstance(stmt, Statement) if self.has_flags(self.LATE_FILTER) and not filter.match(stmt): continue self._cache_add_wds(stmt, URIRef(binding[str(wds)]['value'])) yield stmt count += 1 if len(bindings) < page_size: break # done if count == limit: break # done offset += page_size #: Flags to be passed to filter compiler. _compile_filter_flags: ClassVar[SPARQL_FilterCompiler.Flags] =\ SPARQL_FilterCompiler.default_flags def _compile_filter( self, filter: Filter ) -> SPARQL_FilterCompiler: compiler = SPARQL_FilterCompiler(filter, self._compile_filter_flags) if self.has_flags(self.DEBUG): compiler.set_flags(compiler.DEBUG) else: compiler.unset_flags(compiler.DEBUG) if self.has_flags(self.BEST_RANK): compiler.set_flags(compiler.BEST_RANK) else: compiler.unset_flags(compiler.BEST_RANK) compiler.compile() return compiler def _make_filter_query_vars_dict( self, q: SPARQL_Builder ) -> Mapping[str, TTrm]: return q.vars_dict( 'datatype', 'i', 'p', 'pname', 'property', 'ps', 'psv', 'qt_amount', 'qt_lower', 'qt_unit', 'qt_upper', 'subject', 'tm_calendar', 'tm_precision', 'tm_timezone', 'tm_value', 'value', 'wdno', 'wds', 'wdv', ) def _push_filters_as_values( self, q: SPARQL_Builder, t: Mapping[str, TTrm], filters: Iterable[tuple[int, Filter]] ) -> Mapping[str, TTrm]: values = q.values( t['i'], t['subject'], t['property'], t['pname'], t['p'], t['ps'], t['psv'], t['wdno'], t['value'], t['qt_amount'], t['qt_unit'], t['qt_lower'], t['qt_upper'], t['tm_value'], t['tm_precision'], t['tm_timezone'], t['tm_calendar']) with values: for i, filter in filters: self._push_filters_as_values_helper( q, values, filter, i) return t def _push_filters_as_values_helper( self, q: SPARQL_Builder, values: SPARQL_Builder.Values, filter: Filter, i: int ) -> SPARQL_Builder: p: TTrm = q.UNDEF pname: TTrm = q.UNDEF prop: TTrm = q.UNDEF ps: TTrm = q.UNDEF psv: TTrm = q.UNDEF qt_amount: TTrm = q.UNDEF qt_lower: TTrm = q.UNDEF qt_unit: TTrm = q.UNDEF qt_upper: TTrm = q.UNDEF subj: TTrm = q.UNDEF tm_calendar: TTrm = q.UNDEF tm_precision: TTrm = q.UNDEF tm_timezone: TTrm = q.UNDEF tm_value: TTrm = q.UNDEF val: TTrm = q.UNDEF wdno: TTrm = q.UNDEF subject, property, value, _ = filter._unpack_legacy() # Subject: if subject is not None and isinstance(subject, Entity): subj = subject # Property: if property is not None and isinstance(property, Property): prop_ = property name = NS.Wikidata.get_wikidata_name(prop_.iri.content) prop = prop_ pname = String(name) p = NS.P[name] ps = NS.PS[name] psv = NS.PSV[name] wdno = NS.WDNO[name] # Value: if value is not None and isinstance(value, Value): val = value if isinstance(value, DeepDataValue): if isinstance(value, Quantity): qt = value qt_amount = Quantity(qt.amount) if qt.unit is not None: qt_unit = qt.unit if qt.lower_bound is not None: qt_lower = Quantity(qt.lower_bound) if qt.upper_bound is not None: qt_upper = Quantity(qt.upper_bound) elif isinstance(value, Time): tm = value tm_value = tm if tm.precision is not None: tm_precision = tm.precision.value if tm.timezone is not None: tm_timezone = tm.timezone if tm.calendar is not None: tm_calendar = tm.calendar else: raise self._should_not_get_here() values.push( i, subj, prop, pname, p, ps, psv, wdno, val, qt_amount, qt_unit, qt_lower, qt_upper, tm_value, tm_precision, tm_timezone, tm_calendar) return q def _push_deep_data_value( self, q: SPARQL_Builder, t: Mapping[str, TTrm], value: DeepDataValue | None = None, wds: str = 'wds', psv: str = 'psv', wdv: str = 'wdv', qt_amount: str = 'qt_amount', qt_unit: str = 'qt_unit', qt_lower: str = 'qt_lower', qt_upper: str = 'qt_upper', tm_value: str = 'tm_value', tm_precision: str = 'tm_precision', tm_timezone: str = 'tm_timezone', tm_calendar: str = 'tm_calendar', ) -> SPARQL_Builder: assert value is None or isinstance(value, DeepDataValue) with q.union(cond=value is None) as cup: if value is None or isinstance(value, Quantity): cup.branch() q.triples( (t[wds], t[psv], t[wdv]), (t[wdv], NS.RDF.type, NS.WIKIBASE.QuantityValue), (t[wdv], NS.WIKIBASE.quantityAmount, t[qt_amount])) with q.optional( cond=(value is None or cast( Quantity, value).unit is None)): q.triple( t[wdv], NS.WIKIBASE.quantityUnit, t[qt_unit]) with q.optional( cond=(value is None or cast( Quantity, value).lower_bound is None)): q.triple( t[wdv], NS.WIKIBASE.quantityLowerBound, t[qt_lower]) with q.optional( cond=(value is None or cast( Quantity, value).upper_bound is None)): q.triple( t[wdv], NS.WIKIBASE.quantityUpperBound, t[qt_upper]) if value is None or isinstance(value, Time): cup.branch() q.triples( (t[wds], t[psv], t[wdv]), (t[wdv], NS.RDF.type, NS.WIKIBASE.TimeValue), (t[wdv], NS.WIKIBASE.timeValue, t[tm_value])) with q.optional( cond=(value is None or cast( Time, value).precision is None)): q.triple( t[wdv], NS.WIKIBASE.timePrecision, t[tm_precision]) with q.optional( cond=(value is None or cast( Time, value).timezone is None)): q.triple( t[wdv], NS.WIKIBASE.timeTimezone, t[tm_timezone]) with q.optional( cond=(value is None or cast( Time, value).calendar is None)): q.triple( t[wdv], NS.WIKIBASE.timeCalendarModel, t[tm_calendar]) return q def _push_some_value_filter( self, q: SPARQL_Builder, val: TTrm, negate: bool = False, ) -> SPARQL_Builder: cond = q.or_( q.isBlank(val), q.and_(q.isURI(val), q.strstarts( q.str_(val), String(str(NS.WDGENID))))) if negate: return q.filter(q.not_(cond)) else: return q.filter(cond) def _get_wdss( self, stmts: Iterable[Statement], force_cache_update: bool = False, retries: int = 1, _vars: tuple[str, ...] = ( '?i', '?property', '?qt_amount', '?qt_lower', '?qt_unit', '?qt_upper', '?subject', '?tm_calendar', '?tm_precision', '?tm_timezone', '?tm_value', '?value', '?wds', ) ) -> Iterator[tuple[Statement, Set[T_WDS] | None]]: for batch in self._batched(stmts): reduced_batch: list[Statement] = [] stmt2wdss: dict[Statement, MutableSet[T_WDS]] = {} for stmt in batch: wdss = self._cache_get_wdss(stmt) if not force_cache_update and wdss is not None: stmt2wdss[stmt] = cast(set, wdss) else: reduced_batch.append(stmt) if reduced_batch: q1, q2 = self._make_get_wdss_queries(enumerate(reduced_batch)) if q1 is not None: it1 = self._eval_select_query( q1, self._parse_get_wdss_results, vars=_vars) else: it1 = iter(()) if q2 is not None: it2 = self._eval_select_query( q2, self._parse_get_wdss_results, vars=_vars) else: it2 = iter(()) seen = set() for (stmt, wds, i) in itertools.chain(it1, it2): ### # FIXME: Compatibility with legacy queries. ### seen.add(i) stmti = reduced_batch[i] saved_stmti = stmti if isinstance(stmt.snak, (SomeValueSnak, NoValueSnak)): stmti = stmti.replace(stmti.KEEP, type(stmti.snak)( stmti.snak.property.replace(stmti.KEEP, None))) elif (isinstance(stmt.snak, ValueSnak) and isinstance(stmti.snak, ValueSnak) and isinstance(stmt.snak.value, String) and isinstance(stmti.snak.value, ExternalId)): stmti = stmti.replace( stmti.KEEP, stmti.snak.property.replace(stmti.KEEP, None)( stmti.snak.value.content)) else: self._should_not_get_here() if stmt != stmti: continue # nothing to do if stmt not in stmt2wdss: stmt2wdss[saved_stmti] = set() stmt2wdss[saved_stmti].add(wds) unseen = set(range(len(reduced_batch))) - seen if unseen and retries > 0: f = functools.partial(lambda t, i: t[i], reduced_batch) unseen_stmts = list(map(f, unseen)) LOG.debug( '%s(): retrying (%d left)', self._get_wdss.__qualname__, retries - 1) for stmt, wdss in self._get_wdss( unseen_stmts, force_cache_update, retries - 1): if wdss is not None: if stmt not in stmt2wdss: stmt2wdss[stmt] = set() for wds in wdss: stmt2wdss[stmt].add(wds) for stmt in batch: yield stmt, stmt2wdss.get(stmt, None) def _make_get_wdss_queries( self, stmts: Iterator[tuple[int, Statement]] ) -> tuple[SPARQL_Builder | None, SPARQL_Builder | None]: values: list[tuple[int, Statement]] = [] no_values: list[tuple[int, Statement]] = [] for i, stmt in stmts: if ((self.has_flags(self.VALUE_SNAK) and isinstance(stmt.snak, ValueSnak)) or (self.has_flags(self.SOME_VALUE_SNAK) and isinstance(stmt.snak, SomeValueSnak))): values.append((i, stmt)) elif (self.has_flags(self.NO_VALUE_SNAK) and isinstance(stmt.snak, NoValueSnak)): no_values.append((i, stmt)) ### # FIXME: Find a way to do this using a single query. ### if values: # q1 applies to (some) value stmts q1, t1 = self._make_get_wdss_query_start() with q1.union() as cup: if self.has_flags(self.VALUE_SNAK): cup.branch() q1.triple(t1['wds'], t1['ps'], t1['value']) with q1.optional(): self._push_deep_data_value(q1, t1) if self.has_flags(self.SOME_VALUE_SNAK): cup.branch() q1.triple(t1['wds'], t1['ps'], t1['value']) self._push_some_value_filter(q1, t1['value']) self._make_get_wdss_query_end(q1, t1, values) else: q1 = None if no_values: # q2 applies to no value stmts q2, t2 = self._make_get_wdss_query_start() q2.triple(t2['wds'], NS.RDF.type, t2['wdno']) self._make_get_wdss_query_end(q2, t2, no_values) else: q2 = None return q1, q2 def _make_get_wdss_query_start( self, ) -> tuple[SPARQL_Builder, Mapping[str, TTrm]]: q = SPARQL_Builder() t = self._make_filter_query_vars_dict(q) q.where_start() q.triples( (t['subject'], t['p'], t['wds']), (t['wds'], NS.WIKIBASE.rank, q.bnode())) if self.has_flags(self.BEST_RANK): q.triple(t['wds'], NS.RDF.type, NS.WIKIBASE.BestRank) return q, t def _make_get_wdss_query_end( self, q: SPARQL_Builder, t: Mapping[str, TTrm], stmts: Iterable[tuple[int, Statement]] ) -> None: self._push_filters_as_values(q, t, map( lambda x: (x[0], Filter.from_statement(x[1])), stmts)) q.where_end() def _parse_get_wdss_results( self, results: SPARQL_Results, ) -> Iterator[tuple[Statement, T_WDS, int] | None]: for entry in results.bindings: i = entry.check_integer('i') stmt = entry.check_statement( 'subject', 'property', 'value', 'qt_amount', 'qt_unit', 'qt_lower', 'qt_upper', 'tm_value', 'tm_precision', 'tm_timezone', 'tm_calendar') wds = entry.check_bnode_or_uriref('wds') yield stmt, wds, i # -- Annotations ----------------------------------------------------------- @override def _get_annotations( self, stmts: Iterable[Statement] ) -> Iterator[tuple[Statement, AnnotationRecordSet | None]]: for batch in self._batched(stmts): wds_batch: list[T_WDS] = [] stmt2wdss: dict[Statement, Set[T_WDS]] = {} wds2stmt: dict[T_WDS, Statement] = {} for stmt, wdss in self._get_wdss(batch): if wdss is not None: stmt2wdss[stmt] = wdss for wds in wdss: wds2stmt[wds] = stmt wds_batch.append(wds) if not wds_batch: for stmt in batch: yield stmt, None continue # empty batch, nothing to do q = self._make_get_annotations_query(set(wds_batch)) it = self._eval_select_query( q, self._parse_get_annotations_results) wds2rank: dict[T_WDS, Rank] = {} wds2quals: dict[T_WDS, set[Snak]] = {} wds2refs: dict[T_WDS, dict[T_WDS, set[Snak]]] = {} for wds, rank, wdref, snak in it: wds2rank[wds] = rank if snak is None: continue assert snak is not None if wdref is None: if wds not in wds2quals: wds2quals[wds] = set() if (isinstance(snak, NoValueSnak) and isinstance(wds2stmt[wds].snak, NoValueSnak) and snak.property.iri == wds2stmt[ wds].snak.property.iri): ### # IMPORTANT: The representation of NoValueSnak's # (via rdf:type) is ambiguous. To be considered a # qualifier, a NoValueSnak must be different from # the snak of the statement. ### pass else: wds2quals[wds].add(snak) else: if wds not in wds2refs: wds2refs[wds] = {} if wdref not in wds2refs[wds]: wds2refs[wds][wdref] = set() wds2refs[wds][wdref].add(snak) for stmt in batch: if stmt in stmt2wdss: wdss = stmt2wdss[stmt] annots = [] for wds in wdss: if wds not in wds2rank: continue quals = wds2quals.get(wds, set()) if wds in wds2refs: refs = set(itertools.starmap( ReferenceRecord, wds2refs[wds].values())) else: refs = set() annots.append(AnnotationRecord( quals, refs, wds2rank[wds])) yield stmt, AnnotationRecordSet(*annots) else: yield stmt, None def _parse_get_annotations_results( self, results: SPARQL_Results, ) -> Iterator[tuple[T_WDS, Rank, T_WDS | None, Snak | None] | None]: for entry in results.bindings: wds = entry.check_bnode_or_uriref('wds') rank = entry.check_rank('rank') if 'pq' in entry and 'qvalue' in entry: pq = entry.check_uriref('pq') if not NS.Wikidata.is_wikidata_uri(pq): yield None continue # ignore unexpected prefix ns, name = NS.Wikidata.split_wikidata_uri(pq) if ns == NS.PQN: yield None continue # ignore normalized value if ns != NS.PQ and ns != NS.PQV: yield None continue # ignore unexpected prefix if 'datatype' in entry['qvalue']: dt = entry['qvalue']['datatype'] if (dt == str(NS.XSD.dateTime) or dt == str(NS.XSD.decimal)): yield None continue # ignore simple value prop = Property(NS.WD[name]) snak = entry.check_snak( prop, 'qvalue', 'qt_amount', 'qt_unit', 'qt_lower', 'qt_upper', 'tm_value', 'tm_precision', 'tm_timezone', 'tm_calendar') yield wds, rank, None, snak elif 'pq' not in entry and 'qvalue' in entry: qvalue = entry.check_uriref('qvalue') if not NS.Wikidata.is_wikidata_uri(qvalue): yield None continue # ignore unexpected prefix ns, name = NS.Wikidata.split_wikidata_uri(qvalue) if ns != NS.WDNO: yield None continue # ignore unexpected prefix prop = Property(NS.WD[name]) yield wds, rank, None, NoValueSnak(prop) elif 'pr' in entry and 'rvalue' in entry and 'wdref' in entry: wdref = entry.check_uriref('wdref') pr = entry.check_uriref('pr') ns, name = NS.Wikidata.split_wikidata_uri(pr) if ns == NS.PRN: yield None continue # ignore normalized value if 'datatype' in entry['rvalue']: dt = entry['rvalue']['datatype'] if (dt == str(NS.XSD.dateTime) or dt == str(NS.XSD.decimal)): yield None continue # ignore simple value prop = Property(NS.WD[name]) snak = entry.check_snak( prop, 'rvalue', 'qt_amount', 'qt_unit', 'qt_lower', 'qt_upper', 'tm_value', 'tm_precision', 'tm_timezone', 'tm_calendar') yield wds, rank, wdref, snak else: yield wds, rank, None, None def _make_get_annotations_query( self, wdss: Collection[T_WDS] ) -> SPARQL_Builder: q = SPARQL_Builder() t = q.vars_dict( 'pq', 'pr', 'psv', 'qt_amount', 'qt_lower', 'qt_unit', 'qt_upper', 'qvalue', 'rank', 'rvalue', 'tm_calendar', 'tm_precision', 'tm_timezone', 'tm_value', 'wdref', 'wds', 'wdv', ) q.where_start() q.triple(t['wds'], NS.WIKIBASE.rank, t['rank']) with q.values(t['wds']) as values: for wds in wdss: values.push(cast(TTrm, wds)) if self.has_flags(self.BEST_RANK): q.triple(t['wds'], NS.RDF.type, NS.WIKIBASE.BestRank) q.optional_start() with q.union() as cup: # Qualifiers: cup.branch() q.triple(t['wds'], t['pq'], t['qvalue']) if self.has_flags(self.EARLY_FILTER): q.filter(q.strstarts(q.str_(t['pq']), String(str(NS.PQ)))) with q.optional(): self._push_deep_data_value( q, t, None, 'wds', 'pq', 'qvalue') cup.branch() q.triple(t['wds'], NS.RDF.type, t['qvalue']) if self.has_flags(self.EARLY_FILTER): q.filter(q.strstarts( q.str_(t['qvalue']), String(str(NS.WDNO)))) # References: cup.branch() q.triple(t['wds'], NS.PROV.wasDerivedFrom, t['wdref']) q.triple(t['wdref'], t['pr'], t['rvalue']) if self.has_flags(self.EARLY_FILTER): q.filter(q.strstarts(q.str_(t['pr']), String(str(NS.PR)))) with q.optional(): self._push_deep_data_value( q, t, None, 'wdref', 'pr', 'rvalue') q.optional_end() q.where_end() return q # -- Descriptors ----------------------------------------------------------- @override def _get_item_descriptor( self, items: Iterable[Item], language: str, mask: Descriptor.AttributeMask ) -> Iterator[tuple[Item, ItemDescriptor | None]]: return cast( Iterator[tuple[Item, Union[ItemDescriptor, None]]], self._get_item_or_property_descriptor(Item, items, language, mask)) @override def _get_property_descriptor( self, properties: Iterable[Property], language: str, mask: Descriptor.AttributeMask ) -> Iterator[tuple[Property, PropertyDescriptor | None]]: return cast( Iterator[tuple[Property, Union[PropertyDescriptor, None]]], self._get_item_or_property_descriptor( Property, properties, language, mask)) def _get_item_or_property_descriptor( self, cls: type[Entity], entities: Iterable[Item | Property], language: str, mask: Descriptor.AttributeMask ) -> Iterator[tuple[ Item | Property, ItemDescriptor | PropertyDescriptor | None]]: q = self._make_item_or_property_descriptor_query( set(entities), cls, language, mask) if q.has_variable(q.var('subject')): it = self._eval_select_query( q, lambda res: self._parse_get_item_or_property_descriptor_results( res, cls, language, mask)) else: it = iter(()) LOG.debug( '%s(): nothing to select:\n%s', self._filter.__qualname__, q.select()) desc: dict[IRI | Property, dict[str, Any]] = {} ### # FIXME: Compatibility with legacy queries. ### for entity, label, alias, description, datatype in it: if entity.iri not in desc: desc[entity.iri] = { 'label': None, 'aliases': [], 'description': None, 'datatype': None } if label is not None: desc[entity.iri]['label'] = label if alias is not None: desc[entity.iri]['aliases'].append(alias) if description is not None: desc[entity.iri]['description'] = description if datatype is not None: desc[entity.iri]['datatype'] = datatype for entity in entities: if entity.iri in desc: if isinstance(entity, Item) and cls is Item: yield (cast(Item, entity), ItemDescriptor( desc[entity.iri].get('label'), desc[entity.iri].get('aliases'), desc[entity.iri].get('description'))) elif isinstance(entity, Property) and cls is Property: yield (cast(Property, entity), PropertyDescriptor( desc[entity.iri].get('label'), desc[entity.iri].get('aliases'), desc[entity.iri].get('description'), desc[entity.iri].get('datatype'))) else: raise self._should_not_get_here() else: yield entity, None def _make_item_or_property_descriptor_query( self, entities: Collection[Item | Property], cls: type[Entity], lang: str, mask: Descriptor.AttributeMask ) -> SPARQL_Builder: q = SPARQL_Builder() t: Mapping[str, TTrm] = q.vars_dict( 'alias', 'datatype', 'description', 'label', 'subject') language = String(lang) if self.has_flags(self.EARLY_FILTER): get_label = bool(mask & Descriptor.LABEL) get_aliases = bool(mask & Descriptor.ALIASES) get_description = bool(mask & Descriptor.DESCRIPTION) get_datatype = cls is Property and bool(mask & Descriptor.DATATYPE) else: get_label = True get_aliases = True get_description = True get_datatype = cls is Property with q.where(): # We use schema:version check whether ?subject exists. with q.union() as cup: q.triple(t['subject'], NS.WIKIBASE.sitelinks, q.bnode()) cup.branch() q.triple(t['subject'], NS.RDF.type, NS.WIKIBASE.Property) if get_datatype: q.triple(t['subject'], NS.WIKIBASE.propertyType, t['datatype']) with q.optional(cond=get_label or get_aliases or get_description): with q.union() as cup: if get_label: q.triple(t['subject'], NS.RDFS.label, t['label']) q.filter(q.eq(q.lang(t['label']), language)) if get_aliases: cup.branch() q.triple( t['subject'], NS.SKOS.altLabel, t['alias']) q.filter(q.eq(q.lang(t['alias']), language)) if get_description: cup.branch() q.triple( t['subject'], NS.SCHEMA.description, t['description']) q.filter(q.eq(q.lang(t['description']), language)) with q.values(t['subject']) as values: for entity in entities: if isinstance(entity, cls): values.push(entity.iri) return q def _parse_get_item_or_property_descriptor_results( self, results: SPARQL_Results, cls: type[Entity], language: str, mask: Descriptor.AttributeMask ) -> Iterator[tuple[ Item | Property, Text | None, Text | None, Text | None, Datatype | None] | None]: if self.has_flags(self.LATE_FILTER): late_filter = True get_label = bool(mask & Descriptor.LABEL) get_aliases = bool(mask & Descriptor.ALIASES) get_description = bool(mask & Descriptor.DESCRIPTION) get_datatype = cls is Property and bool(mask & Descriptor.DATATYPE) else: late_filter = False get_label = True get_aliases = True get_description = True get_datatype = cls is Property for entry in results.bindings: if cls is Item: entity: Item | Property = entry.check_item('subject') elif cls is Property: entity = entry.check_property('subject') else: raise self._should_not_get_here() if get_datatype and 'datatype' in entry: datatype: Datatype | None = entry.check_datatype('datatype') else: datatype = None if get_label and 'label' in entry: label = entry.check_text('label') if label.language == language or not late_filter: yield entity, label, None, None, datatype continue # found label elif get_aliases and 'alias' in entry: alias = entry.check_text('alias') if alias.language == language or not late_filter: yield entity, None, alias, None, datatype continue # found alias elif get_description and 'description' in entry: description = entry.check_text('description') if description.language == language or not late_filter: yield entity, None, None, description, datatype continue # found description yield entity, None, None, None, datatype # fallback @override def _get_lexeme_descriptor( self, lexemes: Iterable[Lexeme], mask: Descriptor.AttributeMask ) -> Iterator[tuple[Lexeme, LexemeDescriptor | None]]: for batch in self._batched(lexemes): q = self._make_lexeme_descriptor_query(set(batch), mask) if q.has_variable(q.var('subject')): it = self._eval_select_query( q, lambda res: self._parse_get_lexeme_descriptor_results(res, mask)) else: it = iter(()) LOG.debug( '%s(): nothing to select:\n%s', self._filter.__qualname__, q.select()) desc: dict[Lexeme, dict[str, Any]] = {} for lexeme, lemma, category, language in it: desc[lexeme] = { 'lemma': lemma, 'category': category, 'language': language, } for lexeme in batch: if lexeme in desc: yield (lexeme, LexemeDescriptor( desc[lexeme]['lemma'], desc[lexeme]['category'], desc[lexeme]['language'] )) else: yield lexeme, None def _make_lexeme_descriptor_query( self, lexemes: Collection[Lexeme], mask: Descriptor.AttributeMask ) -> SPARQL_Builder: q = SPARQL_Builder() t: Mapping[str, TTrm] = q.vars_dict( 'lemma', 'category', 'language', 'subject') if self.has_flags(self.EARLY_FILTER): get_lemma = bool(mask & Descriptor.LEMMA) get_category = bool(mask & Descriptor.CATEGORY) get_language = bool(mask & Descriptor.LANGUAGE) else: get_lemma = True get_category = True get_language = True with q.where(): q.triple(t['subject'], NS.RDF.type, NS.ONTOLEX.LexicalEntry) if get_lemma: q.triple(t['subject'], NS.WIKIBASE.lemma, t['lemma']) if get_category: q.triple( t['subject'], NS.WIKIBASE.lexicalCategory, t['category']) if get_language: q.triple(t['subject'], NS.DCT.language, t['language']) with q.values(t['subject']) as values: for lexeme in lexemes: if isinstance(lexeme, Lexeme): values.push(lexeme.iri) return q def _parse_get_lexeme_descriptor_results( self, results: SPARQL_Results, mask: Descriptor.AttributeMask ) -> Iterator[tuple[Lexeme, Text | None, Item | None, Item | None]]: if self.has_flags(self.LATE_FILTER): get_lemma = bool(mask & Descriptor.LEMMA) get_category = bool(mask & Descriptor.CATEGORY) get_language = bool(mask & Descriptor.LANGUAGE) else: get_lemma = True get_category = True get_language = True for entry in results.bindings: subject = entry.check_lexeme('subject') if get_lemma and 'lemma' in entry: lemma = entry.check_text('lemma') else: lemma = None if get_category and 'category' in entry: category = entry.check_item('category') else:
category = None if get_language and 'language' in entry: language = entry.check_item('language') else: language = None yield subject, lemma, category, language