Hello there,
I'm working on a project where I have a pipeline (one readable stream as the source and multiple Transforms).
In normal circumstances, everything works fine. But it is when a huge amount of data enters the pipeline in rapid succession that things get strange (the problem appears with at least 4000 chunks but I guess that's irrelevant):
The pipeline hangs after processing a small portion of the data, and this continues forever. It doesn't even resume.
By adding log and investigating the problem, I came to the conclusion that it is always the last Transform that hangs. And to be sure, I deleted the last Transform (so now, the n-1 Transform is the last one in the pipeline) and it is, then, the new last Transform that hangs.
I thought that it is a memory problem and so I tried some workarounds:
- I increased the
highWaterMark threshold, but this didn't completely solve the issue, though it delayed it (the pipeline processes more data but still hangs at some point).
- I increased/decreased the
parallelism option for each Transform and this didn't solve it either.
At that point, I decided to simplify my pipeline to get to some sort of a skeleton model. It turned out that the ordered option is causing the problem when set to false.
Valid workaround
I thought that it is maybe one of the Transforms is being corked at some point. So I added a data event listener for each one to uncork manually if this is the case. It worked. But then I tried, for no valid reason, to leave the callback of the listener empty (not trying to uncork) and it still works.
So, long story short : adding an empty listener on the Transform fixes the hanging problem (code chunk below)
return transform(myParallelism, { ordered: false }, async (data: Input, callback: TransformCallback) => {
try {
const result = await someAsyncOperation(data)
callback(null, result)
} catch (error) {
callback(error)
}
}).on('data', () => {
})
I'm not really sure, though, if the real problem comes for this library or from the implementation of node streams.
Also I don't have a minimal code to show the problem, but I can work on one if deemed necessary.
Thank you so much 😁
Hello there,
I'm working on a project where I have a pipeline (one readable stream as the source and multiple
Transforms).In normal circumstances, everything works fine. But it is when a huge amount of data enters the pipeline in rapid succession that things get strange (the problem appears with at least 4000 chunks but I guess that's irrelevant):
The pipeline hangs after processing a small portion of the data, and this continues forever. It doesn't even resume.
By adding log and investigating the problem, I came to the conclusion that it is always the last
Transformthat hangs. And to be sure, I deleted the lastTransform(so now, then-1 Transformis the last one in the pipeline) and it is, then, the new lastTransformthat hangs.I thought that it is a memory problem and so I tried some workarounds:
highWaterMarkthreshold, but this didn't completely solve the issue, though it delayed it (the pipeline processes more data but still hangs at some point).parallelismoption for eachTransformand this didn't solve it either.At that point, I decided to simplify my pipeline to get to some sort of a skeleton model. It turned out that the
orderedoption is causing the problem when set tofalse.Valid workaround
I thought that it is maybe one of the
Transformsis being corked at some point. So I added adataevent listener for each one to uncork manually if this is the case. It worked. But then I tried, for no valid reason, to leave the callback of the listener empty (not trying to uncork) and it still works.So, long story short : adding an empty listener on the
Transformfixes the hanging problem (code chunk below)I'm not really sure, though, if the real problem comes for this library or from the implementation of node streams.
Also I don't have a minimal code to show the problem, but I can work on one if deemed necessary.
Thank you so much 😁