mode_edit

Module: Maven Central

The fs2-data-csv module contains tools to parse and transform CSV data. These tools live in the fs2.data.csv package.

This page covers the following topics:

Parsing CSV

The first one of the provided tools is the rows pipe, which transforms a stream of characters into a stream of parsed rows. The rows are represented by NonEmptyList[String], empty lines are skipped. Values can be quoted and escaped, according to the RFC.

import cats.effect._

import fs2._
import fs2.data.csv._

val input = """i,s,j
              |1,test,2
              |,other,-3
              |""".stripMargin
// input: String = """i,s,j
// 1,test,2
// ,other,-3
// """

val stream = Stream.emits(input).through(rows[IO]())
// stream: Stream[IO, cats.data.NonEmptyList[String]] = Stream(..)
stream.compile.toList.unsafeRunSync()
// res0: List[cats.data.NonEmptyList[String]] = List(
//   NonEmptyList("i", List("s", "j")),
//   NonEmptyList("1", List("test", "2")),
//   NonEmptyList("", List("other", "-3"))
// )

If your CSV data is not using the comma as separator, you can provide the character as parameter of the rows pipe. For instance, if your data uses semicolon as separator:

val input2 = """i;s;j
               |1;test;2""".stripMargin
// input2: String = """i;s;j
// 1;test;2"""

val stream2 = Stream.emits(input2).through(rows[IO](';'))
// stream2: Stream[IO, cats.data.NonEmptyList[String]] = Stream(..)
stream2.compile.toList.unsafeRunSync()
// res1: List[cats.data.NonEmptyList[String]] = List(
//   NonEmptyList("i", List("s", "j")),
//   NonEmptyList("1", List("test", "2"))
// )

Often, CSVs don’t conform to RFC4180 and quotation marks should be treated as literal quotation marks rather than denoting a quoted value. You are able to specify quote-handling behavior to the rows pipe as well. For instance:

val input3 =
  """name,age,description
    |John Doe,47,no quotes
    |Jane Doe,50,"entirely quoted"
    |Bob Smith,80,"starts with" a quote
    |Alice Grey,78,contains "a quote""".stripMargin
// input3: String = """name,age,description
// John Doe,47,no quotes
// Jane Doe,50,"entirely quoted"
// Bob Smith,80,"starts with" a quote
// Alice Grey,78,contains "a quote"""

// default quote-handling is QuoteHandling.RFCCompliant
val stream3 = Stream.emits(input3).through(rows[IO](',', QuoteHandling.Literal))
// stream3: Stream[IO, cats.data.NonEmptyList[String]] = Stream(..)
stream3.compile.toList.unsafeRunSync()
// res2: List[cats.data.NonEmptyList[String]] = List(
//   NonEmptyList("name", List("age", "description")),
//   NonEmptyList("John Doe", List("47", "no quotes")),
//   NonEmptyList("Jane Doe", List("50", "\"entirely quoted\"")),
//   NonEmptyList("Bob Smith", List("80", "\"starts with\" a quote")),
//   NonEmptyList("Alice Grey", List("78", "contains \"a quote"))
// )

CSV rows with or without headers

Rows can be converted to a Row or CsvRow[Header] for some Header type. These classes provides higher-level utilities to manipulate rows.

If your CSV file doesn’t have headers, you can use the noHeaders pipe, which creates Row.

val noh = stream.through(noHeaders[IO])
// noh: Stream[IO[x], Row] = Stream(..)
noh.map(_.values).compile.toList.unsafeRunSync()
// res3: List[cats.data.NonEmptyList[String]] = List(
//   NonEmptyList("i", List("s", "j")),
//   NonEmptyList("1", List("test", "2")),
//   NonEmptyList("", List("other", "-3"))
// )

If you want to consider the first row as a header row, you can use the headers pipe. For instance to have headers as String:

val withh = stream.through(headers[IO, String])
// withh: Stream[IO[x], CsvRow[String]] = Stream(..)
withh.map(_.toMap).compile.toList.unsafeRunSync()
// res4: List[Map[String, String]] = List(
//   Map("i" -> "1", "s" -> "test", "j" -> "2"),
//   Map("i" -> "", "s" -> "other", "j" -> "-3")
// )

To support your own type of Header you must provide an implicit ParseableHeader[Header]. For instance if you have a fix set of headers represented as enumeratum enum values, you can provide an instance of ParseableHeader as follows:

import enumeratum._
import cats.implicits._
import cats.data.NonEmptyList

sealed trait MyHeaders extends EnumEntry
object MyHeaders extends Enum[MyHeaders] {
  case object I extends MyHeaders
  case object S extends MyHeaders
  case object J extends MyHeaders
  def values = findValues
}

implicit object ParseableMyHeaders extends ParseableHeader[MyHeaders] {
  def apply(names: NonEmptyList[String]): HeaderResult[MyHeaders] =
    names.traverse { name =>
      MyHeaders.withNameInsensitiveOption(name) match {
        case Some(headers) => Right(headers)
        case None          => Left(new HeaderError(s"Unknown header $name"))
      }
    }
}

val withMyHeaders = stream.through(headers[IO, MyHeaders])
// withMyHeaders: Stream[IO[x], CsvRow[MyHeaders]] = Stream(..)
withMyHeaders.map(_.toMap).compile.toList.unsafeRunSync()
// res5: List[Map[MyHeaders, String]] = List(
//   Map(I -> "1", S -> "test", J -> "2"),
//   Map(I -> "", S -> "other", J -> "-3")
// )

If the parse method fails for a header, the entire stream fails.

Writing CSV

There are also pipes for encoding rows to CSV, with or without headers. Simple example without headers:

val testRows = Stream(Row(NonEmptyList.of("3", "", "test")))
// testRows: Stream[Nothing, Row] = Stream(..)
testRows
  .through(writeWithoutHeaders)
  .through(toRowStrings(/* separator: Char = ',', newline: String = "\n"*/))
  .compile
  .toList
// res6: cats.package.Id[List[String]] = List(
//   """3,,test
// """
// )

If you want to write headers, use writeWithHeaders or, in case you use CsvRow, encodeRowWithFirstHeaders. For writing non-String headers, you’ll need to provide an instance of WritableHeader, a type class analog to ParseableHeader.

Decoders and Encoders

The library also provides decoder and encoder type classes, which allow for decoding CSV rows into arbitrary data types through the decode and decodeRow pipes and encoding data to CSV via encode and encodeRow. There are several kinds:

CellDecoder & CellEncoder

The library provides decoders and encoders for most of the Scala common types:

If you wish to support custom field types, you need to implement your own CellDecoder. To that end, you can use the convenience methods like map or emap defined on the CellDecoder trait.

For instance, if you want to be able to parse an integer field into some enum, you can do:

import enumeratum.values._

sealed abstract class State(val value: Int) extends IntEnumEntry
object State extends IntEnum[State] {
  case object On extends State(1)
  case object Off extends State(0)

  def values = findValues

  implicit val decoder: CellDecoder[State] =
    CellDecoder
      .intDecoder
      .emap(withValueOpt(_)
        .liftTo[DecoderResult](new DecoderError("Unknown state")))
}

For CellEncoder, it is even easier to define your own as encoding can’t fail, so basically it’s just a function A => String. The easiest ways to roll your own are using Scala’s single abstract method sugar:

case class Wrapper(content: String)
implicit val wrapperCellEncoder: CellEncoder[Wrapper] = (w: Wrapper) => w.content
// wrapperCellEncoder: CellEncoder[Wrapper] = repl.MdocSession$App$$anonfun$22@3301f26d

or using contramap on an existing encoder:

implicit val wrapperCellEncoder2: CellEncoder[Wrapper] = CellEncoder[String].contramap(_.content)
// wrapperCellEncoder2: CellEncoder[Wrapper] = fs2.data.csv.CellEncoder$$anonfun$contramap$2@3dd75636

RowDecoder & RowEncoder

RowDecoders can be used to decode an entire CSV row based on field positions. For instance if you want to decode the CSV data into shapeless HList:

import shapeless._

implicit object HListDecoder extends RowDecoder[Option[Int] :: String :: Int :: HNil] {
  def apply(cells: NonEmptyList[String]): DecoderResult[Option[Int] :: String :: Int :: HNil] =
    if(cells.size < 3)
      Left(new DecoderError("row is too short"))
    else
      for {
        i <- if(cells.head.isEmpty) Right(None) else CellDecoder[Int].apply(cells.head).map(Some(_))
        s <- CellDecoder[String].apply(cells.tail.head)
        j <- CellDecoder[Int].apply(cells.tail.tail.head)
      } yield i :: s :: j :: HNil
}

// .tail drops the header line
val hlists = noh.tail.through(decode[IO, Option[Int] :: String :: Int :: HNil])
// hlists: Stream[IO[x], Option[Int] :: String :: Int :: HNil] = Stream(..)
hlists.compile.toList.unsafeRunSync()
// res7: List[Option[Int] :: String :: Int :: HNil] = List(
//   Some(1) :: "test" :: 2 :: HNil,
//   None :: "other" :: -3 :: HNil
// )

Again, encoding is easier as it can’t fail:

import shapeless._

implicit object HListEncoder extends RowEncoder[Option[Int] :: String :: Int :: HNil] {
  def apply(input: Option[Int] :: String :: Int :: HNil): NonEmptyList[String] =
    NonEmptyList.of(CellEncoder[Option[Int]].apply(input.head), input.tail.head, input.tail.tail.head.toString)
}

// .tail drops the header line
val row = Stream(Option(3) :: "test" :: 42 :: HNil)
// row: Stream[Nothing, Option[Int] :: String :: Int :: HNil] = Stream(..)
row.through(encode).compile.toList
// res8: cats.package.Id[List[Row]] = List(fs2.data.csv.Row@65370da9)

CsvRowDecoder & CsvRowEncoder

If your CSV data set has headers, you can use CsvRowDecoder. Using the headers, one can decode the CSV data to some case class:

// note the order of fields is not the same as in the CSV data here
case class MyRow(i: Option[Int], j: Int, s: String)

implicit object MyRowDecoder extends CsvRowDecoder[MyRow, String] {
  def apply(row: CsvRow[String]): DecoderResult[MyRow] =
  for {
    i <- row.as[Int]("i").map(Some(_)).leftFlatMap(_ => Right(None))
    j <- row.as[Int]("j")
    s <- row.as[String]("s")
  } yield MyRow(i, j, s)
}
val decoded = withh.through(decodeRow[IO, String, MyRow])
// decoded: Stream[IO[x], MyRow] = Stream(..)
decoded.compile.toList.unsafeRunSync()
// res9: List[MyRow] = List(
//   MyRow(Some(1), 2, "test"),
//   MyRow(None, -3, "other")
// )

Analogously, you can encode your data with a CsvRowEncoder. Make sure to not vary the headers based on the data itself as this can’t be reflected in the CSV format.

As you can see this can be quite tedious to implement. Lucky us, the fs2-data-csv-generic module comes to the rescue to avoid having to write the boilerplate. Please refer to the module documentation for more details.