JSONPath
The fs2-data-json
module provides a streaming implementation of JSONPath.
Let's use the following JSON input as an example.
import cats.syntax.all._
import fs2._
import fs2.data.json._
val input = """{
| "field1": 0,
| "field2": "test",
| "field3": [1, 2, 3]
|}
|{
| "field1": 2,
| "field3": []
|}""".stripMargin
// input: String = """{
// "field1": 0,
// "field2": "test",
// "field3": [1, 2, 3]
// }
// {
// "field1": 2,
// "field3": []
// }"""
val stream = Stream.emit(input).through(tokens[Fallible, String])
// stream: Stream[[A]Fallible[A], Token] = Stream(..)
Building a JSONPath
A subset of JSONPath can be used to select a subset of a JSON token stream. There are several ways to create selectors:
- build the selector using the constructors, which can be quite verbose and cumbersome;
- parse a string with the JSONPath parser;
- use the
jsonpath
interpolator.
Parsing a string using the JSONPath parser
For instance, to select and enumerate elements that are in the field3
array, you can create this selector. Only the tokens describing the values in field3
will be emitted as a result.
import fs2.data.json.jsonpath._
val selector = JsonPathParser.either("$.field3[*]")
// selector: Either[Throwable, JsonPath] = Right(
// value = JsonPath(
// locations = NonEmptyList(
// head = Child(child = Name(n = "field3")),
// tail = List(Pred(predicate = Wildcard))
// )
// )
// )
The JSONPath parser wraps the result in anything that has an MonadError
with error type Throwable
to catch potential parsing errors. If you prefer not to have this wrapping, you can use the jsonpath
interpolator.
import fs2.data.json.jsonpath.literals._
val path = jsonpath"$$.field3[*]"
// path: JsonPath = JsonPath(
// locations = NonEmptyList(
// head = Child(child = Name(n = "field3")),
// tail = List(Pred(predicate = Wildcard))
// )
// )
Because $
is a special character in interpolated strings, you need to escape it by doubling it.
The advantage of the interpolator is that potential syntax errors are checked at compilation time.
The subset
The supported JSONPath features are:
.*
selects all the object children.
..*
selects all the object descendants.
.id
or["id"]
selects the object child with keyid
.
..id
the recursive descent operator
[idx1:idx2]
selects only elements betweenidx1
(inclusive) andidx2
(inclusive) in arrays. It fails if the value it is applied to is not an array.
[idx:]
selects only elements starting fromidx1
(inclusive) until the end of the array. It fails if the value it is applied to is not an array.
[:idx]
selects only elements starting from the beginning of the array up toidx1
(inclusive). It fails if the value it is applied to is not an array.
[*]
selects and enumerates elements from arrays. It fails if the value it is applied to is not an array.
Using JSONPath
Using the path defined above, we can filter the stream of tokens, to only emit selected tokens downstream. This can be used to drastically reduce the amount of emitted data, to only the parts that are of interest for you.
The filtering pipes are located in the fs2.data.json.jsonpath.filter
namespace.
Since JSONPath includes a recursive descent operator, there can be nested matches for your path.
The filter.raw
emits a stream of all matches.
Each match is represented as a nested stream of JSON tokens which must be consumed.
import fs2.data.json.jsonpath.filter
import cats.effect._
import cats.effect.unsafe.implicits.global
val filtered = stream.lift[IO].through(filter.raw(path)).parEvalMapUnbounded(_.compile.toList)
// filtered: Stream[[x]IO[x], List[Token]] = Stream(..)
filtered.compile.toList.unsafeRunSync()
// res0: List[List[Token]] = List(
// List(NumberValue(value = "1")),
// List(NumberValue(value = "2")),
// List(NumberValue(value = "3"))
// )
The matching streams are returned in the order their matching element is encountered in the input. This means that for nested matches, the first stream returned is the ancestor element.
import fs2.data.json.literals._
val recursive = jsonpath"$$..a"
// recursive: JsonPath = JsonPath(
// locations = NonEmptyList(
// head = Descendant(child = Name(n = "a")),
// tail = List()
// )
// )
val json = json"""{
"a": {
"a": {
"c": 1
},
"b": 2,
"c": 3
}
}"""
// json: Stream[Fallible, Token] = Stream(..)
json
.lift[IO]
.through(filter.raw(recursive))
.parEvalMapUnbounded(_.compile.toList)
.compile
.toList
.unsafeRunSync()
// res1: List[List[Token]] = List(
// List(
// StartObject,
// Key(value = "a"),
// StartObject,
// Key(value = "c"),
// NumberValue(value = "1"),
// EndObject,
// Key(value = "b"),
// NumberValue(value = "2"),
// Key(value = "c"),
// NumberValue(value = "3"),
// EndObject
// ),
// List(StartObject, Key(value = "c"), NumberValue(value = "1"), EndObject)
// )
This is actually a common use case, so the library offers filter.collect
to have this behavior for any collector.
json
.lift[IO]
.through(filter.collect(recursive, List))
.compile
.toList
.unsafeRunSync()
// res2: List[List[Token]] = List(
// List(
// StartObject,
// Key(value = "a"),
// StartObject,
// Key(value = "c"),
// NumberValue(value = "1"),
// EndObject,
// Key(value = "b"),
// NumberValue(value = "2"),
// Key(value = "c"),
// NumberValue(value = "3"),
// EndObject
// ),
// List(StartObject, Key(value = "c"), NumberValue(value = "1"), EndObject)
// )
If you want to have results emitted as early as possible instead of in order, you can set the deterministic
parameter to false
.
json
.lift[IO]
.through(filter.collect(recursive, List, deterministic = false))
.compile
.toList
.unsafeRunSync()
// res3: List[List[Token]] = List(
// List(StartObject, Key(value = "c"), NumberValue(value = "1"), EndObject),
// List(
// StartObject,
// Key(value = "a"),
// StartObject,
// Key(value = "c"),
// NumberValue(value = "1"),
// EndObject,
// Key(value = "b"),
// NumberValue(value = "2"),
// Key(value = "c"),
// NumberValue(value = "3"),
// EndObject
// )
// )