diff --git a/src/openstack_mcp_server/tools/network_tools.py b/src/openstack_mcp_server/tools/network_tools.py index c1101df..51a5e33 100644 --- a/src/openstack_mcp_server/tools/network_tools.py +++ b/src/openstack_mcp_server/tools/network_tools.py @@ -44,6 +44,8 @@ def register_tools(self, mcp: FastMCP): mcp.tool()(self.delete_port) mcp.tool()(self.get_port_allowed_address_pairs) mcp.tool()(self.set_port_binding) + mcp.tool()(self.add_port_to_security_group) + mcp.tool()(self.remove_port_from_security_group) mcp.tool()(self.get_floating_ips) mcp.tool()(self.create_floating_ip) mcp.tool()(self.delete_floating_ip) @@ -63,6 +65,10 @@ def register_tools(self, mcp: FastMCP): mcp.tool()(self.get_security_group_detail) mcp.tool()(self.update_security_group) mcp.tool()(self.delete_security_group) + mcp.tool()(self.create_security_group_rule) + mcp.tool()(self.get_security_group_rule_detail) + mcp.tool()(self.delete_security_group_rule) + mcp.tool()(self.create_security_group_rules_bulk) def get_networks( self, @@ -511,6 +517,62 @@ def set_port_binding( updated = conn.network.update_port(port_id, **update_args) return self._convert_to_port_model(updated) + def add_port_to_security_group( + self, + port_id: str, + security_group_id: str, + ) -> Port: + """ + Attach a Security Group to a Port. + + Idempotent: if the security group is already attached, returns the current + port state without issuing an update. + + :param port_id: Port ID + :param security_group_id: Security Group ID to attach + :return: Updated (or current) Port object + """ + conn = get_openstack_conn() + current = conn.network.get_port(port_id) + current_group_ids: list[str] = list(current.security_group_ids) + if security_group_id in current_group_ids: + return self._convert_to_port_model(current) + + updated_group_ids = current_group_ids + [security_group_id] + updated = conn.network.update_port( + port_id, security_groups=updated_group_ids + ) + return self._convert_to_port_model(updated) + + def remove_port_from_security_group( + self, + port_id: str, + security_group_id: str, + ) -> Port: + """ + Detach a Security Group from a Port. + + Idempotent: if the security group is not attached, returns the current + port state without issuing an update. + + :param port_id: Port ID + :param security_group_id: Security Group ID to detach + :return: Updated (or current) Port object + """ + conn = get_openstack_conn() + current = conn.network.get_port(port_id) + current_group_ids: list[str] = list(current.security_group_ids) + if security_group_id not in current_group_ids: + return self._convert_to_port_model(current) + + updated_group_ids = [ + sg_id for sg_id in current_group_ids if sg_id != security_group_id + ] + updated = conn.network.update_port( + port_id, security_groups=updated_group_ids + ) + return self._convert_to_port_model(updated) + def create_port( self, network_id: str, @@ -1171,6 +1233,25 @@ def _sanitize_server_filters(self, filters: dict) -> dict: attrs.pop("status", None) return attrs + def _coerce_port(self, value) -> int | None: + """ + Coerce a port value that may arrive as a string (e.g., "80") into int. + + This relaxes tool input validation issues from UIs that serialize numbers + as strings. Only base-10 digit strings are converted; otherwise returns + None so the field can be omitted. + + :param value: Port as int, str, or None + :return: int port or None + """ + if isinstance(value, int): + return value + if isinstance(value, str): + stripped = value.strip() + if stripped.isdigit(): + return int(stripped) + return None + def get_security_groups( self, project_id: str | None = None, @@ -1282,14 +1363,12 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup: :param openstack_sg: OpenStack security group object :return: Pydantic SecurityGroup model """ - rule_ids: list[str] | None = None rules = getattr(openstack_sg, "security_group_rules", None) - if rules is not None: - dto_rules = [ - SecurityGroupRule.model_validate(r, from_attributes=True) - for r in rules - ] - rule_ids = [str(r.id) for r in dto_rules if getattr(r, "id", None)] + rule_ids: list[str] | None = ( + [r.get("id") for r in rules if r.get("id")] + if rules is not None + else None + ) return SecurityGroup( id=openstack_sg.id, @@ -1299,3 +1378,120 @@ def _convert_to_security_group_model(self, openstack_sg) -> SecurityGroup: project_id=getattr(openstack_sg, "project_id", None), security_group_rule_ids=rule_ids, ) + + def create_security_group_rule( + self, + security_group_id: str, + direction: str = "ingress", + ethertype: str = "IPv4", + protocol: str | None = None, + port_range_min: int | str | None = None, + port_range_max: int | str | None = None, + remote_ip_prefix: str | None = None, + remote_group_id: str | None = None, + description: str | None = None, + project_id: str | None = None, + ) -> SecurityGroupRule: + """ + Create a Security Group Rule. + + :param security_group_id: Target security group ID + :param direction: "ingress" or "egress" + :param ethertype: "IPv4" or "IPv6" + :param protocol: L4 protocol (e.g., "tcp", "udp", "icmp") + :param port_range_min: Minimum port + :param port_range_max: Maximum port + :param remote_ip_prefix: Source/destination CIDR + :param remote_group_id: Peer security group ID + :param description: Rule description + :param project_id: Project ownership + :return: Created SecurityGroupRule + """ + conn = get_openstack_conn() + create_args: dict = {} + if protocol is not None: + create_args["protocol"] = protocol + coerced_min = self._coerce_port(port_range_min) + coerced_max = self._coerce_port(port_range_max) + if coerced_min is not None: + create_args["port_range_min"] = coerced_min + if coerced_max is not None: + create_args["port_range_max"] = coerced_max + if remote_ip_prefix is not None: + create_args["remote_ip_prefix"] = remote_ip_prefix + if remote_group_id is not None: + create_args["remote_group_id"] = remote_group_id + if description is not None: + create_args["description"] = description + if project_id is not None: + create_args["project_id"] = project_id + + rule = conn.network.create_security_group_rule( + security_group_id=security_group_id, + direction=direction, + ethertype=ethertype, + **create_args, + ) + return SecurityGroupRule(**rule) + + def get_security_group_rule_detail( + self, rule_id: str + ) -> SecurityGroupRule: + """ + Get detailed information about a specific Security Group Rule. + + :param rule_id: Rule ID + :return: SecurityGroupRule detail + """ + conn = get_openstack_conn() + rule = conn.network.get_security_group_rule(rule_id) + return SecurityGroupRule(**rule) + + def delete_security_group_rule(self, rule_id: str) -> None: + """ + Delete a Security Group Rule. + + :param rule_id: Rule ID to delete + :return: None + """ + conn = get_openstack_conn() + conn.network.delete_security_group_rule(rule_id, ignore_missing=False) + return None + + def create_security_group_rules_bulk( + self, + rules: list[dict], + ) -> list[SecurityGroupRule]: + """ + Create multiple Security Group Rules in bulk. + + Each rule dict should follow Neutron SG rule schema keys (e.g., + security_group_id, direction, ethertype, protocol, port_range_min, + port_range_max, remote_ip_prefix, remote_group_id, description, project_id). + + :param rules: List of rule dictionaries + :return: List of created SecurityGroupRule models + """ + conn = get_openstack_conn() + + def _clean_rule_payload(raw: dict) -> dict: + payload = dict(raw) + # Remove port ranges when protocol is absent (Neutron requirement) + if not payload.get("protocol"): + payload.pop("port_range_min", None) + payload.pop("port_range_max", None) + else: + # Coerce possible string ports to integers + for key in ("port_range_min", "port_range_max"): + if key in payload: + coerced = self._coerce_port(payload.get(key)) + if coerced is None: + payload.pop(key, None) + else: + payload[key] = coerced + # Drop keys explicitly set to None + return {k: v for k, v in payload.items() if v is not None} + + cleaned_rules = [_clean_rule_payload(r) for r in rules] + created = conn.network.create_security_group_rules(rules=cleaned_rules) + return [SecurityGroupRule(**r) for r in created] diff --git a/tests/tools/test_network_tools.py b/tests/tools/test_network_tools.py index 5108bed..f919299 100644 --- a/tests/tools/test_network_tools.py +++ b/tests/tools/test_network_tools.py @@ -869,6 +869,87 @@ def test_set_port_binding_and_admin_state( ) assert res_toggle.is_admin_state_up is True + def test_add_remove_security_group_on_port( + self, mock_openstack_connect_network + ): + mock_conn = mock_openstack_connect_network + + # current without sg-9 + current = Mock() + current.id = "port-1" + current.name = "p1" + current.status = "ACTIVE" + current.description = None + current.project_id = None + current.network_id = "net-1" + current.admin_state_up = True + current.is_admin_state_up = True + current.device_id = None + current.device_owner = None + current.mac_address = "fa:16:3e:00:00:09" + current.fixed_ips = [] + current.security_group_ids = ["sg-1", "sg-2"] + mock_conn.network.get_port.return_value = current + + # updated after add + updated_add = Mock() + updated_add.id = "port-1" + updated_add.name = "p1" + updated_add.status = "ACTIVE" + updated_add.description = None + updated_add.project_id = None + updated_add.network_id = "net-1" + updated_add.admin_state_up = True + updated_add.is_admin_state_up = True + updated_add.device_id = None + updated_add.device_owner = None + updated_add.mac_address = "fa:16:3e:00:00:09" + updated_add.fixed_ips = [] + updated_add.security_group_ids = ["sg-1", "sg-2", "sg-9"] + mock_conn.network.update_port.return_value = updated_add + + tools = self.get_network_tools() + res_add = tools.add_port_to_security_group("port-1", "sg-9") + assert isinstance(res_add, Port) + mock_conn.network.update_port.assert_called_with( + "port-1", security_groups=["sg-1", "sg-2", "sg-9"] + ) + + # idempotent add when sg already present + mock_conn.network.get_port.return_value = updated_add + res_add_again = tools.add_port_to_security_group("port-1", "sg-9") + assert isinstance(res_add_again, Port) + + # updated after remove + updated_remove = Mock() + updated_remove.id = "port-1" + updated_remove.name = "p1" + updated_remove.status = "ACTIVE" + updated_remove.description = None + updated_remove.project_id = None + updated_remove.network_id = "net-1" + updated_remove.admin_state_up = True + updated_remove.is_admin_state_up = True + updated_remove.device_id = None + updated_remove.device_owner = None + updated_remove.mac_address = "fa:16:3e:00:00:09" + updated_remove.fixed_ips = [] + updated_remove.security_group_ids = ["sg-1", "sg-2"] + mock_conn.network.update_port.return_value = updated_remove + + res_remove = tools.remove_port_from_security_group("port-1", "sg-9") + assert isinstance(res_remove, Port) + mock_conn.network.update_port.assert_called_with( + "port-1", security_groups=["sg-1", "sg-2"] + ) + + # idempotent remove when sg not present + mock_conn.network.get_port.return_value = updated_remove + res_remove_again = tools.remove_port_from_security_group( + "port-1", "sg-9" + ) + assert isinstance(res_remove_again, Port) + def test_get_subnets_filters_and_has_gateway_true( self, mock_openstack_connect_network, @@ -1379,10 +1460,7 @@ def test_get_security_groups_filters(self, mock_openstack_connect_network): sg.status = None sg.description = "desc" sg.project_id = "proj-1" - sg.security_group_rules = [ - {"id": "r-1"}, - {"id": "r-2"}, - ] + sg.security_group_rules = [{"id": "r-1"}, {"id": "r-2"}] expected_sg = SecurityGroup( id="sg-1", @@ -1768,3 +1846,141 @@ def test_add_get_remove_router_interface_by_port( assert removed == RouterInterface( router_id="r-if-2", port_id="p-2", subnet_id="s-2" ) + + def test_create_security_group_rule(self, mock_openstack_connect_network): + mock_conn = mock_openstack_connect_network + rule = { + "id": "r-1", + "name": None, + "status": None, + "description": "allow 22", + "project_id": "proj-1", + "direction": "ingress", + "ethertype": "IPv4", + "protocol": "tcp", + "port_range_min": 22, + "port_range_max": 22, + "remote_ip_prefix": "0.0.0.0/0", + "remote_group_id": None, + "security_group_id": "sg-1", + } + mock_conn.network.create_security_group_rule.return_value = rule + + tools = self.get_network_tools() + res = tools.create_security_group_rule( + security_group_id="sg-1", + direction="ingress", + ethertype="IPv4", + protocol="tcp", + port_range_min=22, + port_range_max=22, + remote_ip_prefix="0.0.0.0/0", + description="allow 22", + project_id="proj-1", + ) + assert res.id == "r-1" + mock_conn.network.create_security_group_rule.assert_called_once() + + def test_get_security_group_rule_detail( + self, mock_openstack_connect_network + ): + mock_conn = mock_openstack_connect_network + rule = { + "id": "r-2", + "name": None, + "status": None, + "description": None, + "project_id": None, + "direction": "egress", + "ethertype": "IPv4", + "protocol": None, + "port_range_min": None, + "port_range_max": None, + "remote_ip_prefix": None, + "remote_group_id": None, + "security_group_id": "sg-1", + } + mock_conn.network.get_security_group_rule.return_value = rule + + tools = self.get_network_tools() + res = tools.get_security_group_rule_detail("r-2") + assert res.id == "r-2" + mock_conn.network.get_security_group_rule.assert_called_once_with( + "r-2" + ) + + def test_delete_security_group_rule(self, mock_openstack_connect_network): + mock_conn = mock_openstack_connect_network + mock_conn.network.delete_security_group_rule.return_value = None + + tools = self.get_network_tools() + res = tools.delete_security_group_rule("r-3") + assert res is None + mock_conn.network.delete_security_group_rule.assert_called_once_with( + "r-3", ignore_missing=False + ) + + def test_create_security_group_rules_bulk( + self, mock_openstack_connect_network + ): + mock_conn = mock_openstack_connect_network + r1 = { + "id": "r-10", + "name": None, + "status": None, + "description": None, + "project_id": None, + "security_group_id": "sg-1", + "direction": "ingress", + "ethertype": "IPv4", + "protocol": "tcp", + "port_range_min": 80, + "port_range_max": 80, + "remote_ip_prefix": "0.0.0.0/0", + "remote_group_id": None, + } + + r2 = { + "id": "r-11", + "name": None, + "status": None, + "description": None, + "project_id": None, + "security_group_id": "sg-1", + "direction": "ingress", + "ethertype": "IPv4", + "protocol": "tcp", + "port_range_min": 443, + "port_range_max": 443, + "remote_ip_prefix": "0.0.0.0/0", + "remote_group_id": None, + } + + mock_conn.network.create_security_group_rules.return_value = [r1, r2] + + tools = self.get_network_tools() + rules = [ + { + "security_group_id": "sg-1", + "direction": "ingress", + "ethertype": "IPv4", + "protocol": "tcp", + "port_range_min": 80, + "port_range_max": 80, + "remote_ip_prefix": "0.0.0.0/0", + }, + { + "security_group_id": "sg-1", + "direction": "ingress", + "ethertype": "IPv4", + "protocol": "tcp", + "port_range_min": 443, + "port_range_max": 443, + "remote_ip_prefix": "0.0.0.0/0", + }, + ] + res = tools.create_security_group_rules_bulk(rules) + assert len(res) == 2 + mock_conn.network.create_security_group_rules.assert_called_once_with( + rules=rules + )