Introduction

Module: Maven Central

The fs2-data-msgpack module provides tools that allow for serialization and deserialziation of MessagePack data.

The module is split into two namespaces:

High-level API

This is probably the API you want to use (unless you need to work on a flat stream instead of an AST).

Serialization

To serialize a class, you first have to provide a MsgpackSerializer[A] instance for it.

MsgpackSerializer[A] describes a process of turning a value of type A into an array of MsgpackItems.

import fs2.*
import fs2.data.msgpack.high.*
import java.time.Instant

case class User(name: String, age: Int, created: Instant, visits: List[Long])

implicit val userSerializer: MsgpackSerializer[User] = {
  case User(name, age, created, visits) =>
    for {
      name    <- MsgpackSerializer[String](name)
      age     <- MsgpackSerializer[Int](age)
      created <- MsgpackSerializer[Instant](created)
      visits  <- MsgpackSerializer[List[Long]](visits)
    } yield name ++ age ++ created ++ visits
  }

The serialize: Pipe[F, A, Byte] method can be used to apply a serializer of a specific type onto a stream.

val inputStream = Stream[Fallible, User](
  User("foo", 30, Instant.parse("2025-02-03T10:00:00.00Z"), List.empty),
  User("bar", 31, Instant.parse("2026-01-17T10:00:00.00Z"), List(1, 3, 7))
)
// inputStream: Stream[Fallible, User] = Stream(..)

val byteStream = inputStream.through(data.msgpack.high.serialize[Fallible, User])
// byteStream: Stream[[x]Fallible[x], Byte] = Stream(..)

You can also provide an instance of MsgpackSerializer to serialize as an argument explicitly. Serializer instances for common types are imported from the fs2.data.msgpack.high package.

Deserialization

Similarly to serialization, you have to provide a MsgpackDeserializer[A] instance for your type before you can parse it.

MsgpackDeserializer[A] describes a process of creating a value of type A from a chunk of MsgpackItems.

implicit val userDeserializer: MsgpackDeserializer[User] = 
  for {
    name    <- MsgpackDeserializer[String]
    age     <- MsgpackDeserializer[Int]
    created <- MsgpackDeserializer[Instant]
    visits <- MsgpackDeserializer[List[Long]]
  } yield User(name, age, created, visits)

Deserialization can be done with deserialize: Pipe[F, Byte, A].

byteStream
  .through(data.msgpack.high.deserialize[Fallible, User])
  .compile.toList
// res0: Either[Throwable, List[User]] = Right(
//   value = List(
//     User(
//       name = "foo",
//       age = 30,
//       created = 2025-02-03T10:00:00Z,
//       visits = List()
//     ),
//     User(
//       name = "bar",
//       age = 31,
//       created = 2026-01-17T10:00:00Z,
//       visits = List(1L, 3L, 7L)
//     )
//   )
// )

AST

More dynamic behavior can be achieved via the msgpack.high.ast API. The AST module exports a serializer and a deserializer for MsgpackValue. In general, the AST API allows for more flexible behavior (e.g. heterogeneous lists) but also runs with worse performance (both in terms of speed and memory usage).

val valueStream = byteStream.through(data.msgpack.high.ast.valuesFromBytes)
// valueStream: Stream[[x]Fallible[x], ast.MsgpackValue] = Stream(..)

val byteStream2 = valueStream.through(data.msgpack.high.ast.valuesToBytes)
// byteStream2: Stream[[x]Fallible[x], Byte] = Stream(..)

valueStream.compile.toList
// res1: Either[Throwable, List[ast.MsgpackValue]] = Right(
//   value = List(
//     String(x = "foo"),
//     Integer(x = 30L),
//     Timestamp(nanoseconds = 0, seconds = 1738576800L),
//     Array(x = List()),
//     String(x = "bar"),
//     Integer(x = 31L),
//     Timestamp(nanoseconds = 0, seconds = 1768644000L),
//     Array(x = List(Integer(x = 1L), Integer(x = 3L), Integer(x = 7L)))
//   )
// )

byteStream2.compile.toList == byteStream.compile.toList
// res2: Boolean = true

Low-level API

In case you want to operate on a flat MessagePack stream directly, you can use the low-level API. Methods exposed in fs2.data.msgpack.low translate the binary data to MsgpackItem and vice versa.

Serialization

The main serialization pipe is called toBinary: Pipe[F, MsgpackItem, Byte].

import fs2.*
import fs2.data.msgpack.low.*
import scodec.bits.*

val inputStream = Stream[Fallible, MsgpackItem](
  MsgpackItem.Array(2),
  MsgpackItem.UnsignedInt(hex"ab"),
  MsgpackItem.UnsignedInt(hex"abcdef01")
)
// inputStream: Stream[Fallible, MsgpackItem] = Stream(..)

val binaryStream = inputStream.through(data.msgpack.low.toBinary)
// binaryStream: Stream[[x]Fallible[x], Byte] = Stream(..)

This method performs stream validation and will not emit malformed data. In case you are sure that the item stream is valid, you can use the nonValidatedBinary pipe.

inputStream.through(data.msgpack.low.toNonValidatedBinary)
// res4: Stream[[x]Fallible[x], Byte] = Stream(..)

You can also apply the validate: Pipe[F, MsgpackItem, MsgpackItem] pipe directly onto the stream. This method will raise inside the effect in a case of a malformed stream.

Stream(MsgpackItem.Array(999))
  .through(data.msgpack.low.validate[Fallible])
  .compile.toList
// res5: Either[Throwable, List[MsgpackItem]] = Left(
//   value = MsgpackUnexpectedEndOfStreamException(
//     position = Some(value = 0L),
//     inner = null
//   )
// )

Deserialization

You can convert the stream of Bytes into a stream of MsgpackItems by using the fromBinary: Pipe[F, Byte, MsgpackItem] pipe.

Excluding extension types, the process of deserialization is the inverse of serialization. In other words, stream.through(serialize).through(deserialize) should always be equal to stream.

val itemStream = binaryStream.through(data.msgpack.low.fromBinary[Fallible])
// itemStream: Stream[[x]Fallible[x], MsgpackItem] = Stream(..)

itemStream.compile.toList == inputStream.compile.toList
// res6: Boolean = true

Converting between low-level and high-level representations

As mentioned before, the flat item stream present in low-level API serves as a middle point between the binary data and language-level values. To translate between Scala objects and MsgpackItem instances you can use the following functions: