From 8a211768dd1475118a9a646e1307b48a4ace1f8c Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Tue, 17 Mar 2026 14:30:43 +0000 Subject: [PATCH 1/4] Wrote test and fixed the bug for confalte chunk --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- .../test/scala/fs2/StreamConflateSuite.scala | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a3642794a8..2e92d47cf8 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -576,7 +576,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * The `chunkLimit` parameter controls backpressure on the source stream. */ def conflateChunks[F2[x] >: F[x]: Concurrent](chunkLimit: Int): Stream[F2, Chunk[O]] = - Stream.eval(Channel.bounded[F2, Chunk[O]](chunkLimit)).flatMap { chan => + Stream.eval(Channel.bounded[F2, Chunk[O]](chunkLimit - 1)).flatMap { chan => val producer = chunks.through(chan.sendAll) val consumer = chan.stream.chunks.map(_.combineAll) consumer.concurrently(producer) diff --git a/core/shared/src/test/scala/fs2/StreamConflateSuite.scala b/core/shared/src/test/scala/fs2/StreamConflateSuite.scala index c50b7c4451..cd7c9d9b84 100644 --- a/core/shared/src/test/scala/fs2/StreamConflateSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamConflateSuite.scala @@ -23,6 +23,7 @@ package fs2 import cats.effect.IO import cats.effect.testkit.TestControl +import cats.syntax.all._ import scala.concurrent.duration._ @@ -44,4 +45,23 @@ class StreamConflateSuite extends Fs2Suite { ) ) } + test("conflateChunks respects chunk limit") { + + (1 to 1000).toList.traverse_ { _ => + Stream(1, 2, 3, 4, 5, 6, 7) + .covary[IO] + .chunkLimit(1) + .unchunks + .conflateChunks(3) + .compile + .toList + .map { chunks => + assert( + chunks.forall(_.size <= 3), + s"Expected all chunks <= 3, but got: $chunks" + ) + } + } + } + } From 4a66066619ac18d4a79941d86d2fdd74fd0636f3 Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Tue, 17 Mar 2026 20:03:51 +0000 Subject: [PATCH 2/4] Changed tlbaseversion from 3.12 to 3.13 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index e52fb3ce0f..aec1ff0c38 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ import com.typesafe.tools.mima.core._ Global / onChangedBuildSource := ReloadOnSourceChanges -ThisBuild / tlBaseVersion := "3.12" +ThisBuild / tlBaseVersion := "3.13" ThisBuild / organization := "co.fs2" ThisBuild / organizationName := "Functional Streams for Scala" From c66f95d0b679605f8b7bd9611e5e8677810a9d76 Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Fri, 3 Apr 2026 10:55:41 +0000 Subject: [PATCH 3/4] want to run ci pipeline --- io/js/src/main/scala/fs2/io/file/FilesPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala b/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala index 6fee6275c1..1af5f21ea2 100644 --- a/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala +++ b/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala @@ -416,7 +416,7 @@ private[fs2] trait FilesCompanionPlatform { .chunkN(options.chunkSize) .flatMap(Stream.chunk) } - + // want to run ci pipeline override def writeAll(path: Path, _flags: Flags): Pipe[F, Byte, Nothing] = in => in.through { From 850fdb050c666880c5b6b37a357e63a28daa0a5e Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Fri, 3 Apr 2026 11:10:31 +0000 Subject: [PATCH 4/4] applied scalafmt --- io/js/src/main/scala/fs2/io/file/FilesPlatform.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala b/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala index 1af5f21ea2..6fee6275c1 100644 --- a/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala +++ b/io/js/src/main/scala/fs2/io/file/FilesPlatform.scala @@ -416,7 +416,7 @@ private[fs2] trait FilesCompanionPlatform { .chunkN(options.chunkSize) .flatMap(Stream.chunk) } - // want to run ci pipeline + override def writeAll(path: Path, _flags: Flags): Pipe[F, Byte, Nothing] = in => in.through {