JSONPath

Module: Maven Central

The fs2-data-json module provides a streaming implementation of JSONPath.

Let's use the following JSON input as an example.

import cats.syntax.all._

import fs2._
import fs2.data.json._

val input = """{
              |  "field1": 0,
              |  "field2": "test",
              |  "field3": [1, 2, 3]
              |}
              |{
              |  "field1": 2,
              |  "field3": []
              |}""".stripMargin
// input: String = """{
//   "field1": 0,
//   "field2": "test",
//   "field3": [1, 2, 3]
// }
// {
//   "field1": 2,
//   "field3": []
// }"""

val stream = Stream.emit(input).through(tokens[Fallible, String])
// stream: Stream[[A]Fallible[A], Token] = Stream(..)

Building a JSONPath

A subset of JSONPath can be used to select a subset of a JSON token stream. There are several ways to create selectors:

Parsing a string using the JSONPath parser

For instance, to select and enumerate elements that are in the field3 array, you can create this selector. Only the tokens describing the values in field3 will be emitted as a result.

import fs2.data.json.jsonpath._

val selector = JsonPathParser.either("$.field3[*]")
// selector: Either[Throwable, JsonPath] = Right(
//   value = JsonPath(
//     locations = NonEmptyList(
//       head = Child(child = Name(n = "field3")),
//       tail = List(Pred(predicate = Wildcard))
//     )
//   )
// )

The JSONPath parser wraps the result in anything that has an MonadError with error type Throwable to catch potential parsing errors. If you prefer not to have this wrapping, you can use the jsonpath interpolator.

import fs2.data.json.jsonpath.literals._

val path = jsonpath"$$.field3[*]"
// path: JsonPath = JsonPath(
//   locations = NonEmptyList(
//     head = Child(child = Name(n = "field3")),
//     tail = List(Pred(predicate = Wildcard))
//   )
// )

Because $ is a special character in interpolated strings, you need to escape it by doubling it. The advantage of the interpolator is that potential syntax errors are checked at compilation time.

The subset

The supported JSONPath features are:

Using JSONPath

Using the path defined above, we can filter the stream of tokens, to only emit selected tokens downstream. This can be used to drastically reduce the amount of emitted data, to only the parts that are of interest for you. The filtering pipes are located in the fs2.data.json.jsonpath.filter namespace.

The main operators in the namespace are:

Since JSONPath includes a recursive descent operator, there can be nested matches for your path. The matches are returned in the order their first matching token is encountered in the input. This means that for nested matches, the first stream returned is the ancestor element.

Using filter.collect, you can build a stream that collects each match for the provided collector and emits the aggregated result. For instance, to build the list of string representations of the matches, you can run the following code.

import fs2.data.json.literals._
import fs2.data.json.jsonpath.filter

import cats.effect._
import cats.effect.unsafe.implicits.global

val recursive = jsonpath"$$..a"
// recursive: JsonPath = JsonPath(
//   locations = NonEmptyList(
//     head = Descendant(child = Name(n = "a")),
//     tail = List()
//   )
// )

val json = json"""{
  "a": {
    "a": {
      "c": 1
    },
    "b": 2,
    "c": 3
  }
}"""
// json: Stream[Fallible, Token] = Stream(..)

json
  .lift[IO]
  .through(filter.collect(recursive, List))
  .compile
  .toList
  .unsafeRunSync()
// res0: List[List[Token]] = List(
//   List(
//     StartObject,
//     Key(value = "a"),
//     StartObject,
//     Key(value = "c"),
//     NumberValue(value = "1"),
//     EndObject,
//     Key(value = "b"),
//     NumberValue(value = "2"),
//     Key(value = "c"),
//     NumberValue(value = "3"),
//     EndObject
//   ),
//   List(StartObject, Key(value = "c"), NumberValue(value = "1"), EndObject)
// )

If you want to have results emitted as early as possible instead of in order, you can set the deterministic parameter to false.

json
  .lift[IO]
  .through(filter.collect(recursive, List, deterministic = false))
  .compile
  .toList
  .unsafeRunSync()
// res1: List[List[Token]] = List(
//   List(StartObject, Key(value = "c"), NumberValue(value = "1"), EndObject),
//   List(
//     StartObject,
//     Key(value = "a"),
//     StartObject,
//     Key(value = "c"),
//     NumberValue(value = "1"),
//     EndObject,
//     Key(value = "b"),
//     NumberValue(value = "2"),
//     Key(value = "c"),
//     NumberValue(value = "3"),
//     EndObject
//   )
// )

The filter.through operator allows for handling each match in a streaming fashion. For instance, let's say you want to save each match in a file, incrementing a counter on each match. You can run the following code.

import fs2.io.file.{Files, Path}

def saveJson(counter: Ref[IO, Int], tokens: Stream[IO, Token]): Stream[IO, Nothing] =
  Stream.eval(counter.getAndUpdate(_ + 1)).flatMap { index =>
   tokens 
      .through(render.compact)
      .through(Files[IO].writeUtf8(Path(s"match-$index.json")))
  }

val program =
  for {
    counter <- Ref[IO].of(0)
    _ <- json
      .lift[IO]
      .through(filter.through(recursive, saveJson(counter, _)))
      .compile
      .drain
  } yield ()
// program: IO[Unit] = FlatMap(
//   ioe = Delay(
//     thunk = cats.effect.IO$$$Lambda$12958/0x000000080335e840@73965525,
//     event = cats.effect.tracing.TracingEvent$StackTrace
//   ),
//   f = <function1>,
//   event = cats.effect.tracing.TracingEvent$StackTrace
// )

program.unsafeRunSync()

Files[IO].readUtf8(Path("match-0.json")).compile.string.unsafeRunSync()
// res3: String = "{\"c\":1}"
Files[IO].readUtf8(Path("match-1.json")).compile.string.unsafeRunSync()
// res4: String = "{\"a\":{\"c\":1},\"b\":2,\"c\":3}"

The operator described below is unsafe and should be used carefully only if none of the above operators fits your purpose. When using it, please ensure that you:

  • consume all inner Streams
  • consume them in parallel (e.g. with a variant of parEvalMap and paralellism >1, or with a variant of parJoin).

Failure to do so might result in memory leaks or hanging programs.

The filter.unsafeRaw emits a stream of all matches. Each match is represented as a nested stream of JSON tokens which must be consumed.

json
  .lift[IO]
  .through(filter.unsafeRaw(recursive))
  .parEvalMapUnbounded(_.compile.toList)
  .compile
  .toList
  .unsafeRunSync()
// res5: List[List[Token]] = List(
//   List(
//     StartObject,
//     Key(value = "a"),
//     StartObject,
//     Key(value = "c"),
//     NumberValue(value = "1"),
//     EndObject,
//     Key(value = "b"),
//     NumberValue(value = "2"),
//     Key(value = "c"),
//     NumberValue(value = "3"),
//     EndObject
//   ),
//   List(StartObject, Key(value = "c"), NumberValue(value = "1"), EndObject)
// )