Source code for commons_api.wikidata.tasks.wikidata_item

import datetime

import celery
import itertools

from commons_api.wikidata import utils
from commons_api.wikidata.tasks.base import with_periodic_queuing_task, get_wikidata_model_by_name

__all__ = ['refresh_labels']


[docs]@with_periodic_queuing_task @celery.shared_task def refresh_labels(app_label, model, ids=None, queued_at=None): """Refreshes all labels for the given model""" queryset = get_wikidata_model_by_name(app_label, model).objects.all() if queued_at is not None: queryset = queryset.filter(refresh_labels_last_queued=queued_at) if ids is not None: queryset = queryset.objects.filter(id__in=ids) for items in utils.split_every(queryset, 250): items = {item.id: item for item in items} results = utils.templated_wikidata_query('wikidata/query/labels.rq', {'ids': sorted(items)}) for id, rows in itertools.groupby(results['results']['bindings'], key=lambda row: row['id']['value']): id = utils.item_uri_to_id(id) rows = list(rows) items[id].labels = {row['label']['xml:lang']: row['label']['value'] for row in rows} items[id].save()