From 262bc0f00406822f63897e1c41873a75025e7372 Mon Sep 17 00:00:00 2001 From: John Orgera <65687576+johnoooh@users.noreply.github.com> Date: Tue, 26 May 2026 17:34:10 -0400 Subject: [PATCH 1/2] Stream NETMHCSTABANDPAN per sample instead of waiting on full SV channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR #246 added remainder: true to the SV-fasta join in NETMHCSTABANDPAN.createNETMHCInput so samples without SV data wouldn't be silently dropped. The fix is correct, but remainder: true buffers each unmatched item until the SV channel closes — which doesn't happen until GENERATE_MUTATED_PEPTIDES finishes emitting (gated on the slowest GENERATEMUTFASTA across the whole cohort). The visible symptom is that no sample's netMHC tasks start until every sample's GENERATEMUTFASTA has completed. Move the "keep samples without SV" responsibility upstream into GENERATE_MUTATED_PEPTIDES: branch ch_sv into with_sv (runs NEOSV) and without_sv (emits [meta, []] placeholders), and mix both into sv_mut_fasta/sv_wt_fasta. NETMHCSTABANDPAN can then use a plain inner .join — every sample matches and items flow per-sample as soon as their fastas are ready. Verified end-to-end against neoantigenpipeline with a 2-sample run: sample_tiny's first NETMHC submission lands ~120 ms after its own GENERATEMUTFASTA completes, vs ~14 s previously (where it sat behind the other sample's netMHC task). The existing "empty SV channel" regression test from PR #246 still passes — the subworkflow continues to tolerate an empty SV channel as a defensive contract. --- .../msk/generate_mutated_peptides/main.nf | 20 +++++++++++++------ subworkflows/msk/netmhcstabandpan/main.nf | 14 ++++++------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/subworkflows/msk/generate_mutated_peptides/main.nf b/subworkflows/msk/generate_mutated_peptides/main.nf index 54dc99cb..f3939630 100644 --- a/subworkflows/msk/generate_mutated_peptides/main.nf +++ b/subworkflows/msk/generate_mutated_peptides/main.nf @@ -56,24 +56,29 @@ workflow GENERATE_MUTATED_PEPTIDES { ch_versions = ch_versions.mix(GENERATEMUTFASTA.out.versions) - ch_neosv_input = createNEOSVInput( ch_sv , NEOANTIGENUTILS_GENERATEHLASTRING.out.hlastring ) + ch_neosv_branched = branchNEOSVInput( ch_sv , NEOANTIGENUTILS_GENERATEHLASTRING.out.hlastring ) - NEOSV( ch_neosv_input, ch_gtf_and_cdna ) + NEOSV( ch_neosv_branched.with_sv, ch_gtf_and_cdna ) ch_versions = ch_versions.mix(NEOSV.out.versions) + // Pad SV outputs with empty placeholders for samples without SV input so that + // downstream per-sample joins never have to wait for the full channel to close. + ch_sv_mut_fasta = NEOSV.out.mutOut.mix( ch_neosv_branched.without_sv.map{ [it[0], []] } ) + ch_sv_wt_fasta = NEOSV.out.wtOut.mix( ch_neosv_branched.without_sv.map{ [it[0], []] } ) + emit: mut_fasta = GENERATEMUTFASTA.out.mut_fasta // channel: [ val(meta), [ *.MUT_sequences.fa ] ] wt_fasta = GENERATEMUTFASTA.out.wt_fasta // channel: [ val(meta), [ *.WT_sequences.fa ] ] mut_fasta_log = GENERATEMUTFASTA.out.mut_fasta_log // channel: [ val(meta), [ *_generate_mut_fasta.log ] ] - sv_mut_fasta = NEOSV.out.mutOut // channel: [ val(meta), [ *.SV.MUT.fa ] ] - sv_wt_fasta = NEOSV.out.wtOut // channel: [ val(meta), [ *.SV.WT.fa ] ] + sv_mut_fasta = ch_sv_mut_fasta // channel: [ val(meta), [ *.SV.MUT.fa ] | [] ] + sv_wt_fasta = ch_sv_wt_fasta // channel: [ val(meta), [ *.SV.WT.fa ] | [] ] hla_string = NEOANTIGENUTILS_GENERATEHLASTRING.out.hlastring // channel: [ val(meta), [ hla_string ] ] versions = ch_versions // channel: [ versions.yml ] } -def createNEOSVInput(sv_bedpe, hla_str) { +def branchNEOSVInput(sv_bedpe, hla_str) { def sv_bedpe_channel = sv_bedpe .map{ [it[0],it] @@ -89,6 +94,9 @@ def createNEOSVInput(sv_bedpe, hla_str) { .map{ [it[1][0], it[1][1], it[2][1]] } - .filter{ it[1] != null && it[2] != null } + .branch{ + with_sv: it[1] != null && it[2] != null + without_sv: true + } return merged_sv_hla } diff --git a/subworkflows/msk/netmhcstabandpan/main.nf b/subworkflows/msk/netmhcstabandpan/main.nf index 603d2005..5085ff94 100644 --- a/subworkflows/msk/netmhcstabandpan/main.nf +++ b/subworkflows/msk/netmhcstabandpan/main.nf @@ -58,19 +58,19 @@ def createNETMHCInput(fastas_and_hla, sv_fastas) { [it[0],it] } - // remainder: true keeps samples that have no SV data (sv_fastas is empty when no SVs provided) + // Callers are expected to pre-pad sv_fastas with [meta, []] for samples without SV data, + // so this is a plain inner join — each sample emits as soon as its fastas are ready and + // we never have to wait for the SV channel to close. def merged_mut = fastas_and_hla_channel - .join(sv_fastas_channel, by:0, remainder: true) + .join(sv_fastas_channel, by:0) .map({ - def sv = it[2] ?: [null, [], []] - [it[1][0], it[1][1], sv[1], it[1][3], "MUT"] + [it[1][0], it[1][1], it[2][1], it[1][3], "MUT"] }) def merged_wt = fastas_and_hla_channel - .join(sv_fastas_channel, by:0, remainder: true) + .join(sv_fastas_channel, by:0) .map({ - def sv = it[2] ?: [null, [], []] - [it[1][0], it[1][2], sv[2], it[1][3], "WT"] + [it[1][0], it[1][2], it[2][2], it[1][3], "WT"] }) def merged = merged_mut.mix(merged_wt) return merged From f8e7239e3352c14778f8a64b8e331e76ce41e4f2 Mon Sep 17 00:00:00 2001 From: John Orgera <65687576+johnoooh@users.noreply.github.com> Date: Wed, 27 May 2026 16:12:03 -0400 Subject: [PATCH 2/2] Update empty-SV regression test for new pre-pad contract MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The "empty SV channel" stub test from PR #246 was failing with NPE on this branch: with remainder: true removed, channel.empty() produces zero join matches and workflow.out.tsv is empty, so the snapshot assertion indexed into null. Under the new contract callers (e.g. GENERATE_MUTATED_PEPTIDES) pre-pad sv_fastas with [meta, [], []] for samples without SV data — channel.empty() is no longer a realistic input shape. Rename the test to reflect the new contract and pass a padded placeholder; the assertion now covers all 4 emitted tsv outputs (MUT/WT × PAN/STAB). Verified locally against an isolated copy of the test in stub mode: PASSED with the expected 4 output filenames. Snapshot crafted to match the structurally identical existing "netmhcstabandpan - fa,hla_str - tsv - stub" test, since stub mode produces identical meta and filenames regardless of SV input shape. --- .../msk/netmhcstabandpan/tests/main.nf.test | 21 +++++++++++++----- .../netmhcstabandpan/tests/main.nf.test.snap | 22 ++++++++++++++++--- 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/subworkflows/msk/netmhcstabandpan/tests/main.nf.test b/subworkflows/msk/netmhcstabandpan/tests/main.nf.test index b66c6e33..0b87ae35 100644 --- a/subworkflows/msk/netmhcstabandpan/tests/main.nf.test +++ b/subworkflows/msk/netmhcstabandpan/tests/main.nf.test @@ -162,11 +162,12 @@ nextflow_workflow { } - test("netmhcstabandpan - empty SV channel - fa,hla_str - tsv - stub") { + test("netmhcstabandpan - padded empty SV placeholder - fa,hla_str - tsv - stub") { - // Regression test: when no SV files are provided, the upstream NEOSV process - // doesn't run, so the sv_fasta channel emits zero items. The createNETMHCInput - // helper must still emit MUT and WT tuples for each sample. + // Regression test for the per-sample streaming contract: + // when a sample has no SV data, callers (e.g. GENERATE_MUTATED_PEPTIDES) + // pad the SV channel with [meta, [], []] so every sample matches the + // inner join. The subworkflow must still emit per-sample tsv outputs. options "-stub" @@ -180,7 +181,11 @@ nextflow_workflow { "HLA-A24:02,HLA-A24:02,HLA-B39:01,HLA-B39:01,HLA-C07:01,HLA-C06:02", ]) - input[1] = channel.empty() + input[1] = channel.value([ + [ id:'test', single_end:false ], // meta map + [], + [] + ]) """ } } @@ -191,7 +196,11 @@ nextflow_workflow { { assert snapshot(workflow.out.tsv[0][0], file(workflow.out.tsv[0][1]).name, workflow.out.tsv[1][0], - file(workflow.out.tsv[1][1]).name + file(workflow.out.tsv[1][1]).name, + workflow.out.tsv[2][0], + file(workflow.out.tsv[2][1]).name, + workflow.out.tsv[3][0], + file(workflow.out.tsv[3][1]).name ).match() } ) diff --git a/subworkflows/msk/netmhcstabandpan/tests/main.nf.test.snap b/subworkflows/msk/netmhcstabandpan/tests/main.nf.test.snap index c07459e2..ebb1adee 100644 --- a/subworkflows/msk/netmhcstabandpan/tests/main.nf.test.snap +++ b/subworkflows/msk/netmhcstabandpan/tests/main.nf.test.snap @@ -213,7 +213,7 @@ }, "timestamp": "2025-12-19T14:22:41.854041255" }, - "netmhcstabandpan - empty SV channel - fa,hla_str - tsv - stub": { + "netmhcstabandpan - padded empty SV placeholder - fa,hla_str - tsv - stub": { "content": [ { "id": "test", @@ -230,12 +230,28 @@ "fromStab": true, "typePan": true }, - "test.WT.STAB.tsv" + "test.WT.STAB.tsv", + { + "id": "test", + "single_end": false, + "typeMut": true, + "fromStab": false, + "typePan": true + }, + "test.MUT.PAN.tsv", + { + "id": "test", + "single_end": false, + "typeMut": true, + "fromStab": true, + "typePan": true + }, + "test.MUT.STAB.tsv" ], "meta": { "nf-test": "0.9.2", "nextflow": "24.10.2" }, - "timestamp": "2026-05-13T22:29:19.315483" + "timestamp": "2026-05-27T16:15:00.000000000" } } \ No newline at end of file