Source code for commons_api.wikidata.utils

import itertools
from typing import Mapping

import re
import SPARQLWrapper
from django.conf import settings
from django.template.loader import get_template
from django.utils import translation

from . import models
from .namespaces import WD, WDS


def lang_dict(terms):
    return {term.language: str(term) for term in terms}

ITEM_URI_RE = re.compile('^' + re.escape(WD) + 'Q[1-9][0-9]*$')
STATEMENT_URI_RE = re.compile('^' + re.escape(WDS) + '[qQ][0-9A-Fa-f-]+$')
DATE_RE = re.compile('^[0-9]{4}-[01][0-9]-[0-3][0-9]$')

def item_uri_to_id(uri):
    if isinstance(uri, dict):
        uri = uri['value']
    if not ITEM_URI_RE.match(uri):
        raise ValueError("Not a Wikidata item URI: {!r}".format(uri))
    return uri[len(WD):]


def get_date(dt):
    if not dt:
        return None
    if isinstance(dt, dict):
        dt = dt['value']
    if not isinstance(dt, str):
        return None
    dt = dt[:10]
    if DATE_RE.match(dt):
        return dt


def statement_uri_to_id(uri):
    if isinstance(uri, dict):
        uri = uri['value']
    if not STATEMENT_URI_RE.match(uri):
        raise ValueError("Not a Wikidata statement URI: {!r}".format(uri))
    return uri[len(WDS):].upper()


def select_multilingual(data: Mapping[str,object],
                        default=None):
    """Selects a value from data based on user's language

    `data` should be keyed with language code `str`s. This function first tries the user's proferred language, falling
    back to `settings.DEFAULT_LANGUAGE` and then "en". If no appropriate value is found in `data`, `default` is
    returned."""
    languages_to_try = (translation.get_language(),
                        getattr(settings, 'DEFAULT_LANGUAGE', 'en'),
                        'en')
    for language in languages_to_try:
        try:
            return data[language]
        except KeyError:
            continue
    return default


def split_every(it, n):
    """Splits an iterable into sub-iterators each of maximum length n

    Each returned iterator needs consuming before the main iterator is
    iterated over again, otherwise the returned groups will be shorter
    than expected, or returned items returned out of order.
    """
    def group(first):
        yield first
        yield from itertools.islice(it, n-1)
    it = iter(it)
    while True:
        try:
            first = next(it)
        except StopIteration:
            break
        yield group(first)


[docs]def templated_wikidata_query(query_name, context): sparql = SPARQLWrapper.SPARQLWrapper(settings.WDQS_URL) sparql.setMethod(SPARQLWrapper.POST) sparql.setQuery(get_template(query_name).render(context)) sparql.setReturnFormat(SPARQLWrapper.JSON) return sparql.query().convert()