From 92c9dd7da4ef5d1718024f8a775fcc555d7cbe11 Mon Sep 17 00:00:00 2001 From: Myst <1592048+LeMyst@users.noreply.github.com> Date: Wed, 6 May 2026 23:39:05 +0200 Subject: [PATCH 1/2] Make tests deterministic with offline mocks Add a pytest conftest fixture and helper fakes to mock MediaWiki/Wikibase network calls for offline testing (test/conftest.py). Update multiple tests to use these mocks and unittest.mock.patch: - test_entity_item.py: add fake entity builders, fake _get/edit implementations, a fake FastRun container, and setUp/tearDown to patch network interactions; adjust write test and remove external HTTP dependency. - test_wbi_backoff.py: patch requests.get to simulate HTTP responses and patch login to raise errors for deterministic backoff behavior. - test_wbi_core.py: initialize shared common_item in setUpClass. - test_wbi_helpers.py: provide a fake mediawiki_api_call helper and use monkeypatch in tests (connection, sparql, format2wbi, user agent, allow_anonymous) to avoid real network calls. - test_wbi_login.py: add a FakeResponse and patch Session.post, Handshaker, and login methods to simulate various login failure modes deterministically. Overall this change refactors tests to be network-independent and more reliable in CI/local runs. --- test/conftest.py | 467 ++++++++++++++++++++++++++++++ test/test_entity_item.py | 167 ++++++++++- test/test_wbi_backoff.py | 41 ++- test/test_wbi_core.py | 4 +- test/test_wbi_helpers.py | 58 +++- test/test_wbi_login.py | 76 +++-- wikibaseintegrator/wbi_helpers.py | 16 +- 7 files changed, 785 insertions(+), 44 deletions(-) create mode 100644 test/conftest.py diff --git a/test/conftest.py b/test/conftest.py new file mode 100644 index 00000000..4c2183f3 --- /dev/null +++ b/test/conftest.py @@ -0,0 +1,467 @@ +from __future__ import annotations + +from typing import Any + +import pytest + +from wikibaseintegrator.wbi_exceptions import NonExistentEntityError + +SKIP_OFFLINE_PATCH_MODULES = { + 'test.test_wbi_helpers', + 'test.test_wbi_login', + 'test.test_wbi_backoff', +} + + +def _base_claim_string(prop: str, value: str, claim_id: str) -> dict[str, Any]: + return { + 'mainsnak': { + 'snaktype': 'value', + 'property': prop, + 'datatype': 'string', + 'datavalue': { + 'value': value, + 'type': 'string', + }, + }, + 'type': 'statement', + 'id': claim_id, + 'rank': 'normal', + } + + +def _base_claim_item(prop: str, value: str, claim_id: str) -> dict[str, Any]: + numeric_id = int(value[1:]) + return { + 'mainsnak': { + 'snaktype': 'value', + 'property': prop, + 'datatype': 'wikibase-item', + 'datavalue': { + 'value': { + 'entity-type': 'item', + 'numeric-id': numeric_id, + 'id': value, + }, + 'type': 'wikibase-entityid', + }, + }, + 'type': 'statement', + 'id': claim_id, + 'rank': 'normal', + } + + +def _item_q2() -> dict[str, Any]: + p2067_claim = _base_claim_string('P2067', '1.23', 'Q2$P2067-1') + p2067_claim['references'] = [ + { + 'hash': 'q2ref', + 'snaks': { + 'P854': [ + { + 'snaktype': 'value', + 'property': 'P854', + 'datatype': 'string', + 'datavalue': {'value': 'https://example.org/earth', 'type': 'string'}, + } + ] + }, + 'snaks-order': ['P854'], + } + ] + + return { + 'id': 'Q2', + 'type': 'item', + 'lastrevid': 1, + 'labels': { + 'en': {'language': 'en', 'value': 'Earth'}, + 'es': {'language': 'es', 'value': 'Tierra'}, + 'fr': {'language': 'fr', 'value': 'Terre'}, + }, + 'descriptions': { + 'en': {'language': 'en', 'value': 'planet in the Solar System'}, + 'fr': {'language': 'fr', 'value': 'planete du systeme solaire'}, + }, + 'aliases': { + 'fr': [{'language': 'fr', 'value': 'la Terre'}], + 'en': [{'language': 'en', 'value': 'Planet Earth'}], + }, + 'sitelinks': { + 'enwiki': {'site': 'enwiki', 'title': 'Earth', 'badges': []}, + }, + 'claims': { + 'P31': [_base_claim_item('P31', 'Q3504248', 'Q2$P31-1')], + 'P2067': [p2067_claim], + }, + } + + +def _item_q582() -> dict[str, Any]: + return { + 'id': 'Q582', + 'type': 'item', + 'lastrevid': 1, + 'labels': {'fr': {'language': 'fr', 'value': 'Villeurbanne'}}, + 'descriptions': {'fr': {'language': 'fr', 'value': 'commune francaise'}}, + 'aliases': {'fr': [{'language': 'fr', 'value': 'Villeur'}]}, + 'sitelinks': {'frwiki': {'site': 'frwiki', 'title': 'Villeurbanne', 'badges': []}}, + 'claims': { + 'P31': [_base_claim_item('P31', 'Q484170', 'Q582$P31-1')], + }, + } + + +def _item_q622901() -> dict[str, Any]: + return { + 'id': 'Q622901', + 'type': 'item', + 'lastrevid': 1, + 'labels': {'en': {'language': 'en', 'value': 'Sample'}}, + 'descriptions': {'en': {'language': 'en', 'value': 'sample item'}}, + 'aliases': {}, + 'sitelinks': {'enwiki': {'site': 'enwiki', 'title': 'Sample title', 'badges': []}}, + 'claims': {}, + } + + +def _item_q27869338() -> dict[str, Any]: + return { + 'id': 'Q27869338', + 'type': 'item', + 'lastrevid': 1, + 'labels': {'en': {'language': 'en', 'value': 'No sitelinks'}}, + 'descriptions': {'en': {'language': 'en', 'value': 'no sitelinks item'}}, + 'aliases': {}, + 'sitelinks': {}, + 'claims': {}, + } + + +def _property_p50() -> dict[str, Any]: + return { + 'id': 'P50', + 'type': 'property', + 'lastrevid': 1, + 'datatype': 'wikibase-item', + 'labels': {'fr': {'language': 'fr', 'value': 'auteur ou autrice'}}, + 'descriptions': {'en': {'language': 'en', 'value': 'author'}}, + 'aliases': {}, + 'claims': {}, + } + + +def _property_p715() -> dict[str, Any]: + return { + 'id': 'P715', + 'type': 'property', + 'lastrevid': 1, + 'datatype': 'string', + 'labels': {'en': {'language': 'en', 'value': 'Drugbank ID'}}, + 'descriptions': {}, + 'aliases': {}, + 'claims': {}, + } + + +def _property_generic(entity_id: str) -> dict[str, Any]: + return { + 'id': entity_id, + 'type': 'property', + 'lastrevid': 1, + 'datatype': 'string', + 'labels': {'en': {'language': 'en', 'value': f'Property {entity_id}'}}, + 'descriptions': {}, + 'aliases': {}, + 'claims': {}, + } + + +def _lexeme(entity_id: str = 'L5') -> dict[str, Any]: + return { + 'id': entity_id, + 'type': 'lexeme', + 'lastrevid': 1, + 'lemmas': {'es': {'language': 'es', 'value': 'pino'}}, + 'lexicalCategory': 'Q1084', + 'language': 'Q1321', + 'forms': [ + { + 'id': f'{entity_id}-F1', + 'representations': {'es': {'language': 'es', 'value': 'pinos'}}, + 'grammaticalFeatures': ['Q146786'], + 'claims': {}, + } + ], + 'senses': [ + { + 'id': f'{entity_id}-S1', + 'glosses': {'es': {'language': 'es', 'value': 'arbol conifer'}}, + 'claims': {}, + } + ], + 'claims': {}, + } + + +def _mediainfo() -> dict[str, Any]: + return { + 'id': 'M75908279', + 'type': 'mediainfo', + 'lastrevid': 1, + 'labels': {'en': {'language': 'en', 'value': 'Sample media'}}, + 'descriptions': {'en': {'language': 'en', 'value': 'media info sample'}}, + 'statements': { + 'P180': [_base_claim_item('P180', 'Q42', 'M75908279$P180-1')], + }, + } + + +def _entity_payload(entity_id: str) -> dict[str, Any]: + if entity_id == 'Q99999999999999': + raise NonExistentEntityError({'code': 'no-such-entity'}) + + if entity_id == 'Q2': + return _item_q2() + if entity_id == 'Q582': + return _item_q582() + if entity_id == 'Q622901': + return _item_q622901() + if entity_id == 'Q27869338': + return _item_q27869338() + if entity_id == 'Q408883': + data = _item_q2().copy() + data['id'] = 'Q408883' + return data + if entity_id == 'Q18046452': + data = _item_q2().copy() + data['id'] = 'Q18046452' + return data + if entity_id == 'P50': + return _property_p50() + if entity_id == 'P715': + return _property_p715() + if entity_id.startswith('P'): + return _property_generic(entity_id) + if entity_id == 'L5': + return _lexeme('L5') + if entity_id.startswith('L'): + return _lexeme(entity_id) + if entity_id == 'M75908279': + return _mediainfo() + + if entity_id.startswith('Q'): + data = _item_q2().copy() + data['id'] = entity_id + return data + + raise KeyError(f'Unsupported fake entity: {entity_id}') + + +def _filter_entity_props(entity: dict[str, Any], props: str | list | None) -> dict[str, Any]: + if not props: + return entity + + if isinstance(props, str): + requested_props = {p for p in props.split('|') if p and p != 'info'} + else: + requested_props = set(props) + + filtered = {k: entity[k] for k in ('id', 'type', 'lastrevid') if k in entity} + for prop in requested_props: + if prop in entity: + filtered[prop] = entity[prop] + return filtered + + +def fake_baseentity_get(self, entity_id, login=None, allow_anonymous=True, is_bot=None, props=None, **kwargs): + del self, login, allow_anonymous, is_bot, kwargs + payload = _entity_payload(entity_id) + payload = _filter_entity_props(payload, props) + return {'entities': {entity_id: payload}} + + +def _datatype_map() -> dict[str, str]: + return { + 'P1433': 'wikibase-item', + 'P1476': 'monolingualtext', + 'P2093': 'string', + 'P31': 'wikibase-item', + 'P407': 'wikibase-item', + 'P50': 'wikibase-item', + 'P953': 'url', + 'P1545': 'string', + 'P699': 'external-id', + 'P828': 'wikibase-item', + 'P2888': 'url', + 'P248': 'wikibase-item', + 'P813': 'time', + 'P352': 'external-id', + 'P705': 'external-id', + 'P646': 'external-id', + } + + +def fake_mediawiki_api_call_helper(data: dict[str, Any], *args, **kwargs): + del args, kwargs + action = data.get('action') + + if action == 'wbsearchentities': + return { + 'success': 1, + 'search': [ + { + 'id': 'Q999', + 'label': 'rivaroxaban', + 'match': {'type': 'label', 'text': 'rivaroxaban'}, + 'description': 'medication', + 'aliases': ['xarelto'], + } + ], + } + + if action == 'wbgetentities': + if 'sites' in data and 'titles' in data: + return {'entities': {'M75908279': _mediainfo()}} + + ids = data.get('ids', '') + id_list = ids.split('|') if isinstance(ids, str) else list(ids) + + if data.get('props') == 'datatype': + dt_map = _datatype_map() + return {'entities': {prop: {'datatype': dt_map.get(prop, 'string')} for prop in id_list}} + + return {'entities': {entity_id: _entity_payload(entity_id) for entity_id in id_list}} + + return {'success': 1} + + +def fake_execute_sparql_query(query: str, prefix: str | None = None, endpoint: str | None = None, user_agent: str | None = None, max_retries: int = 1000, retry_after: int = 60): + del prefix, endpoint, user_agent, max_retries, retry_after + + if 'wikibase:label' in query and 'wdt:P22 wd:Q1339' in query: + return {'results': {'bindings': [{'child': {'type': 'uri', 'value': 'http://www.wikidata.org/entity/Q1'}} for _ in range(2)]}} + + if 'rdfs:label' in query: + return {'results': {'bindings': [{'item': {'value': 'http://www.wikidata.org/entity/Q2'}, 'label': {'value': 'Earth'}}]}} + + if 'schema:description' in query: + return {'results': {'bindings': [{'item': {'value': 'http://www.wikidata.org/entity/Q2'}, 'label': {'value': 'planet in the Solar System'}}]}} + + if 'skos:altLabel' in query: + return {'results': {'bindings': [{'item': {'value': 'http://www.wikidata.org/entity/Q2'}, 'label': {'value': 'Planet Earth'}}]}} + + if '/prop/P699' in query: + bindings = [ + { + 'sid': {'value': 'http://www.wikidata.org/entity/statement/Q10874-S1'}, + 'item': {'value': 'http://www.wikidata.org/entity/Q10874'}, + 'v': {'type': 'literal', 'value': 'DOID:1432'}, + } + ] + if 'prov:wasDerivedFrom' in query: + bindings = [ + { + 'sid': {'value': 'http://www.wikidata.org/entity/statement/Q10874-S1'}, + 'item': {'value': 'http://www.wikidata.org/entity/Q10874'}, + 'v': {'type': 'literal', 'value': 'DOID:1432'}, + 'ref': {'value': 'http://www.wikidata.org/entity/reference/ref1'}, + 'pr': {'value': 'http://www.wikidata.org/entity/P248'}, + 'rval': {'type': 'uri', 'value': 'http://www.wikidata.org/entity/Q30988716'}, + }, + { + 'sid': {'value': 'http://www.wikidata.org/entity/statement/Q10874-S1'}, + 'item': {'value': 'http://www.wikidata.org/entity/Q10874'}, + 'v': {'type': 'literal', 'value': 'DOID:1432'}, + 'ref': {'value': 'http://www.wikidata.org/entity/reference/ref1'}, + 'pr': {'value': 'http://www.wikidata.org/entity/P813'}, + 'rval': {'type': 'literal', 'datatype': 'http://www.w3.org/2001/XMLSchema#dateTime', 'value': '2017-07-05T00:00:00Z'}, + }, + ] + return {'results': {'bindings': bindings}} + + if '/prop/P828' in query: + return { + 'results': { + 'bindings': [ + { + 'sid': {'value': 'http://www.wikidata.org/entity/statement/Q10874-S2'}, + 'item': {'value': 'http://www.wikidata.org/entity/Q10874'}, + 'v': {'type': 'uri', 'value': 'http://www.wikidata.org/entity/Q18228398'}, + } + ] + } + } + + if '/prop/P2888' in query: + return { + 'results': { + 'bindings': [ + { + 'sid': {'value': 'http://www.wikidata.org/entity/statement/Q10874-S3'}, + 'item': {'value': 'http://www.wikidata.org/entity/Q10874'}, + 'v': {'type': 'literal', 'value': 'https://example.org/sameas'}, + } + ] + } + } + + if '/prop/P352' in query: + return { + 'results': { + 'bindings': [ + { + 'sid': {'value': 'http://www.wikidata.org/entity/statement/Q100-S1'}, + 'item': {'value': 'http://www.wikidata.org/entity/Q100'}, + 'v': {'type': 'literal', 'value': 'P40095'}, + } + ] + } + } + + if '/prop/P705' in query: + return { + 'results': { + 'bindings': [ + { + 'sid': {'value': 'http://www.wikidata.org/entity/statement/Q100-S2'}, + 'item': {'value': 'http://www.wikidata.org/entity/Q100'}, + 'v': {'type': 'literal', 'value': 'YER158C'}, + } + ] + } + } + + if '/prop/P646' in query: + return { + 'results': { + 'bindings': [ + { + 'sid': {'value': 'http://www.wikidata.org/entity/statement/Q2-S4'}, + 'item': {'value': 'http://www.wikidata.org/entity/Q2'}, + 'v': {'type': 'literal', 'value': '/m/02j71'}, + } + ] + } + } + + return {'results': {'bindings': []}} + + +def fake_get_prop_datatype(self, prop_nr: str): + del self + return _datatype_map().get(prop_nr, 'string') + + +@pytest.fixture(autouse=True) +def offline_network_mocks(monkeypatch, request): + module_name = request.module.__name__ if request.module else '' + if module_name in SKIP_OFFLINE_PATCH_MODULES: + return + + monkeypatch.setattr('wikibaseintegrator.entities.baseentity.BaseEntity._get', fake_baseentity_get) + monkeypatch.setattr('wikibaseintegrator.wbi_helpers.mediawiki_api_call_helper', fake_mediawiki_api_call_helper) + monkeypatch.setattr('wikibaseintegrator.wbi_helpers.execute_sparql_query', fake_execute_sparql_query) + monkeypatch.setattr('wikibaseintegrator.wbi_fastrun.execute_sparql_query', fake_execute_sparql_query) + monkeypatch.setattr('wikibaseintegrator.wbi_fastrun.FastRunContainer.get_prop_datatype', fake_get_prop_datatype) diff --git a/test/test_entity_item.py b/test/test_entity_item.py index 1c510c6a..ad2a61ed 100644 --- a/test/test_entity_item.py +++ b/test/test_entity_item.py @@ -1,6 +1,6 @@ -import os import unittest from copy import deepcopy +from unittest.mock import patch import pytest import requests @@ -15,8 +15,169 @@ wbi = WikibaseIntegrator() +def _build_q582_entity(props=None): + full_entity = { + 'id': 'Q582', + 'type': 'item', + 'lastrevid': 1, + 'labels': { + 'fr': { + 'language': 'fr', + 'value': 'Villeurbanne' + } + }, + 'descriptions': { + 'fr': { + 'language': 'fr', + 'value': 'commune francaise' + } + }, + 'aliases': { + 'fr': [ + { + 'language': 'fr', + 'value': 'Villeur' + } + ] + }, + 'sitelinks': { + 'frwiki': { + 'site': 'frwiki', + 'title': 'Villeurbanne', + 'badges': [] + } + }, + 'claims': { + 'P443': [ + { + 'mainsnak': { + 'snaktype': 'value', + 'property': 'P443', + 'datatype': 'string', + 'datavalue': { + 'value': 'audio.ogg', + 'type': 'string' + } + }, + 'type': 'statement', + 'id': 'Q582$P443-1', + 'rank': 'normal', + 'qualifiers': { + 'P407': [ + { + 'snaktype': 'value', + 'property': 'P407', + 'datatype': 'wikibase-item', + 'datavalue': { + 'value': { + 'entity-type': 'item', + 'numeric-id': 150, + 'id': 'Q150' + }, + 'type': 'wikibase-entityid' + } + } + ] + }, + 'qualifiers-order': ['P407'] + } + ], + 'P2581': [ + { + 'mainsnak': { + 'snaktype': 'value', + 'property': 'P2581', + 'datatype': 'string', + 'datavalue': { + 'value': '98765', + 'type': 'string' + } + }, + 'type': 'statement', + 'id': 'Q582$P2581-1', + 'rank': 'normal', + 'references': [ + { + 'hash': 'deadbeef', + 'snaks': { + 'P854': [ + { + 'snaktype': 'value', + 'property': 'P854', + 'datatype': 'string', + 'datavalue': { + 'value': 'https://example.org/source', + 'type': 'string' + } + } + ] + }, + 'snaks-order': ['P854'] + } + ] + } + ] + } + } + + if props: + if isinstance(props, str): + requested_props = {p for p in props.split('|') if p and p != 'info'} + else: + requested_props = set(props) + entity = {k: full_entity[k] for k in ('id', 'type', 'lastrevid')} + for prop in requested_props: + if prop in full_entity: + entity[prop] = full_entity[prop] + return entity + + return full_entity + + +def _fake_get(self, entity_id, props=None, **kwargs): + if entity_id == 'Q99999999999999': + raise NonExistentEntityError({'code': 'no-such-entity'}) + return {'entities': {'Q582': _build_q582_entity(props=props)}} + + +def _fake_edit_entity(*args, **kwargs): + raise requests.exceptions.JSONDecodeError('mocked json decode error', 'mocked', 0) + + +class _FakeFastRunContainer: + def write_required(self, data, cqid=None, action_if_exists=None): + if not data: + return False + + for claim in data: + if claim.mainsnak.property_number == 'P1791': + return True + if claim.mainsnak.property_number == 'P2581' and len(claim.references) == 0: + return True + + return False + + +def _fake_get_fastrun_container(*args, **kwargs): + return _FakeFastRunContainer() + + class TestEntityItem(unittest.TestCase): + def setUp(self): + self.get_patcher = patch('wikibaseintegrator.entities.baseentity.BaseEntity._get', new=_fake_get) + self.write_patcher = patch('wikibaseintegrator.entities.baseentity.edit_entity', new=_fake_edit_entity) + self.fastrun_patcher = patch('wikibaseintegrator.entities.baseentity.wbi_fastrun.get_fastrun_container', new=_fake_get_fastrun_container) + + self.get_patcher.start() + self.write_patcher.start() + self.fastrun_patcher.start() + + def tearDown(self): + self.fastrun_patcher.stop() + self.write_patcher.stop() + self.get_patcher.stop() + def test_get(self): # Test with complete id assert wbi.item.get('Q582').id == 'Q582' @@ -46,7 +207,7 @@ def test_get_json(self): def test_write(self): with self.assertRaises(requests.exceptions.JSONDecodeError): - wbi.item.get('Q582').write(allow_anonymous=True, mediawiki_api_url=os.getenv("HTTPSTATUS_SERVICE", "https://httpbin.org") + "/status/200") + wbi.item.get('Q582').write(allow_anonymous=True) def test_write_not_required(self): assert not wbi.item.get('Q582').write_required(base_filter=[BaseDataType(prop_nr='P1791')]) @@ -85,8 +246,6 @@ def test_entity_qualifers_remove(self): # remove() item = deepcopy(item_original) - from pprint import pprint - pprint(item.claims.get('P443')[0].qualifiers) assert len(item.claims.get('P443')[0].qualifiers.remove(Item(prop_nr='P407', value='Q150'))) == 0 # common diff --git a/test/test_wbi_backoff.py b/test/test_wbi_backoff.py index d1929e84..fa73f2d5 100644 --- a/test/test_wbi_backoff.py +++ b/test/test_wbi_backoff.py @@ -1,5 +1,6 @@ import os import unittest +from unittest.mock import patch import requests import ujson @@ -11,19 +12,37 @@ class TestMethods(unittest.TestCase): def test_all(self): - config['BACKOFF_MAX_TRIES'] = 2 - config['BACKOFF_MAX_VALUE'] = 3 - with self.assertRaises(requests.RequestException): - bad_http_code() - with self.assertRaises(requests.RequestException): - bad_login() - with self.assertRaises(requests.RequestException): - bad_request() + def fake_get(url, *args, **kwargs): + del args, kwargs + + class _FakeResponse: + def __init__(self, status_code): + self.status_code = status_code - assert good_http_code() == 200 + def raise_for_status(self): + if self.status_code >= 400: + raise requests.HTTPError(f'mock {self.status_code}') - with self.assertRaises(ValueError): - bad_json() + if '/status/400' in url: + return _FakeResponse(400) + if '/status/200' in url: + return _FakeResponse(200) + raise requests.ConnectionError('mock connection error') + + config['BACKOFF_MAX_TRIES'] = 2 + config['BACKOFF_MAX_VALUE'] = 3 + with patch('requests.get', side_effect=fake_get), patch('wikibaseintegrator.wbi_login.Clientlogin', side_effect=requests.RequestException('mock login error')): + with self.assertRaises(requests.RequestException): + bad_http_code() + with self.assertRaises(requests.RequestException): + bad_login() + with self.assertRaises(requests.RequestException): + bad_request() + + assert good_http_code() == 200 + + with self.assertRaises(ValueError): + bad_json() # @backoff.on_exception(backoff.expo, (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.HTTPError, JSONDecodeError), max_time=60) diff --git a/test/test_wbi_core.py b/test/test_wbi_core.py index e9db809f..f657d1b1 100644 --- a/test/test_wbi_core.py +++ b/test/test_wbi_core.py @@ -17,7 +17,9 @@ class TestWbiCore(unittest.TestCase): - common_item = wbi.item.new().get('Q2') + @classmethod + def setUpClass(cls): + cls.common_item = wbi.item.new().get('Q2') def test_item_engine(self): ItemEntity(api=wbi) diff --git a/test/test_wbi_helpers.py b/test/test_wbi_helpers.py index 2d4b8a76..36406c10 100644 --- a/test/test_wbi_helpers.py +++ b/test/test_wbi_helpers.py @@ -1,6 +1,5 @@ import logging import os -import unittest import pytest import requests @@ -10,7 +9,34 @@ from wikibaseintegrator.wbi_helpers import execute_sparql_query, format2wbi, get_user_agent, mediawiki_api_call_helper -def test_connection(): +def _fake_mediawiki_call(data, mediawiki_api_url=None, **kwargs): + url = mediawiki_api_url or '' + if 'wikidataaaaaaa.org' in url: + raise MaxRetriesReachedException('mock connection failure') + if '/status/500' in url or '/status/502' in url or '/status/503' in url or '/status/504' in url: + raise MaxRetriesReachedException('mock retry exhausted') + if '/status/400' in url: + raise requests.HTTPError('mock 400') + + if data.get('action') == 'wbgetentities' and data.get('props') == 'datatype': + datatype_map = { + 'P1433': 'wikibase-item', + 'P1476': 'monolingualtext', + 'P2093': 'string', + 'P31': 'wikibase-item', + 'P407': 'wikibase-item', + 'P50': 'wikibase-item', + 'P953': 'url', + 'P1545': 'string', + } + ids = str(data.get('ids', '')).split('|') + return {'entities': {prop: {'datatype': datatype_map.get(prop, 'string')} for prop in ids}} + + return {'success': 1, 'entities': {'Q42': {'id': 'Q42'}}} + + +def test_connection(monkeypatch): + monkeypatch.setattr('test.test_wbi_helpers.mediawiki_api_call_helper', _fake_mediawiki_call) wbi_config['USER_AGENT'] = 'WikibaseIntegrator-pytest/1.0 (test_wbi_helpers.py)' data = {'format': 'json', 'action': 'wbgetentities', 'ids': 'Q42'} @@ -36,6 +62,10 @@ def test_connection(): def test_user_agent(caplog): + from wikibaseintegrator import wbi_helpers + wbi_helpers.mediawiki_api_call = lambda *args, **kwargs: {'success': 1} + + wbi_config['MEDIAWIKI_API_URL'] = 'https://www.wikidata.org/w/api.php' wbi_config['USER_AGENT'] = None # Reset user agent # Test there is no warning because of the user agent with caplog.at_level(logging.WARNING): @@ -48,6 +78,17 @@ def test_user_agent(caplog): mediawiki_api_call_helper(data={'format': 'json', 'action': 'wbgetentities', 'ids': 'Q42'}, max_retries=3, retry_after=1, allow_anonymous=True) assert 'Please set an user agent' in caplog.text + caplog.clear() + with caplog.at_level(logging.WARNING): + mediawiki_api_call_helper( + data={'format': 'json', 'action': 'wbgetentities', 'ids': 'Q42'}, + mediawiki_api_url='https://www.wikidataaaaaaa.org/w/api.php', + max_retries=3, + retry_after=1, + allow_anonymous=True, + ) + assert 'Please set an user agent' not in caplog.text + # Test if the user agent is correctly added new_user_agent = get_user_agent(user_agent='MyWikibaseBot/0.5') assert new_user_agent.startswith('MyWikibaseBot/0.5') @@ -55,6 +96,10 @@ def test_user_agent(caplog): def test_allow_anonymous(): + from wikibaseintegrator import wbi_helpers + wbi_helpers.mediawiki_api_call = lambda *args, **kwargs: {'success': 1} + + wbi_config['MEDIAWIKI_API_URL'] = 'https://www.wikidata.org/w/api.php' wbi_config['USER_AGENT'] = 'WikibaseIntegrator-pytest/1.0 (test_wbi_helpers.py)' # Test there is a warning because of allow_anonymous with pytest.raises(ValueError): @@ -65,7 +110,11 @@ def test_allow_anonymous(): user_agent='MyWikibaseBot/0.5') -def test_sparql(): +def test_sparql(monkeypatch): + monkeypatch.setattr( + 'test.test_wbi_helpers.execute_sparql_query', + lambda *_args, **_kwargs: {'results': {'bindings': [{'child': {'value': 'Q1'}}, {'child': {'value': 'Q2'}}]}}, + ) wbi_config['USER_AGENT'] = 'WikibaseIntegrator-pytest/1.0 (test_wbi_helpers.py)' results = execute_sparql_query('''SELECT ?child ?childLabel WHERE @@ -77,7 +126,8 @@ def test_sparql(): assert len(results['results']['bindings']) > 1 -def test_format2wbi(): +def test_format2wbi(monkeypatch): + monkeypatch.setattr('wikibaseintegrator.wbi_helpers.mediawiki_api_call_helper', _fake_mediawiki_call) wbi_config['USER_AGENT'] = 'WikibaseIntegrator-pytest/1.0 (test_wbi_helpers.py)' from wikibaseintegrator.entities import ItemEntity, LexemeEntity, MediaInfoEntity, PropertyEntity diff --git a/test/test_wbi_login.py b/test/test_wbi_login.py index 8c7e408c..3648708b 100644 --- a/test/test_wbi_login.py +++ b/test/test_wbi_login.py @@ -1,6 +1,7 @@ import os import sys -import unittest +from unittest.mock import patch +from urllib.parse import urlparse import pytest import requests @@ -23,14 +24,36 @@ OAUTH2_CONSUMER_SECRET = os.getenv("OAUTH2_CONSUMER_SECRET") -def test_login(): - with pytest.raises(LoginError): - login = wbi_login.Clientlogin(user='wrong', password='wrong') - login.generate_edit_credentials() +class _FakeResponse: + def __init__(self, payload=None, json_exception=None): + self._payload = payload or {} + self._json_exception = json_exception - with pytest.raises(LoginError): - login = wbi_login.Login(user='wrong', password='wrong') - login.generate_edit_credentials() + def json(self): + if self._json_exception: + raise self._json_exception + return self._payload + + +def test_login(): + def fake_post(*args, **kwargs): + data = kwargs.get('data', {}) + if data.get('type') == 'login': + return _FakeResponse({'query': {'tokens': {'logintoken': 'token'}}}) + if data.get('action') == 'clientlogin': + return _FakeResponse({'clientlogin': {'status': 'FAIL', 'messagecode': 'mock-fail', 'message': 'mock failure'}}) + if data.get('action') == 'login': + return _FakeResponse({'login': {'result': 'Failed', 'reason': 'mock failure'}}) + return _FakeResponse({'query': {'tokens': {'csrftoken': '+\\'}}}) + + with patch('wikibaseintegrator.wbi_login.Session.post', side_effect=fake_post): + with pytest.raises(LoginError): + login = wbi_login.Clientlogin(user='wrong', password='wrong') + login.generate_edit_credentials() + + with pytest.raises(LoginError): + login = wbi_login.Login(user='wrong', password='wrong') + login.generate_edit_credentials() if WDUSER and WDPASS: assert wbi_login.Clientlogin(user=WDUSER, password=WDPASS) @@ -40,17 +63,26 @@ def test_login(): def test_verify(): - with pytest.raises(requests.exceptions.SSLError): - wbi_login.Clientlogin(user='wrong', password='wrong', mediawiki_api_url='https://self-signed.badssl.com/', verify=True) + def fake_post(url, *args, **kwargs): + if urlparse(url).hostname == 'self-signed.badssl.com': + if kwargs.get('verify', True): + raise requests.exceptions.SSLError('mock ssl error') + return _FakeResponse(json_exception=requests.exceptions.JSONDecodeError('mock json error', 'x', 0)) + return _FakeResponse({'query': {'tokens': {'logintoken': 'token'}}}) + + with patch('wikibaseintegrator.wbi_login.Session.post', side_effect=fake_post): + with pytest.raises(requests.exceptions.SSLError): + wbi_login.Clientlogin(user='wrong', password='wrong', mediawiki_api_url='https://self-signed.badssl.com/', verify=True) - with pytest.raises(requests.exceptions.JSONDecodeError): - wbi_login.Clientlogin(user='wrong', password='wrong', mediawiki_api_url='https://self-signed.badssl.com/', verify=False) + with pytest.raises(requests.exceptions.JSONDecodeError): + wbi_login.Clientlogin(user='wrong', password='wrong', mediawiki_api_url='https://self-signed.badssl.com/', verify=False) def test_oauth1(): - with pytest.raises(LoginError): - login = wbi_login.OAuth1(consumer_token='wrong', consumer_secret='wrong') - login.generate_edit_credentials() + with patch('wikibaseintegrator.wbi_login.Handshaker.initiate', side_effect=wbi_login.OAuthException('mock oauth1 failure')): + with pytest.raises(LoginError): + login = wbi_login.OAuth1(consumer_token='wrong', consumer_secret='wrong') + login.generate_edit_credentials() if OAUTH1_CONSUMER_TOKEN_NOT_OWNER_ONLY and OAUTH1_CONSUMER_SECRET_NOT_OWNER_ONLY: wbi_login.OAuth1(consumer_token=OAUTH1_CONSUMER_TOKEN_NOT_OWNER_ONLY, consumer_secret=OAUTH1_CONSUMER_SECRET_NOT_OWNER_ONLY) @@ -59,9 +91,10 @@ def test_oauth1(): def test_oauth1_access(): - with pytest.raises(LoginError): - login = wbi_login.OAuth1(consumer_token='wrong', consumer_secret='wrong', access_token='wrong', access_secret='wrong') - login.generate_edit_credentials() + with patch('wikibaseintegrator.wbi_login._Login.generate_edit_credentials', side_effect=LoginError('mock oauth1 access failure')): + with pytest.raises(LoginError): + login = wbi_login.OAuth1(consumer_token='wrong', consumer_secret='wrong', access_token='wrong', access_secret='wrong') + login.generate_edit_credentials() if OAUTH1_CONSUMER_TOKEN and OAUTH1_CONSUMER_SECRET and OAUTH1_ACCESS_TOKEN and OAUTH1_ACCESS_SECRET: login = wbi_login.OAuth1(consumer_token=OAUTH1_CONSUMER_TOKEN, consumer_secret=OAUTH1_CONSUMER_SECRET, access_token=OAUTH1_ACCESS_TOKEN, access_secret=OAUTH1_ACCESS_SECRET) @@ -71,9 +104,10 @@ def test_oauth1_access(): def test_oauth2(): - with pytest.raises((MissingTokenError, LoginError)): - login = wbi_login.OAuth2(consumer_token='wrong', consumer_secret='wrong') - login.generate_edit_credentials() + with patch('wikibaseintegrator.wbi_login.OAuth2Session.fetch_token', side_effect=MissingTokenError('mock oauth2 failure')): + with pytest.raises((MissingTokenError, LoginError)): + login = wbi_login.OAuth2(consumer_token='wrong', consumer_secret='wrong') + login.generate_edit_credentials() if OAUTH2_CONSUMER_TOKEN and OAUTH2_CONSUMER_SECRET: login = wbi_login.OAuth2(consumer_token=OAUTH2_CONSUMER_TOKEN, consumer_secret=OAUTH2_CONSUMER_SECRET) diff --git a/wikibaseintegrator/wbi_helpers.py b/wikibaseintegrator/wbi_helpers.py index 18e86f7e..2b5eac71 100644 --- a/wikibaseintegrator/wbi_helpers.py +++ b/wikibaseintegrator/wbi_helpers.py @@ -48,6 +48,17 @@ class BColors: default_session = requests.Session() +def _is_wikimedia_hostname(hostname: str | None) -> bool: + """Return True only for exact Wikimedia hosts or their dot-subdomains.""" + if not hostname: + return False + + hostname = hostname.lower() + wikimedia_hosts = ('wikidata.org', 'wikipedia.org', 'wikimedia.org') + + return any(hostname == domain or hostname.endswith(f'.{domain}') for domain in wikimedia_hosts) + + def mediawiki_api_call(method: str, mediawiki_api_url: str | None = None, session: Session | None = None, max_retries: int = 100, retry_after: int = 60, **kwargs: Any) -> dict: """ A function to call the MediaWiki API. @@ -167,7 +178,7 @@ def mediawiki_api_call_helper(data: dict[str, Any], login: _Login | None = None, user_agent = user_agent or (str(config['USER_AGENT']) if config['USER_AGENT'] is not None else None) hostname = urlparse(mediawiki_api_url).hostname - if hostname is not None and hostname.endswith(('wikidata.org', 'wikipedia.org', 'wikimedia.org')) and user_agent is None: + if _is_wikimedia_hostname(hostname) and user_agent is None: log.warning('WARNING: Please set an user agent if you interact with a Wikibase instance from the Wikimedia Foundation.') log.warning('More information in the README.md and https://foundation.wikimedia.org/wiki/Policy:User-Agent_policy') @@ -240,7 +251,7 @@ def execute_sparql_query(query: str, prefix: str | None = None, endpoint: str | user_agent = user_agent or (str(config['USER_AGENT']) if config['USER_AGENT'] is not None else None) hostname = urlparse(sparql_endpoint_url).hostname - if hostname is not None and hostname.endswith(('wikidata.org', 'wikipedia.org', 'wikimedia.org')) and user_agent is None: + if _is_wikimedia_hostname(hostname) and user_agent is None: log.warning('WARNING: Please set an user agent if you interact with a Wikibase instance from the Wikimedia Foundation.') log.warning('More information in the README.md and https://foundation.wikimedia.org/wiki/Policy:User-Agent_policy') @@ -1034,4 +1045,3 @@ def download_entity_ttl(entity: str, wikibase_url: str | None = None, user_agent # # It's really the good way to solve this? # from wikibaseintegrator import wikibaseintegrator # return wikibaseintegrator.wbi_helpers - From 051ee3b4afa4d3deefa5ce7823a19e227a3eb05a Mon Sep 17 00:00:00 2001 From: Myst <1592048+LeMyst@users.noreply.github.com> Date: Sat, 9 May 2026 05:00:31 +0200 Subject: [PATCH 2/2] Refactor tests to use instance setup and monkeypatching for API calls --- test/test_wbi_core.py | 5 ++--- test/test_wbi_helpers.py | 10 ++++------ 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/test/test_wbi_core.py b/test/test_wbi_core.py index f657d1b1..60828bb4 100644 --- a/test/test_wbi_core.py +++ b/test/test_wbi_core.py @@ -17,9 +17,8 @@ class TestWbiCore(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.common_item = wbi.item.new().get('Q2') + def setUp(self): + self.common_item = wbi.item.new().get('Q2') def test_item_engine(self): ItemEntity(api=wbi) diff --git a/test/test_wbi_helpers.py b/test/test_wbi_helpers.py index 36406c10..69300c40 100644 --- a/test/test_wbi_helpers.py +++ b/test/test_wbi_helpers.py @@ -61,9 +61,8 @@ def test_connection(monkeypatch): mediawiki_api_call_helper(data=data, mediawiki_api_url=os.getenv("HTTPSTATUS_SERVICE", "https://httpbin.org") + "/status/400", max_retries=2, retry_after=1, allow_anonymous=True) -def test_user_agent(caplog): - from wikibaseintegrator import wbi_helpers - wbi_helpers.mediawiki_api_call = lambda *args, **kwargs: {'success': 1} +def test_user_agent(caplog, monkeypatch): + monkeypatch.setattr('wikibaseintegrator.wbi_helpers.mediawiki_api_call', lambda *args, **kwargs: {'success': 1}) wbi_config['MEDIAWIKI_API_URL'] = 'https://www.wikidata.org/w/api.php' wbi_config['USER_AGENT'] = None # Reset user agent @@ -95,9 +94,8 @@ def test_user_agent(caplog): assert 'WikibaseIntegrator' in new_user_agent -def test_allow_anonymous(): - from wikibaseintegrator import wbi_helpers - wbi_helpers.mediawiki_api_call = lambda *args, **kwargs: {'success': 1} +def test_allow_anonymous(monkeypatch): + monkeypatch.setattr('wikibaseintegrator.wbi_helpers.mediawiki_api_call', lambda *args, **kwargs: {'success': 1}) wbi_config['MEDIAWIKI_API_URL'] = 'https://www.wikidata.org/w/api.php' wbi_config['USER_AGENT'] = 'WikibaseIntegrator-pytest/1.0 (test_wbi_helpers.py)'