diff --git a/.github/workflows/test-pypy3.11.yml b/.github/workflows/test-pypy3.11.yml index 5963819..1719c1c 100644 --- a/.github/workflows/test-pypy3.11.yml +++ b/.github/workflows/test-pypy3.11.yml @@ -1,4 +1,4 @@ -name: Unit Tests (PyPy 3.11) +name: PyPy 3.11 on: push: diff --git a/.github/workflows/test-python3.10.yml b/.github/workflows/test-python3.10.yml index ee865a9..7aedf47 100644 --- a/.github/workflows/test-python3.10.yml +++ b/.github/workflows/test-python3.10.yml @@ -1,4 +1,4 @@ -name: Unit Tests (Python 3.10) +name: 3.10 on: push: diff --git a/.github/workflows/test-python3.11.yml b/.github/workflows/test-python3.11.yml index c75adfc..bdbf9e3 100644 --- a/.github/workflows/test-python3.11.yml +++ b/.github/workflows/test-python3.11.yml @@ -1,4 +1,4 @@ -name: Unit Tests (Python 3.11) +name: 3.11 on: push: diff --git a/.github/workflows/test-python3.12.yml b/.github/workflows/test-python3.12.yml index 833aa48..1a7a6d3 100644 --- a/.github/workflows/test-python3.12.yml +++ b/.github/workflows/test-python3.12.yml @@ -1,4 +1,4 @@ -name: Unit Tests (Python 3.12) +name: 3.12 on: push: diff --git a/.github/workflows/test-python3.13.yml b/.github/workflows/test-python3.13.yml index e7d275d..5a28ee6 100644 --- a/.github/workflows/test-python3.13.yml +++ b/.github/workflows/test-python3.13.yml @@ -1,4 +1,4 @@ -name: Unit Tests (Python 3.13) +name: 3.13 on: push: diff --git a/.github/workflows/test-python3.14.yml b/.github/workflows/test-python3.14.yml index 95fdba6..6c6a8db 100644 --- a/.github/workflows/test-python3.14.yml +++ b/.github/workflows/test-python3.14.yml @@ -1,4 +1,4 @@ -name: Unit Tests (Python 3.14) +name: 3.14 on: push: diff --git a/.github/workflows/test-python3.8.yml b/.github/workflows/test-python3.8.yml index 75145d3..def86ef 100644 --- a/.github/workflows/test-python3.8.yml +++ b/.github/workflows/test-python3.8.yml @@ -1,4 +1,4 @@ -name: Unit Tests (Python 3.8) +name: 3.8 on: push: diff --git a/.github/workflows/test-python3.9.yml b/.github/workflows/test-python3.9.yml index 5ec8314..04cba3d 100644 --- a/.github/workflows/test-python3.9.yml +++ b/.github/workflows/test-python3.9.yml @@ -1,4 +1,4 @@ -name: Unit Tests (Python 3.9) +name: 3.9 on: push: diff --git a/README.md b/README.md index 032505d..6402ef9 100644 --- a/README.md +++ b/README.md @@ -2,15 +2,16 @@ i2CAT Logo -![Python versions](https://img.shields.io/badge/python-3.8%20|%203.9%20|%203.10%20|%203.11%20|%203.12%20|%203.13%20|%203.14-blue) +![Python versions](https://img.shields.io/badge/python-3.8%20%7C%203.9%20%7C%203.10%20%7C%203.11%20%7C%203.12%20%7C%203.13%20%7C%203.14%20%7C%20PyPy3.11-blue) -[![3.8](https://img.shields.io/github/actions/workflow/status/Fundacio-i2CAT/FlexStack/general.yml?label=3.8&style=flat-square&job=Unit%20Tests%20(Python%203.8))](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/general.yml) -[![3.9](https://img.shields.io/github/actions/workflow/status/Fundacio-i2CAT/FlexStack/general.yml?label=3.9&style=flat-square&job=Unit%20Tests%20(Python%203.9))](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/general.yml) -[![3.10](https://img.shields.io/github/actions/workflow/status/Fundacio-i2CAT/FlexStack/general.yml?label=3.10&style=flat-square&job=Unit%20Tests%20(Python%203.10))](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/general.yml) -[![3.11](https://img.shields.io/github/actions/workflow/status/Fundacio-i2CAT/FlexStack/general.yml?label=3.11&style=flat-square&job=Unit%20Tests%20(Python%203.11))](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/general.yml) -[![3.12](https://img.shields.io/github/actions/workflow/status/Fundacio-i2CAT/FlexStack/general.yml?label=3.12&style=flat-square&job=Unit%20Tests%20(Python%203.12))](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/general.yml) -[![3.13](https://img.shields.io/github/actions/workflow/status/Fundacio-i2CAT/FlexStack/general.yml?label=3.13&style=flat-square&job=Unit%20Tests%20(Python%203.13))](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/general.yml) -[![3.14](https://img.shields.io/github/actions/workflow/status/Fundacio-i2CAT/FlexStack/general.yml?label=3.14&style=flat-square&job=Unit%20Tests%20(Python%203.14))](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/general.yml) +[![3.8](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.8.yml/badge.svg)](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.8.yml) +[![3.9](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.9.yml/badge.svg)](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.9.yml) +[![3.10](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.10.yml/badge.svg)](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.10.yml) +[![3.11](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.11.yml/badge.svg)](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.11.yml) +[![3.12](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.12.yml/badge.svg)](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.12.yml) +[![3.13](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.13.yml/badge.svg)](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.13.yml) +[![3.14](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.14.yml/badge.svg)](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-python3.14.yml) +[![PyPy 3.11](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-pypy3.11.yml/badge.svg)](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/test-pypy3.11.yml) ![Coverage](https://img.shields.io/codecov/c/github/YOUR_USER/YOUR_REPO) ![Flake8](https://img.shields.io/badge/code%20style-flake8-blue) @@ -18,7 +19,7 @@ ![Pyright](https://img.shields.io/badge/type--checker-pyright-blue) -![Test Coverage](https://img.shields.io/github/actions/workflow/status/Fundacio-i2CAT/FlexStack/general.yml?job=Test%20Coverage&label=Coverage) ![Run Flake8](https://img.shields.io/github/actions/workflow/status/Fundacio-i2CAT/FlexStack/general.yml?job=Run%20Flake8&label=Flake8) ![Run Pylint](https://img.shields.io/github/actions/workflow/status/Fundacio-i2CAT/FlexStack/general.yml?job=Run%20Pylint&label=Pylint) ![Run Pyright](https://img.shields.io/github/actions/workflow/status/Fundacio-i2CAT/FlexStack/general.yml?job=Run%20Pyright&label=Pyright) +[![Test Coverage](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/coverage.yml/badge.svg)](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/coverage.yml) [![Flake8](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/flake8.yml/badge.svg)](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/flake8.yml) [![Flake8](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/flake8.yml/badge.svg)](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/flake8.yml) [![Pyright](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/pyright.yml/badge.svg)](https://github.com/Fundacio-i2CAT/FlexStack/actions/workflows/pyright.yml) # Short description diff --git a/tests/flexstack/facilities/local_dynamic_map/test_ldm_maintenance_reactive.py b/tests/flexstack/facilities/local_dynamic_map/test_ldm_maintenance_reactive.py index 39c2fe2..ecc553a 100644 --- a/tests/flexstack/facilities/local_dynamic_map/test_ldm_maintenance_reactive.py +++ b/tests/flexstack/facilities/local_dynamic_map/test_ldm_maintenance_reactive.py @@ -17,13 +17,13 @@ def setUp(self, mock_monotonic): ) @patch("time.monotonic") - @patch("builtins.super") - def test_add_provider_data(self, mock_ldm_maintenace, mock_monotonic): + @patch("flexstack.facilities.local_dynamic_map.ldm_maintenance.LDMMaintenance.add_provider_data") + def test_add_provider_data(self, mock_parent_add_provider_data, mock_monotonic): mock_monotonic.return_value = 1000.0 - mock_ldm_maintenace().add_provider_data = MagicMock(return_value=1) + mock_parent_add_provider_data.return_value = 1 self.ldm_maintenance_reactive.collect_trash = MagicMock() test_add_data_provider_req = MagicMock() self.ldm_maintenance_reactive.add_provider_data(test_add_data_provider_req) - mock_ldm_maintenace().add_provider_data.assert_called_once_with(test_add_data_provider_req) + mock_parent_add_provider_data.assert_called_once_with(test_add_data_provider_req) self.ldm_maintenance_reactive.collect_trash.assert_called_once() diff --git a/tests/flexstack/facilities/local_dynamic_map/test_ldm_maintenance_thread.py b/tests/flexstack/facilities/local_dynamic_map/test_ldm_maintenance_thread.py index 1ae8546..d49ef8e 100644 --- a/tests/flexstack/facilities/local_dynamic_map/test_ldm_maintenance_thread.py +++ b/tests/flexstack/facilities/local_dynamic_map/test_ldm_maintenance_thread.py @@ -30,9 +30,9 @@ def test__init__(self): self.threading_thread.start.assert_called_once() self.mock_lock.assert_called() - @patch("builtins.super") - def test_add_data_provider(self, mock_super): - mock_super().add_provider_data = MagicMock(return_value=1) + @patch("flexstack.facilities.local_dynamic_map.ldm_maintenance.LDMMaintenance.add_provider_data") + def test_add_data_provider(self, mock_parent_add_provider_data): + mock_parent_add_provider_data.return_value = 1 add_data_provider_request = AddDataProviderReq( application_id=1, timestamp=TimestampIts(1000), @@ -43,32 +43,32 @@ def test_add_data_provider(self, mock_super): self.assertEqual(self.ldm_maintenance_thread.add_provider_data( add_data_provider_request), 1) - @patch("builtins.super") - def test_get_provider_data(self, mock_super): - mock_super().get_provider_data = MagicMock(return_value=1) + @patch("flexstack.facilities.local_dynamic_map.ldm_maintenance.LDMMaintenance.get_provider_data") + def test_get_provider_data(self, mock_parent_get_provider_data): + mock_parent_get_provider_data.return_value = 1 self.assertEqual(self.ldm_maintenance_thread.get_provider_data(1), 1) - @patch("builtins.super") - def test_update_provider_data(self, mock_super): - mock_super().update_provider_data = MagicMock(return_value=1) + @patch("flexstack.facilities.local_dynamic_map.ldm_maintenance.LDMMaintenance.update_provider_data") + def test_update_provider_data(self, mock_parent_update_provider_data): + mock_parent_update_provider_data.return_value = 1 self.ldm_maintenance_thread.update_provider_data(1, TEST_DATA) - mock_super().update_provider_data.assert_called_once_with(1, TEST_DATA) + mock_parent_update_provider_data.assert_called_once_with(1, TEST_DATA) - @patch("builtins.super") - def test_del_provider_data(self, mock_super): - mock_super().del_provider_data = MagicMock(return_value=1) + @patch("flexstack.facilities.local_dynamic_map.ldm_maintenance.LDMMaintenance.del_provider_data") + def test_del_provider_data(self, mock_parent_del_provider_data): + mock_parent_del_provider_data.return_value = 1 self.ldm_maintenance_thread.del_provider_data(TEST_DATA) - mock_super().del_provider_data.assert_called_once_with(TEST_DATA) + mock_parent_del_provider_data.assert_called_once_with(TEST_DATA) - @patch("builtins.super") - def test_get_all_data_containers(self, mock_super): - mock_super().get_all_data_containers = MagicMock(return_value=1) + @patch("flexstack.facilities.local_dynamic_map.ldm_maintenance.LDMMaintenance.get_all_data_containers") + def test_get_all_data_containers(self, mock_parent_get_all_data_containers): + mock_parent_get_all_data_containers.return_value = 1 self.assertEqual( self.ldm_maintenance_thread.get_all_data_containers(), 1) - @patch("builtins.super") - def test_search_data_contaier(self, mock_super): - mock_super().search_data_containers = MagicMock(return_value=1) + @patch("flexstack.facilities.local_dynamic_map.ldm_maintenance.LDMMaintenance.search_data_containers") + def test_search_data_contaier(self, mock_parent_search_data_containers): + mock_parent_search_data_containers.return_value = 1 request_data_objects_req = RequestDataObjectsReq( application_id=CAM, data_object_type=(CAM,), @@ -80,8 +80,8 @@ def test_search_data_contaier(self, mock_super): self.assertEqual(self.ldm_maintenance_thread.search_data_containers( request_data_objects_req), 1) - @patch("builtins.super") - def test_check_data_container(self, mock_super): - mock_super().check_new_data_recieved = MagicMock(return_value=1) + @patch("flexstack.facilities.local_dynamic_map.ldm_maintenance.LDMMaintenance.check_new_data_recieved") + def test_check_data_container(self, mock_parent_check_new_data_recieved): + mock_parent_check_new_data_recieved.return_value = 1 self.assertEqual( self.ldm_maintenance_thread.check_new_data_recieved(), 1) diff --git a/tests/flexstack/facilities/local_dynamic_map/test_ldm_service_reactive.py b/tests/flexstack/facilities/local_dynamic_map/test_ldm_service_reactive.py index 269434a..70528f5 100644 --- a/tests/flexstack/facilities/local_dynamic_map/test_ldm_service_reactive.py +++ b/tests/flexstack/facilities/local_dynamic_map/test_ldm_service_reactive.py @@ -14,11 +14,11 @@ def setUp(self, mock_monotonic): self.ldm_maintenance = MagicMock() self.ldm_service = LDMServiceReactive(self.ldm_maintenance) - @patch("builtins.super") + @patch("flexstack.facilities.local_dynamic_map.ldm_service.LDMService.add_provider_data") @patch("time.monotonic") - def test_add_provider_data(self, mock_monotonic, mock_ldm_service): + def test_add_provider_data(self, mock_monotonic, mock_parent_add_provider_data): mock_monotonic.return_value = 1000.0 - mock_ldm_service().add_provider_data = MagicMock(return_value=1) + mock_parent_add_provider_data.return_value = 1 self.ldm_service.attend_subscriptions = MagicMock() test_dict = {"a": 1, "b": 2} @@ -29,5 +29,5 @@ def test_add_provider_data(self, mock_monotonic, mock_ldm_service): data_object=test_dict, time_validity=MagicMock(),) self.ldm_service.add_provider_data(provider_data_request) - mock_ldm_service().add_provider_data.assert_called_once_with(provider_data_request) + mock_parent_add_provider_data.assert_called_once_with(provider_data_request) self.ldm_service.attend_subscriptions.assert_called_once() diff --git a/tests/flexstack/security/test_certificate_library.py b/tests/flexstack/security/test_certificate_library.py new file mode 100644 index 0000000..9542f37 --- /dev/null +++ b/tests/flexstack/security/test_certificate_library.py @@ -0,0 +1,319 @@ +import unittest +from unittest.mock import MagicMock, patch + +from flexstack.security.certificate_library import CertificateLibrary +from flexstack.security.certificate import Certificate, OwnCertificate + + +class TestCertificateLibrary(unittest.TestCase): + def setUp(self): + self.backend = MagicMock() + self.root_cert = MagicMock(spec=Certificate) + self.root_cert.verify.return_value = True + self.root_cert.as_hashedid8.return_value = b'\x01\x02\x03\x04\x05\x06\x07\x08' + self.root_cert.certificate = { + "issuer": ("self", "sha256") + } + + self.aa_cert = MagicMock(spec=Certificate) + self.aa_cert.verify.return_value = True + self.aa_cert.as_hashedid8.return_value = b'\x11\x12\x13\x14\x15\x16\x17\x18' + self.aa_cert.certificate = { + "issuer": ("sha256AndDigest", b'\x01\x02\x03\x04\x05\x06\x07\x08') + } + + self.at_cert = MagicMock(spec=Certificate) + self.at_cert.verify.return_value = True + self.at_cert.as_hashedid8.return_value = b'\x21\x22\x23\x24\x25\x26\x27\x28' + self.at_cert.certificate = { + "issuer": ("sha256AndDigest", b'\x11\x12\x13\x14\x15\x16\x17\x18') + } + + def test_init_empty(self): + """Test initialization with empty certificate lists""" + library = CertificateLibrary(self.backend, [], [], []) + self.assertEqual(library.own_certificates, {}) + self.assertEqual(library.known_authorization_tickets, {}) + self.assertEqual(library.known_authorization_authorities, {}) + self.assertEqual(library.known_root_certificates, {}) + + def test_init_with_root_certificate(self): + """Test initialization with root certificates""" + library = CertificateLibrary(self.backend, [self.root_cert], [], []) + self.assertIn(b'\x01\x02\x03\x04\x05\x06\x07\x08', + library.known_root_certificates) + + def test_init_with_aa_certificate(self): + """Test initialization with AA certificates""" + library = CertificateLibrary( + self.backend, [self.root_cert], [self.aa_cert], []) + self.assertIn(b'\x01\x02\x03\x04\x05\x06\x07\x08', + library.known_root_certificates) + self.assertIn(b'\x11\x12\x13\x14\x15\x16\x17\x18', + library.known_authorization_authorities) + + def test_init_with_at_certificate(self): + """Test initialization with AT certificates""" + library = CertificateLibrary( + self.backend, [self.root_cert], [self.aa_cert], [self.at_cert] + ) + self.assertIn(b'\x01\x02\x03\x04\x05\x06\x07\x08', + library.known_root_certificates) + self.assertIn(b'\x11\x12\x13\x14\x15\x16\x17\x18', + library.known_authorization_authorities) + self.assertIn(b'\x21\x22\x23\x24\x25\x26\x27\x28', + library.known_authorization_tickets) + + def test_add_root_certificate_valid(self): + """Test adding a valid root certificate""" + library = CertificateLibrary(self.backend, [], [], []) + library.add_root_certificate(self.root_cert) + self.assertIn(b'\x01\x02\x03\x04\x05\x06\x07\x08', + library.known_root_certificates) + + def test_add_root_certificate_invalid(self): + """Test adding an invalid root certificate""" + library = CertificateLibrary(self.backend, [], [], []) + self.root_cert.verify.return_value = False + library.add_root_certificate(self.root_cert) + self.assertNotIn(b'\x01\x02\x03\x04\x05\x06\x07\x08', + library.known_root_certificates) + + def test_add_authorization_authority_valid(self): + """Test adding a valid AA certificate""" + library = CertificateLibrary(self.backend, [self.root_cert], [], []) + library.add_authorization_authority(self.aa_cert) + self.assertIn(b'\x11\x12\x13\x14\x15\x16\x17\x18', + library.known_authorization_authorities) + + def test_add_authorization_authority_no_issuer(self): + """Test adding an AA certificate without a known issuer""" + library = CertificateLibrary(self.backend, [], [], []) + library.add_authorization_authority(self.aa_cert) + self.assertNotIn(b'\x11\x12\x13\x14\x15\x16\x17\x18', + library.known_authorization_authorities) + + def test_add_authorization_authority_invalid(self): + """Test adding an invalid AA certificate""" + library = CertificateLibrary(self.backend, [self.root_cert], [], []) + self.aa_cert.verify.return_value = False + library.add_authorization_authority(self.aa_cert) + self.assertNotIn(b'\x11\x12\x13\x14\x15\x16\x17\x18', + library.known_authorization_authorities) + + def test_add_authorization_ticket_valid(self): + """Test adding a valid AT certificate""" + library = CertificateLibrary( + self.backend, [self.root_cert], [self.aa_cert], []) + library.add_authorization_ticket(self.at_cert) + self.assertIn(b'\x21\x22\x23\x24\x25\x26\x27\x28', + library.known_authorization_tickets) + + def test_add_authorization_ticket_no_issuer(self): + """Test adding an AT certificate without a known issuer""" + library = CertificateLibrary(self.backend, [], [], []) + library.add_authorization_ticket(self.at_cert) + self.assertNotIn(b'\x21\x22\x23\x24\x25\x26\x27\x28', + library.known_authorization_tickets) + + def test_add_authorization_ticket_invalid(self): + """Test adding an invalid AT certificate""" + library = CertificateLibrary( + self.backend, [self.root_cert], [self.aa_cert], []) + self.at_cert.verify.return_value = False + library.add_authorization_ticket(self.at_cert) + self.assertNotIn(b'\x21\x22\x23\x24\x25\x26\x27\x28', + library.known_authorization_tickets) + + def test_add_authorization_ticket_duplicate(self): + """Test adding a duplicate AT certificate""" + library = CertificateLibrary( + self.backend, [self.root_cert], [self.aa_cert], [self.at_cert] + ) + # Reset mock to check if verify is called again + self.at_cert.verify.reset_mock() + library.add_authorization_ticket(self.at_cert) + # Should not call verify again since it's already in the library + self.at_cert.verify.assert_not_called() + + def test_add_own_certificate_valid(self): + """Test adding a valid own certificate""" + library = CertificateLibrary( + self.backend, [self.root_cert], [self.aa_cert], []) + own_cert = MagicMock(spec=OwnCertificate) + own_cert.verify.return_value = True + own_cert.as_hashedid8.return_value = b'\x31\x32\x33\x34\x35\x36\x37\x38' + own_cert.certificate = { + "issuer": ("sha256AndDigest", b'\x11\x12\x13\x14\x15\x16\x17\x18') + } + library.add_own_certificate(own_cert) + self.assertIn(b'\x31\x32\x33\x34\x35\x36\x37\x38', + library.own_certificates) + + def test_add_own_certificate_no_issuer(self): + """Test adding an own certificate without a known issuer""" + library = CertificateLibrary(self.backend, [], [], []) + own_cert = MagicMock(spec=OwnCertificate) + own_cert.verify.return_value = True + own_cert.as_hashedid8.return_value = b'\x31\x32\x33\x34\x35\x36\x37\x38' + own_cert.certificate = { + "issuer": ("sha256AndDigest", b'\x11\x12\x13\x14\x15\x16\x17\x18') + } + library.add_own_certificate(own_cert) + self.assertNotIn(b'\x31\x32\x33\x34\x35\x36\x37\x38', + library.own_certificates) + + def test_get_issuer_certificate_self_signed(self): + """Test getting issuer for a self-signed certificate""" + library = CertificateLibrary(self.backend, [self.root_cert], [], []) + result = library.get_issuer_certificate(self.root_cert) + self.assertIsNone(result) + + def test_get_issuer_certificate_from_root(self): + """Test getting issuer from root certificates""" + library = CertificateLibrary(self.backend, [self.root_cert], [], []) + result = library.get_issuer_certificate(self.aa_cert) + self.assertEqual(result, self.root_cert) + + def test_get_issuer_certificate_from_aa(self): + """Test getting issuer from AA certificates""" + library = CertificateLibrary( + self.backend, [self.root_cert], [self.aa_cert], []) + result = library.get_issuer_certificate(self.at_cert) + self.assertEqual(result, self.aa_cert) + + def test_get_issuer_certificate_not_found(self): + """Test getting issuer when not found""" + library = CertificateLibrary(self.backend, [], [], []) + result = library.get_issuer_certificate(self.aa_cert) + self.assertIsNone(result) + + def test_get_issuer_certificate_unknown_type(self): + """Test getting issuer with unknown issuer type""" + library = CertificateLibrary(self.backend, [], [], []) + unknown_cert = MagicMock(spec=Certificate) + unknown_cert.certificate = { + "issuer": ("unknown", b'\x00\x00\x00\x00\x00\x00\x00\x00') + } + with self.assertRaises(ValueError): + library.get_issuer_certificate(unknown_cert) + + def test_get_authorization_ticket_by_hashedid8_found(self): + """Test getting AT by hashedid8 when found""" + library = CertificateLibrary( + self.backend, [self.root_cert], [self.aa_cert], [self.at_cert] + ) + result = library.get_authorization_ticket_by_hashedid8( + b'\x21\x22\x23\x24\x25\x26\x27\x28') + self.assertEqual(result, self.at_cert) + + def test_get_authorization_ticket_by_hashedid8_not_found(self): + """Test getting AT by hashedid8 when not found""" + library = CertificateLibrary(self.backend, [], [], []) + result = library.get_authorization_ticket_by_hashedid8( + b'\x21\x22\x23\x24\x25\x26\x27\x28') + self.assertIsNone(result) + + @patch.object(Certificate, 'from_dict') + def test_verify_sequence_of_certificates_empty(self, mock_from_dict): + """Test verifying an empty sequence of certificates""" + library = CertificateLibrary(self.backend, [], [], []) + result = library.verify_sequence_of_certificates([], self.backend) + self.assertIsNone(result) + + @patch.object(Certificate, 'from_dict') + def test_verify_sequence_of_certificates_single_known(self, mock_from_dict): + """Test verifying a single certificate that is already known""" + library = CertificateLibrary( + self.backend, [self.root_cert], [self.aa_cert], [self.at_cert] + ) + mock_cert = MagicMock(spec=Certificate) + mock_cert.as_hashedid8.return_value = b'\x21\x22\x23\x24\x25\x26\x27\x28' + mock_from_dict.return_value = mock_cert + + result = library.verify_sequence_of_certificates( + [{"cert": "data"}], self.backend) + self.assertEqual(result, self.at_cert) + + @patch.object(Certificate, 'from_dict') + def test_verify_sequence_of_certificates_single_unknown_valid(self, mock_from_dict): + """Test verifying a single unknown but valid certificate""" + library = CertificateLibrary( + self.backend, [self.root_cert], [self.aa_cert], []) + + mock_cert = MagicMock(spec=Certificate) + mock_cert.as_hashedid8.return_value = b'\x41\x42\x43\x44\x45\x46\x47\x48' + mock_cert.verify.return_value = True + mock_cert.certificate = { + "issuer": ("sha256AndDigest", b'\x11\x12\x13\x14\x15\x16\x17\x18') + } + mock_from_dict.return_value = mock_cert + + result = library.verify_sequence_of_certificates( + [{"cert": "data"}], self.backend) + self.assertEqual(result, mock_cert) + + @patch.object(Certificate, 'from_dict') + def test_verify_sequence_of_certificates_single_unknown_invalid(self, mock_from_dict): + """Test verifying a single unknown and invalid certificate""" + library = CertificateLibrary( + self.backend, [self.root_cert], [self.aa_cert], []) + + mock_cert = MagicMock(spec=Certificate) + mock_cert.as_hashedid8.return_value = b'\x41\x42\x43\x44\x45\x46\x47\x48' + mock_cert.verify.return_value = False + mock_cert.certificate = { + "issuer": ("sha256AndDigest", b'\x11\x12\x13\x14\x15\x16\x17\x18') + } + mock_from_dict.return_value = mock_cert + + result = library.verify_sequence_of_certificates( + [{"cert": "data"}], self.backend) + self.assertIsNone(result) + + @patch.object(Certificate, 'from_dict') + def test_verify_sequence_of_certificates_two_valid(self, mock_from_dict): + """Test verifying a sequence of two valid certificates""" + library = CertificateLibrary(self.backend, [self.root_cert], [], []) + + mock_aa = MagicMock(spec=Certificate) + mock_aa.as_hashedid8.return_value = b'\x51\x52\x53\x54\x55\x56\x57\x58' + mock_aa.get_issuer_hashedid8.return_value = b'\x01\x02\x03\x04\x05\x06\x07\x08' + mock_aa.verify.return_value = True + mock_aa.certificate = { + "issuer": ("sha256AndDigest", b'\x01\x02\x03\x04\x05\x06\x07\x08') + } + + mock_at = MagicMock(spec=Certificate) + mock_at.as_hashedid8.return_value = b'\x61\x62\x63\x64\x65\x66\x67\x68' + mock_at.verify.return_value = True + mock_at.certificate = { + "issuer": ("sha256AndDigest", b'\x51\x52\x53\x54\x55\x56\x57\x58') + } + + mock_from_dict.side_effect = [mock_aa, mock_aa, mock_at] + + result = library.verify_sequence_of_certificates( + [{"at": "data"}, {"aa": "data"}], self.backend + ) + self.assertEqual(result, mock_at) + + @patch.object(Certificate, 'from_dict') + def test_verify_sequence_of_certificates_three_with_known_root(self, mock_from_dict): + """Test verifying a sequence of three certificates with known root""" + library = CertificateLibrary(self.backend, [self.root_cert], [], []) + + mock_root = MagicMock(spec=Certificate) + mock_root.as_hashedid8.return_value = b'\x01\x02\x03\x04\x05\x06\x07\x08' + + mock_from_dict.return_value = mock_root + + with patch.object(library, 'verify_sequence_of_certificates', wraps=library.verify_sequence_of_certificates): + # This will call recursively with certificates[:-1] + _ = library.verify_sequence_of_certificates( + [{"at": "data"}, {"aa": "data"}, {"root": "data"}], self.backend + ) + + +if __name__ == '__main__': + unittest.main()