Skip to content

Commit bd2d485

Browse files
authored
Add replica groups in dstack-service (#3408)
* Add replica groups in dstack-service add_replica_groups_model Replica Groups AutoScaling Rolling deployment and UI Replica Groups implementation clean up * Resolve Merge Conflict & Rename replica_groups to replicas * Resolve pyright type check * Rename replicas to count and make replica names optional * Resolve review comments on probes and rate limits * Resolve tests * Transform to ReplicaGroup in the replica_groups property * Resolve review comments * Resolve test_runs * Resolved major comments * Remove create_group_run_spec and use Job Configurator instead * Resolve Minor Issues * Resolve Minor Issues - Additional fixes * Resolve conflict with master branch * Resolve Major Comments * Resolve some minor comments * Resolve some minor comments --------- Co-authored-by: Bihan Rana
1 parent 802c450 commit bd2d485

File tree

17 files changed

+929
-184
lines changed

17 files changed

+929
-184
lines changed

src/dstack/_internal/cli/utils/run.py

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,16 +285,38 @@ def _format_job_name(
285285
show_deployment_num: bool,
286286
show_replica: bool,
287287
show_job: bool,
288+
group_index: Optional[int] = None,
289+
last_shown_group_index: Optional[int] = None,
288290
) -> str:
289291
name_parts = []
292+
prefix = ""
290293
if show_replica:
291-
name_parts.append(f"replica={job.job_spec.replica_num}")
294+
# Show group information if replica groups are used
295+
if group_index is not None:
296+
# Show group=X replica=Y when group changes, or just replica=Y when same group
297+
if group_index != last_shown_group_index:
298+
# First job in group: use 3 spaces indent
299+
prefix = " "
300+
name_parts.append(f"group={group_index} replica={job.job_spec.replica_num}")
301+
else:
302+
# Subsequent job in same group: align "replica=" with first job's "replica="
303+
# Calculate padding: width of " group={last_shown_group_index} "
304+
padding_width = 3 + len(f"group={last_shown_group_index}") + 1
305+
prefix = " " * padding_width
306+
name_parts.append(f"replica={job.job_spec.replica_num}")
307+
else:
308+
# Legacy behavior: no replica groups
309+
prefix = " "
310+
name_parts.append(f"replica={job.job_spec.replica_num}")
311+
else:
312+
prefix = " "
313+
292314
if show_job:
293315
name_parts.append(f"job={job.job_spec.job_num}")
294316
name_suffix = (
295317
f" deployment={latest_job_submission.deployment_num}" if show_deployment_num else ""
296318
)
297-
name_value = " " + (" ".join(name_parts) if name_parts else "")
319+
name_value = prefix + (" ".join(name_parts) if name_parts else "")
298320
name_value += name_suffix
299321
return name_value
300322

@@ -363,6 +385,17 @@ def get_runs_table(
363385
)
364386
merge_job_rows = len(run.jobs) == 1 and not show_deployment_num
365387

388+
group_name_to_index: Dict[str, int] = {}
389+
if run.run_spec.configuration.type == "service" and hasattr(
390+
run.run_spec.configuration, "replica_groups"
391+
):
392+
replica_groups = run.run_spec.configuration.replica_groups
393+
if replica_groups:
394+
for idx, group in enumerate(replica_groups):
395+
assert group.name is not None, "Group name is always set"
396+
group_name = group.name
397+
group_name_to_index[group_name] = idx
398+
366399
run_row: Dict[Union[str, int], Any] = {
367400
"NAME": _format_run_name(run, show_deployment_num),
368401
"SUBMITTED": format_date(run.submitted_at),
@@ -376,13 +409,35 @@ def get_runs_table(
376409
if not merge_job_rows:
377410
add_row_from_dict(table, run_row)
378411

379-
for job in run.jobs:
412+
# Sort jobs by group index first, then by replica_num within each group
413+
def get_job_sort_key(job: Job) -> tuple:
414+
group_index = None
415+
if group_name_to_index:
416+
group_index = group_name_to_index.get(job.job_spec.replica_group)
417+
# Use a large number for jobs without groups to put them at the end
418+
return (group_index if group_index is not None else 999999, job.job_spec.replica_num)
419+
420+
sorted_jobs = sorted(run.jobs, key=get_job_sort_key)
421+
422+
last_shown_group_index: Optional[int] = None
423+
for job in sorted_jobs:
380424
latest_job_submission = job.job_submissions[-1]
381425
status_formatted = _format_job_submission_status(latest_job_submission, verbose)
382426

427+
# Get group index for this job
428+
group_index: Optional[int] = None
429+
if group_name_to_index:
430+
group_index = group_name_to_index.get(job.job_spec.replica_group)
431+
383432
job_row: Dict[Union[str, int], Any] = {
384433
"NAME": _format_job_name(
385-
job, latest_job_submission, show_deployment_num, show_replica, show_job
434+
job,
435+
latest_job_submission,
436+
show_deployment_num,
437+
show_replica,
438+
show_job,
439+
group_index=group_index,
440+
last_shown_group_index=last_shown_group_index,
386441
),
387442
"STATUS": status_formatted,
388443
"PROBES": _format_job_probes(
@@ -394,6 +449,9 @@ def get_runs_table(
394449
"GPU": "-",
395450
"PRICE": "-",
396451
}
452+
# Update last shown group index for next iteration
453+
if group_index is not None:
454+
last_shown_group_index = group_index
397455
jpd = latest_job_submission.job_provisioning_data
398456
if jpd is not None:
399457
shared_offer: Optional[InstanceOfferWithAvailability] = None

0 commit comments

Comments
 (0)