JSON Queries

Module: Maven Central

The fs2-data-json module provides a streaming implementation of a jq-like query language.

This allows for extracting and transforming JSON data in a streaming and declarative fashion. It can be useful when you want to extract and transform only a part of an input JSON data.

The JSON query feature is still experimental. It should be stable enough to be used but you can come across some bugs when using complex queries. In such a case, do not hesitate to reach out on Discord or GitHub (see link at the top of the page).

Let's use the following JSON input as an example.

import cats.effect.SyncIO
import cats.syntax.all._

import fs2._
import fs2.data.json._

val input = """{
              |  "field1": 0,
              |  "field2": "test",
              |  "field3": [1, 2, 3]
              |}""".stripMargin
// input: String = """{
//   "field1": 0,
//   "field2": "test",
//   "field3": [1, 2, 3]
// }"""

val stream = Stream.emit(input).through(tokens[SyncIO, String])
// stream: Stream[[A]SyncIO[A], Token] = Stream(..)

Building a query

There are several ways to create queries:

Parsing a string using the jq parser

For instance, to create an output array containing one element per element in field3, elements being objects with field2 and the current value from field3 we can write:

import fs2.data.json.jq._

val wrappedQuery = JqParser.either("""[ { "field2": .field2, "field3": .field3[] } ]""")
// wrappedQuery: Either[Throwable, Jq] = Right(
//   value = Arr(
//     prefix = Identity,
//     values = List(
//       Obj(
//         prefix = Identity,
//         fields = List(
//           ("field2", Field(name = "field2")),
//           (
//             "field3",
//             Iterator(filter = Field(name = "field3"), inner = Identity)
//           )
//         )
//       )
//     )
//   )
// )

The jq parser wraps the result in anything that has an MonadError with error type Throwable to catch potential parsing errors. If you prefer not to have this wrapping, you can use the jq interpolator.

import fs2.data.json.jq.literals._

val query = jq"""[ { "field2": .field2, "field3": .field3[] } ]"""
// query: Jq = Arr(
//   prefix = Identity,
//   values = List(
//     Obj(
//       prefix = Identity,
//       fields = List(
//         ("field2", Field(name = "field2")),
//         ("field3", Iterator(filter = Field(name = "field3"), inner = Identity))
//       )
//     )
//   )
// )

The query language

The general form of a query is a two phases process:

The constructor can contain sub-queries applied to each selected element.

So a query is of the form:

The filters

Following filters exist:

Filters can be sequenced by using the pipe (|) symbol as separator, for instance to select the field a and then only the third element in a if it is an array, the filter is .a | .[2]. The pipe separator can be elided in many cases, and the leading dot that would follow it is then removed. For instance, the previous example can also be written .a[2].

The recursive descent operator must be preceded by a pipe if it is not the first operator.

The constructors

Values

Any JSON scalar value is a valid query constructor, it means:

build the equivalent JSON value. The scalar values do not depend on the selected values from the input.

Objects

It is possible to build a JSON object, whose field values may depend on the selected elements from the filter phase, by using the following syntax: { "a": query1, "b": query2, ... }.

Each object value is a full query. If a query emits several elements (e.g. is an iterator), then one object is emitted to the output per element the iterator filter selects.

The object constructor can only contain one top-level iterator query. For instance, trying to compile the following query will fail:

{ "a": .a[], "b": .b[] }

Arrays

It is possible to build a JSON array, whose element values may depend on the selected elements from the filter phase, by using the following syntax: [ query1, query2, ... ].

Each value is a full query. If a query emits several elements (e.g. is an iterator), then all resulting elements are emitted as array elements, in the order they are selected.

Using queries

A query must first be compiled to be usable. Compiling a query can be a quite expensive computation, but a compiled query can be reused any number of time, so you usually will compile it only once.

To use a query, make your stream pass though the compiled query obtained above. A compiled query is a Pipe[F, Token, Token].

val qCompiler = jq.Compiler[SyncIO]
// qCompiler: jq.Compiler[SyncIO] = fs2.data.json.jq.internal.ESPJqCompiler@20a68443

val compiled = qCompiler.compile(query).unsafeRunSync()
// compiled: Pipe[SyncIO, Token, Token] = <function1>

stream
  .through(compiled)
  .compile
  .to(collector.pretty())
  .unsafeRunSync()
// res0: String = """[
//   {
//     "field2": "test",
//     "field3": 1
//   },
//   {
//     "field2": "test",
//     "field3": 2
//   },
//   {
//     "field2": "test",
//     "field3": 3
//   }
// ]"""