Skip to main content

Read and parse CSV containing Key-value pairs using Akka Streams

Let's say we want to read and parse a CSV file containing Key value pairs.
We will be using Alpakka's CSVParser for this.

A snippet of a file (src/main/resources/CountryNicCurrencyKeyValueMap.csv) that shows mapping from Country NIC code to currency code with pipe (|) as field delimiter:
AD|EUR
AE|AED
AF|AFN
AG|XCD
AI|XCD
AL|ALL
AM|AMD
AN|ANG
AO|AOA
AQ|AQD
AR|ARS
AS|EUR
AT|EUR
AU|AUD
AW|ANG
AX|EUR
AZ|AZN
BA|BAM
BB|BBD
BD|BDT
BE|EUR
BF|XOF
BG|BGN
BH|BHD
BI|BIF
BJ|XOF
BL|EUR
BM|BMD
BN|BND
BO|BOB
BR|BRL
BS|BSD
BT|INR

Following is the code:
import java.io.File
import java.nio.charset.StandardCharsets

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.alpakka.csv.scaladsl.CsvParsing
import akka.stream.scaladsl.{FileIO, Flow, Sink}
import akka.util.ByteString

import scala.collection.immutable
import scala.concurrent.{ExecutionContext, _}
import scala.concurrent.duration._

implicit val system: ActorSystem = ActorSystem("TestApplication")
implicit val materializer: ActorMaterializer =  ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher

val path = getClass.getResource("CountryNicCurrencyKeyValueMap.csv").getPath
val file = new File(path)

val f: Future[Map[String, String]] = FileIO.fromPath(file.toPath)
  .via(CsvParsing.lineScanner('|','"','\\', 256))
  .via(
    Flow[immutable.Seq[ByteString]]
      .map { x =>
        (x.head.decodeString(StandardCharsets.UTF_8) -> x.tail.head.decodeString(StandardCharsets.UTF_8))
      }
  )
  .runWith(
    Sink.fold[Map[String, String], (String, String)](Map.empty[String, String])(_ + _)
  )

val purchase = f map {
  println(_)
}

Await.result(purchase , 10 seconds)

If your CSV file is embedded inside an uber JAR file, then JDK won't be able to treat it as a FileSystem. In that case following will handle both the cases:
import java.nio.charset.StandardCharsets

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.alpakka.csv.scaladsl.CsvParsing
import akka.stream.scaladsl.{Flow, Sink, StreamConverters}
import akka.util.ByteString

import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, _}

implicit val system: ActorSystem = ActorSystem("BulletinApplication")
implicit val materializer: ActorMaterializer =  ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher

val f: Future[Map[String, String]] = StreamConverters.fromInputStream(() => getClass.getResourceAsStream("CountryNicCurrencyKeyValueMap.csv"))
  .via(CsvParsing.lineScanner('|','"','\\', 256))
  .via(
    Flow[immutable.Seq[ByteString]]
      .map { x =>
        (x.head.decodeString(StandardCharsets.UTF_8) -> x.tail.head.decodeString(StandardCharsets.UTF_8))
      }
  )
  .runWith(
    Sink.fold[Map[String, String], (String, String)](Map.empty[String, String])(_ + _)
  )

val purchase = f map {
  println(_)
}

Await.result(purchase , 10 seconds)

Comments

Popular posts from this blog

wget and curl behind corporate proxy throws certificate is not trusted or certificate doesn't have a known issuer

If you try to run wget or curl in Ununtu/Debian behind corporate proxy, you might receive errors like: ERROR: The certificate of 'apertium.projectjj.com' is not trusted. ERROR: The certificate of 'apertium.projectjj.com' doesn't have a known issuer. wget https://apertium.projectjj.com/apt/apertium-packaging.public.gpg ERROR: cannot verify apertium.projectjj.com's certificate, issued by 'emailAddress=proxyteam@corporate.proxy.com,CN=diassl.corporate.proxy.com,OU=Division UK,O=Group name,L=Company,ST=GB,C=UK': Unable to locally verify the issuer's authority. To connect to apertium.projectjj.com insecurely, use `--no-check-certificate'. To solution is to install your company's CA certificate in Ubuntu. In Windows, open the first part of URL in your web browser. e.g. open https://apertium.projectjj.com in web browser. If you inspect the certifcate, you will see the same CN (diassl.corporate.proxy.com), as reported by the error above ...

Kafka performance tuning

Performance Tuning of Kafka is critical when your cluster grow in size. Below are few points to consider to improve Kafka performance: Consumer group ID : Never use same exact consumer group ID for dozens of machines consuming from different topics. All of those commits will end up on the same exact partition of __consumer_offsets , hence the same broker, and this might in turn cause performance problems. Choose the consumer group ID to group_id+topic_name . Skewed : A broker is skewed if its number of partitions is greater that the average of partitions per broker on the given topic. Example: 2 brokers share 4 partitions, if one of them has 3 partitions, it is skewed (3 > 2). Try to make sure that none of the brokers is skewed. Spread : Brokers spread is the percentage of brokers in the cluster that has partitions for the given topic. Example: 3 brokers share a topic that has 2 partitions, so 66% of the brokers have partitions for this topic. Try to achieve 100% broker spread...

Procedure for name and date of birth change (Pune)

For change of name, the form (scribd) is available free of cost at Government Book Depot (Shaskiya Granthagar), which is located near Collector’s office, next to Saint Helena's School. The postal address is: Government Photozinco Press Premises and Book Depot, 5, Photozinco Press Road, Pune, MH, 411001. Wikimapia link Charges for name or date of birth change, in the Maharashtra Government Gazette: INR 120.00 per insertion (for two copies of the Gazette) For backward class applicants: INR 60.00 Charges for extra copy of the Gazette: INR 15.00 per copy (two copies are enough, so you may not want to pay extra for extra copies). Backward class applicants are required to submit a xerox of caste certificate of old name as issued by the Collector of the District concerned. Once the form is duly submitted, it normally takes 10 to 15 days for publication of advertisement in the Maharashtra Government Gazette. The Gazette copy reaches to the address filled in the form within nex...