Introduction

Module: Maven Central

The fs2-data-csv module contains tools to parse and transform CSV data. These tools live in the fs2.data.csv package.

High-level and low-level APIs

fs2-data-csv contains two APIs which share some common concepts and are built on top of each other:

Both APIs share data types and type classes that can be found in fs2.data.csv._.

Quick-Start with the high-level API

For the common use case of parsing a CSV into a case class or serializing your data into CSV, you can use this snippet:

import cats.effect._

import fs2._
import fs2.data.csv._
import fs2.data.csv.generic.semiauto._

val input = """i,s,j
              |1,test,2
              |,other,-3
              |""".stripMargin
// input: String = """i,s,j
// 1,test,2
// ,other,-3
// """

// Usually this would come from a file, for example using Files[IO].readAll
val textStream = Stream.emit(input).covary[Fallible]
// textStream: Stream[Fallible, String] = Stream(..)
implicit val myRowDecoder: CsvRowDecoder[MyRow, String] = deriveCsvRowDecoder
// myRowDecoder: CsvRowDecoder[MyRow, String] = fs2.data.csv.generic.internal.DerivedCsvRowDecoder$$anon$1@52703253
implicit val myRowEncoder: CsvRowEncoder[MyRow, String] = deriveCsvRowEncoder
// myRowEncoder: CsvRowEncoder[MyRow, String] = fs2.data.csv.generic.internal.DerivedCsvRowEncoder$$anonfun$productWriter$2@cc7b029

// decodeUsingHeaders can take a `Char` indicating the separator to use
// for example `decodeUsingHeaders[MyRow](';') for a semi-colon separated csv
val decodedStream = textStream.through(decodeUsingHeaders[MyRow]()) 
// decodedStream: Stream[[x]Fallible[x], MyRow] = Stream(..)
val caseClasses = decodedStream.compile.toList
// caseClasses: Either[Throwable, List[MyRow]] = Right(
//   value = List(
//     MyRow(i = Some(value = 1), j = 2, s = "test"),
//     MyRow(i = None, j = -3, s = "other")
//   )
// )

val backAsText = decodedStream.through(encodeUsingFirstHeaders(fullRows = true)).compile.string
// backAsText: Either[Throwable, String] = Right(
//   value = """i,j,s
// 1,2,test
// ,-3,other
// """
// )

This will assume the file contains headers and takes those into account, re-emitting them again on encoding. How the data is de-/encoded is determined by the type class instances of CsvRowDecoder and CsvRowEncoder which are derived semi-automatically (for details, see the fs2-data-csv-generic module). The details of these type classes are described later in this document.

More high-level pipes are available for the following use cases:

Dealing with erroneous files

The default behaviour when parsing CSV files, is to terminate the stream whenever the columns of a row do not match the columns of the header. If you're dealing with CSV files that could contain these kind of errors, you can make use of the lenient package. You will get back a Stream of results, where each parsed row is represented by an Either[Throwable, A].

The following high-level pipes are available for decoding erroneous CSV files:

case class TestData(name: String, age: Int, description: String)
object TestData {
  implicit val decoder: CsvRowDecoder[TestData, String] =
    row =>
      for {
        name <- row.as[String]("name")
        age <- row.as[Int]("age")
        desc <- row.as[String]("description")
      } yield TestData(name, age, desc)
}

// Note that not all columns are present for all CSV rows
val content =
  """name,age,description
    |John Doe,47,description 1
    |Jane Doe,50
    |Bob Smith,80,description 2
    |Alice Grey,78
    |""".stripMargin
// content: String = """name,age,description
// John Doe,47,description 1
// Jane Doe,50
// Bob Smith,80,description 2
// Alice Grey,78
// """

Stream
      .emit(content)
      .covary[Fallible]
      .through(lenient.attemptDecodeUsingHeaders[TestData]())
      .compile
      .toList
// res0: Either[Throwable, List[Either[CsvException, TestData]]] = Right(
//   value = List(
//     Right(
//       value = TestData(
//         name = "John Doe",
//         age = 47,
//         description = "description 1"
//       )
//     ),
//     Left(
//       value = fs2.data.csv.HeaderSizeError: Headers have size 3 but row has size 2. Both numbers must match! in line 3
//     ),
//     Right(
//       value = TestData(
//         name = "Bob Smith",
//         age = 80,
//         description = "description 2"
//       )
//     ),
//     Left(
//       value = fs2.data.csv.HeaderSizeError: Headers have size 3 but row has size 2. Both numbers must match! in line 5
//     )
//   )
// )

Low-level API

This section takes a closer look on the low-level concepts the high-level API is built from.

Parsing CSV into rows

The first one of the provided tools is the rows pipe, which transforms a stream of characters into a stream of parsed rows. The rows are represented by NonEmptyList[String], empty lines are skipped. Values can be quoted and escaped, according to the RFC.

val stream = textStream.through(lowlevel.rows[Fallible, String]())
// stream: Stream[[x]Fallible[x], Row] = Stream(..)
stream.compile.toList
// res1: Either[Throwable, List[Row]] = Right(
//   value = List(
//     RowF(
//       values = NonEmptyList(head = "i", tail = List("s", "j")),
//       headers = None,
//       line = Some(value = 1L)
//     ),
//     RowF(
//       values = NonEmptyList(head = "1", tail = List("test", "2")),
//       headers = None,
//       line = Some(value = 2L)
//     ),
//     RowF(
//       values = NonEmptyList(head = "", tail = List("other", "-3")),
//       headers = None,
//       line = Some(value = 3L)
//     )
//   )
// )

If your CSV data is not using the comma as separator, you can provide the character as parameter of the rows pipe. For instance, if your data uses semicolon as separator:

val input2 = """i;s;j
               |1;test;2""".stripMargin
// input2: String = """i;s;j
// 1;test;2"""

val stream2 = Stream.emit(input2).through(lowlevel.rows[Fallible, String](';'))
// stream2: Stream[[A]Fallible[A], Row] = Stream(..)
stream2.compile.toList
// res2: Either[Throwable, List[Row]] = Right(
//   value = List(
//     RowF(
//       values = NonEmptyList(head = "i", tail = List("s", "j")),
//       headers = None,
//       line = Some(value = 1L)
//     ),
//     RowF(
//       values = NonEmptyList(head = "1", tail = List("test", "2")),
//       headers = None,
//       line = Some(value = 2L)
//     )
//   )
// )

Often, CSVs don't conform to RFC4180 and quotation marks should be treated as literal quotation marks rather than denoting a quoted value. You are able to specify quote-handling behavior to the rows pipe as well. For instance:

val input3 =
  """name,age,description
    |John Doe,47,no quotes
    |Jane Doe,50,"entirely quoted"
    |Bob Smith,80,"starts with" a quote
    |Alice Grey,78,contains "a quote""".stripMargin
// input3: String = """name,age,description
// John Doe,47,no quotes
// Jane Doe,50,"entirely quoted"
// Bob Smith,80,"starts with" a quote
// Alice Grey,78,contains "a quote"""

// default quote-handling is QuoteHandling.RFCCompliant
val stream3 = Stream.emit(input3).through(lowlevel.rows[Fallible, String](',', QuoteHandling.Literal))
// stream3: Stream[[A]Fallible[A], Row] = Stream(..)
stream3.compile.toList
// res3: Either[Throwable, List[Row]] = Right(
//   value = List(
//     RowF(
//       values = NonEmptyList(head = "name", tail = List("age", "description")),
//       headers = None,
//       line = Some(value = 1L)
//     ),
//     RowF(
//       values = NonEmptyList(head = "John Doe", tail = List("47", "no quotes")),
//       headers = None,
//       line = Some(value = 2L)
//     ),
//     RowF(
//       values = NonEmptyList(
//         head = "Jane Doe",
//         tail = List("50", "\"entirely quoted\"")
//       ),
//       headers = None,
//       line = Some(value = 3L)
//     ),
//     RowF(
//       values = NonEmptyList(
//         head = "Bob Smith",
//         tail = List("80", "\"starts with\" a quote")
//       ),
//       headers = None,
//       line = Some(value = 4L)
//     ),
//     RowF(
//       values = NonEmptyList(
//         head = "Alice Grey",
//         tail = List("78", "contains \"a quote")
//       ),
//       headers = None,
//       line = Some(value = 5L)
//     )
//   )
// )

CSV rows with or without headers

Rows can be converted to a Row or CsvRow[Header] for some Header type. These classes provides higher-level utilities to manipulate rows.

If your CSV file doesn't have headers, you can use the noHeaders pipe, which creates Row.

val noh = stream.through(lowlevel.noHeaders)
// noh: Stream[[x]Fallible[x], Row] = Stream(..)
noh.map(_.values).compile.toList
// res4: Either[Throwable, List[cats.data.NonEmptyList[String]]] = Right(
//   value = List(
//     NonEmptyList(head = "i", tail = List("s", "j")),
//     NonEmptyList(head = "1", tail = List("test", "2")),
//     NonEmptyList(head = "", tail = List("other", "-3"))
//   )
// )

If you want to consider the first row as a header row, you can use the headers pipe. For instance to have headers as String:

val withh = stream.through(lowlevel.headers[Fallible, String])
// withh: Stream[[x]Fallible[x], CsvRow[String]] = Stream(..)
withh.map(_.toMap).compile.toList
// res5: Either[Throwable, List[Map[String, String]]] = Right(
//   value = List(
//     Map("i" -> "1", "s" -> "test", "j" -> "2"),
//     Map("i" -> "", "s" -> "other", "j" -> "-3")
//   )
// )

To support your own type of Header you must provide an implicit ParseableHeader[Header]. For instance if you have a fix set of headers represented as enumeratum enum values, you can provide an instance of ParseableHeader as follows:

import enumeratum._
import cats.syntax.all._
import cats.data.NonEmptyList

sealed trait MyHeaders extends EnumEntry
object MyHeaders extends Enum[MyHeaders] {
  case object I extends MyHeaders
  case object S extends MyHeaders
  case object J extends MyHeaders
  def values = findValues
}

implicit val parseableMyHeaders: ParseableHeader[MyHeaders] = ParseableHeader.instance[MyHeaders] { name =>
   MyHeaders.withNameInsensitiveOption(name).toRight(new HeaderError(s"Unknown header $name"))
}
// parseableMyHeaders: ParseableHeader[MyHeaders] = fs2.data.csv.ParseableHeader$$$Lambda$13365/0x00000008027dd040@42ea8540

val withMyHeaders = stream.through(lowlevel.headers[Fallible, MyHeaders])
// withMyHeaders: Stream[[x]Fallible[x], CsvRow[MyHeaders]] = Stream(..)
withMyHeaders.map(_.toMap).compile.toList
// res6: Either[Throwable, List[Map[MyHeaders, String]]] = Right(
//   value = List(
//     Map(I -> "1", S -> "test", J -> "2"),
//     Map(I -> "", S -> "other", J -> "-3")
//   )
// )

If the parse method fails for a header, the entire stream fails.

Dealing with optional data

Optional values have no unified representation in CSV:

The Row and Row[Header] type provide the asOptionalAt and asOptional method respectively to implement the behavior for optional values that fits your application logic. By default, these methods will fail if the field is missing (header does not exist or index is out of bound) and return None if the field is the empty string.

This can be tuned, for instance, if we want to treat the missing field as None and the null value as None as well, then you can do:

val testRow = CsvRow.fromNelHeaders(NonEmptyList.of("first" -> "value1", "second" -> "null"))
// testRow: CsvRow[String] = RowF(
//   values = NonEmptyList(head = "value1", tail = List("null")),
//   headers = Some(value = NonEmptyList(head = "first", tail = List("second"))),
//   line = None
// )

def get(field: String) =
  testRow.asOptional[String](field, missing = _ => Right(None), isEmpty = _ == "null")

get("first")
// res7: DecoderResult[Option[String]] = Right(value = Some(value = "value1"))
get("second")
// res8: DecoderResult[Option[String]] = Right(value = None)
get("third")
// res9: DecoderResult[Option[String]] = Right(value = None)

Writing CSV

There are also pipes for encoding rows to CSV, with or without headers. Simple example without headers:

val testRows = Stream(Row(NonEmptyList.of("3", "", "test")))
// testRows: Stream[[x]Pure[x], Row] = Stream(..)
testRows
  .through(lowlevel.writeWithoutHeaders)
  .through(lowlevel.toRowStrings(/* separator: Char = ',', newline: String = "\n"*/))
  .compile
  .string
// res10: cats.package.Id[String] = """3,,test
// """

If you want to write headers, use writeWithGivenHeaders or, in case you use CsvRow, encodeRowWithFirstHeaders. For writing non-String headers, you'll need to provide an instance of WritableHeader, a type class analog to ParseableHeader.

The type classes: Decoders and Encoders

The library also provides decoder and encoder type classes, which allow for decoding CSV rows into arbitrary data types through the decode and decodeRow pipes and encoding data to CSV via encode and encodeRow. There are several kinds: - CellDecoder is a simple value decoder, used to transform values within the CSV fields. CellEncoder describes the reverse operation, encoding a simple value into a String. - RowDecoder and RowEncoder are used to decode/encode CSV Rows through the decode/encode pipes, and is positional. Use it when your CSV file doesn't have headers. - CsvRowDecoder is used to decode CsvRows through the decodeRow pipe, and has access to the headers. Use it when you want to use header names to decode the rows. Analog, CsvRowEncoder exists to encode values to CSV with headers.

CellDecoder & CellEncoder

The library provides decoders and encoders for most of the Scala common types:

If you wish to support custom field types, you need to implement your own CellDecoder. To that end, you can use the convenience methods like map or emap defined on the CellDecoder trait.

For instance, if you want to be able to parse an integer field into some enum, you can do:

import enumeratum.values._

sealed abstract class State(val value: Int) extends IntEnumEntry
object State extends IntEnum[State] {
  case object On extends State(1)
  case object Off extends State(0)

  def values = findValues

  implicit val decoder: CellDecoder[State] =
    CellDecoder
      .intDecoder
      .emap(withValueOpt(_)
        .liftTo[DecoderResult](new DecoderError("Unknown state")))
}

For CellEncoder, it is even easier to define your own as encoding can't fail, so basically it's just a function A => String. The easiest ways to roll your own are using Scala's single abstract method sugar:

case class Wrapper(content: String)
implicit val wrapperCellEncoder: CellEncoder[Wrapper] = (w: Wrapper) => w.content
// wrapperCellEncoder: CellEncoder[Wrapper] = repl.MdocSession$MdocApp$$anonfun$9@174ac177

or using contramap on an existing encoder:

implicit val wrapperCellEncoder2: CellEncoder[Wrapper] = CellEncoder[String].contramap(_.content)
// wrapperCellEncoder2: CellEncoder[Wrapper] = fs2.data.csv.CellEncoder$$anonfun$contramap$2@2592267c

RowDecoder & RowEncoder

RowDecoders can be used to decode an entire CSV row based on field positions. For instance if you want to decode the CSV data into shapeless HList:

import shapeless._

implicit object HListDecoder extends RowDecoder[Option[Int] :: String :: Int :: HNil] {
  def apply(row: Row): DecoderResult[Option[Int] :: String :: Int :: HNil] =
    if(row.values.size < 3)
      Left(new DecoderError("row is too short"))
    else
      for {
        i <- if(row.values.head.isEmpty) Right(None) else CellDecoder[Int].apply(row.values.head).map(Some(_))
        s <- CellDecoder[String].apply(row.values.tail.head)
        j <- CellDecoder[Int].apply(row.values.tail.tail.head)
      } yield i :: s :: j :: HNil
}

// .tail drops the header line
val hlists = noh.tail.through(lowlevel.decode[Fallible, Option[Int] :: String :: Int :: HNil])
// hlists: Stream[[x]Fallible[x], Option[Int] :: String :: Int :: HNil] = Stream(..)
hlists.compile.toList
// res11: Either[Throwable, List[Option[Int] :: String :: Int :: HNil]] = Right(
//   value = List(
//     Some(value = 1) :: "test" :: 2 :: HNil,
//     None :: "other" :: -3 :: HNil
//   )
// )

Again, encoding is easier as it can't fail:

import shapeless._

implicit object HListEncoder extends RowEncoder[Option[Int] :: String :: Int :: HNil] {
  def apply(input: Option[Int] :: String :: Int :: HNil): Row =
    Row(NonEmptyList.of(CellEncoder[Option[Int]].apply(input.head), input.tail.head, input.tail.tail.head.toString))
}

// .tail drops the header line
val row = Stream(Option(3) :: "test" :: 42 :: HNil)
// row: Stream[[x]Pure[x], Option[Int] :: String :: Int :: HNil] = Stream(..)
row.through(lowlevel.encode).compile.toList
// res12: cats.package.Id[List[Row]] = List(
//   RowF(
//     values = NonEmptyList(head = "3", tail = List("test", "42")),
//     headers = None,
//     line = None
//   )
// )

CsvRowDecoder & CsvRowEncoder

If your CSV data set has headers, you can use CsvRowDecoder. Using the headers, one can decode the CSV data to some case class:

// note the order of fields is not the same as in the CSV data here
case class MyRow(i: Option[Int], j: Int, s: String)

implicit object MyRowDecoder extends CsvRowDecoder[MyRow, String] {
  def apply(row: CsvRow[String]): DecoderResult[MyRow] =
  for {
    i <- row.as[Int]("i").map(Some(_)).leftFlatMap(_ => Right(None))
    j <- row.as[Int]("j")
    s <- row.as[String]("s")
  } yield MyRow(i, j, s)
}
val decoded = withh.through(lowlevel.decodeRow[Fallible, String, MyRow])
// decoded: Stream[[x]Fallible[x], MyRow] = Stream(..)
decoded.compile.toList
// res13: Either[Throwable, List[MyRow]] = Right(
//   value = List(
//     MyRow(i = Some(value = 1), j = 2, s = "test"),
//     MyRow(i = None, j = -3, s = "other")
//   )
// )

Analogously, you can encode your data with a CsvRowEncoder. Make sure to not vary the headers based on the data itself as this can't be reflected in the CSV format.

As you can see this can be quite tedious to implement. Lucky us, the fs2-data-csv-generic module comes to the rescue to avoid having to write the boilerplate. Please refer to the module documentation for more details.