Skip to content

Commit c46205b

Browse files
authored
Add service and replica registration events (#3516)
- Service registered in gateway - Service unregistered from gateway - Service replica registered to receive requests - Service replica unregistered from receiving requests
1 parent 14ef341 commit c46205b

File tree

4 files changed

+73
-20
lines changed

4 files changed

+73
-20
lines changed

src/dstack/_internal/server/services/gateways/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ async def generate_gateway_name(session: AsyncSession, project: ProjectModel) ->
444444

445445
async def get_or_add_gateway_connection(
446446
session: AsyncSession, gateway_id: uuid.UUID
447-
) -> GatewayConnection:
447+
) -> tuple[GatewayModel, GatewayConnection]:
448448
gateway = await session.get(GatewayModel, gateway_id)
449449
if gateway is None:
450450
raise GatewayError("Gateway not found")
@@ -460,7 +460,7 @@ async def get_or_add_gateway_connection(
460460
"Failed to connect to gateway %s: %s", gateway.gateway_compute.ip_address, e
461461
)
462462
raise GatewayError("Failed to connect to gateway")
463-
return conn
463+
return gateway, conn
464464

465465

466466
async def init_gateways(session: AsyncSession):

src/dstack/_internal/server/services/services/__init__.py

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from dstack._internal.core.models.runs import JobSpec, Run, RunSpec, ServiceModelSpec, ServiceSpec
3030
from dstack._internal.server import settings
3131
from dstack._internal.server.models import GatewayModel, JobModel, ProjectModel, RunModel
32+
from dstack._internal.server.services import events
3233
from dstack._internal.server.services.gateways import (
3334
get_gateway_configuration,
3435
get_or_add_gateway_connection,
@@ -114,7 +115,7 @@ async def _register_service_in_gateway(
114115
domain = service_spec.get_domain()
115116
assert domain is not None
116117

117-
conn = await get_or_add_gateway_connection(session, gateway.id)
118+
_, conn = await get_or_add_gateway_connection(session, gateway.id)
118119
try:
119120
logger.debug("%s: registering service as %s", fmt(run_model), service_spec.url)
120121
async with conn.client() as client:
@@ -131,13 +132,21 @@ async def _register_service_in_gateway(
131132
ssh_private_key=run_model.project.ssh_private_key,
132133
router=router,
133134
)
134-
logger.info("%s: service is registered as %s", fmt(run_model), service_spec.url)
135135
except SSHError:
136136
raise ServerClientError("Gateway tunnel is not working")
137137
except httpx.RequestError as e:
138138
logger.debug("Gateway request failed", exc_info=True)
139139
raise GatewayError(f"Gateway is not working: {e!r}")
140140

141+
events.emit(
142+
session,
143+
"Service registered in gateway",
144+
actor=events.SystemActor(),
145+
targets=[
146+
events.Target.from_model(run_model),
147+
events.Target.from_model(gateway),
148+
],
149+
)
141150
return service_spec
142151

143152

@@ -193,8 +202,9 @@ async def register_replica(
193202
ssh_head_proxy: Optional[SSHConnectionParams],
194203
ssh_head_proxy_private_key: Optional[str],
195204
):
205+
gateway = None
196206
if gateway_id is not None:
197-
conn = await get_or_add_gateway_connection(session, gateway_id)
207+
gateway, conn = await get_or_add_gateway_connection(session, gateway_id)
198208
job_submission = jobs_services.job_model_to_job_submission(job_model)
199209
try:
200210
logger.debug("%s: registering replica for service %s", fmt(job_model), run.id.hex)
@@ -225,17 +235,21 @@ async def register_replica(
225235
else:
226236
raise
227237
job_model.registered = True
228-
logger.info(
229-
"%s: service replica registered to receive requests, gateway=%s",
230-
fmt(job_model),
231-
gateway_id is not None,
238+
targets = [events.Target.from_model(job_model)]
239+
if gateway is not None:
240+
targets.append(events.Target.from_model(gateway))
241+
events.emit(
242+
session,
243+
"Service replica registered to receive requests",
244+
actor=events.SystemActor(),
245+
targets=targets,
232246
)
233247

234248

235249
async def unregister_service(session: AsyncSession, run_model: RunModel):
236250
if run_model.gateway_id is None: # in-server proxy
237251
return
238-
conn = await get_or_add_gateway_connection(session, run_model.gateway_id)
252+
gateway, conn = await get_or_add_gateway_connection(session, run_model.gateway_id)
239253
res = await session.execute(
240254
select(ProjectModel).where(ProjectModel.id == run_model.project_id)
241255
)
@@ -247,24 +261,37 @@ async def unregister_service(session: AsyncSession, run_model: RunModel):
247261
project=project.name,
248262
run_name=run_model.run_name,
249263
)
250-
logger.debug("%s: service is unregistered", fmt(run_model))
264+
event_msg = "Service unregistered from gateway"
251265
except GatewayError as e:
252266
# ignore if service is not registered
253267
logger.warning("%s: unregistering service: %s", fmt(run_model), e)
268+
event_msg = f"Gateway error when unregistering service: {e}"
254269
except (httpx.RequestError, SSHError) as e:
255270
logger.debug("Gateway request failed", exc_info=True)
256271
raise GatewayError(repr(e))
272+
events.emit(
273+
session,
274+
event_msg,
275+
actor=events.SystemActor(),
276+
targets=[
277+
events.Target.from_model(run_model),
278+
events.Target.from_model(gateway),
279+
],
280+
)
257281

258282

259283
async def unregister_replica(session: AsyncSession, job_model: JobModel):
284+
if not job_model.registered: # non-services and unregistered service replicas
285+
return
260286
res = await session.execute(
261287
select(RunModel)
262288
.where(RunModel.id == job_model.run_id)
263-
.options(joinedload(RunModel.project).joinedload(ProjectModel.backends))
289+
.options(joinedload(RunModel.project))
264290
)
265291
run_model = res.unique().scalar_one()
292+
gateway = None
266293
if run_model.gateway_id is not None:
267-
conn = await get_or_add_gateway_connection(session, run_model.gateway_id)
294+
gateway, conn = await get_or_add_gateway_connection(session, run_model.gateway_id)
268295
try:
269296
logger.debug(
270297
"%s: unregistering replica from service %s", fmt(job_model), job_model.run_id.hex
@@ -282,10 +309,14 @@ async def unregister_replica(session: AsyncSession, job_model: JobModel):
282309
logger.debug("Gateway request failed", exc_info=True)
283310
raise GatewayError(repr(e))
284311
job_model.registered = False
285-
logger.info(
286-
"%s: service replica unregistered from receiving requests, gateway=%s",
287-
fmt(job_model),
288-
run_model.gateway_id is not None,
312+
targets = [events.Target.from_model(job_model)]
313+
if gateway is not None:
314+
targets.append(events.Target.from_model(gateway))
315+
events.emit(
316+
session,
317+
"Service replica unregistered from receiving requests",
318+
actor=events.SystemActor(),
319+
targets=targets,
289320
)
290321

291322

@@ -314,7 +345,7 @@ async def update_service_desired_replica_count(
314345
) -> None:
315346
stats = None
316347
if run_model.gateway_id is not None:
317-
conn = await get_or_add_gateway_connection(session, run_model.gateway_id)
348+
_, conn = await get_or_add_gateway_connection(session, run_model.gateway_id)
318349
stats = await conn.get_stats(run_model.project.name, run_model.run_name)
319350
replica_groups = configuration.replica_groups
320351
desired_replica_counts = {}

src/tests/_internal/server/background/tasks/test_process_running_jobs.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,11 @@ async def test_registers_service_replica_immediately_if_no_probes(
10141014
await session.refresh(job)
10151015
assert job.status == JobStatus.RUNNING
10161016
assert job.registered
1017+
events = await list_events(session)
1018+
assert {e.message for e in events} == {
1019+
"Job status changed PULLING -> RUNNING",
1020+
"Service replica registered to receive requests",
1021+
}
10171022

10181023
@pytest.mark.asyncio
10191024
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
@@ -1104,7 +1109,14 @@ async def test_registers_service_replica_only_after_probes_pass(
11041109
await process_running_jobs()
11051110

11061111
await session.refresh(job)
1107-
assert job.registered == expect_to_register
1112+
events = await list_events(session)
1113+
if expect_to_register:
1114+
assert job.registered
1115+
assert len(events) == 1
1116+
assert events[0].message == "Service replica registered to receive requests"
1117+
else:
1118+
assert not job.registered
1119+
assert not events
11081120

11091121

11101122
class TestPatchBaseImageForAwsEfa:

src/tests/_internal/server/routers/test_runs.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
get_fleet_spec,
6464
get_job_provisioning_data,
6565
get_run_spec,
66+
list_events,
6667
)
6768
from dstack._internal.server.testing.matchers import SomeUUID4Str
6869
from tests._internal.server.background.tasks.test_process_running_jobs import settings
@@ -2085,41 +2086,47 @@ def mock_gateway_connections(self) -> Generator[None, None, None]:
20852086
"specified_gateway_in_run_conf",
20862087
"expected_service_url",
20872088
"expected_model_url",
2089+
"is_gateway",
20882090
),
20892091
[
20902092
pytest.param(
20912093
[("default-gateway", True), ("non-default-gateway", False)],
20922094
None,
20932095
"https://test-service.default-gateway.example",
20942096
"https://gateway.default-gateway.example",
2097+
True,
20952098
id="submits-to-default-gateway",
20962099
),
20972100
pytest.param(
20982101
[("default-gateway", True), ("non-default-gateway", False)],
20992102
True,
21002103
"https://test-service.default-gateway.example",
21012104
"https://gateway.default-gateway.example",
2105+
True,
21022106
id="submits-to-default-gateway-when-gateway-true",
21032107
),
21042108
pytest.param(
21052109
[("default-gateway", True), ("non-default-gateway", False)],
21062110
"non-default-gateway",
21072111
"https://test-service.non-default-gateway.example",
21082112
"https://gateway.non-default-gateway.example",
2113+
True,
21092114
id="submits-to-specified-gateway",
21102115
),
21112116
pytest.param(
21122117
[("non-default-gateway", False)],
21132118
None,
21142119
"/proxy/services/test-project/test-service/",
21152120
"/proxy/models/test-project/",
2121+
False,
21162122
id="submits-in-server-when-no-default-gateway",
21172123
),
21182124
pytest.param(
21192125
[("default-gateway", True)],
21202126
False,
21212127
"/proxy/services/test-project/test-service/",
21222128
"/proxy/models/test-project/",
2129+
False,
21232130
id="submits-in-server-when-specified",
21242131
),
21252132
],
@@ -2130,9 +2137,10 @@ async def test_submit_to_correct_proxy(
21302137
session: AsyncSession,
21312138
client: AsyncClient,
21322139
existing_gateways: List[Tuple[str, bool]],
2133-
specified_gateway_in_run_conf: str,
2140+
specified_gateway_in_run_conf: Union[str, bool, None],
21342141
expected_service_url: str,
21352142
expected_model_url: str,
2143+
is_gateway: bool,
21362144
) -> None:
21372145
user = await create_user(session=session, global_role=GlobalRole.USER)
21382146
project = await create_project(session=session, owner=user, name="test-project")
@@ -2171,6 +2179,8 @@ async def test_submit_to_correct_proxy(
21712179
assert response.status_code == 200
21722180
assert response.json()["service"]["url"] == expected_service_url
21732181
assert response.json()["service"]["model"]["base_url"] == expected_model_url
2182+
events = await list_events(session)
2183+
assert ("Service registered in gateway" in {e.message for e in events}) == is_gateway
21742184

21752185
@pytest.mark.asyncio
21762186
async def test_return_error_if_specified_gateway_not_exists(

0 commit comments

Comments
 (0)