diff --git a/armis_sdk/entities/data_export/application.py b/armis_sdk/entities/data_export/application.py index a014e9e..e03b00a 100644 --- a/armis_sdk/entities/data_export/application.py +++ b/armis_sdk/entities/data_export/application.py @@ -19,51 +19,51 @@ class Application(BaseExportedEntity): entity_name: ClassVar[str] = "applications" - device_id: int + device_id: int | None = None """The id of the device with the application""" - vendor: str + vendor: str | None = None """ The vendor of the application **Example**: `Google` """ - name: str + name: str | None = None """ The name of the application **Example**: `Chrome` """ - version: str + version: str | None = None """ The version of the application **Example**: `30.0.1599.40` """ - cpe: str | None + cpe: str | None = None """ The CPE (Common Platform Enumeration) of the application **Example**: `cpe:2.3:a:google:chrome:30.0.1599.40:*:*:*:*:*:*:*` """ - first_seen: datetime.datetime + first_seen: datetime.datetime | None = None """When the application was first seen on the device""" - last_seen: datetime.datetime + last_seen: datetime.datetime | None = None """When the application was last seen on the device""" @classmethod def series_to_model(cls, series: pandas.Series) -> Application: return Application( - device_id=series.loc["device_id"], - vendor=series.loc["vendor"], - name=series.loc["name"], - version=series.loc["version"], - cpe=cls._value_or_none(series.loc["cpe"] if "cpe" in series.index else None), - first_seen=series.loc["first_seen"].to_pydatetime(), - last_seen=series["last_seen"].to_pydatetime(), + device_id=cls._value_or_none(series.get("device_id")), + vendor=cls._value_or_none(series.get("vendor")), + name=cls._value_or_none(series.get("name")), + version=cls._value_or_none(series.get("version")), + cpe=cls._value_or_none(series.get("cpe")), + first_seen=cls._value_or_none(series.get("first_seen")), + last_seen=cls._value_or_none(series.get("last_seen")), ) diff --git a/armis_sdk/entities/data_export/base_exported_entity.py b/armis_sdk/entities/data_export/base_exported_entity.py index c9ab2ff..f1d6257 100644 --- a/armis_sdk/entities/data_export/base_exported_entity.py +++ b/armis_sdk/entities/data_export/base_exported_entity.py @@ -1,4 +1,7 @@ +from __future__ import annotations + import abc +from collections.abc import Iterable from typing import Type # noqa: UP035 # TODO: fix UP035 (deprecated import, use updated module) from typing import TypeVar @@ -19,12 +22,12 @@ def series_to_model(cls: type[T], series: pandas.Series) -> T: ... def entity_name(self): ... @classmethod - def _to_list(cls, value) -> list: - return [item for item in value if cls._value_or_none(item)] + def _to_list(cls, value) -> list | None: + return [item for item in value if cls._value_or_none(item)] if isinstance(value, Iterable) else None @classmethod def _value_or_none(cls, value): - if not value or pandas.isnull(value) or value == "N/A": # noqa: PD003 # TODO: fix PD003 (use .isna() instead of .isnull()) + if not value or pandas.isna(value) or value == "N/A": return None if isinstance(value, pandas.Timestamp): diff --git a/armis_sdk/entities/data_export/risk_factor.py b/armis_sdk/entities/data_export/risk_factor.py index eb1735c..0be0dd5 100644 --- a/armis_sdk/entities/data_export/risk_factor.py +++ b/armis_sdk/entities/data_export/risk_factor.py @@ -30,8 +30,8 @@ class RiskFactorRecommendedAction(BaseModel): """ The description of the recommended action - **Example**: `Regularly update all operating systems and firmware on network devices - to the latest versions to reduce the potential for exploitation of vulnerabilities + **Example**: `Regularly update all operating systems and firmware on network devices + to the latest versions to reduce the potential for exploitation of vulnerabilities via obsolete protocols.` """ @@ -50,48 +50,48 @@ class RiskFactor(BaseExportedEntity): entity_name: ClassVar[str] = "risk-factors" - device_id: int + device_id: int | None = None """The id of the device with the risk factor""" - category: str + category: str | None = None """ The category of the risk factor **Example**: `BEHAVIOURAL` """ - type: str + type: str | None = None """ The type of the risk factor **Example**: `SMBV1_SUPPORT` """ - description: str + description: str | None = None """ The description of the risk factor **Example**: `Device Supports SMBv1` """ - score: int | None + score: int | None = None """The score of the risk factor""" - group: str + group: str | None = None """ The group of the risk factor **Example**: `INSECURE_TRAFFIC_AND_BEHAVIOR` """ - remediation_type: str | None + remediation_type: str | None = None """ The type of the remediation **Example**: `Disable SMBv1 Protocol` """ - remediation_description: str | None + remediation_description: str | None = None """ The description of the remediation @@ -100,29 +100,29 @@ class RiskFactor(BaseExportedEntity): such as SMBv3 are implemented to maintain secure network communications.` """ - remediation_recommended_actions: list[RiskFactorRecommendedAction] + remediation_recommended_actions: list[RiskFactorRecommendedAction] | None = None """The remediation recommended actions""" - first_seen: datetime.datetime + first_seen: datetime.datetime | None = None """When the risk factor was first seen on the device""" - last_seen: datetime.datetime + last_seen: datetime.datetime | None = None """When the risk factor was last seen on the device""" - status: str + status: str | None = None """ The status of the risk factor in relation to the device **Example**: `OPEN` """ - status_update_time: datetime.datetime | None + status_update_time: datetime.datetime | None = None """When was the status last changed""" - status_updated_by_user_id: int | None + status_updated_by_user_id: int | None = None """Which used id last changed the status""" - status_update_reason: str | None + status_update_reason: str | None = None """ The reason for the status change @@ -131,27 +131,25 @@ class RiskFactor(BaseExportedEntity): @classmethod def series_to_model(cls, series: pandas.Series) -> RiskFactor: + remediation_recommended_actions = series.get("remediation_recommended_actions") return RiskFactor( - device_id=series.loc["device_id"], - category=series.loc["category"], - type=series.loc["type"], - description=series.loc["description"], - score=(int(score) if (score := cls._value_or_none(series.loc["score"])) else None), - status=series.loc["status"], - group=series.loc["group"], - remediation_type=cls._value_or_none(series.loc["remediation"]), - remediation_description=cls._value_or_none(series.loc["remediation_description"]), + device_id=cls._value_or_none(series.get("device_id")), + category=cls._value_or_none(series.get("category")), + type=cls._value_or_none(series.get("type")), + description=cls._value_or_none(series.get("description")), + score=(int(score) if (score := cls._value_or_none(series.get("score"))) else None), + status=cls._value_or_none(series.get("status")), + group=cls._value_or_none(series.get("group")), + remediation_type=cls._value_or_none(series.get("remediation")), + remediation_description=cls._value_or_none(series.get("remediation_description")), remediation_recommended_actions=( - [ - RiskFactorRecommendedAction(**item) - for item in json.loads(series.loc["remediation_recommended_actions"]) - ] - if series.loc["remediation_recommended_actions"] - else [] + [RiskFactorRecommendedAction(**item) for item in json.loads(remediation_recommended_actions)] + if isinstance(remediation_recommended_actions, str) + else None ), - first_seen=series.loc["first_seen"].to_pydatetime(), - last_seen=series.loc["last_seen"].to_pydatetime(), - status_update_time=cls._value_or_none(series.loc["status_update_time"]), - status_updated_by_user_id=cls._value_or_none(series.loc["status_updated_by_user_id"]), - status_update_reason=cls._value_or_none(series.loc["status_update_reason"]), + first_seen=cls._value_or_none(series.get("first_seen")), + last_seen=cls._value_or_none(series.get("last_seen")), + status_update_time=cls._value_or_none(series.get("status_update_time")), + status_updated_by_user_id=cls._value_or_none(series.get("status_updated_by_user_id")), + status_update_reason=cls._value_or_none(series.get("status_update_reason")), ) diff --git a/armis_sdk/entities/data_export/vulnerability.py b/armis_sdk/entities/data_export/vulnerability.py index f7b6631..e04e366 100644 --- a/armis_sdk/entities/data_export/vulnerability.py +++ b/armis_sdk/entities/data_export/vulnerability.py @@ -2,7 +2,6 @@ import datetime from typing import ClassVar -from typing import Optional from typing import TYPE_CHECKING from armis_sdk.entities.data_export.base_exported_entity import BaseExportedEntity @@ -19,63 +18,63 @@ class Vulnerability(BaseExportedEntity): entity_name: ClassVar[str] = "vulnerabilities" - device_id: int + device_id: int | None = None """The id of the device with the vulnerability""" - cve_uid: str + cve_uid: str | None = None """ The unique CVE identifier **Example**: `CVE-2025-53799` """ - advisory_id: str | None + advisory_id: str | None = None """ The id of the advisory **Example**: `KB5065429` """ - remediation_types: list[str] + remediation_types: list[str] | None = None """ The list of remediation types **Example**: `["VERSION_UPDATE"]` """ - avm_rating: str | None + avm_rating: str | None = None """The Armis AVM (Asset Vulnerability Management) rating of the vulnerability""" - match_source: list[str] + match_source: list[str] | None = None """ The list of sources for the match **Example**: `["Profile Matching"]` """ - status: str + status: str | None = None """ The status of the vulnerability in relation to the device **Example**: `Open` """ - status_change_time: datetime.datetime | None + status_change_time: datetime.datetime | None = None """When was the status last changed""" - status_change_reason: str | None + status_change_reason: str | None = None """The reason for the status change""" @classmethod def series_to_model(cls, series: pandas.Series) -> Vulnerability: return Vulnerability( - device_id=series.loc["device_id"], - cve_uid=series.loc["vulnerability_cve_uid"], - advisory_id=cls._value_or_none(series.loc["vulnerability_advisory_id"]), - remediation_types=cls._to_list(series.loc["vulnerability_remediation_types"]), - avm_rating=cls._value_or_none(series.loc["avm_rating"]), - match_source=cls._to_list(series.loc["match_source"]), - status=series.loc["status"], - status_change_time=cls._value_or_none(series.loc["status_change_time"]), - status_change_reason=cls._value_or_none(series.loc["status_change_reason"]), + device_id=cls._value_or_none(series.get("device_id")), + cve_uid=cls._value_or_none(series.get("vulnerability_cve_uid")), + advisory_id=cls._value_or_none(series.get("vulnerability_advisory_id")), + remediation_types=cls._to_list(series.get("vulnerability_remediation_types")), + avm_rating=cls._value_or_none(series.get("avm_rating")), + match_source=cls._to_list(series.get("match_source")), + status=cls._value_or_none(series.get("status")), + status_change_time=cls._value_or_none(series.get("status_change_time")), + status_change_reason=cls._value_or_none(series.get("status_change_reason")), ) diff --git a/pyproject.toml b/pyproject.toml index 1050640..a004f7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "armis_sdk" -version = "1.1.2" +version = "1.1.3" description = "The Armis SDK is a package that encapsulates common use-cases for interacting with the Armis platform." authors = [ { name = "Shai Lachmanovich", email = "shai@armis.com" }, diff --git a/tests/armis_sdk/clients/data_export_client_test.py b/tests/armis_sdk/clients/data_export_client_test.py index 78e1bf3..8fc4927 100644 --- a/tests/armis_sdk/clients/data_export_client_test.py +++ b/tests/armis_sdk/clients/data_export_client_test.py @@ -82,7 +82,7 @@ async def test_get(httpx_mock: pytest_httpx.HTTPXMock): @mock.patch.object(pandas, "read_parquet") -async def test_export(mock_read_parquet: mock.MagicMock, httpx_mock: pytest_httpx.HTTPXMock): +async def test_iterate(mock_read_parquet: mock.MagicMock, httpx_mock: pytest_httpx.HTTPXMock): httpx_mock.add_response( url="https://api.armis.com/v3/data-export/mock-entity", json={ diff --git a/tests/armis_sdk/entities/data_export/application_test.py b/tests/armis_sdk/entities/data_export/application_test.py index 0994d9a..c8d4529 100644 --- a/tests/armis_sdk/entities/data_export/application_test.py +++ b/tests/armis_sdk/entities/data_export/application_test.py @@ -1,4 +1,3 @@ -import collections import datetime import pandas @@ -6,12 +5,6 @@ from armis_sdk.entities.data_export.application import Application -ApplicationNT = collections.namedtuple( - "ApplicationNT", - ["device_id", "name", "vendor", "version", "cpe", "first_seen", "last_seen"], -) - - def test_series_to_model(): series = pandas.Series( { @@ -34,3 +27,17 @@ def test_series_to_model(): first_seen=datetime.datetime(2025, 11, 1), last_seen=datetime.datetime(2025, 11, 4), ) + + +def test_series_to_model_empty(): + series = pandas.Series() + + assert Application.series_to_model(series) == Application( + device_id=None, + vendor=None, + name=None, + version=None, + cpe=None, + first_seen=None, + last_seen=None, + ) diff --git a/tests/armis_sdk/entities/data_export/risk_factor_test.py b/tests/armis_sdk/entities/data_export/risk_factor_test.py index 446c243..b221a7c 100644 --- a/tests/armis_sdk/entities/data_export/risk_factor_test.py +++ b/tests/armis_sdk/entities/data_export/risk_factor_test.py @@ -1,4 +1,3 @@ -import collections import datetime import json @@ -8,28 +7,6 @@ from armis_sdk.entities.data_export.risk_factor import RiskFactorRecommendedAction -RiskFactorNT = collections.namedtuple( - "RiskFactorNT", - [ - "device_id", - "category", - "type", - "description", - "score", - "status", - "group", - "remediation", - "remediation_description", - "remediation_recommended_actions", - "first_seen", - "last_seen", - "status_update_time", - "status_updated_by_user_id", - "status_update_reason", - ], -) - - def test_series_to_model(): series = pandas.Series( { @@ -84,3 +61,25 @@ def test_series_to_model(): status_updated_by_user_id=3, status_update_reason="reason1", ) + + +def test_series_to_model_empty(): + series = pandas.Series() + + assert RiskFactor.series_to_model(series) == RiskFactor( + device_id=None, + category=None, + type=None, + description=None, + score=None, + status=None, + group=None, + remediation_type=None, + remediation_description=None, + remediation_recommended_actions=None, + first_seen=None, + last_seen=None, + status_update_time=None, + status_updated_by_user_id=None, + status_update_reason=None, + ) diff --git a/tests/armis_sdk/entities/data_export/vulnerability_test.py b/tests/armis_sdk/entities/data_export/vulnerability_test.py index a77115e..c8fa078 100644 --- a/tests/armis_sdk/entities/data_export/vulnerability_test.py +++ b/tests/armis_sdk/entities/data_export/vulnerability_test.py @@ -1,4 +1,3 @@ -import collections import datetime import pandas @@ -6,24 +5,8 @@ from armis_sdk.entities.data_export.vulnerability import Vulnerability -VulnerabilityNT = collections.namedtuple( - "VulnerabilityNT", - [ - "device_id", - "vulnerability_cve_uid", - "vulnerability_advisory_id", - "vulnerability_remediation_types", - "avm_rating", - "match_source", - "status", - "status_change_time", - "status_change_reason", - ], -) - - def test_series_to_model(): - application_nt = pandas.Series( + series = pandas.Series( { "device_id": 1, "vulnerability_cve_uid": "CVE-123", @@ -37,7 +20,7 @@ def test_series_to_model(): }, ) - assert Vulnerability.series_to_model(application_nt) == Vulnerability( + assert Vulnerability.series_to_model(series) == Vulnerability( device_id=1, cve_uid="CVE-123", advisory_id="KB123", @@ -48,3 +31,19 @@ def test_series_to_model(): status_change_time=datetime.datetime(2025, 11, 1), status_change_reason="Good reason", ) + + +def test_series_to_model_empty(): + series = pandas.Series() + + assert Vulnerability.series_to_model(series) == Vulnerability( + device_id=None, + cve_uid=None, + advisory_id=None, + remediation_types=None, + avm_rating=None, + match_source=None, + status=None, + status_change_time=None, + status_change_reason=None, + )