Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions subworkflows/msk/generate_mutated_peptides/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,29 @@ workflow GENERATE_MUTATED_PEPTIDES {

ch_versions = ch_versions.mix(GENERATEMUTFASTA.out.versions)

ch_neosv_input = createNEOSVInput( ch_sv , NEOANTIGENUTILS_GENERATEHLASTRING.out.hlastring )
ch_neosv_branched = branchNEOSVInput( ch_sv , NEOANTIGENUTILS_GENERATEHLASTRING.out.hlastring )

NEOSV( ch_neosv_input, ch_gtf_and_cdna )
NEOSV( ch_neosv_branched.with_sv, ch_gtf_and_cdna )

ch_versions = ch_versions.mix(NEOSV.out.versions)

// Pad SV outputs with empty placeholders for samples without SV input so that
// downstream per-sample joins never have to wait for the full channel to close.
ch_sv_mut_fasta = NEOSV.out.mutOut.mix( ch_neosv_branched.without_sv.map{ [it[0], []] } )
ch_sv_wt_fasta = NEOSV.out.wtOut.mix( ch_neosv_branched.without_sv.map{ [it[0], []] } )

emit:

mut_fasta = GENERATEMUTFASTA.out.mut_fasta // channel: [ val(meta), [ *.MUT_sequences.fa ] ]
wt_fasta = GENERATEMUTFASTA.out.wt_fasta // channel: [ val(meta), [ *.WT_sequences.fa ] ]
mut_fasta_log = GENERATEMUTFASTA.out.mut_fasta_log // channel: [ val(meta), [ *_generate_mut_fasta.log ] ]
sv_mut_fasta = NEOSV.out.mutOut // channel: [ val(meta), [ *.SV.MUT.fa ] ]
sv_wt_fasta = NEOSV.out.wtOut // channel: [ val(meta), [ *.SV.WT.fa ] ]
sv_mut_fasta = ch_sv_mut_fasta // channel: [ val(meta), [ *.SV.MUT.fa ] | [] ]
sv_wt_fasta = ch_sv_wt_fasta // channel: [ val(meta), [ *.SV.WT.fa ] | [] ]
hla_string = NEOANTIGENUTILS_GENERATEHLASTRING.out.hlastring // channel: [ val(meta), [ hla_string ] ]
versions = ch_versions // channel: [ versions.yml ]
}

def createNEOSVInput(sv_bedpe, hla_str) {
def branchNEOSVInput(sv_bedpe, hla_str) {
def sv_bedpe_channel = sv_bedpe
.map{
[it[0],it]
Expand All @@ -89,6 +94,9 @@ def createNEOSVInput(sv_bedpe, hla_str) {
.map{
[it[1][0], it[1][1], it[2][1]]
}
.filter{ it[1] != null && it[2] != null }
.branch{
with_sv: it[1] != null && it[2] != null
without_sv: true
}
return merged_sv_hla
}
14 changes: 7 additions & 7 deletions subworkflows/msk/netmhcstabandpan/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ def createNETMHCInput(fastas_and_hla, sv_fastas) {
[it[0],it]
}

// remainder: true keeps samples that have no SV data (sv_fastas is empty when no SVs provided)
// Callers are expected to pre-pad sv_fastas with [meta, []] for samples without SV data,
// so this is a plain inner join — each sample emits as soon as its fastas are ready and
// we never have to wait for the SV channel to close.
def merged_mut = fastas_and_hla_channel
.join(sv_fastas_channel, by:0, remainder: true)
.join(sv_fastas_channel, by:0)
.map({
def sv = it[2] ?: [null, [], []]
[it[1][0], it[1][1], sv[1], it[1][3], "MUT"]
[it[1][0], it[1][1], it[2][1], it[1][3], "MUT"]
})

def merged_wt = fastas_and_hla_channel
.join(sv_fastas_channel, by:0, remainder: true)
.join(sv_fastas_channel, by:0)
.map({
def sv = it[2] ?: [null, [], []]
[it[1][0], it[1][2], sv[2], it[1][3], "WT"]
[it[1][0], it[1][2], it[2][2], it[1][3], "WT"]
})
def merged = merged_mut.mix(merged_wt)
return merged
Expand Down
21 changes: 15 additions & 6 deletions subworkflows/msk/netmhcstabandpan/tests/main.nf.test
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,12 @@ nextflow_workflow {
}


test("netmhcstabandpan - empty SV channel - fa,hla_str - tsv - stub") {
test("netmhcstabandpan - padded empty SV placeholder - fa,hla_str - tsv - stub") {

// Regression test: when no SV files are provided, the upstream NEOSV process
// doesn't run, so the sv_fasta channel emits zero items. The createNETMHCInput
// helper must still emit MUT and WT tuples for each sample.
// Regression test for the per-sample streaming contract:
// when a sample has no SV data, callers (e.g. GENERATE_MUTATED_PEPTIDES)
// pad the SV channel with [meta, [], []] so every sample matches the
// inner join. The subworkflow must still emit per-sample tsv outputs.

options "-stub"

Expand All @@ -180,7 +181,11 @@ nextflow_workflow {
"HLA-A24:02,HLA-A24:02,HLA-B39:01,HLA-B39:01,HLA-C07:01,HLA-C06:02",
])

input[1] = channel.empty()
input[1] = channel.value([
[ id:'test', single_end:false ], // meta map
[],
[]
])
"""
}
}
Expand All @@ -191,7 +196,11 @@ nextflow_workflow {
{ assert snapshot(workflow.out.tsv[0][0],
file(workflow.out.tsv[0][1]).name,
workflow.out.tsv[1][0],
file(workflow.out.tsv[1][1]).name
file(workflow.out.tsv[1][1]).name,
workflow.out.tsv[2][0],
file(workflow.out.tsv[2][1]).name,
workflow.out.tsv[3][0],
file(workflow.out.tsv[3][1]).name
).match()
}
)
Expand Down
22 changes: 19 additions & 3 deletions subworkflows/msk/netmhcstabandpan/tests/main.nf.test.snap
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@
},
"timestamp": "2025-12-19T14:22:41.854041255"
},
"netmhcstabandpan - empty SV channel - fa,hla_str - tsv - stub": {
"netmhcstabandpan - padded empty SV placeholder - fa,hla_str - tsv - stub": {
"content": [
{
"id": "test",
Expand All @@ -230,12 +230,28 @@
"fromStab": true,
"typePan": true
},
"test.WT.STAB.tsv"
"test.WT.STAB.tsv",
{
"id": "test",
"single_end": false,
"typeMut": true,
"fromStab": false,
"typePan": true
},
"test.MUT.PAN.tsv",
{
"id": "test",
"single_end": false,
"typeMut": true,
"fromStab": true,
"typePan": true
},
"test.MUT.STAB.tsv"
],
"meta": {
"nf-test": "0.9.2",
"nextflow": "24.10.2"
},
"timestamp": "2026-05-13T22:29:19.315483"
"timestamp": "2026-05-27T16:15:00.000000000"
}
}
Loading