Introduction

Module: Maven Central

The fs2-data-cbor module provides tools to parse and transform CBOR data in a streaming manner.

Low-level representation

The low-level representation lives in the fs2.data.cbor.low package and follows closely the structure defined in the RFC. It is a flat structure, which doesn't represent data as an AST. This representation is useful when you don't need the tree structure for your processing or when the data cannot be represented in an AST way (e.g. when a collection contains more than Int.MaxValue elements).

Parsing

Parsing a CBOR binary stream into low-level representation, use the items pipe.

import fs2._
import fs2.data.cbor.low._

import scodec.bits._

val byteStream = Stream.chunk(Chunk.byteVector(hex"8301820203820405"))
// byteStream: Stream[[x]Pure[x], Byte] = Stream(..)

val itemStream = byteStream.through(items[Fallible])
// itemStream: Stream[[A]Fallible[A], CborItem] = Stream(..)
itemStream.compile.toList
// res0: Either[Throwable, List[CborItem]] = Right(
//   value = List(
//     StartArray(size = 3L),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@a6b52b4,
//           offset = 0L,
//           size = 1L
//         )
//       )
//     ),
//     StartArray(size = 2L),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@6c777513,
//           offset = 0L,
//           size = 1L
//         )
//       )
//     ),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@5120228f,
//           offset = 0L,
//           size = 1L
//         )
//       )
//     ),
//     StartArray(size = 2L),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@4d762c64,
//           offset = 0L,
//           size = 1L
//         )
//       )
//     ),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@1d58298c,
//           offset = 0L,
//           size = 1L
//         )
//       )
// ...

Serializing

When you already have a hand on a CBOR item stream, you can serialize it back to the binary representation (for instance to send it to the consumer) by making it go through the toBinary pipe.

Stream
  .emits(List(
    CborItem.StartArray(2),
    CborItem.TextString("an"),
    CborItem.TextString("array")))
  .through(toBinary[Fallible])
  .compile
  .to(ByteVector)
  .map(_.toHex)
// res1: Either[Throwable, String] = Right(value = "8262616e656172726179")

The toBinary pipe validates the input and fail if the input steam is invalid. For instance let's say that we are missing some elements in the array.

val invalid =
  Stream
    .emits(List(
      CborItem.StartArray(3),
      CborItem.TextString("an"),
      CborItem.TextString("array")))
// invalid: Stream[[x]Pure[x], Product with CborItem with Serializable] = Stream(..)

invalid
  .through(toBinary[Fallible])
  .compile
  .drain
// res2: Either[Throwable, Unit] = Left(
//   value = fs2.data.cbor.CborValidationException: unexpected end of CBOR item stream
// )

The pipe is fine when you are sure that the input stream is valid or when throughput doesn't matter. In the case source is safe, or throughput matters more than correctness, you can use the toNonValidatedBinary pipe instead, which will silently generate a non valid byte stream.

val invalidBytes = invalid.through(toNonValidatedBinary)
// invalidBytes: Stream[[x]Pure[x], Byte] = Stream(..)

invalidBytes
  .compile
  .to(ByteVector)
  .toHex
// res3: String = "8362616e656172726179"

Of course, this stream will fail to be parsed.

invalidBytes.through(items[Fallible]).compile.drain
// res4: Either[Throwable, Unit] = Left(
//   value = fs2.data.cbor.CborParsingException: unexpected end of input
// )

High-level representation

Some transformations are easier or safe to perform on more structure data, this is a scenario in which the high-level representation can come in handy. This representation lives in the fs2.data.cbor.high package.

Parsing

The main parsing pipe to get a high-level value stream from a byte stream is the values pipe.

import fs2.data.cbor.high._

val valueStream = byteStream.through(values[Fallible])
// valueStream: Stream[[A]Fallible[A], CborValue] = Stream(..)
valueStream.compile.toList
// res5: Either[Throwable, List[CborValue]] = Right(
//   value = List(
//     Array(
//       values = List(
//         Integer(value = 1),
//         Array(
//           values = List(Integer(value = 2), Integer(value = 3)),
//           indefinite = false
//         ),
//         Array(
//           values = List(Integer(value = 4), Integer(value = 5)),
//           indefinite = false
//         )
//       ),
//       indefinite = false
//     )
//   )
// )

If you already have a stream of low-level items, you can make it go through the parseValues pipe to get the value stream.

itemStream.through(parseValues[Fallible]).compile.toList
// res6: Either[Throwable, List[CborValue]] = Right(
//   value = List(
//     Array(
//       values = List(
//         Integer(value = 1),
//         Array(
//           values = List(Integer(value = 2), Integer(value = 3)),
//           indefinite = false
//         ),
//         Array(
//           values = List(Integer(value = 4), Integer(value = 5)),
//           indefinite = false
//         )
//       ),
//       indefinite = false
//     )
//   )
// )

Serializing

High-level value stream can be serialized to binary format by using the toBinary pipe.

valueStream
  .through(data.cbor.high.toBinary)
  .compile
  .to(ByteVector)
  .map(_.toHex)
// res7: Either[Throwable, String] = Right(value = "8301820203820405")

It is possible to convert values back to low-level items, using the toItems pipes.

valueStream.through(toItems).compile.toList
// res8: Either[Throwable, List[CborItem]] = Right(
//   value = List(
//     StartArray(size = 3L),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@69433240,
//           offset = 0L,
//           size = 1L
//         )
//       )
//     ),
//     StartArray(size = 2L),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@6a289ef7,
//           offset = 0L,
//           size = 1L
//         )
//       )
//     ),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@4acfafd8,
//           offset = 0L,
//           size = 1L
//         )
//       )
//     ),
//     StartArray(size = 2L),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@255eb0d9,
//           offset = 0L,
//           size = 1L
//         )
//       )
//     ),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@557f5700,
//           offset = 0L,
//           size = 1L
//         )
//       )
// ...

You can locally control whether an array or a map will be streamed in the serialized form by setting the indefinite flag to true.

valueStream
  .map {
    // make top-level arrays streamed
    case CborValue.Array(elements, _) => CborValue.Array(elements, true)
    case value                        => value
  }
  .through(toItems)
  .compile
  .toList
// res9: Either[Throwable, List[CborItem]] = Right(
//   value = List(
//     StartIndefiniteArray,
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@1c00cc98,
//           offset = 0L,
//           size = 1L
//         )
//       )
//     ),
//     StartArray(size = 2L),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@33e85f4f,
//           offset = 0L,
//           size = 1L
//         )
//       )
//     ),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@3d4b3008,
//           offset = 0L,
//           size = 1L
//         )
//       )
//     ),
//     StartArray(size = 2L),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@326137ac,
//           offset = 0L,
//           size = 1L
//         )
//       )
//     ),
//     PositiveInt(
//       bytes = Chunk(
//         bytes = View(
//           at = scodec.bits.ByteVector$AtArray@281f5974,
//           offset = 0L,
//           size = 1L
//         )
//       )
// ...