JSON Libraries
Bindings to popular Scala JSON libraries can be added by implementing the Builder
and Tokenizer
traits. fs2-data
provides some of them out of the box.
Examples on this page use the following input:
import fs2.{Fallible, Stream}
import fs2.data.json._
import fs2.data.json.jsonpath._
import fs2.data.json.jsonpath.literals._
def input[F[_]] = Stream.emit("""{
"field1": 0,
"field2": "test",
"field3": [1, 2, 3]
}
{
"field1": 2,
"field3": []
}""").covary[F]
val stream = input[Fallible].through(tokens)
// stream: Stream[[x]Fallible[x], Token] = Stream(..)
val sel = jsonpath"$$.field3[*]"
// sel: JsonPath = JsonPath(
// locations = NonEmptyList(
// head = Child(child = Name(n = "field3")),
// tail = List(Pred(predicate = Wildcard))
// )
// )
Circe
The fs2-data-json-circe
module provides Builder
and Tokenizer
instances for the circe Json
type and a Tokenizer
instance for each type T
having an implicit Encoder[T]
in scope.
For instance both examples from the core module documentation with circe can be written that way:
import fs2.data.json.circe._
val asts = input[Fallible].through(ast.parse)
// asts: Stream[[x]Fallible[x], io.circe.Json] = Stream(..)
asts.map(_.spaces2).compile.toList
// res0: Either[Throwable, List[String]] = Right(
// value = List(
// """{
// "field1" : 0,
// "field2" : "test",
// "field3" : [
// 1,
// 2,
// 3
// ]
// }""",
// """{
// "field1" : 2,
// "field3" : [
// ]
// }"""
// )
// )
You can use filter.values
to selects only the values matching the JSONPath and deserialize them using the builder.
import fs2.data.json.circe._
import cats.effect._
import cats.syntax.all._
import cats.effect.unsafe.implicits.global
stream
.lift[IO]
.through(jsonpath.filter.values(sel))
.compile
.toList
.unsafeRunSync()
// res1: List[io.circe.Json] = List(
// JNumber(value = JsonDecimal(input = "1")),
// JNumber(value = JsonDecimal(input = "2")),
// JNumber(value = JsonDecimal(input = "3"))
// )
The circe integration also provides Deserializer
and Serializer
based on the circe Decoder
s and Encoder
s.
Let's say we defined our data model as follows:
import io.circe.generic.JsonCodec
@JsonCodec
case class Data(field1: Int, field2: Option[String], field3: List[Int])
@JsonCodec
case class WrappedData(field1: Int, field2: Option[String], field3: List[Wrapped])
@JsonCodec
case class Wrapped(test: Int)
We could write the previous wrapping transformation using these case classes.
import fs2.data.json.selector._
import fs2.data.json.circe._
val values = stream.through(codec.deserialize[Fallible, Data])
// values: Stream[[x]Fallible[x], Data] = Stream(..)
values.compile.toList
// res2: Either[Throwable, List[Data]] = Right(
// value = List(
// Data(field1 = 0, field2 = Some(value = "test"), field3 = List(1, 2, 3)),
// Data(field1 = 2, field2 = None, field3 = List())
// )
// )
def wrap(data: Data): WrappedData =
WrappedData(data.field1, data.field2, data.field3.map(Wrapped(_)))
val sel = root.field("field3").iterate.compile
// sel: Selector = PipeSelector(
// left = NameSelector(
// pred = Single(name = "field3"),
// strict = true,
// mandatory = false
// ),
// right = IteratorSelector(strict = true)
// )
val transformed = stream.through(codec.transform(sel, wrap))
// transformed: Stream[[x]Fallible[x], Token] = Stream(..)
transformed.compile.to(collector.pretty())
// res3: Either[Throwable, String] = Left(
// value = JsonException(
// msg = "An error occurred while transforming Json data",
// context = Some(
// value = Index(
// idx = 0L,
// parent = Key(
// name = "field3",
// parent = Key(
// name = "field2",
// parent = Key(name = "field1", parent = Root)
// )
// )
// )
// ),
// inner = JsonException(
// msg = "an error occured while deserializing Json values",
// context = None,
// inner = DecodingFailure at .field1: Missing required field
// )
// )
// )
Dropping values can be done similarly.
import fs2.data.json.circe._
import cats.syntax.all._
val f1 = root.field("field1").compile
// f1: Selector = NameSelector(
// pred = Single(name = "field1"),
// strict = true,
// mandatory = false
// )
val transformed = stream.through(codec.transformOpt(f1, (i: Int) => (i > 0).guard[Option].as(i)))
// transformed: Stream[[x]Fallible[x], Token] = Stream(..)
transformed.compile.to(collector.pretty())
// res4: Either[Throwable, String] = Right(
// value = """{
// "field2": "test",
// "field3": [
// 1,
// 2,
// 3
// ]
// }
// {
// "field1": 2,
// "field3": []
// }"""
// )
Migrating from circe-fs2
If you were using circe-fs2
to emit streams of Json
values, you can easily switch to fs2-data-json-circe
. Just replace your usages of stringStreamParser
or byteStreamParser
by usage of fs2.data.json.ast.parse
.
For instance if you had this code:
import io.circe.fs2._
import cats.effect._
input[SyncIO]
.through(stringStreamParser)
.map(_.spaces2)
.compile
.toList
.unsafeRunSync()
// res5: List[String] = List(
// """{
// "field1" : 0,
// "field2" : "test",
// "field3" : [
// 1,
// 2,
// 3
// ]
// }""",
// """{
// "field1" : 2,
// "field3" : [
// ]
// }"""
// )
You can replace it by
import fs2.data.json._
import fs2.data.json.circe._
input[Fallible]
.through(ast.parse)
.map(_.spaces2)
.compile
.toList
// res6: Either[Throwable, List[String]] = Right(
// value = List(
// """{
// "field1" : 0,
// "field2" : "test",
// "field3" : [
// 1,
// 2,
// 3
// ]
// }""",
// """{
// "field1" : 2,
// "field3" : [
// ]
// }"""
// )
// )
If you were using byteStreamParser
, please refer to the the fs2.data.text
package documentation to indicate how to decode the byte stream.
Play! JSON
The fs2-data-json-play
module provides Builder
and Tokenizer
instances for the Play! JSON JsValue
type and a Tokenizer
instance for each type T
having an implicit Writes[T]
in scope.
It also provides Deserializer
for types with a Reads
instance and Serializer
for the ones with a Writes
instance.