XPath
The fs2-data-xml
module provides a streaming implementation of XPath.
Let's use the following XML input as an example.
import cats.syntax.all._
import fs2._
import fs2.data.xml._
val stream = xml"""
<root>
<a attr="value">
<b>text</b>
<a>
<c><![CDATA[some other text]]></c>
</a>
</a>
</root>
"""
// stream: Stream[Fallible, XmlEvent] = Stream(..)
Building an XPath
A subset of XPath can be used to select a subset of a XML event stream. There are several ways to create selectors:
- build the selector using the constructors, which can be quite verbose and cumbersome;
- parse a string with the XPath parser;
- use the
xpath
interpolator.
Parsing a string using the XPath parser
For instance, to select and enumerate a
elements, you can create this selector. Only the events describing the a
elements will be emitted as a result.
import fs2.data.xml.xpath._
val selector = XPathParser.either("//a")
// selector: Either[Throwable, XPath] = Right(
// value = XPath(
// locations = NonEmptyList(
// head = List(
// Location(
// axis = Descendant,
// node = Node(prefix = None, local = Some(value = "a")),
// predicate = None
// )
// ),
// tail = List()
// )
// )
// )
The XPath parser wraps the result in anything that has an MonadError
with error type Throwable
to catch potential parsing errors. If you prefer not to have this wrapping, you can use the xpath
interpolator.
import fs2.data.xml.xpath.literals._
val path = xpath"//a"
// path: XPath = XPath(
// locations = NonEmptyList(
// head = List(
// Location(
// axis = Descendant,
// node = Node(prefix = None, local = Some(value = "a")),
// predicate = None
// )
// ),
// tail = List()
// )
// )
The advantage of the interpolator is that potential syntax errors are checked at compilation time.
The subset
The supported XPath features are:
-
/
the child axis/a
selects alla
children/*
selects all children elements
-
//
The descendant axis//a
selects alla
descendants//*
selects all descendant elements
-
[<attribute selection expression>]
attribute predicate selection@attr
attributeattr
exists@attr == "value"
attributeattr
equalsvalue
@attr != "value"
attributeattr
does not equalvalue
p1 && p2
element attributes match bothp1
andp2
p1 || p2
element attributes match eitherp1
orp2
!p
element attributes do not matchp
xp1|xp2
matches the XPath expressionxp1
orxp2
Operator precedence is the common one: !
has precedence over &&
which has precedence over ||
. This means that !p1 && p2 || p3
is the same as ((!p1) && p2) || p3
.
You can use parentheses to associate differently, for instance !(p1 && p2) || p3
.
Using XPath
Using the path defined above, we can filter the stream of events, to only emit selected tokens downstream. This can be used to drastically reduce the amount of emitted data, to only the parts that are of interest for you.
The filtering pipes are located in the fs2.data.xml.xpath.filter
namespace.
Since XPath includes a recursive descent operator, there can be nested matches for your path.
The filter.raw
emits a stream of all matches.
Each match is represented as a nested stream of XML events which must be consumed.
import cats.effect._
import cats.effect.unsafe.implicits.global
stream
.lift[IO]
.through(filter.raw(path))
.parEvalMapUnbounded(_.through(render.raw()).compile.foldMonoid)
.compile
.toList
.unsafeRunSync()
// res0: List[String] = List(
// """<a attr="value">
// <b>text</b>
// <a>
// <c><![CDATA[some other text]]></c>
// </a>
// </a>""",
// """<a>
// <c><![CDATA[some other text]]></c>
// </a>"""
// )
The matching streams are returned in the order their matching element is encountered in the input. This means that for nested matches, the first stream returned is the ancestor element.
The library offers filter.collect
to collect each match for any collector.
stream
.lift[IO]
.through(filter.collect(path, collector.raw()))
.compile
.toList
.unsafeRunSync()
// res1: List[String] = List(
// """<a attr="value">
// <b>text</b>
// <a>
// <c><![CDATA[some other text]]></c>
// </a>
// </a>""",
// """<a>
// <c><![CDATA[some other text]]></c>
// </a>"""
// )
If you want to have results emitted as early as possible instead of in order, you can set the deterministic
parameter to false
.
stream
.lift[IO]
.through(filter.collect(path, collector.raw(), deterministic = false))
.compile
.toList
.unsafeRunSync()
// res2: List[String] = List(
// """<a>
// <c><![CDATA[some other text]]></c>
// </a>""",
// """<a attr="value">
// <b>text</b>
// <a>
// <c><![CDATA[some other text]]></c>
// </a>
// </a>"""
// )