XPath

Module: Maven Central

The fs2-data-xml module provides a streaming implementation of XPath.

Let's use the following XML input as an example.

import cats.syntax.all._

import fs2._
import fs2.data.xml._

val stream = xml"""
<root>
  <a attr="value">
    <b>text</b>
    <a>
      <c><![CDATA[some other text]]></c>
    </a>
  </a>
</root>
"""
// stream: Stream[Fallible, XmlEvent] = Stream(..)

Building an XPath

A subset of XPath can be used to select a subset of a XML event stream. There are several ways to create selectors:

Parsing a string using the XPath parser

For instance, to select and enumerate a elements, you can create this selector. Only the events describing the a elements will be emitted as a result.

import fs2.data.xml.xpath._

val selector = XPathParser.either("//a")
// selector: Either[Throwable, XPath] = Right(
//   value = XPath(
//     locations = NonEmptyList(
//       head = List(
//         Location(
//           axis = Descendant,
//           node = Node(prefix = None, local = Some(value = "a")),
//           predicate = None
//         )
//       ),
//       tail = List()
//     )
//   )
// )

The XPath parser wraps the result in anything that has an MonadError with error type Throwable to catch potential parsing errors. If you prefer not to have this wrapping, you can use the xpath interpolator.

import fs2.data.xml.xpath.literals._

val path = xpath"//a"
// path: XPath = XPath(
//   locations = NonEmptyList(
//     head = List(
//       Location(
//         axis = Descendant,
//         node = Node(prefix = None, local = Some(value = "a")),
//         predicate = None
//       )
//     ),
//     tail = List()
//   )
// )

The advantage of the interpolator is that potential syntax errors are checked at compilation time.

The subset

The supported XPath features are:

Operator precedence is the common one: ! has precedence over && which has precedence over ||. This means that !p1 && p2 || p3 is the same as ((!p1) && p2) || p3. You can use parentheses to associate differently, for instance !(p1 && p2) || p3.

Using XPath

Using the path defined above, we can filter the stream of events, to only emit selected tokens downstream. This can be used to drastically reduce the amount of emitted data, to only the parts that are of interest for you. The filtering pipes are located in the fs2.data.xml.xpath.filter namespace.

The main operators in the namespace are:

Since XPath includes a recursive descent operator, there can be nested matches for your xpath. The matches are returned in the order their opening matching element is encountered in the input by default. This means that for nested matches, the first stream returned is the ancestor element.

Using filter.collect, you can build a stream that collects each match for the provided collector and emits the aggregated result. For instance, to build the list of string representations of the matches, you can run the following code.

import cats.effect._
import cats.effect.unsafe.implicits.global

stream
  .lift[IO]
  .through(filter.collect(path, collector.raw()))
  .compile
  .toList
  .unsafeRunSync()
// res0: List[String] = List(
//   """<a attr="value">
//     <b>text</b>
//     <a>
//       <c><![CDATA[some other text]]></c>
//     </a>
//   </a>""",
//   """<a>
//       <c><![CDATA[some other text]]></c>
//     </a>"""
// )

If you want to have results emitted as early as possible instead of in order, you can set the deterministic parameter to false.

stream
  .lift[IO]
  .through(filter.collect(path, collector.raw(), deterministic = false))
  .compile
  .toList
  .unsafeRunSync()
// res1: List[String] = List(
//   """<a>
//       <c><![CDATA[some other text]]></c>
//     </a>""",
//   """<a attr="value">
//     <b>text</b>
//     <a>
//       <c><![CDATA[some other text]]></c>
//     </a>
//   </a>"""
// )

The filter.through operator allows for handling each match in a streaming fashion. For instance, let's say you want to save each match in a file, incrementing a counter on each match. You can run the following code.

import fs2.io.file.{Files, Path}

def saveXml(counter: Ref[IO, Int], events: Stream[IO, XmlEvent]): Stream[IO, Nothing] =
  Stream.eval(counter.getAndUpdate(_ + 1)).flatMap { index =>
    events
      .through(render.raw())
      .through(Files[IO].writeUtf8(Path(s"match-$index.xml")))
  }

val program =
  for {
    counter <- Ref[IO].of(0)
    _ <- stream
      .lift[IO]
      .through(filter.through(path, saveXml(counter, _)))
      .compile
      .drain
  } yield ()
// program: IO[Unit] = FlatMap(
//   ioe = Delay(
//     thunk = cats.effect.IO$$$Lambda$12958/0x000000080335e840@691262e9,
//     event = cats.effect.tracing.TracingEvent$StackTrace
//   ),
//   f = <function1>,
//   event = cats.effect.tracing.TracingEvent$StackTrace
// )

program.unsafeRunSync()

Files[IO].readUtf8(Path("match-0.xml")).compile.string.unsafeRunSync()
// res3: String = """<a attr="value">
//     <b>text</b>
//     <a>
//       <c><![CDATA[some other text]]></c>
//     </a>
//   </a>"""
Files[IO].readUtf8(Path("match-1.xml")).compile.string.unsafeRunSync()
// res4: String = """<a>
//       <c><![CDATA[some other text]]></c>
//     </a>"""

The operator described below is unsafe and should be used carefully only if none of the above operators fits your purpose. When using it, please ensure that you:

  • consume all inner Streams
  • consume them in parallel (e.g. with a variant of parEvalMap and paralellism >1, or with a variant of parJoin).

Failure to do so might result in memory leaks or hanging programs.

The filter.unsafeRaw operator emits a stream of all matches. Each match is represented as a nested stream of XML events which must be consumed.

stream
  .lift[IO]
  .through(filter.unsafeRaw(path))
  .parEvalMapUnbounded(_.through(render.raw()).compile.foldMonoid)
  .compile
  .toList
  .unsafeRunSync()
// res5: List[String] = List(
//   """<a attr="value">
//     <b>text</b>
//     <a>
//       <c><![CDATA[some other text]]></c>
//     </a>
//   </a>""",
//   """<a>
//       <c><![CDATA[some other text]]></c>
//     </a>"""
// )