JSON Libraries

Bindings to popular Scala JSON libraries can be added by implementing the Builder and Tokenizer traits. fs2-data provides some of them out of the box.

Examples on this page use the following input:

import fs2.{Fallible, Stream}
import fs2.data.json._
import fs2.data.json.jsonpath._
import fs2.data.json.jsonpath.literals._

def input[F[_]] = Stream.emit("""{
  "field1": 0,
  "field2": "test",
  "field3": [1, 2, 3]
  }
  {
  "field1": 2,
  "field3": []
}""").covary[F]

val stream = input[Fallible].through(tokens)
// stream: Stream[[x]Fallible[x], Token] = Stream(..)

val sel = jsonpath"$$.field3[*]"
// sel: JsonPath = JsonPath(
//   locations = NonEmptyList(
//     head = Child(child = Name(n = "field3")),
//     tail = List(Pred(predicate = Wildcard))
//   )
// )

Circe

Module: Maven Central

The fs2-data-json-circe module provides Builder and Tokenizer instances for the circe Json type and a Tokenizer instance for each type T having an implicit Encoder[T] in scope. For instance both examples from the core module documentation with circe can be written that way:

import fs2.data.json.circe._

val asts = input[Fallible].through(ast.parse)
// asts: Stream[[x]Fallible[x], io.circe.Json] = Stream(..)
asts.map(_.spaces2).compile.toList
// res0: Either[Throwable, List[String]] = Right(
//   value = List(
//     """{
//   "field1" : 0,
//   "field2" : "test",
//   "field3" : [
//     1,
//     2,
//     3
//   ]
// }""",
//     """{
//   "field1" : 2,
//   "field3" : [
//   ]
// }"""
//   )
// )

You can use filter.values to selects only the values matching the JSONPath and deserialize them using the builder.

import fs2.data.json.circe._

import cats.effect._
import cats.syntax.all._
import cats.effect.unsafe.implicits.global

stream
  .lift[IO]
  .through(jsonpath.filter.values(sel))
  .compile
  .toList
  .unsafeRunSync()
// res1: List[io.circe.Json] = List(
//   JNumber(value = JsonDecimal(input = "1")),
//   JNumber(value = JsonDecimal(input = "2")),
//   JNumber(value = JsonDecimal(input = "3"))
// )

The circe integration also provides Deserializer and Serializer based on the circe Decoders and Encoders.

Let's say we defined our data model as follows:

import io.circe.generic.JsonCodec

@JsonCodec
case class Data(field1: Int, field2: Option[String], field3: List[Int])

@JsonCodec
case class WrappedData(field1: Int, field2: Option[String], field3: List[Wrapped])

@JsonCodec
case class Wrapped(test: Int)

We could write the previous wrapping transformation using these case classes.

import fs2.data.json.selector._
import fs2.data.json.circe._

val values = stream.through(codec.deserialize[Fallible, Data])
// values: Stream[[x]Fallible[x], Data] = Stream(..)
values.compile.toList
// res2: Either[Throwable, List[Data]] = Right(
//   value = List(
//     Data(field1 = 0, field2 = Some(value = "test"), field3 = List(1, 2, 3)),
//     Data(field1 = 2, field2 = None, field3 = List())
//   )
// )

def wrap(data: Data): WrappedData =
  WrappedData(data.field1, data.field2, data.field3.map(Wrapped(_)))

val sel = root.field("field3").iterate.compile
// sel: Selector = PipeSelector(
//   left = NameSelector(
//     pred = Single(name = "field3"),
//     strict = true,
//     mandatory = false
//   ),
//   right = IteratorSelector(strict = true)
// )

val transformed = stream.through(codec.transform(sel, wrap))
// transformed: Stream[[x]Fallible[x], Token] = Stream(..)
transformed.compile.to(collector.pretty())
// res3: Either[Throwable, String] = Left(
//   value = JsonException(
//     msg = "An error occurred while transforming Json data",
//     context = Some(
//       value = Index(
//         idx = 0L,
//         parent = Key(
//           name = "field3",
//           parent = Key(
//             name = "field2",
//             parent = Key(name = "field1", parent = Root)
//           )
//         )
//       )
//     ),
//     inner = JsonException(
//       msg = "an error occured while deserializing Json values",
//       context = None,
//       inner = DecodingFailure at .field1: Missing required field
//     )
//   )
// )

Dropping values can be done similarly.

import fs2.data.json.circe._
import cats.syntax.all._

val f1 = root.field("field1").compile
// f1: Selector = NameSelector(
//   pred = Single(name = "field1"),
//   strict = true,
//   mandatory = false
// )

val transformed = stream.through(codec.transformOpt(f1, (i: Int) => (i > 0).guard[Option].as(i)))
// transformed: Stream[[x]Fallible[x], Token] = Stream(..)
transformed.compile.to(collector.pretty())
// res4: Either[Throwable, String] = Right(
//   value = """{
//   "field2": "test",
//   "field3": [
//     1,
//     2,
//     3
//   ]
// }
// {
//   "field1": 2,
//   "field3": []
// }"""
// )

Migrating from circe-fs2

If you were using circe-fs2 to emit streams of Json values, you can easily switch to fs2-data-json-circe. Just replace your usages of stringStreamParser or byteStreamParser by usage of fs2.data.json.ast.parse.

For instance if you had this code:

import io.circe.fs2._

import cats.effect._

input[SyncIO]
  .through(stringStreamParser)
  .map(_.spaces2)
  .compile
  .toList
  .unsafeRunSync()
// res5: List[String] = List(
//   """{
//   "field1" : 0,
//   "field2" : "test",
//   "field3" : [
//     1,
//     2,
//     3
//   ]
// }""",
//   """{
//   "field1" : 2,
//   "field3" : [
//   ]
// }"""
// )

You can replace it by

import fs2.data.json._
import fs2.data.json.circe._

input[Fallible]
  .through(ast.parse)
  .map(_.spaces2)
  .compile
  .toList
// res6: Either[Throwable, List[String]] = Right(
//   value = List(
//     """{
//   "field1" : 0,
//   "field2" : "test",
//   "field3" : [
//     1,
//     2,
//     3
//   ]
// }""",
//     """{
//   "field1" : 2,
//   "field3" : [
//   ]
// }"""
//   )
// )

If you were using byteStreamParser, please refer to the the fs2.data.text package documentation to indicate how to decode the byte stream.

Play! JSON

Module: Maven Central

The fs2-data-json-play module provides Builder and Tokenizer instances for the Play! JSON JsValue type and a Tokenizer instance for each type T having an implicit Writes[T] in scope.

It also provides Deserializer for types with a Reads instance and Serializer for the ones with a Writes instance.