Handling JSON Lines data

The JSON Lines format describes a way to represent JSON records on single lines. This allows to process records one at a time, reading them as they come. In this cookbook, we will demonstrate how such data can be read and produced using fs2-data.

Reading JSON Lines data

The fs2-data JSON module can natively read concatenated JSON values in an input stream This means that we can naively read data from the input stream and we will get the token stream out of it.

However, this way, we will not check that the input is actually respecting the JSON lines format. The format actually only has a few rules:

We can leverage the operators provided by fs2 and fs2-data to enforce these constraints when reading data.

import cats.effect.unsafe.implicits.global

import cats.effect.IO
import fs2.Stream
import fs2.data.json.JsonException
import fs2.data.json.circe._
import fs2.io.file.{Files, Path}
import io.circe.Json

def readJsonLines(input: Stream[IO, Byte]): Stream[IO, Json] =
  input
    // rule #1: input must be UTF-8 encoded
    .through(fs2.text.utf8.decode)
    // rule #3: new line delimiter is '\n'
    .through(fs2.text.lines)
    .flatMap { line =>
      // rule #2: values must be encoded on single lines
      Stream
        .emit(line)
        .covary[IO]
        .through(fs2.data.json.ast.parse)
        .handleErrorWith { t =>
          Stream.raiseError[IO](JsonException(s"'$line' is not a valid JSON value", inner = t))
        }
    }

Using this function, we can read a JSON Lines data file and wrap the elements in an array.

val array =
  Files[IO]
    .readAll(Path("site/cookbooks/data/jsonl/nested.jsonl"))
    .through(readJsonLines)
    .through(fs2.data.json.ast.tokenize)
    .through(fs2.data.json.wrap.asTopLevelArray)
    .through(fs2.data.json.render.prettyPrint(width = 35))
// array: Stream[[x]IO[x], String] = Stream(..)

array
  .compile
  .string
  .unsafeRunSync()
// res0: String = """[
//   {
//     "name": "Gilbert",
//     "wins": [
//       ["straight", "7\u2663"],
//       ["one pair", "10\u2665"]
//     ]
//   },
//   {
//     "name": "Alexa",
//     "wins": [
//       ["two pair", "4\u2660"],
//       ["two pair", "9\u2660"]
//     ]
//   },
//   {"name": "May", "wins": []},
//   {
//     "name": "Deloise",
//     "wins": [
//       [
//         "three of a kind",
//         "5\u2663"
//       ]
//     ]
//   }
// ]"""

This reading function will fail if the input data is not JSON Lines encoded.

array
  .through(fs2.text.utf8.encode)
  .through(readJsonLines)
  .compile
  .drain
  .attempt
  .unsafeRunSync()
// res1: Either[Throwable, Unit] = Left(
//   value = JsonException(
//     msg = "'[' is not a valid JSON value",
//     context = None,
//     inner = JsonException(
//       msg = "unexpected end of input",
//       context = None,
//       inner = null
//     )
//   )
// )

Producing JSON Lines data

Similarly, using fs2 and fs2-data operators, we can generate a stream that will emit each record on a single line.

def writeJsonLines(input: Stream[IO, Json]): Stream[IO, Byte] =
  input
    .map { data =>
      // rule #2: values must be encoded on single lines
      Stream.emit(data)
            .through(fs2.data.json.ast.tokenize)
            .through(fs2.data.json.render.compact)
            .compile
            .string
    }
    // rule #3: new line delimiter is '\n'
    .intersperse("\n")
    // rule #1: input must be UTF-8 encoded
    .through(fs2.text.utf8.encode)

Using this function, we can generate JSON Lines encoded data out of a sample JSON array

import fs2.data.json.jsonpath.literals._

Files[IO]
  .readAll(Path("site/cookbooks/data/json/sample.json"))
  .through(fs2.text.utf8.decode)
  .through(fs2.data.json.tokens)
  .through(fs2.data.json.jsonpath.filter.values(jsonpath"$$[0:4]"))
  .through(writeJsonLines)
  .through(fs2.text.utf8.decode)
  .compile
  .string
  .unsafeRunSync()
// res2: String = """{"name":"Adeel Solangi","language":"Sindhi","id":"V59OF92YF627HFY0","bio":"Donec lobortis eleifend condimentum. Cras dictum dolor lacinia lectus vehicula rutrum. Maecenas quis nisi nunc. Nam tristique feugiat est vitae mollis. Maecenas quis nisi nunc.","version":6.1}
// {"name":"Afzal Ghaffar","language":"Sindhi","id":"ENTOCR13RSCLZ6KU","bio":"Aliquam sollicitudin ante ligula, eget malesuada nibh efficitur et. Pellentesque massa sem, scelerisque sit amet odio id, cursus tempor urna. Etiam congue dignissim volutpat. Vestibulum pharetra libero et velit gravida euismod.","version":1.88}
// {"name":"Aamir Solangi","language":"Sindhi","id":"IAKPO3R4761JDRVG","bio":"Vestibulum pharetra libero et velit gravida euismod. Quisque mauris ligula, efficitur porttitor sodales ac, lacinia non ex. Fusce eu ultrices elit, vel posuere neque.","version":7.27}
// {"name":"Abla Dilmurat","language":"Uyghur","id":"5ZVOEPMJUI4MB4EN","bio":"Donec lobortis eleifend condimentum. Morbi ac tellus erat.","version":2.53}
// {"name":"Adil Eli","language":"Uyghur","id":"6VTI8X6LL0MMPJCC","bio":"Vivamus id faucibus velit, id posuere leo. Morbi vitae nisi lacinia, laoreet lorem nec, egestas orci. Suspendisse potenti.","version":6.49}"""

Running the full example

The full code can be found in the repository as a Scala CLI script. This example uses decline to parse the CLI options.

$ scala-cli site/cookbooks/scripts/jsonlines.scala -- read site/cookbooks/data/jsonl/nested.jsonl
$ scala-cli --js site/cookbooks/scripts/jsonlines.scala -- read site/cookbooks/data/jsonl/nested.jsonl
$ scala-cli --native site/cookbooks/scripts/jsonlines.scala -- read site/cookbooks/data/jsonl/nested.jsonl