Handling JSON Lines data
The JSON Lines format describes a way to represent JSON records on single lines. This allows to process records one at a time, reading them as they come.
In this cookbook, we will demonstrate how such data can be read and produced using fs2-data
.
Reading JSON Lines data
The fs2-data
JSON module can natively read concatenated JSON values in an input stream This means that we can naively read data from the input stream and we will get the token stream out of it.
However, this way, we will not check that the input is actually respecting the JSON lines format. The format actually only has a few rules:
- Input must be UTF-8 encoded.
- Each line is a valid JSON value.
- Lines are separated by
\n
.
We can leverage the operators provided by fs2
and fs2-data
to enforce these constraints when reading data.
import cats.effect.unsafe.implicits.global
import cats.effect.IO
import fs2.Stream
import fs2.data.json.JsonException
import fs2.data.json.circe._
import fs2.io.file.{Files, Path}
import io.circe.Json
def readJsonLines(input: Stream[IO, Byte]): Stream[IO, Json] =
input
// rule #1: input must be UTF-8 encoded
.through(fs2.text.utf8.decode)
// rule #3: new line delimiter is '\n'
.through(fs2.text.lines)
.flatMap { line =>
// rule #2: values must be encoded on single lines
Stream
.emit(line)
.covary[IO]
.through(fs2.data.json.ast.parse)
.handleErrorWith { t =>
Stream.raiseError[IO](JsonException(s"'$line' is not a valid JSON value", inner = t))
}
}
Using this function, we can read a JSON Lines data file and wrap the elements in an array.
val array =
Files[IO]
.readAll(Path("site/cookbooks/data/jsonl/nested.jsonl"))
.through(readJsonLines)
.through(fs2.data.json.ast.tokenize)
.through(fs2.data.json.wrap.asTopLevelArray)
.through(fs2.data.json.render.prettyPrint(width = 35))
// array: Stream[[x]IO[x], String] = Stream(..)
array
.compile
.string
.unsafeRunSync()
// res0: String = """[
// {
// "name": "Gilbert",
// "wins": [
// ["straight", "7\u2663"],
// ["one pair", "10\u2665"]
// ]
// },
// {
// "name": "Alexa",
// "wins": [
// ["two pair", "4\u2660"],
// ["two pair", "9\u2660"]
// ]
// },
// {"name": "May", "wins": []},
// {
// "name": "Deloise",
// "wins": [
// [
// "three of a kind",
// "5\u2663"
// ]
// ]
// }
// ]"""
This reading function will fail if the input data is not JSON Lines encoded.
array
.through(fs2.text.utf8.encode)
.through(readJsonLines)
.compile
.drain
.attempt
.unsafeRunSync()
// res1: Either[Throwable, Unit] = Left(
// value = JsonException(
// msg = "'[' is not a valid JSON value",
// context = None,
// inner = JsonException(
// msg = "unexpected end of input",
// context = None,
// inner = null
// )
// )
// )
Producing JSON Lines data
Similarly, using fs2
and fs2-data
operators, we can generate a stream that will emit each record on a single line.
def writeJsonLines(input: Stream[IO, Json]): Stream[IO, Byte] =
input
.map { data =>
// rule #2: values must be encoded on single lines
Stream.emit(data)
.through(fs2.data.json.ast.tokenize)
.through(fs2.data.json.render.compact)
.compile
.string
}
// rule #3: new line delimiter is '\n'
.intersperse("\n")
// rule #1: input must be UTF-8 encoded
.through(fs2.text.utf8.encode)
Using this function, we can generate JSON Lines encoded data out of a sample JSON array
import fs2.data.json.jsonpath.literals._
Files[IO]
.readAll(Path("site/cookbooks/data/json/sample.json"))
.through(fs2.text.utf8.decode)
.through(fs2.data.json.tokens)
.through(fs2.data.json.jsonpath.filter.values(jsonpath"$$[0:4]"))
.through(writeJsonLines)
.through(fs2.text.utf8.decode)
.compile
.string
.unsafeRunSync()
// res2: String = """{"name":"Adeel Solangi","language":"Sindhi","id":"V59OF92YF627HFY0","bio":"Donec lobortis eleifend condimentum. Cras dictum dolor lacinia lectus vehicula rutrum. Maecenas quis nisi nunc. Nam tristique feugiat est vitae mollis. Maecenas quis nisi nunc.","version":6.1}
// {"name":"Afzal Ghaffar","language":"Sindhi","id":"ENTOCR13RSCLZ6KU","bio":"Aliquam sollicitudin ante ligula, eget malesuada nibh efficitur et. Pellentesque massa sem, scelerisque sit amet odio id, cursus tempor urna. Etiam congue dignissim volutpat. Vestibulum pharetra libero et velit gravida euismod.","version":1.88}
// {"name":"Aamir Solangi","language":"Sindhi","id":"IAKPO3R4761JDRVG","bio":"Vestibulum pharetra libero et velit gravida euismod. Quisque mauris ligula, efficitur porttitor sodales ac, lacinia non ex. Fusce eu ultrices elit, vel posuere neque.","version":7.27}
// {"name":"Abla Dilmurat","language":"Uyghur","id":"5ZVOEPMJUI4MB4EN","bio":"Donec lobortis eleifend condimentum. Morbi ac tellus erat.","version":2.53}
// {"name":"Adil Eli","language":"Uyghur","id":"6VTI8X6LL0MMPJCC","bio":"Vivamus id faucibus velit, id posuere leo. Morbi vitae nisi lacinia, laoreet lorem nec, egestas orci. Suspendisse potenti.","version":6.49}"""
Running the full example
The full code can be found in the repository as a Scala CLI script. This example uses decline to parse the CLI options.
$ scala-cli site/cookbooks/scripts/jsonlines.scala -- read site/cookbooks/data/jsonl/nested.jsonl
$ scala-cli --js site/cookbooks/scripts/jsonlines.scala -- read site/cookbooks/data/jsonl/nested.jsonl
$ scala-cli --native site/cookbooks/scripts/jsonlines.scala -- read site/cookbooks/data/jsonl/nested.jsonl