diff --git a/subworkflows/msk/generate_mutated_peptides/main.nf b/subworkflows/msk/generate_mutated_peptides/main.nf index 54dc99cb..f3939630 100644 --- a/subworkflows/msk/generate_mutated_peptides/main.nf +++ b/subworkflows/msk/generate_mutated_peptides/main.nf @@ -56,24 +56,29 @@ workflow GENERATE_MUTATED_PEPTIDES { ch_versions = ch_versions.mix(GENERATEMUTFASTA.out.versions) - ch_neosv_input = createNEOSVInput( ch_sv , NEOANTIGENUTILS_GENERATEHLASTRING.out.hlastring ) + ch_neosv_branched = branchNEOSVInput( ch_sv , NEOANTIGENUTILS_GENERATEHLASTRING.out.hlastring ) - NEOSV( ch_neosv_input, ch_gtf_and_cdna ) + NEOSV( ch_neosv_branched.with_sv, ch_gtf_and_cdna ) ch_versions = ch_versions.mix(NEOSV.out.versions) + // Pad SV outputs with empty placeholders for samples without SV input so that + // downstream per-sample joins never have to wait for the full channel to close. + ch_sv_mut_fasta = NEOSV.out.mutOut.mix( ch_neosv_branched.without_sv.map{ [it[0], []] } ) + ch_sv_wt_fasta = NEOSV.out.wtOut.mix( ch_neosv_branched.without_sv.map{ [it[0], []] } ) + emit: mut_fasta = GENERATEMUTFASTA.out.mut_fasta // channel: [ val(meta), [ *.MUT_sequences.fa ] ] wt_fasta = GENERATEMUTFASTA.out.wt_fasta // channel: [ val(meta), [ *.WT_sequences.fa ] ] mut_fasta_log = GENERATEMUTFASTA.out.mut_fasta_log // channel: [ val(meta), [ *_generate_mut_fasta.log ] ] - sv_mut_fasta = NEOSV.out.mutOut // channel: [ val(meta), [ *.SV.MUT.fa ] ] - sv_wt_fasta = NEOSV.out.wtOut // channel: [ val(meta), [ *.SV.WT.fa ] ] + sv_mut_fasta = ch_sv_mut_fasta // channel: [ val(meta), [ *.SV.MUT.fa ] | [] ] + sv_wt_fasta = ch_sv_wt_fasta // channel: [ val(meta), [ *.SV.WT.fa ] | [] ] hla_string = NEOANTIGENUTILS_GENERATEHLASTRING.out.hlastring // channel: [ val(meta), [ hla_string ] ] versions = ch_versions // channel: [ versions.yml ] } -def createNEOSVInput(sv_bedpe, hla_str) { +def branchNEOSVInput(sv_bedpe, hla_str) { def sv_bedpe_channel = sv_bedpe .map{ [it[0],it] @@ -89,6 +94,9 @@ def createNEOSVInput(sv_bedpe, hla_str) { .map{ [it[1][0], it[1][1], it[2][1]] } - .filter{ it[1] != null && it[2] != null } + .branch{ + with_sv: it[1] != null && it[2] != null + without_sv: true + } return merged_sv_hla } diff --git a/subworkflows/msk/netmhcstabandpan/main.nf b/subworkflows/msk/netmhcstabandpan/main.nf index 603d2005..5085ff94 100644 --- a/subworkflows/msk/netmhcstabandpan/main.nf +++ b/subworkflows/msk/netmhcstabandpan/main.nf @@ -58,19 +58,19 @@ def createNETMHCInput(fastas_and_hla, sv_fastas) { [it[0],it] } - // remainder: true keeps samples that have no SV data (sv_fastas is empty when no SVs provided) + // Callers are expected to pre-pad sv_fastas with [meta, []] for samples without SV data, + // so this is a plain inner join — each sample emits as soon as its fastas are ready and + // we never have to wait for the SV channel to close. def merged_mut = fastas_and_hla_channel - .join(sv_fastas_channel, by:0, remainder: true) + .join(sv_fastas_channel, by:0) .map({ - def sv = it[2] ?: [null, [], []] - [it[1][0], it[1][1], sv[1], it[1][3], "MUT"] + [it[1][0], it[1][1], it[2][1], it[1][3], "MUT"] }) def merged_wt = fastas_and_hla_channel - .join(sv_fastas_channel, by:0, remainder: true) + .join(sv_fastas_channel, by:0) .map({ - def sv = it[2] ?: [null, [], []] - [it[1][0], it[1][2], sv[2], it[1][3], "WT"] + [it[1][0], it[1][2], it[2][2], it[1][3], "WT"] }) def merged = merged_mut.mix(merged_wt) return merged diff --git a/subworkflows/msk/netmhcstabandpan/tests/main.nf.test b/subworkflows/msk/netmhcstabandpan/tests/main.nf.test index b66c6e33..0b87ae35 100644 --- a/subworkflows/msk/netmhcstabandpan/tests/main.nf.test +++ b/subworkflows/msk/netmhcstabandpan/tests/main.nf.test @@ -162,11 +162,12 @@ nextflow_workflow { } - test("netmhcstabandpan - empty SV channel - fa,hla_str - tsv - stub") { + test("netmhcstabandpan - padded empty SV placeholder - fa,hla_str - tsv - stub") { - // Regression test: when no SV files are provided, the upstream NEOSV process - // doesn't run, so the sv_fasta channel emits zero items. The createNETMHCInput - // helper must still emit MUT and WT tuples for each sample. + // Regression test for the per-sample streaming contract: + // when a sample has no SV data, callers (e.g. GENERATE_MUTATED_PEPTIDES) + // pad the SV channel with [meta, [], []] so every sample matches the + // inner join. The subworkflow must still emit per-sample tsv outputs. options "-stub" @@ -180,7 +181,11 @@ nextflow_workflow { "HLA-A24:02,HLA-A24:02,HLA-B39:01,HLA-B39:01,HLA-C07:01,HLA-C06:02", ]) - input[1] = channel.empty() + input[1] = channel.value([ + [ id:'test', single_end:false ], // meta map + [], + [] + ]) """ } } @@ -191,7 +196,11 @@ nextflow_workflow { { assert snapshot(workflow.out.tsv[0][0], file(workflow.out.tsv[0][1]).name, workflow.out.tsv[1][0], - file(workflow.out.tsv[1][1]).name + file(workflow.out.tsv[1][1]).name, + workflow.out.tsv[2][0], + file(workflow.out.tsv[2][1]).name, + workflow.out.tsv[3][0], + file(workflow.out.tsv[3][1]).name ).match() } ) diff --git a/subworkflows/msk/netmhcstabandpan/tests/main.nf.test.snap b/subworkflows/msk/netmhcstabandpan/tests/main.nf.test.snap index c07459e2..ebb1adee 100644 --- a/subworkflows/msk/netmhcstabandpan/tests/main.nf.test.snap +++ b/subworkflows/msk/netmhcstabandpan/tests/main.nf.test.snap @@ -213,7 +213,7 @@ }, "timestamp": "2025-12-19T14:22:41.854041255" }, - "netmhcstabandpan - empty SV channel - fa,hla_str - tsv - stub": { + "netmhcstabandpan - padded empty SV placeholder - fa,hla_str - tsv - stub": { "content": [ { "id": "test", @@ -230,12 +230,28 @@ "fromStab": true, "typePan": true }, - "test.WT.STAB.tsv" + "test.WT.STAB.tsv", + { + "id": "test", + "single_end": false, + "typeMut": true, + "fromStab": false, + "typePan": true + }, + "test.MUT.PAN.tsv", + { + "id": "test", + "single_end": false, + "typeMut": true, + "fromStab": true, + "typePan": true + }, + "test.MUT.STAB.tsv" ], "meta": { "nf-test": "0.9.2", "nextflow": "24.10.2" }, - "timestamp": "2026-05-13T22:29:19.315483" + "timestamp": "2026-05-27T16:15:00.000000000" } } \ No newline at end of file