From 5c8d02efc158a55f1a495e11e97080f6a37efbc9 Mon Sep 17 00:00:00 2001 From: Istvan Bartha <106101+pityka@users.noreply.github.com> Date: Thu, 4 Jun 2026 23:46:36 +0200 Subject: [PATCH] Gunzip block gzip files Multiple gzip blocks concatenated form a valid gzip stream --- .../fs2/compression/CompressionPlatform.scala | 60 +++++++++++++------ .../JvmNativeCompressionSuite.scala | 25 ++++++++ 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/core/jvm-native/src/main/scala/fs2/compression/CompressionPlatform.scala b/core/jvm-native/src/main/scala/fs2/compression/CompressionPlatform.scala index 302f3d864f..6ba13f52a5 100644 --- a/core/jvm-native/src/main/scala/fs2/compression/CompressionPlatform.scala +++ b/core/jvm-native/src/main/scala/fs2/compression/CompressionPlatform.scala @@ -685,7 +685,7 @@ private[compression] trait CompressionCompanionPlatform crc32 = Some(contentCrc32) )(s).stream ) - .through(_gunzip_validateTrailer(contentCrc32, inflater)) + .through(_gunzip_validateTrailer(inflateParams, contentCrc32, inflater)) ) ) } @@ -807,6 +807,7 @@ private[compression] trait CompressionCompanionPlatform else stream private def _gunzip_validateTrailer( + inflateParams: InflateParams, crc32: CRC32, inflater: Inflater ): Pipe[F, Byte, Byte] = @@ -829,30 +830,51 @@ private[compression] trait CompressionCompanionPlatform Pull.done } else Pull.raiseError(new ZipException("Failed to read trailer (1)")) - def streamUntilTrailer(last: Chunk[Byte]): Stream[F, Byte] => Pull[F, Byte, Unit] = + // RFC 1952 § 2.2: a gzip stream may concatenate multiple members. + def continueWithNextMember(s: Stream[F, Byte]): Pull[F, Byte, Unit] = + s.pull.uncons.flatMap { + case None => Pull.done + case Some((next, rest)) => + (Stream.chunk(next) ++ rest) + .through(gunzip(inflateParams)) + .flatMap(_.content) + .pull + .echo + } + + def consumeTrailer( + buffered: Chunk[Byte], + rest: Stream[F, Byte] + ): Pull[F, Byte, Unit] = + if (buffered.size >= gzipTrailerBytes) { + val (trailer, afterTrailer) = buffered.splitAt(gzipTrailerBytes) + validateTrailer(trailer) >> + continueWithNextMember(Stream.chunk(afterTrailer) ++ rest) + } else + rest.pull.uncons.flatMap { + case Some((next, restRest)) => consumeTrailer(buffered ++ next, restRest) + case None => validateTrailer(buffered) + } + + def streamUntilTrailer(emitted: Long): Stream[F, Byte] => Pull[F, Byte, Unit] = _.pull.uncons .flatMap { case Some((next, rest)) => - if (inflater.finished()) - if (next.size >= gzipTrailerBytes) - if (last.nonEmpty) Pull.output(last) >> streamUntilTrailer(next)(rest) - else streamUntilTrailer(next)(rest) - else - streamUntilTrailer(last ++ next)(rest) - else if (last.nonEmpty) - Pull.output(last) >> Pull.output(next) >> - streamUntilTrailer(Chunk.empty[Byte])(rest) - else Pull.output(next) >> streamUntilTrailer(Chunk.empty[Byte])(rest) + val contentRemaining = (inflater.getBytesWritten - emitted).max(0L).toInt + val chunkIsAllContent = next.size <= contentRemaining + val chunkStraddlesBoundary = !chunkIsAllContent && contentRemaining > 0 + if (chunkIsAllContent) + Pull.output(next) >> streamUntilTrailer(emitted + next.size)(rest) + else if (chunkStraddlesBoundary) { + val (content, postContent) = next.splitAt(contentRemaining) + Pull.output(content) >> consumeTrailer(postContent, rest) + } else + consumeTrailer(next, rest) case None => - val preTrailerBytes = last.size - gzipTrailerBytes - if (preTrailerBytes > 0) - Pull.output(last.take(preTrailerBytes)) >> - validateTrailer(last.drop(preTrailerBytes)) - else - validateTrailer(last) + Pull.raiseError(new ZipException("Failed to read trailer (1)")) } - streamUntilTrailer(Chunk.empty[Byte])(stream) + streamUntilTrailer(0L)(stream) }.stream /** Like Stream.unconsN, but returns a chunk of elements that do not satisfy the predicate, splitting chunk as necessary. diff --git a/core/jvm-native/src/test/scala/fs2/compression/JvmNativeCompressionSuite.scala b/core/jvm-native/src/test/scala/fs2/compression/JvmNativeCompressionSuite.scala index c04b1157a4..960869eb22 100644 --- a/core/jvm-native/src/test/scala/fs2/compression/JvmNativeCompressionSuite.scala +++ b/core/jvm-native/src/test/scala/fs2/compression/JvmNativeCompressionSuite.scala @@ -22,6 +22,7 @@ package fs2 import cats.effect._ +import cats.syntax.traverse._ import fs2.compression._ import org.scalacheck.effect.PropF.forAllF @@ -325,6 +326,30 @@ class JvmNativeCompressionSuite extends CompressionSuite { .assertEquals(expectedContent) } + test("gunzip handles concatenated gzip members (RFC 1952 § 2.2 / block gzip)") { + def gzipMember(s: String): IO[Chunk[Byte]] = + Stream + .chunk(Chunk.array(s.getBytes(StandardCharsets.UTF_8))) + .through(Compression[IO].gzip(8192)) + .compile + .to(Chunk) + + val parts = List("first member ", "second member ", "third member") + parts + .traverse(gzipMember) + .flatMap { members => + Stream + .chunk(Chunk.concat(members)) + .rechunkRandomlyWithSeed(0.1, 2)(System.nanoTime()) + .through(Compression[IO].gunzip(8192)) + .flatMap(_.content) + .compile + .toVector + } + .map(bytes => new String(bytes.toArray, StandardCharsets.UTF_8)) + .assertEquals(parts.mkString) + } + def toEncodableFileName(fileName: String): String = new String( fileName.replaceAll("\u0000", "_").getBytes(StandardCharsets.ISO_8859_1),