Introduction
The fs2-data-msgpack module provides tools that allow for serialization and deserialziation of MessagePack data.
The module is split into two namespaces:
fs2.data.msgpack.low- Works on a flat model (see MsgpackItem) which mimics the raw MessagePack format. This model is used as a middle-point for the high-level API.fs2.data.msgpack.high- Works on Scala values directly. You can create custom serializers and deserializers for your classes.
High-level API
This is probably the API you want to use (unless you need to work on a flat stream instead of an AST).
Serialization
To serialize a class, you first have to provide a MsgpackSerializer[A] instance for it.
MsgpackSerializer[A] describes a process of turning a value of type A into an array of MsgpackItems.
import fs2.*
import fs2.data.msgpack.high.*
import java.time.Instant
case class User(name: String, age: Int, created: Instant, visits: List[Long])
implicit val userSerializer: MsgpackSerializer[User] = {
case User(name, age, created, visits) =>
for {
name <- MsgpackSerializer[String](name)
age <- MsgpackSerializer[Int](age)
created <- MsgpackSerializer[Instant](created)
visits <- MsgpackSerializer[List[Long]](visits)
} yield name ++ age ++ created ++ visits
}
The serialize: Pipe[F, A, Byte] method can be used to apply a serializer of a specific type onto a stream.
val inputStream = Stream[Fallible, User](
User("foo", 30, Instant.parse("2025-02-03T10:00:00.00Z"), List.empty),
User("bar", 31, Instant.parse("2026-01-17T10:00:00.00Z"), List(1, 3, 7))
)
// inputStream: Stream[Fallible, User] = Stream(..)
val byteStream = inputStream.through(data.msgpack.high.serialize[Fallible, User])
// byteStream: Stream[[x]Fallible[x], Byte] = Stream(..)
You can also provide an instance of MsgpackSerializer to serialize as an argument explicitly.
Serializer instances for common types are imported from the fs2.data.msgpack.high package.
Deserialization
Similarly to serialization, you have to provide a MsgpackDeserializer[A] instance for your type before you can parse it.
MsgpackDeserializer[A] describes a process of creating a value of type A from a chunk of MsgpackItems.
implicit val userDeserializer: MsgpackDeserializer[User] =
for {
name <- MsgpackDeserializer[String]
age <- MsgpackDeserializer[Int]
created <- MsgpackDeserializer[Instant]
visits <- MsgpackDeserializer[List[Long]]
} yield User(name, age, created, visits)
Deserialization can be done with deserialize: Pipe[F, Byte, A].
byteStream
.through(data.msgpack.high.deserialize[Fallible, User])
.compile.toList
// res0: Either[Throwable, List[User]] = Right(
// value = List(
// User(
// name = "foo",
// age = 30,
// created = 2025-02-03T10:00:00Z,
// visits = List()
// ),
// User(
// name = "bar",
// age = 31,
// created = 2026-01-17T10:00:00Z,
// visits = List(1L, 3L, 7L)
// )
// )
// )
AST
More dynamic behavior can be achieved via the msgpack.high.ast API.
The AST module exports a serializer and a deserializer for MsgpackValue.
In general, the AST API allows for more flexible behavior (e.g. heterogeneous lists) but also runs with worse performance (both in terms of speed and memory usage).
val valueStream = byteStream.through(data.msgpack.high.ast.valuesFromBytes)
// valueStream: Stream[[x]Fallible[x], ast.MsgpackValue] = Stream(..)
val byteStream2 = valueStream.through(data.msgpack.high.ast.valuesToBytes)
// byteStream2: Stream[[x]Fallible[x], Byte] = Stream(..)
valueStream.compile.toList
// res1: Either[Throwable, List[ast.MsgpackValue]] = Right(
// value = List(
// String(x = "foo"),
// Integer(x = 30L),
// Timestamp(nanoseconds = 0, seconds = 1738576800L),
// Array(x = List()),
// String(x = "bar"),
// Integer(x = 31L),
// Timestamp(nanoseconds = 0, seconds = 1768644000L),
// Array(x = List(Integer(x = 1L), Integer(x = 3L), Integer(x = 7L)))
// )
// )
byteStream2.compile.toList == byteStream.compile.toList
// res2: Boolean = true
Low-level API
In case you want to operate on a flat MessagePack stream directly, you can use the low-level API.
Methods exposed in fs2.data.msgpack.low translate the binary data to MsgpackItem and vice versa.
Serialization
The main serialization pipe is called toBinary: Pipe[F, MsgpackItem, Byte].
import fs2.*
import fs2.data.msgpack.low.*
import scodec.bits.*
val inputStream = Stream[Fallible, MsgpackItem](
MsgpackItem.Array(2),
MsgpackItem.UnsignedInt(hex"ab"),
MsgpackItem.UnsignedInt(hex"abcdef01")
)
// inputStream: Stream[Fallible, MsgpackItem] = Stream(..)
val binaryStream = inputStream.through(data.msgpack.low.toBinary)
// binaryStream: Stream[[x]Fallible[x], Byte] = Stream(..)
This method performs stream validation and will not emit malformed data.
In case you are sure that the item stream is valid, you can use the nonValidatedBinary pipe.
inputStream.through(data.msgpack.low.toNonValidatedBinary)
// res4: Stream[[x]Fallible[x], Byte] = Stream(..)
You can also apply the validate: Pipe[F, MsgpackItem, MsgpackItem] pipe directly onto the stream.
This method will raise inside the effect in a case of a malformed stream.
Stream(MsgpackItem.Array(999))
.through(data.msgpack.low.validate[Fallible])
.compile.toList
// res5: Either[Throwable, List[MsgpackItem]] = Left(
// value = MsgpackUnexpectedEndOfStreamException(
// position = Some(value = 0L),
// inner = null
// )
// )
Deserialization
You can convert the stream of Bytes into a stream of MsgpackItems by using the fromBinary: Pipe[F, Byte, MsgpackItem] pipe.
Excluding extension types, the process of deserialization is the inverse of serialization.
In other words, stream.through(serialize).through(deserialize) should always be equal to stream.
val itemStream = binaryStream.through(data.msgpack.low.fromBinary[Fallible])
// itemStream: Stream[[x]Fallible[x], MsgpackItem] = Stream(..)
itemStream.compile.toList == inputStream.compile.toList
// res6: Boolean = true
Converting between low-level and high-level representations
As mentioned before, the flat item stream present in low-level API serves as a middle point between the binary data and language-level values.
To translate between Scala objects and MsgpackItem instances you can use the following functions:
high.fromItems: Pipe[F, MsgpackItem, A]high.toItems: Pipe[F, A, MsgpackItem]high.ast.valuesFromItems: Pipe[F, MsgpackItem, MsgpackValue]high.ast.valuesToItems: Pipe[F, MsgpackValue, MsgpackItem]