Skip to main content

Read and parse CSV containing Key-value pairs using Akka Streams

Let's say we want to read and parse a CSV file containing Key value pairs.
We will be using Alpakka's CSVParser for this.

A snippet of a file (src/main/resources/CountryNicCurrencyKeyValueMap.csv) that shows mapping from Country NIC code to currency code with pipe (|) as field delimiter:
AD|EUR
AE|AED
AF|AFN
AG|XCD
AI|XCD
AL|ALL
AM|AMD
AN|ANG
AO|AOA
AQ|AQD
AR|ARS
AS|EUR
AT|EUR
AU|AUD
AW|ANG
AX|EUR
AZ|AZN
BA|BAM
BB|BBD
BD|BDT
BE|EUR
BF|XOF
BG|BGN
BH|BHD
BI|BIF
BJ|XOF
BL|EUR
BM|BMD
BN|BND
BO|BOB
BR|BRL
BS|BSD
BT|INR

Following is the code:
import java.io.File
import java.nio.charset.StandardCharsets

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.alpakka.csv.scaladsl.CsvParsing
import akka.stream.scaladsl.{FileIO, Flow, Sink}
import akka.util.ByteString

import scala.collection.immutable
import scala.concurrent.{ExecutionContext, _}
import scala.concurrent.duration._

implicit val system: ActorSystem = ActorSystem("TestApplication")
implicit val materializer: ActorMaterializer =  ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher

val path = getClass.getResource("CountryNicCurrencyKeyValueMap.csv").getPath
val file = new File(path)

val f: Future[Map[String, String]] = FileIO.fromPath(file.toPath)
  .via(CsvParsing.lineScanner('|','"','\\', 256))
  .via(
    Flow[immutable.Seq[ByteString]]
      .map { x =>
        (x.head.decodeString(StandardCharsets.UTF_8) -> x.tail.head.decodeString(StandardCharsets.UTF_8))
      }
  )
  .runWith(
    Sink.fold[Map[String, String], (String, String)](Map.empty[String, String])(_ + _)
  )

val purchase = f map {
  println(_)
}

Await.result(purchase , 10 seconds)

If your CSV file is embedded inside an uber JAR file, then JDK won't be able to treat it as a FileSystem. In that case following will handle both the cases:
import java.nio.charset.StandardCharsets

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.alpakka.csv.scaladsl.CsvParsing
import akka.stream.scaladsl.{Flow, Sink, StreamConverters}
import akka.util.ByteString

import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, _}

implicit val system: ActorSystem = ActorSystem("BulletinApplication")
implicit val materializer: ActorMaterializer =  ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher

val f: Future[Map[String, String]] = StreamConverters.fromInputStream(() => getClass.getResourceAsStream("CountryNicCurrencyKeyValueMap.csv"))
  .via(CsvParsing.lineScanner('|','"','\\', 256))
  .via(
    Flow[immutable.Seq[ByteString]]
      .map { x =>
        (x.head.decodeString(StandardCharsets.UTF_8) -> x.tail.head.decodeString(StandardCharsets.UTF_8))
      }
  )
  .runWith(
    Sink.fold[Map[String, String], (String, String)](Map.empty[String, String])(_ + _)
  )

val purchase = f map {
  println(_)
}

Await.result(purchase , 10 seconds)

Comments

Popular posts from this blog

MPlayer subtitle font problem in Windows

While playing a video with subtitles in mplayer, I was getting the following problem: New_Face failed. Maybe the font path is wrong. Please supply the text font file (~/.mplayer/subfont.ttf). Solution is as follows: Right click on "My Computer". Select "Properties". Go to "Advanced" tab. Click on "Environment Variables". Delete "HOME" variable from User / System variables.

Kafka performance tuning

Performance Tuning of Kafka is critical when your cluster grow in size. Below are few points to consider to improve Kafka performance: Consumer group ID : Never use same exact consumer group ID for dozens of machines consuming from different topics. All of those commits will end up on the same exact partition of __consumer_offsets , hence the same broker, and this might in turn cause performance problems. Choose the consumer group ID to group_id+topic_name . Skewed : A broker is skewed if its number of partitions is greater that the average of partitions per broker on the given topic. Example: 2 brokers share 4 partitions, if one of them has 3 partitions, it is skewed (3 > 2). Try to make sure that none of the brokers is skewed. Spread : Brokers spread is the percentage of brokers in the cluster that has partitions for the given topic. Example: 3 brokers share a topic that has 2 partitions, so 66% of the brokers have partitions for this topic. Try to achieve 100% broker spread

wget and curl behind corporate proxy throws certificate is not trusted or certificate doesn't have a known issuer

If you try to run wget or curl in Ununtu/Debian behind corporate proxy, you might receive errors like: ERROR: The certificate of 'apertium.projectjj.com' is not trusted. ERROR: The certificate of 'apertium.projectjj.com' doesn't have a known issuer. wget https://apertium.projectjj.com/apt/apertium-packaging.public.gpg ERROR: cannot verify apertium.projectjj.com's certificate, issued by 'emailAddress=proxyteam@corporate.proxy.com,CN=diassl.corporate.proxy.com,OU=Division UK,O=Group name,L=Company,ST=GB,C=UK': Unable to locally verify the issuer's authority. To connect to apertium.projectjj.com insecurely, use `--no-check-certificate'. To solution is to install your company's CA certificate in Ubuntu. In Windows, open the first part of URL in your web browser. e.g. open https://apertium.projectjj.com in web browser. If you inspect the certifcate, you will see the same CN (diassl.corporate.proxy.com), as reported by the error above