XPath
The fs2-data-xml module provides a streaming implementation of XPath.
Let's use the following XML input as an example.
import cats.syntax.all._
import fs2._
import fs2.data.xml._
val stream = xml"""
<root>
<a attr="value">
<b>text</b>
<a>
<c><![CDATA[some other text]]></c>
</a>
</a>
</root>
"""
// stream: Stream[Fallible, XmlEvent] = Stream(..)
Building an XPath
A subset of XPath can be used to select a subset of a XML event stream. There are several ways to create selectors:
- build the selector using the constructors, which can be quite verbose and cumbersome;
- parse a string with the XPath parser;
- use the
xpathinterpolator.
Parsing a string using the XPath parser
For instance, to select and enumerate a elements, you can create this selector. Only the events describing the a elements will be emitted as a result.
import fs2.data.xml.xpath._
val selector = XPathParser.either("//a")
// selector: Either[Throwable, XPath] = Right(
// value = XPath(
// locations = NonEmptyList(
// head = List(
// Location(
// axis = Descendant,
// node = Node(prefix = None, local = Some(value = "a")),
// predicate = None
// )
// ),
// tail = List()
// )
// )
// )
The XPath parser wraps the result in anything that has an MonadError with error type Throwable to catch potential parsing errors. If you prefer not to have this wrapping, you can use the xpath interpolator.
import fs2.data.xml.xpath.literals._
val path = xpath"//a"
// path: XPath = XPath(
// locations = NonEmptyList(
// head = List(
// Location(
// axis = Descendant,
// node = Node(prefix = None, local = Some(value = "a")),
// predicate = None
// )
// ),
// tail = List()
// )
// )
The advantage of the interpolator is that potential syntax errors are checked at compilation time.
The subset
The supported XPath features are:
-
/the child axis/aselects allachildren/*selects all children elements
-
//The descendant axis//aselects alladescendants//*selects all descendant elements
-
[<attribute selection expression>]attribute predicate selection@attrattributeattrexists@attr == "value"attributeattrequalsvalue@attr != "value"attributeattrdoes not equalvaluep1 && p2element attributes match bothp1andp2p1 || p2element attributes match eitherp1orp2!pelement attributes do not matchp
xp1|xp2matches the XPath expressionxp1orxp2
Operator precedence is the common one: ! has precedence over && which has precedence over ||. This means that !p1 && p2 || p3 is the same as ((!p1) && p2) || p3.
You can use parentheses to associate differently, for instance !(p1 && p2) || p3.
Using XPath
Using the path defined above, we can filter the stream of events, to only emit selected tokens downstream. This can be used to drastically reduce the amount of emitted data, to only the parts that are of interest for you.
The filtering pipes are located in the fs2.data.xml.xpath.filter namespace.
The main operators in the namespace are:
filter.first(xpath)which is aPipereturning the events of the first match only.
filter.collect(xpath, collector)which uses the providedcollectorto aggregate the events of each match, and emits all the aggregated results.
filter.dom[Node](xpath)which builds the DOM for each match for any DOM typeNodewith aDocumentBuilderin scope.
filter.consumer(xpath, consumer)which sends all matches as a stream through the providedconsumer.
filter.through(xpath, pipe)which sends all matches as a stream through the providedpipe.
Since XPath includes a recursive descent operator, there can be nested matches for your xpath. The matches are returned in the order their opening matching element is encountered in the input by default. This means that for nested matches, the first stream returned is the ancestor element.
Using filter.collect, you can build a stream that collects each match for the provided collector and emits the aggregated result. For instance, to build the list of string representations of the matches, you can run the following code.
import cats.effect._
import cats.effect.unsafe.implicits.global
stream
.lift[IO]
.through(filter.collect(path, collector.raw()))
.compile
.toList
.unsafeRunSync()
// res0: List[String] = List(
// """<a attr="value">
// <b>text</b>
// <a>
// <c><![CDATA[some other text]]></c>
// </a>
// </a>""",
// """<a>
// <c><![CDATA[some other text]]></c>
// </a>"""
// )
If you want to have results emitted as early as possible instead of in order, you can set the deterministic parameter to false.
stream
.lift[IO]
.through(filter.collect(path, collector.raw(), deterministic = false))
.compile
.toList
.unsafeRunSync()
// res1: List[String] = List(
// """<a>
// <c><![CDATA[some other text]]></c>
// </a>""",
// """<a attr="value">
// <b>text</b>
// <a>
// <c><![CDATA[some other text]]></c>
// </a>
// </a>"""
// )
The filter.consumer operator allows for handling each match in a streaming fashion without emitting any value.
For instance, let's say you want to save each match in a file, incrementing a counter on each match. You can run the following code.
import fs2.io.file.{Files, Path}
def saveXml(counter: Ref[IO, Int], events: Stream[IO, XmlEvent]): Stream[IO, Nothing] =
Stream.eval(counter.getAndUpdate(_ + 1)).flatMap { index =>
events
.through(render.raw())
.through(Files[IO].writeUtf8(Path(s"match-$index.xml")))
}
val program =
for {
counter <- Ref[IO].of(0)
_ <- stream
.lift[IO]
.through(filter.consume(path, saveXml(counter, _)))
.compile
.drain
} yield ()
// program: IO[Unit] = FlatMap(
// ioe = Delay(
// thunk = cats.effect.IO$$$Lambda$11826/0x00000008031ce840@792bab24,
// event = cats.effect.tracing.TracingEvent$StackTrace
// ),
// f = <function1>,
// event = cats.effect.tracing.TracingEvent$StackTrace
// )
program.unsafeRunSync()
Files[IO].readUtf8(Path("match-0.xml")).compile.string.unsafeRunSync()
// res3: String = """<a attr="value">
// <b>text</b>
// <a>
// <c><![CDATA[some other text]]></c>
// </a>
// </a>"""
Files[IO].readUtf8(Path("match-1.xml")).compile.string.unsafeRunSync()
// res4: String = """<a>
// <c><![CDATA[some other text]]></c>
// </a>"""
The operator described below is unsafe and should be used carefully only if none of the above operators fits your purpose. When using it, please ensure that you:
- consume all inner
Streams
- consume them in parallel (e.g. with a variant of
parEvalMapand paralellism >1, or with a variant ofparJoin).
Failure to do so might result in memory leaks or hanging programs.
The filter.unsafeRaw operator emits a stream of all matches.
Each match is represented as a nested stream of XML events which must be consumed.
stream
.lift[IO]
.through(filter.unsafeRaw(path))
.parEvalMapUnbounded(_.through(render.raw()).compile.foldMonoid)
.compile
.toList
.unsafeRunSync()
// res5: List[String] = List(
// """<a attr="value">
// <b>text</b>
// <a>
// <c><![CDATA[some other text]]></c>
// </a>
// </a>""",
// """<a>
// <c><![CDATA[some other text]]></c>
// </a>"""
// )