JSONPath

Module: Maven Central

The fs2-data-json module provides a streaming implementation of JSONPath.

Let's use the following JSON input as an example.

import cats.syntax.all._

import fs2._
import fs2.data.json._

val input = """{
              |  "field1": 0,
              |  "field2": "test",
              |  "field3": [1, 2, 3]
              |}
              |{
              |  "field1": 2,
              |  "field3": []
              |}""".stripMargin
// input: String = """{
//   "field1": 0,
//   "field2": "test",
//   "field3": [1, 2, 3]
// }
// {
//   "field1": 2,
//   "field3": []
// }"""

val stream = Stream.emit(input).through(tokens[Fallible, String])
// stream: Stream[[A]Fallible[A], Token] = Stream(..)

Building a JSONPath

A subset of JSONPath can be used to select a subset of a JSON token stream. There are several ways to create selectors:

Parsing a string using the JSONPath parser

For instance, to select and enumerate elements that are in the field3 array, you can create this selector. Only the tokens describing the values in field3 will be emitted as a result.

import fs2.data.json.jsonpath._

val selector = JsonPathParser.either("$.field3[*]")
// selector: Either[Throwable, JsonPath] = Right(
//   value = JsonPath(
//     locations = NonEmptyList(
//       head = Child(child = Name(n = "field3")),
//       tail = List(Pred(predicate = Wildcard))
//     )
//   )
// )

The JSONPath parser wraps the result in anything that has an MonadError with error type Throwable to catch potential parsing errors. If you prefer not to have this wrapping, you can use the jsonpath interpolator.

import fs2.data.json.jsonpath.literals._

val path = jsonpath"$$.field3[*]"
// path: JsonPath = JsonPath(
//   locations = NonEmptyList(
//     head = Child(child = Name(n = "field3")),
//     tail = List(Pred(predicate = Wildcard))
//   )
// )

Because $ is a special character in interpolated strings, you need to escape it by doubling it. The advantage of the interpolator is that potential syntax errors are checked at compilation time.

The subset

The supported JSONPath features are:

Using JSONPath

Using the path defined above, we can filter the stream of tokens, to only emit selected tokens downstream. This can be used to drastically reduce the amount of emitted data, to only the parts that are of interest for you. The filtering pipes are located in the fs2.data.json.jsonpath.filter namespace.

Since JSONPath includes a recursive descent operator, there can be nested matches for your path. The filter.raw emits a stream of all matches. Each match is represented as a nested stream of JSON tokens which must be consumed.

import fs2.data.json.jsonpath.filter

import cats.effect._
import cats.effect.unsafe.implicits.global

val filtered = stream.lift[IO].through(filter.raw(path)).parEvalMapUnbounded(_.compile.toList)
// filtered: Stream[[x]IO[x], List[Token]] = Stream(..)
filtered.compile.toList.unsafeRunSync()
// res0: List[List[Token]] = List(
//   List(NumberValue(value = "1")),
//   List(NumberValue(value = "2")),
//   List(NumberValue(value = "3"))
// )

The matching streams are returned in the order their matching element is encountered in the input. This means that for nested matches, the first stream returned is the ancestor element.

import fs2.data.json.literals._

val recursive = jsonpath"$$..a"
// recursive: JsonPath = JsonPath(
//   locations = NonEmptyList(
//     head = Descendant(child = Name(n = "a")),
//     tail = List()
//   )
// )

val json = json"""{
  "a": {
    "a": {
      "c": 1
    },
    "b": 2,
    "c": 3
  }
}"""
// json: Stream[Fallible, Token] = Stream(..)

json
  .lift[IO]
  .through(filter.raw(recursive))
  .parEvalMapUnbounded(_.compile.toList)
  .compile
  .toList
  .unsafeRunSync()
// res1: List[List[Token]] = List(
//   List(
//     StartObject,
//     Key(value = "a"),
//     StartObject,
//     Key(value = "c"),
//     NumberValue(value = "1"),
//     EndObject,
//     Key(value = "b"),
//     NumberValue(value = "2"),
//     Key(value = "c"),
//     NumberValue(value = "3"),
//     EndObject
//   ),
//   List(StartObject, Key(value = "c"), NumberValue(value = "1"), EndObject)
// )

This is actually a common use case, so the library offers filter.collect to have this behavior for any collector.

json
  .lift[IO]
  .through(filter.collect(recursive, List))
  .compile
  .toList
  .unsafeRunSync()
// res2: List[List[Token]] = List(
//   List(
//     StartObject,
//     Key(value = "a"),
//     StartObject,
//     Key(value = "c"),
//     NumberValue(value = "1"),
//     EndObject,
//     Key(value = "b"),
//     NumberValue(value = "2"),
//     Key(value = "c"),
//     NumberValue(value = "3"),
//     EndObject
//   ),
//   List(StartObject, Key(value = "c"), NumberValue(value = "1"), EndObject)
// )

If you want to have results emitted as early as possible instead of in order, you can set the deterministic parameter to false.

json
  .lift[IO]
  .through(filter.collect(recursive, List, deterministic = false))
  .compile
  .toList
  .unsafeRunSync()
// res3: List[List[Token]] = List(
//   List(StartObject, Key(value = "c"), NumberValue(value = "1"), EndObject),
//   List(
//     StartObject,
//     Key(value = "a"),
//     StartObject,
//     Key(value = "c"),
//     NumberValue(value = "1"),
//     EndObject,
//     Key(value = "b"),
//     NumberValue(value = "2"),
//     Key(value = "c"),
//     NumberValue(value = "3"),
//     EndObject
//   )
// )