mode_edit

Module: Maven Central

The fs2-data-json module provides tools to parse, query and transform JSON data in a streaming manner.

This page covers the following topics:

JSON parsing

To create a stream of JSON tokens from an input stream, use the tokens pipe in fs2.data.json package. This pipe accepts a stream of characters and returns a stream of JSON tokens. This produces a stream of structurally valid tokens forming the JSON documents.

import cats.effect._

import fs2._
import fs2.data.json._

val input = """{
              |  "field1": 0,
              |  "field2": "test",
              |  "field3": [1, 2, 3]
              |}
              |{
              |  "field1": 2,
              |  "field3": []
              |}""".stripMargin
// input: String = """{
//   "field1": 0,
//   "field2": "test",
//   "field3": [1, 2, 3]
// }
// {
//   "field1": 2,
//   "field3": []
// }"""

val stream = Stream.emits(input).through(tokens[Fallible])
// stream: Stream[Fallible, Token] = Stream(..)
stream.compile.toList
// res0: Either[Throwable, List[Token]] = Right(
//   List(
//     StartObject,
//     Key("field1"),
//     NumberValue("0"),
//     Key("field2"),
//     StringValue("test"),
//     Key("field3"),
//     StartArray,
//     NumberValue("1"),
//     NumberValue("2"),
//     NumberValue("3"),
//     EndArray,
//     EndObject,
//     StartObject,
//     Key("field1"),
//     NumberValue("2"),
//     Key("field3"),
//     StartArray,
//     EndArray,
//     EndObject
//   )
// )

The pipe validates the JSON structure while parsing. It reads all the json values in the input stream and emits tokens as they are available.

Selectors

Selectors can be used to select a subset of a JSON token stream. There are several ways to create selectors:

Parsing a string using the selector syntax

For instance, to select and enumerate elements that are in the field3 array, you can create this selector. Only the tokens describing the values in field3 will be emitted as a result.

import cats.implicits._

type ThrowableEither[T] = Either[Throwable, T]

val selector = ".field3.[]".parseSelector[ThrowableEither].toTry.get
// selector: Selector = PipeSelector(
//   NameSelector(Single("field3"), true, false),
//   IteratorSelector(true)
// )

The parseSelector method implicitly comes from the import fs2.data.json._ and wraps the result in anything that has an MonadError with error type Throwable to catch potential parsing errors. If you prefer not to have this wrapping and don’t mind an extra dependency, you can have a look at the interpolator.

The filter syntax is as follows:

Using the selector DSL

The selector DSL is a nice way to describe selectors without using any string parsing. They also allow for programmatically building selectors.
The DSL resides within the fs2.data.json.selector package, and you start a selector using the root builder.
The selector above can be written like this with the DSL:

import fs2.data.json.selector._

val selectorFromDsl = root.field("field3").iterate.compile
// selectorFromDsl: Selector = PipeSelector(
//   NameSelector(Single("field3"), true, false),
//   IteratorSelector(true)
// )

The .compile in the end transforms the previous selector builder from the DSL into the final selector. Builders are safe to reuse, re-compose and compile several times.

You can express the same selectors as with the syntax described above. For instance to make the field mandatory and the iteration lenient you can do:

val selectorFromDsl = root.field("field3").!.iterate.?.compile
// selectorFromDsl: Selector = PipeSelector(
//   NameSelector(Single("field3"), true, true),
//   IteratorSelector(false)
// )

The DSL is typesafe, so that you cannot write invalid selectors. Any attempt to do so results in a compilation error.

// array index selection cannot be made mandatory
root.index(1).!
// error: Only not yet mandatory field selectors can be made mandatory
// root.index(1).!
// ^^^^^^^^^^^^^^^
// you cannot use the same flag twice
root.index(1).?.?
// error: There seems to be no way to make this selector lenient. Is it already?
// root.index(1).!
// ^^^^^

Using JSON selectors

Using the selector defined above, we can filter the stream of tokens, to only emit selected tokens downstream. This can be used to drastically reduce the amount of emitted data, to only the parts that are of interest for you.

val filtered = stream.through(filter(selector))
// filtered: Stream[Fallible[x], Token] = Stream(..)
filtered.compile.toList
// res3: Either[Throwable, List[Token]] = Right(
//   List(NumberValue("1"), NumberValue("2"), NumberValue("3"))
// )

By default, selected values are emitted in the stream as they are matched, resulting in a stream with several JSON values.
If this is not desired, you can wrap the elements into arrays and objects, from the root by calling filter with wrap set to true.

val filteredWrapped = stream.through(filter(selector, wrap = true))
// filteredWrapped: Stream[Fallible[x], Token] = Stream(..)
filteredWrapped.compile.toList
// res4: Either[Throwable, List[Token]] = Right(
//   List(
//     StartObject,
//     Key("field3"),
//     StartArray,
//     NumberValue("1"),
//     NumberValue("2"),
//     NumberValue("3"),
//     EndArray,
//     EndObject,
//     StartObject,
//     Key("field3"),
//     StartArray,
//     EndArray,
//     EndObject
//   )
// )

If the selector selects elements in an array, then the resulting values are wrapped in an array.
On the other hand, if it selects elements in an object, then emitted values are returned wrapped in an object, associated with the last selected keys.

If you want to ensure that selected object keys are present in the JSON value, you can use the ! operator described above. For instance if you want to select field2 and fail the stream as soon as an object does not contain it, you can do:

val mandatorySelector = ".field2!".parseSelector[ThrowableEither].toTry.get
// mandatorySelector: Selector = NameSelector(Single("field2"), true, true)
stream.through(filter(mandatorySelector)).compile.toList
// res5: Either[Throwable, List[Token]] = Left(
//   fs2.data.json.JsonMissingFieldException: missing mandatory fields: field2
// )

The filter preserves the chunk structure, so that the stream fails as soon as an error is encountered in the chunk, but first emitting previously selected values in the same chunk.

AST builder and tokenizer

JSON ASTs can be built if you provider an implicit Builder[Json] to the values pipe. The Builder[Json] typeclass describes how JSON ASTs of type Json are built from streams.

implicit val builder: Builder[SomeJsonType] = ...
val asts = stream.through(values[F, SomeJsonType])

The asts stream emits all top-level JSON values parsed, in our example, the two objects are emitted.

If you provide an implicit Tokenizer[Json], which describes how a JSON AST is transformed into JSON tokens, you can apply transformations to the JSON stream. For instance, you can wrap all values in the fields3 array by using this code:

implicit tokenizer: Tokenizer[SomeJsonType] = ...
val transformed = stream.through(transform[Fallible, Json](selector, json => SomeJsonObject("test" -> json)))

For concrete examples of provided Builders and Tokenizers, please refer to the JSON library binding modules documentation

Sometimes you would like to delete some Json values from the input stream, based o some predicate at a given path, and keep the rest untouched. In this case, you can use the transformOpt pipe, and return None for values you want to remove from the stream.

JSON Renderers

Once you got a JSON token stream, selected and transformed what you needed in it, you can then write the resulting token stream to some storage. This can be achieved using renderers.

For instance, let’s say you want to write the resulting JSON stream to a file in compact form (i.e. with no space or new lines), you can do:

import java.nio.file.Paths

implicit val cs = IO.contextShift(scala.concurrent.ExecutionContext.global)

Blocker[IO].use { blocker =>
  stream
    .through(render.compact)
    .through(text.utf8Encode)
    .lift[IO]
    .through(io.file.writeAll[IO](Paths.get("/some/path/to/file.json"), blocker))
    .compile
    .drain
}

There exists also a pretty() renderer, that indents inner elements by the given indent string.

If you are interested in the String rendering as a value, the library also provides Collectors:

stream.compile.to(collector.compact)
// res7: Either[Throwable, collector.compact.Out] = Right(
//   "{\"field1\":0,\"field2\":\"test\",\"field3\":[1,2,3]}{\"field1\":2,\"field3\":[]}"
// )

// default indentation is 2 spaces
stream.compile.to(collector.pretty())
// res8: Either[Throwable, String] = Right(
//   """{
//   "field1": 0,
//   "field2": "test",
//   "field3": [
//     1,
//     2,
//     3
//   ]
// }
// {
//   "field1": 2,
//   "field3": []
// }"""
// )
// if you are more into tabs (or any other indentation size) you can change the indentation string
stream.compile.to(collector.pretty("\t"))
// res9: Either[Throwable, String] = Right(
//   """{
// 	"field1": 0,
// 	"field2": "test",
// 	"field3": [
// 		1,
// 		2,
// 		3
// 	]
// }
// {
// 	"field1": 2,
// 	"field3": []
// }"""
// )

Generating JSON streams

Another use case of the library can be to generate a JSON token stream. This comes in handy if you are developing a web service that returns some big JSON in chunks.

To this end you can use the pipes in wrap which allow you to wrap a stream into an object structure.

For instance imagine you have a store of events which can return a stream of events, and you have a way to serialize the events into JSON.

sealed trait Event
case class CreateCounter(name: String, initialValue: Int) extends Event
case class RemoveCounter(name: String) extends Event
case class IncreaseCounter(name: String) extends Event

object Event {
  import _root_.io.circe.Encoder
  import _root_.io.circe.generic.extras.Configuration
  import _root_.io.circe.generic.extras.semiauto._
  implicit val configuration = Configuration.default.withDiscriminator("type")

  implicit val encoder: Encoder[Event] = deriveConfiguredEncoder
}

val events = Stream.emits(
  List[Event](
    CreateCounter("counter1", 0),
    IncreaseCounter("counter1"),
    CreateCounter("counter2", 0),
    RemoveCounter("counter2")
  )
)
// events: Stream[Nothing, Event] = Stream(..)

You can generate a stream of JSON token wrapped in an object at a key named events like this:

import fs2.data.json.circe._

val wrappedTokens = events.through(tokenize).through(wrap.asArrayInObject(at = "events"))
// wrappedTokens: Stream[Nothing, Token] = Stream(..)

You can use the renderers described above to generate the rendered chunks to send to the client.

wrappedTokens.through(render.compact).compile.toList
// res10: cats.package.Id[List[String]] = List(
//   "{",
//   "\"events\":",
//   "[",
//   "{\"name\":\"counter1\",\"initialValue\":0,\"type\":\"CreateCounter\"}",
//   ",{\"name\":\"counter1\",\"type\":\"IncreaseCounter\"}",
//   ",{\"name\":\"counter2\",\"initialValue\":0,\"type\":\"CreateCounter\"}",
//   ",{\"name\":\"counter2\",\"type\":\"RemoveCounter\"}",
//   "]",
//   "}"
// )

You can also add other fields to the the generated object stream. For instance, let’s assume we can know how big the stream will be in advance from our event store, we can send this piece of data in the first chunks, so that the client can react accordingly.

import _root_.io.circe.Json

events
  .through(tokenize)
  .through(wrap.asArrayInObject(at = "events", in = Map("size" -> Json.fromInt(4))))
  .through(render.compact)
  .compile
  .toList
// res11: cats.package.Id[List[String]] = List(
//   "{",
//   "\"size\":4",
//   ",\"events\":",
//   "[",
//   "{\"name\":\"counter1\",\"initialValue\":0,\"type\":\"CreateCounter\"}",
//   ",{\"name\":\"counter1\",\"type\":\"IncreaseCounter\"}",
//   ",{\"name\":\"counter2\",\"initialValue\":0,\"type\":\"CreateCounter\"}",
//   ",{\"name\":\"counter2\",\"type\":\"RemoveCounter\"}",
//   "]",
//   "}"
// )

For more pipes and options, please refer to the API documentation.