Introduction
The fs2-data-cbor
module provides tools to parse and transform CBOR data in a streaming manner.
Low-level representation
The low-level representation lives in the fs2.data.cbor.low
package and follows closely the structure defined in the RFC. It is a flat structure, which doesn't represent data as an AST. This representation is useful when you don't need the tree structure for your processing or when the data cannot be represented in an AST way (e.g. when a collection contains more than Int.MaxValue
elements).
Parsing
Parsing a CBOR binary stream into low-level representation, use the items
pipe.
import fs2._
import fs2.data.cbor.low._
import scodec.bits._
val byteStream = Stream.chunk(Chunk.byteVector(hex"8301820203820405"))
// byteStream: Stream[[x]Pure[x], Byte] = Stream(..)
val itemStream = byteStream.through(items[Fallible])
// itemStream: Stream[[A]Fallible[A], CborItem] = Stream(..)
itemStream.compile.toList
// res0: Either[Throwable, List[CborItem]] = Right(
// value = List(
// StartArray(size = 3L),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@56fb33d5,
// offset = 0L,
// size = 1L
// )
// )
// ),
// StartArray(size = 2L),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@14b7cab6,
// offset = 0L,
// size = 1L
// )
// )
// ),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@1fd270df,
// offset = 0L,
// size = 1L
// )
// )
// ),
// StartArray(size = 2L),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@5a8b65b0,
// offset = 0L,
// size = 1L
// )
// )
// ),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@55e180af,
// offset = 0L,
// size = 1L
// )
// )
// ...
Serializing
When you already have a hand on a CBOR item stream, you can serialize it back to the binary representation (for instance to send it to the consumer) by making it go through the toBinary
pipe.
Stream
.emits(List(
CborItem.StartArray(2),
CborItem.TextString("an"),
CborItem.TextString("array")))
.through(toBinary[Fallible])
.compile
.to(ByteVector)
.map(_.toHex)
// res1: Either[Throwable, String] = Right(value = "8262616e656172726179")
The toBinary
pipe validates the input and fail if the input steam is invalid. For instance let's say that we are missing some elements in the array.
val invalid =
Stream
.emits(List(
CborItem.StartArray(3),
CborItem.TextString("an"),
CborItem.TextString("array")))
// invalid: Stream[[x]Pure[x], Product with CborItem with Serializable] = Stream(..)
invalid
.through(toBinary[Fallible])
.compile
.drain
// res2: Either[Throwable, Unit] = Left(
// value = fs2.data.cbor.CborValidationException: unexpected end of CBOR item stream
// )
The pipe is fine when you are sure that the input stream is valid or when throughput doesn't matter. In the case source is safe, or throughput matters more than correctness, you can use the toNonValidatedBinary
pipe instead, which will silently generate a non valid byte stream.
val invalidBytes = invalid.through(toNonValidatedBinary)
// invalidBytes: Stream[[x]Pure[x], Byte] = Stream(..)
invalidBytes
.compile
.to(ByteVector)
.toHex
// res3: String = "8362616e656172726179"
Of course, this stream will fail to be parsed.
invalidBytes.through(items[Fallible]).compile.drain
// res4: Either[Throwable, Unit] = Left(
// value = fs2.data.cbor.CborParsingException: unexpected end of input
// )
High-level representation
Some transformations are easier or safe to perform on more structure data, this is a scenario in which the high-level representation can come in handy. This representation lives in the fs2.data.cbor.high
package.
Parsing
The main parsing pipe to get a high-level value stream from a byte stream is the values
pipe.
import fs2.data.cbor.high._
val valueStream = byteStream.through(values[Fallible])
// valueStream: Stream[[A]Fallible[A], CborValue] = Stream(..)
valueStream.compile.toList
// res5: Either[Throwable, List[CborValue]] = Right(
// value = List(
// Array(
// values = List(
// Integer(value = 1),
// Array(
// values = List(Integer(value = 2), Integer(value = 3)),
// indefinite = false
// ),
// Array(
// values = List(Integer(value = 4), Integer(value = 5)),
// indefinite = false
// )
// ),
// indefinite = false
// )
// )
// )
If you already have a stream of low-level items, you can make it go through the parseValues
pipe to get the value stream.
itemStream.through(parseValues[Fallible]).compile.toList
// res6: Either[Throwable, List[CborValue]] = Right(
// value = List(
// Array(
// values = List(
// Integer(value = 1),
// Array(
// values = List(Integer(value = 2), Integer(value = 3)),
// indefinite = false
// ),
// Array(
// values = List(Integer(value = 4), Integer(value = 5)),
// indefinite = false
// )
// ),
// indefinite = false
// )
// )
// )
Serializing
High-level value stream can be serialized to binary format by using the toBinary
pipe.
valueStream
.through(data.cbor.high.toBinary)
.compile
.to(ByteVector)
.map(_.toHex)
// res7: Either[Throwable, String] = Right(value = "8301820203820405")
It is possible to convert values back to low-level items, using the toItems
pipes.
valueStream.through(toItems).compile.toList
// res8: Either[Throwable, List[CborItem]] = Right(
// value = List(
// StartArray(size = 3L),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@6b92f858,
// offset = 0L,
// size = 1L
// )
// )
// ),
// StartArray(size = 2L),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@3c3ab121,
// offset = 0L,
// size = 1L
// )
// )
// ),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@7186f931,
// offset = 0L,
// size = 1L
// )
// )
// ),
// StartArray(size = 2L),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@5b7eb1fa,
// offset = 0L,
// size = 1L
// )
// )
// ),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@27d2d2f5,
// offset = 0L,
// size = 1L
// )
// )
// ...
You can locally control whether an array or a map will be streamed in the serialized form by setting the indefinite
flag to true
.
valueStream
.map {
// make top-level arrays streamed
case CborValue.Array(elements, _) => CborValue.Array(elements, true)
case value => value
}
.through(toItems)
.compile
.toList
// res9: Either[Throwable, List[CborItem]] = Right(
// value = List(
// StartIndefiniteArray,
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@265e7a9d,
// offset = 0L,
// size = 1L
// )
// )
// ),
// StartArray(size = 2L),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@172a41d9,
// offset = 0L,
// size = 1L
// )
// )
// ),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@1fcedbdf,
// offset = 0L,
// size = 1L
// )
// )
// ),
// StartArray(size = 2L),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@1890caeb,
// offset = 0L,
// size = 1L
// )
// )
// ),
// PositiveInt(
// bytes = Chunk(
// bytes = View(
// at = scodec.bits.ByteVector$AtArray@4dcfc1f6,
// offset = 0L,
// size = 1L
// )
// )
// ...