Skip to main content

Read and parse CSV containing Key-value pairs using Akka Streams

Let's say we want to read and parse a CSV file containing Key value pairs.
We will be using Alpakka's CSVParser for this.

A snippet of a file (src/main/resources/CountryNicCurrencyKeyValueMap.csv) that shows mapping from Country NIC code to currency code with pipe (|) as field delimiter:
AD|EUR
AE|AED
AF|AFN
AG|XCD
AI|XCD
AL|ALL
AM|AMD
AN|ANG
AO|AOA
AQ|AQD
AR|ARS
AS|EUR
AT|EUR
AU|AUD
AW|ANG
AX|EUR
AZ|AZN
BA|BAM
BB|BBD
BD|BDT
BE|EUR
BF|XOF
BG|BGN
BH|BHD
BI|BIF
BJ|XOF
BL|EUR
BM|BMD
BN|BND
BO|BOB
BR|BRL
BS|BSD
BT|INR

Following is the code:
import java.io.File
import java.nio.charset.StandardCharsets

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.alpakka.csv.scaladsl.CsvParsing
import akka.stream.scaladsl.{FileIO, Flow, Sink}
import akka.util.ByteString

import scala.collection.immutable
import scala.concurrent.{ExecutionContext, _}
import scala.concurrent.duration._

implicit val system: ActorSystem = ActorSystem("TestApplication")
implicit val materializer: ActorMaterializer =  ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher

val path = getClass.getResource("CountryNicCurrencyKeyValueMap.csv").getPath
val file = new File(path)

val f: Future[Map[String, String]] = FileIO.fromPath(file.toPath)
  .via(CsvParsing.lineScanner('|','"','\\', 256))
  .via(
    Flow[immutable.Seq[ByteString]]
      .map { x =>
        (x.head.decodeString(StandardCharsets.UTF_8) -> x.tail.head.decodeString(StandardCharsets.UTF_8))
      }
  )
  .runWith(
    Sink.fold[Map[String, String], (String, String)](Map.empty[String, String])(_ + _)
  )

val purchase = f map {
  println(_)
}

Await.result(purchase , 10 seconds)

If your CSV file is embedded inside an uber JAR file, then JDK won't be able to treat it as a FileSystem. In that case following will handle both the cases:
import java.nio.charset.StandardCharsets

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.alpakka.csv.scaladsl.CsvParsing
import akka.stream.scaladsl.{Flow, Sink, StreamConverters}
import akka.util.ByteString

import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, _}

implicit val system: ActorSystem = ActorSystem("BulletinApplication")
implicit val materializer: ActorMaterializer =  ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher

val f: Future[Map[String, String]] = StreamConverters.fromInputStream(() => getClass.getResourceAsStream("CountryNicCurrencyKeyValueMap.csv"))
  .via(CsvParsing.lineScanner('|','"','\\', 256))
  .via(
    Flow[immutable.Seq[ByteString]]
      .map { x =>
        (x.head.decodeString(StandardCharsets.UTF_8) -> x.tail.head.decodeString(StandardCharsets.UTF_8))
      }
  )
  .runWith(
    Sink.fold[Map[String, String], (String, String)](Map.empty[String, String])(_ + _)
  )

val purchase = f map {
  println(_)
}

Await.result(purchase , 10 seconds)

Comments

Popular posts from this blog

MPlayer subtitle font problem in Windows

While playing a video with subtitles in mplayer, I was getting the following problem: New_Face failed. Maybe the font path is wrong. Please supply the text font file (~/.mplayer/subfont.ttf). Solution is as follows: Right click on "My Computer". Select "Properties". Go to "Advanced" tab. Click on "Environment Variables". Delete "HOME" variable from User / System variables.

wget and curl behind corporate proxy throws certificate is not trusted or certificate doesn't have a known issuer

If you try to run wget or curl in Ununtu/Debian behind corporate proxy, you might receive errors like: ERROR: The certificate of 'apertium.projectjj.com' is not trusted. ERROR: The certificate of 'apertium.projectjj.com' doesn't have a known issuer. wget https://apertium.projectjj.com/apt/apertium-packaging.public.gpg ERROR: cannot verify apertium.projectjj.com's certificate, issued by 'emailAddress=proxyteam@corporate.proxy.com,CN=diassl.corporate.proxy.com,OU=Division UK,O=Group name,L=Company,ST=GB,C=UK': Unable to locally verify the issuer's authority. To connect to apertium.projectjj.com insecurely, use `--no-check-certificate'. To solution is to install your company's CA certificate in Ubuntu. In Windows, open the first part of URL in your web browser. e.g. open https://apertium.projectjj.com in web browser. If you inspect the certifcate, you will see the same CN (diassl.corporate.proxy.com), as reported by the error above

Kafka performance tuning

Performance Tuning of Kafka is critical when your cluster grow in size. Below are few points to consider to improve Kafka performance: Consumer group ID : Never use same exact consumer group ID for dozens of machines consuming from different topics. All of those commits will end up on the same exact partition of __consumer_offsets , hence the same broker, and this might in turn cause performance problems. Choose the consumer group ID to group_id+topic_name . Skewed : A broker is skewed if its number of partitions is greater that the average of partitions per broker on the given topic. Example: 2 brokers share 4 partitions, if one of them has 3 partitions, it is skewed (3 > 2). Try to make sure that none of the brokers is skewed. Spread : Brokers spread is the percentage of brokers in the cluster that has partitions for the given topic. Example: 3 brokers share a topic that has 2 partitions, so 66% of the brokers have partitions for this topic. Try to achieve 100% broker spread