Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions armis_sdk/entities/data_export/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,51 +19,51 @@ class Application(BaseExportedEntity):

entity_name: ClassVar[str] = "applications"

device_id: int
device_id: int | None = None
"""The id of the device with the application"""

vendor: str
vendor: str | None = None
"""
The vendor of the application

**Example**: `Google`
"""

name: str
name: str | None = None
"""
The name of the application

**Example**: `Chrome`
"""

version: str
version: str | None = None
"""
The version of the application

**Example**: `30.0.1599.40`
"""

cpe: str | None
cpe: str | None = None
"""
The CPE (Common Platform Enumeration) of the application

**Example**: `cpe:2.3:a:google:chrome:30.0.1599.40:*:*:*:*:*:*:*`
"""

first_seen: datetime.datetime
first_seen: datetime.datetime | None = None
"""When the application was first seen on the device"""

last_seen: datetime.datetime
last_seen: datetime.datetime | None = None
"""When the application was last seen on the device"""

@classmethod
def series_to_model(cls, series: pandas.Series) -> Application:
return Application(
device_id=series.loc["device_id"],
vendor=series.loc["vendor"],
name=series.loc["name"],
version=series.loc["version"],
cpe=cls._value_or_none(series.loc["cpe"] if "cpe" in series.index else None),
first_seen=series.loc["first_seen"].to_pydatetime(),
last_seen=series["last_seen"].to_pydatetime(),
device_id=cls._value_or_none(series.get("device_id")),
vendor=cls._value_or_none(series.get("vendor")),
name=cls._value_or_none(series.get("name")),
version=cls._value_or_none(series.get("version")),
cpe=cls._value_or_none(series.get("cpe")),
first_seen=cls._value_or_none(series.get("first_seen")),
last_seen=cls._value_or_none(series.get("last_seen")),
)
9 changes: 6 additions & 3 deletions armis_sdk/entities/data_export/base_exported_entity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from __future__ import annotations

import abc
from collections.abc import Iterable
from typing import Type # noqa: UP035 # TODO: fix UP035 (deprecated import, use updated module)
from typing import TypeVar

Expand All @@ -19,12 +22,12 @@ def series_to_model(cls: type[T], series: pandas.Series) -> T: ...
def entity_name(self): ...

@classmethod
def _to_list(cls, value) -> list:
return [item for item in value if cls._value_or_none(item)]
def _to_list(cls, value) -> list | None:
return [item for item in value if cls._value_or_none(item)] if isinstance(value, Iterable) else None

@classmethod
def _value_or_none(cls, value):
if not value or pandas.isnull(value) or value == "N/A": # noqa: PD003 # TODO: fix PD003 (use .isna() instead of .isnull())
if not value or pandas.isna(value) or value == "N/A":
return None

if isinstance(value, pandas.Timestamp):
Expand Down
72 changes: 35 additions & 37 deletions armis_sdk/entities/data_export/risk_factor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class RiskFactorRecommendedAction(BaseModel):
"""
The description of the recommended action

**Example**: `Regularly update all operating systems and firmware on network devices
to the latest versions to reduce the potential for exploitation of vulnerabilities
**Example**: `Regularly update all operating systems and firmware on network devices
to the latest versions to reduce the potential for exploitation of vulnerabilities
via obsolete protocols.`
"""

Expand All @@ -50,48 +50,48 @@ class RiskFactor(BaseExportedEntity):

entity_name: ClassVar[str] = "risk-factors"

device_id: int
device_id: int | None = None
"""The id of the device with the risk factor"""

category: str
category: str | None = None
"""
The category of the risk factor

**Example**: `BEHAVIOURAL`
"""

type: str
type: str | None = None
"""
The type of the risk factor

**Example**: `SMBV1_SUPPORT`
"""

description: str
description: str | None = None
"""
The description of the risk factor

**Example**: `Device Supports SMBv1`
"""

score: int | None
score: int | None = None
"""The score of the risk factor"""

group: str
group: str | None = None
"""
The group of the risk factor

**Example**: `INSECURE_TRAFFIC_AND_BEHAVIOR`
"""

remediation_type: str | None
remediation_type: str | None = None
"""
The type of the remediation

**Example**: `Disable SMBv1 Protocol`
"""

remediation_description: str | None
remediation_description: str | None = None
"""
The description of the remediation

Expand All @@ -100,29 +100,29 @@ class RiskFactor(BaseExportedEntity):
such as SMBv3 are implemented to maintain secure network communications.`
"""

remediation_recommended_actions: list[RiskFactorRecommendedAction]
remediation_recommended_actions: list[RiskFactorRecommendedAction] | None = None
"""The remediation recommended actions"""

first_seen: datetime.datetime
first_seen: datetime.datetime | None = None
"""When the risk factor was first seen on the device"""

last_seen: datetime.datetime
last_seen: datetime.datetime | None = None
"""When the risk factor was last seen on the device"""

status: str
status: str | None = None
"""
The status of the risk factor in relation to the device

**Example**: `OPEN`
"""

status_update_time: datetime.datetime | None
status_update_time: datetime.datetime | None = None
"""When was the status last changed"""

status_updated_by_user_id: int | None
status_updated_by_user_id: int | None = None
"""Which used id last changed the status"""

status_update_reason: str | None
status_update_reason: str | None = None
"""
The reason for the status change

Expand All @@ -131,27 +131,25 @@ class RiskFactor(BaseExportedEntity):

@classmethod
def series_to_model(cls, series: pandas.Series) -> RiskFactor:
remediation_recommended_actions = series.get("remediation_recommended_actions")
return RiskFactor(
device_id=series.loc["device_id"],
category=series.loc["category"],
type=series.loc["type"],
description=series.loc["description"],
score=(int(score) if (score := cls._value_or_none(series.loc["score"])) else None),
status=series.loc["status"],
group=series.loc["group"],
remediation_type=cls._value_or_none(series.loc["remediation"]),
remediation_description=cls._value_or_none(series.loc["remediation_description"]),
device_id=cls._value_or_none(series.get("device_id")),
category=cls._value_or_none(series.get("category")),
type=cls._value_or_none(series.get("type")),
description=cls._value_or_none(series.get("description")),
score=(int(score) if (score := cls._value_or_none(series.get("score"))) else None),
status=cls._value_or_none(series.get("status")),
group=cls._value_or_none(series.get("group")),
remediation_type=cls._value_or_none(series.get("remediation")),
remediation_description=cls._value_or_none(series.get("remediation_description")),
remediation_recommended_actions=(
[
RiskFactorRecommendedAction(**item)
for item in json.loads(series.loc["remediation_recommended_actions"])
]
if series.loc["remediation_recommended_actions"]
else []
[RiskFactorRecommendedAction(**item) for item in json.loads(remediation_recommended_actions)]
if isinstance(remediation_recommended_actions, str)
else None
),
first_seen=series.loc["first_seen"].to_pydatetime(),
last_seen=series.loc["last_seen"].to_pydatetime(),
status_update_time=cls._value_or_none(series.loc["status_update_time"]),
status_updated_by_user_id=cls._value_or_none(series.loc["status_updated_by_user_id"]),
status_update_reason=cls._value_or_none(series.loc["status_update_reason"]),
first_seen=cls._value_or_none(series.get("first_seen")),
last_seen=cls._value_or_none(series.get("last_seen")),
status_update_time=cls._value_or_none(series.get("status_update_time")),
status_updated_by_user_id=cls._value_or_none(series.get("status_updated_by_user_id")),
status_update_reason=cls._value_or_none(series.get("status_update_reason")),
)
37 changes: 18 additions & 19 deletions armis_sdk/entities/data_export/vulnerability.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import datetime
from typing import ClassVar
from typing import Optional
from typing import TYPE_CHECKING

from armis_sdk.entities.data_export.base_exported_entity import BaseExportedEntity
Expand All @@ -19,63 +18,63 @@ class Vulnerability(BaseExportedEntity):

entity_name: ClassVar[str] = "vulnerabilities"

device_id: int
device_id: int | None = None
"""The id of the device with the vulnerability"""

cve_uid: str
cve_uid: str | None = None
"""
The unique CVE identifier

**Example**: `CVE-2025-53799`
"""

advisory_id: str | None
advisory_id: str | None = None
"""
The id of the advisory

**Example**: `KB5065429`
"""

remediation_types: list[str]
remediation_types: list[str] | None = None
"""
The list of remediation types

**Example**: `["VERSION_UPDATE"]`
"""

avm_rating: str | None
avm_rating: str | None = None
"""The Armis AVM (Asset Vulnerability Management) rating of the vulnerability"""

match_source: list[str]
match_source: list[str] | None = None
"""
The list of sources for the match

**Example**: `["Profile Matching"]`
"""

status: str
status: str | None = None
"""
The status of the vulnerability in relation to the device

**Example**: `Open`
"""

status_change_time: datetime.datetime | None
status_change_time: datetime.datetime | None = None
"""When was the status last changed"""

status_change_reason: str | None
status_change_reason: str | None = None
"""The reason for the status change"""

@classmethod
def series_to_model(cls, series: pandas.Series) -> Vulnerability:
return Vulnerability(
device_id=series.loc["device_id"],
cve_uid=series.loc["vulnerability_cve_uid"],
advisory_id=cls._value_or_none(series.loc["vulnerability_advisory_id"]),
remediation_types=cls._to_list(series.loc["vulnerability_remediation_types"]),
avm_rating=cls._value_or_none(series.loc["avm_rating"]),
match_source=cls._to_list(series.loc["match_source"]),
status=series.loc["status"],
status_change_time=cls._value_or_none(series.loc["status_change_time"]),
status_change_reason=cls._value_or_none(series.loc["status_change_reason"]),
device_id=cls._value_or_none(series.get("device_id")),
cve_uid=cls._value_or_none(series.get("vulnerability_cve_uid")),
advisory_id=cls._value_or_none(series.get("vulnerability_advisory_id")),
remediation_types=cls._to_list(series.get("vulnerability_remediation_types")),
avm_rating=cls._value_or_none(series.get("avm_rating")),
match_source=cls._to_list(series.get("match_source")),
status=cls._value_or_none(series.get("status")),
status_change_time=cls._value_or_none(series.get("status_change_time")),
status_change_reason=cls._value_or_none(series.get("status_change_reason")),
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "armis_sdk"
version = "1.1.2"
version = "1.1.3"
description = "The Armis SDK is a package that encapsulates common use-cases for interacting with the Armis platform."
authors = [
{ name = "Shai Lachmanovich", email = "shai@armis.com" },
Expand Down
2 changes: 1 addition & 1 deletion tests/armis_sdk/clients/data_export_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async def test_get(httpx_mock: pytest_httpx.HTTPXMock):


@mock.patch.object(pandas, "read_parquet")
async def test_export(mock_read_parquet: mock.MagicMock, httpx_mock: pytest_httpx.HTTPXMock):
async def test_iterate(mock_read_parquet: mock.MagicMock, httpx_mock: pytest_httpx.HTTPXMock):
httpx_mock.add_response(
url="https://api.armis.com/v3/data-export/mock-entity",
json={
Expand Down
21 changes: 14 additions & 7 deletions tests/armis_sdk/entities/data_export/application_test.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
import collections
import datetime

import pandas

from armis_sdk.entities.data_export.application import Application


ApplicationNT = collections.namedtuple(
"ApplicationNT",
["device_id", "name", "vendor", "version", "cpe", "first_seen", "last_seen"],
)


def test_series_to_model():
series = pandas.Series(
{
Expand All @@ -34,3 +27,17 @@ def test_series_to_model():
first_seen=datetime.datetime(2025, 11, 1),
last_seen=datetime.datetime(2025, 11, 4),
)


def test_series_to_model_empty():
series = pandas.Series()

assert Application.series_to_model(series) == Application(
device_id=None,
vendor=None,
name=None,
version=None,
cpe=None,
first_seen=None,
last_seen=None,
)
Loading
Loading