01 July 2013

Spark Tutorial and Cheatsheet

Main resources:
Scala Cheat Sheet
Reactive Cheat Sheet
Spark Cheat sheet
Spark Quick start
Spark programming guide
Spark Streaming: processing real-time data streams
Spark SQL and DataFrames: support for structured data and relational queries
MLlib: built-in machine learning library
GraphX: Spark’s new API for graph processing

Scala programming examples:

  1. Define a object with main function -- Helloworld.
    object HelloWorld {
      def main(args: Array[String]) {
        println("Hello, world!")
      }
    }
    
    Execute main function:
    scala> HelloWorld.main(null)
    Hello, world!
    
  2. Creating RDDs
    Parallelized Collections:
    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
    
    External Datasets:
    val distFile = sc.textFile("data.txt")
    
    Above command returns the content of the file:
    scala> distFile.collect()
    res16: Array[String] = Array(1,2,3, 4,5,6)
    
    SparkContext.wholeTextFiles can return (filename, content).
    val distFile = sc.wholeTextFiles("/tmp/tmpdir")
    scala> distFile.collect()
    res17: Array[(String, String)] =
    Array((maprfs:/tmp/tmpdir/data3.txt,"1,2,3,4,5,6"),
     (maprfs:/tmp/tmpdir/data.txt,"1,2,3,4,5,6"),
     (maprfs:/tmp/tmpdir/data2.txt,"1,2,3,4,5,6"))
    
  3. RDD Transformations
    All RDD Transformations create a new dataset from an existing one and it is a Lazy operation.
    1. map(f:T->U)
      Returns a new distributed dataset formed by passing each element of the source through a function func.
      Example 1: To calculate the length of each line.
      scala> lines.map(s => s.length).collect
      res46: Array[Int] = Array(48, 25, 34, 5, 6, 6, 5, 5, 6)  
      
    2. filter(f:T->Bool)
      Returns a new dataset formed by selecting those elements of the source on which func returns true.
      Example 1: Find the lines which starts with "APPLE":
      scala> lines.filter(_.startsWith("APPLE")).collect
      res50: Array[String] = Array(APPLE)
      
      Example 2: Find the lines which contains "test":
      scala> lines.filter(_.contains("test")).collect
      res54: Array[String] = Array("This is a test data text file for Spark to use. ", "To test Scala and Spark, ")
      
    3. flatMap(func)
      Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
      Example 1: Generate the Int List and compare "map" and "flatMap".
      scala> val intlist = List( 1,2,3,4,5 )
      intlist: List[Int] = List(1, 2, 3, 4, 5)
      
      scala> intlist.map(x=>List(x,x*2))
      res72: List[List[Int]] = List(List(1, 2), List(2, 4), List(3, 6), List(4, 8), List(5, 10))
      
      scala> intlist.flatMap(x=>(List(x,x*2)))
      res73: List[Int] = List(1, 2, 2, 4, 3, 6, 4, 8, 5, 10)
      
      Example 2: Use flatMap for map
      scala> val m = Map(1 -> 2, 2 -> 4, 3 -> 6)
      m: scala.collection.immutable.Map[Int,Int] = Map(1 -> 2, 2 -> 4, 3 -> 6)
      
      scala> def h(k:Int, v:Int) = if (v > 2) Some(k->v) else None
      h: (k: Int, v: Int)Option[(Int, Int)]
      
      scala> m.flatMap { case (k,v) => h(k,v) }
      res76: scala.collection.immutable.Map[Int,Int] = Map(2 -> 4, 3 -> 6)
      
    4. mapPartitions(func)
      Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<t> => Iterator<u> when running on an RDD of type T.
      To be simple:
      map converts each element of the source RDD into a single element of the result RDD by applying a function.
      mapPartitions converts each partition of the source RDD into into multiple elements of the result (possibly none).
      It can improve performance by reducing new object creation in the map function.
      Example 1: We have totally 9 lines here, instead of map each line, we can firstly split all 9 lines into 2 partitions, and then we only need to map twice.
      Firstly we need to create a map function which accepts Iterator as inputs and also as returning value.
      This function simply return the size of each partition.
      def myfunc(inputs: Iterator[String]) : Iterator[Int] = {
        var results = List[Int]()
        results .::= (inputs.size)
        results.iterator
      }
      
      scala> lines.count
      res12: Long = 9
      
      scala> lines.mapPartitions(myfunc).collect
      res9: Array[Int] = Array(2, 7)
      
    5. mapPartitionsWithIndex(func)
      Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<t>) => Iterator<u> when running on an RDD of type T.
      Example 1: Take above mapPartitions example, and here we have one more "index" as input value for map function.
      def myfunc2(index: Int, inputs: Iterator[String]) : Iterator[(Int, Int)] = {
        var results = List[(Int,Int)]()
        results .::= (index, inputs.size)
        results.iterator
      }
      
      scala> lines.mapPartitionsWithIndex(myfunc2).collect
      res14: Array[(Int, Int)] = Array((0,2), (1,7))
      
    6. sample(withReplacement, fraction, seed)
      Sample a fraction of the data, with or without replacement, using a given random number generator seed.
      Note: Comparing to takeSample, the 2nd parameter of sample() is how much percentage of the total number should be sampled. However the actual number sampled may not be exactly the same.
      Example 1: Fraction = 0.5 may not be exactly 50% of total numbers. It may change.
      scala> val list = sc.parallelize(1 to 9)
      list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at parallelize at :33
      
      scala> list.sample(true, 0.5) .collect
      res63: Array[Int] = Array(7, 9)
      
      scala> list.sample(true, 0.5) .collect
      res64: Array[Int] = Array(1, 1, 2, 4, 5, 6)
      
      Example 2: "withReplacement"=true means output may have duplicate elements, else, it will not.
      scala> list.sample(true, 1).collect
      res70: Array[Int] = Array(4, 4, 4, 4, 8, 8, 9, 9)
      
      scala> list.sample(false, 1).collect
      res71: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
      
      Example 3: If "seed" does not change, the result will not change.
      scala> list.sample(true, 0.5, 1).collect
      res73: Array[Int] = Array(5, 7, 8, 9, 9)
      
      scala> list.sample(true, 0.5, 1).collect
      res74: Array[Int] = Array(5, 7, 8, 9, 9)
      
    7. union(otherDataset)
      Return a new dataset that contains the union of the elements in the source dataset and the argument.
      Note: It is the same as operator "++".
      Example: Union 2 array of Strings. Unlike "Union" in SQL, here it will not de-duplicate.
      scala> val list  = sc.parallelize(List("apple", "orange", "banana", "apple", "orange"))
      scala> val list2 = sc.parallelize(List("mapr", "cloudera", "hortonworks"))
      
      scala> (list union list2).collect
      res4: Array[String] = Array(apple, orange, banana, apple, orange, mapr, cloudera, hortonworks)
      
      scala> (list ++ list2).collect
      res5: Array[String] = Array(apple, orange, banana, apple, orange, mapr, cloudera, hortonworks)
      
    8. intersection(otherDataset)
      Return a new RDD that contains the intersection of elements in the source dataset and the argument.
      Note: It will do de-duplicate here.
      Example 1: Intersection 2 array of Strings. However the result only contains one "apple" although each of the arrays has more than one.
      scala> val list  = sc.parallelize(List("apple", "orange", "banana", "apple", "orange"))
      scala> val list2 = sc.parallelize(List("apple", "mapr", "apple" ))
      
      list.intersection(list2).collect
      res7: Array[String] = Array(apple)
      
    9. distinct([numTasks]))
      Returns a new dataset that contains the distinct elements of the source dataset.
      Example 1: Return distinct values from one array.
      scala> val list  = sc.parallelize(List("apple", "orange", "banana", "apple", "orange"))
      scala> list.distinct.collect
      res13: Array[String] = Array(orange, apple, banana)
      
    10. groupByKey([numTasks])
      When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs.
      Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or combineByKey will yield much better performance.
      Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
      Example 1: Group by a list of (K,V) pairs.
      scala> val kv = sc.parallelize( List(("apple", 1), ("orange", 2), ("banana", 3), ("apple", 2)) )
      
      scala> kv.groupByKey.collect
      res14: Array[(String, Iterable[Int])] = Array((orange,ArrayBuffer(2)), (apple,ArrayBuffer(1, 2)), (banana,ArrayBuffer(3)))
      
    11. reduceByKey(func, [numTasks])
      When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
      Example 1: Take above example to calculate the total sum of value for each key.
      scala> kv.reduceByKey(_ + _ ).collect
      res19: Array[(String, Int)] = Array((orange,2), (apple,3), (banana,3))
      
    12. aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
      When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
      Example 1:Take above example to debug the functions.
      scala> kv.aggregateByKey("")(  ( (a,b) => "DEBUG:" + "a=" + a + " b=" + b ), ((v1, v2) => v1 + " and " + v2) ).collect
      res34: Array[(String, String)] = Array((orange,DEBUG:a= b=2), (apple,DEBUG:a= b=1 and DEBUG:a= b=2), (banana,DEBUG:a= b=3))
      
    13. sortByKey([ascending], [numTasks])
      When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
      Example 1:Take above example to sort by key asc or desc.
      scala> kv.sortByKey(true).collect
      res35: Array[(String, Int)] = Array((apple,2), (apple,1), (banana,3), (orange,2))
      
      scala> kv.sortByKey(false).collect
      res36: Array[(String, Int)] = Array((orange,2), (banana,3), (apple,2), (apple,1))
      
    14. join(otherDataset, [numTasks])
      When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are also supported through leftOuterJoin and rightOuterJoin.
      Example 1: Inner Join
      scala> val kv = sc.parallelize( List(("apple", 1), ("orange", 2), ("banana", 3), ("apple", 2)) )
      scala> val kw = sc.parallelize( List(("apple", 999), ("orange", 222) ))
      
      scala> kv.join(kw).collect
      res37: Array[(String, (Int, Int))] = Array((orange,(2,222)), (apple,(1,999)), (apple,(2,999)))
      
      Example 2: Left Outer Join
      scala> kv.leftOuterJoin(kw).collect
      res38: Array[(String, (Int, Option[Int]))] = Array((orange,(2,Some(222))), (apple,(2,Some(999))), (apple,(1,Some(999))), (banana,(3,None)))
      
      Example 3: Right Outer Join
      scala> kv.rightOuterJoin(kw).collect
      res39: Array[(String, (Option[Int], Int))] = Array((orange,(Some(2),222)), (apple,(Some(1),999)), (apple,(Some(2),999)))
      
    15. cogroup(otherDataset, [numTasks])
      When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable, Iterable) tuples. This operation is also called groupWith.
      Example 1: 2 RDDs GroupWith.
      scala> kv.cogroup(kw).collect
      res40: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((orange,(CompactBuffer(2),CompactBuffer(222))), (apple,(CompactBuffer(2, 1),CompactBuffer(999))), (banana,(CompactBuffer(3),CompactBuffer())))
      Example 1: 3 RDDs GroupWith.
      scala> val kw2 = sc.parallelize( List(("banana", 123), ("banana", 456) ))
      
      scala> kv.cogroup(kw,kw2).collect
      res41: Array[(String, (Iterable[Int], Iterable[Int], Iterable[Int]))] = Array((orange,(CompactBuffer(2),CompactBuffer(222),CompactBuffer())), (apple,(CompactBuffer(1, 2),CompactBuffer(999),CompactBuffer())), (banana,(CompactBuffer(3),CompactBuffer(),CompactBuffer(123, 456))))
      
    16. cartesian(otherDataset)
      When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
      Example 1: 2 RDDs cartesian sets.
      scala> val a = sc.parallelize(List(1,2,3))
      scala> val b = sc.parallelize(List(4,5,6))
      scala> a.cartesian(b).collect
      res42: Array[(Int, Int)] = Array((1,4), (1,5), (1,6), (2,4), (3,4), (2,5), (2,6), (3,5), (3,6))
      
    17. pipe(command, [envVars])
      Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
      Example 1: Split a List to 2 partitions, and the command will be executed from each partition.
      scala> val list  = sc.parallelize(List("apple", "orange", "banana", "mapr", "cloudera" , "hortonworks") , 2)
      
      scala> list.pipe("tail -1").collect
      res34: Array[String] = Array(banana, hortonworks)
      
      scala> list.pipe("tail -2").collect
      res35: Array[String] = Array(orange, banana, cloudera, hortonworks)
      
    18. coalesce(numPartitions)
      Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
      Example 1: Reduce above "list" from 2 partitions to 1 partition. Comparing the differences.
      scala> list.coalesce(1, false).pipe("tail -1").collect
      res37: Array[String] = Array(hortonworks)
      
      scala> list.coalesce(1, false).pipe("tail -2").collect
      res38: Array[String] = Array(cloudera, hortonworks)
      
    19. repartition(numPartitions)
      Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
      Example 1: Increase above "list" from 2 partitions to 6 partitions. Comparing the differences.
      scala> list.repartition(6).pipe("tail -1").collect
      res39: Array[String] = Array(hortonworks, apple, orange, banana, mapr, cloudera)
      
  4. RDD Actions
    Actions return a value to the driver program after running a computation on the dataset.
    1. reduce(f: (T, T) => T): T
      Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
      Example 1: Calculate the sum of int from 1 to 9.
      scala> val a = sc.parallelize(1 to 9)
      a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at :33
      
      scala> a.reduce(_ + _)
      res5: Int = 45
      
    2. collect()
      Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
      Example 1: Collect all elements of List and return an Array.
      scala> val list = sc.parallelize(List("apple", "orange", "banana"))
      list: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at :33
      
      scala> list.collect
      res6: Array[String] = Array(apple, orange, banana)
      
    3. count()
      Return the number of elements in the dataset.
      Example 1: Return number of above list.
      scala> list.count
      res7: Long = 3
      
    4. first()
      Returns first element of the dataset (similar to take(1)).
      Example 1: Return first element of above list. Similar as "limit 1" in SQL.
      scala> list.first
      res8: String = apple
      
    5. take(n)
      Returns an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements.
      Example 1: Return first 2 elements of above list. Similar as "limit s" in SQL.
      scala> list.take(2)
      res9: Array[String] = Array(apple, orange)
      
    6. takeSample(withReplacement, num, [seed])
      Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
      Note: It returns an Array instead of RDD.
      Example 1: "withReplacement"=true means output may have duplicate elements, else, it will not.
      scala> val list = sc.parallelize(1 to 9)
      list: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[55] at parallelize at :33
      
      scala> list.takeSample(true, 10)
      res59: Array[Int] = Array(5, 8, 2, 4, 8, 9, 9, 1, 4, 5)
      
      scala> list.takeSample(false, 10)
      res60: Array[Int] = Array(3, 8, 2, 6, 1, 7, 5, 9, 4)
      
      Example 2: If "seed" does not change, the result will not change.
      scala>  list.takeSample(true, 5, 1)
      res61: Array[Int] = Array(8, 6, 3, 8, 9)
      
      scala>  list.takeSample(true, 5, 1)
      res62: Array[Int] = Array(8, 6, 3, 8, 9)
      
      scala>  list.takeSample(true, 5, 2)
      res63: Array[Int] = Array(3, 8, 4, 8, 9)
      
    7. takeOrdered(n, [ordering])
      Return the first n elements of the RDD using either their natural order or a custom comparator.
      Example 1: Similar as "order by limit n" in SQL.
      scala> val list = sc.parallelize(List("apple", "orange", "banan", "APPLE", "BABY","cat", "1"  , "3" , "9" ))
      list: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[64] at parallelize at :33
      
      scala> list.takeOrdered(3)
      res67: Array[String] = Array(1, 3, 9)
      
      scala> list.takeOrdered(5)
      res68: Array[String] = Array(1, 3, 9, APPLE, BABY)
      
    8. saveAsTextFile(path)
      Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
      Example 1: Save above list on HDFS.
      list.saveAsTextFile("/tmp/tmpout") 
      
      # hadoop fs -ls /tmp/tmpout
      Found 3 items
      -rwxr-xr-x   3 root root          0 2015-02-28 02:23 /tmp/tmpout/_SUCCESS
      -rwxr-xr-x   3 root root         25 2015-02-28 02:23 /tmp/tmpout/part-00000
      -rwxr-xr-x   3 root root         15 2015-02-28 02:23 /tmp/tmpout/part-00001
      # hadoop fs -cat /tmp/tmpout/part-00000
      apple
      orange
      banan
      APPLE
      # hadoop fs -cat /tmp/tmpout/part-00001
      BABY
      cat
      1
      3
      9
      
    9. saveAsSequenceFile(path)
      Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
      Example 1: Save a key-value pair as sequence file on HDFS.
      val a = sc.parallelize(Array(("apple",1), ("orange",2), ("banana",3)))
      a.saveAsSequenceFile("/tmp/tmpoutseq")
      
      # hadoop fs -ls /tmp/tmpoutseq
      Found 3 items
      -rwxr-xr-x   3 root root          0 2015-02-28 02:27 /tmp/tmpoutseq/_SUCCESS
      -rw-r--r--   3 root root        103 2015-02-28 02:27 /tmp/tmpoutseq/part-00000
      -rw-r--r--   3 root root        123 2015-02-28 02:27 /tmp/tmpoutseq/part-00001
      
    10. saveAsObjectFile(path)
      Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
      Example 1: Save and load an object file on HDFS.
      scala> val list = sc.parallelize(List("apple", "orange", "banan"))
      list: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at :12
      
      scala> list.saveAsObjectFile("/tmp/tmpobj")
      
      scala> val newlist = sc.objectFile[String]("/tmp/tmpobj")
      newlist: org.apache.spark.rdd.RDD[String] = FlatMappedRDD[11] at objectFile at :12
      
      scala> newlist.collect
      res4: Array[String] = Array(orange, banan, apple)
      
    11. countByKey()
      Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
      Example 1: Count the List of KV pairs.
      scala> val kv = sc.parallelize( List(("apple", 1), ("orange", 2), ("banana", 3), ("apple", 2)) )
      kv: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at :12
      
      scala> kv.countByKey
      res6: scala.collection.Map[String,Long] = Map(banana -> 1, apple -> 2, orange -> 1)
      
    12. foreach(func)
      Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.
      Example 1: Calculate the sum of 1 to 4.
      scala> val accum = sc.accumulator(0)
      accum: org.apache.spark.Accumulator[Int] = 0
      
      scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
      
      scala> accum.value
      res24: Int = 10
      
      In all, here is an example to calculate the total length of the file.
      val lines = sc.textFile("data.txt")
      val lineLengths = lines.map(s => s.length)
      val totalLength = lineLengths.reduce((a, b) => a + b)
      
    13. Persist and Unpersist RDD
      Persist can save the RDD in memory or disk in this application after the first time it is computed.
      lineLengths.persist()
      lineLengths.unpersist()
      
      Example 1: persist() an object in different levels.
      scala> import org.apache.spark.storage.StorageLevel
      import org.apache.spark.storage.StorageLevel
      
      scala> kv.persist(StorageLevel.MEMORY_ONLY)
      res9: kv.type = ParallelCollectionRDD[0] at parallelize at :12
      
      scala> kv.getStorageLevel
      res10: org.apache.spark.storage.StorageLevel = StorageLevel(false, true, false, true, 1)
      
      scala> kv.unpersist()
      res13: kv.type = ParallelCollectionRDD[0] at parallelize at :12
      
      scala> kv.getStorageLevel
      res14: org.apache.spark.storage.StorageLevel = StorageLevel(false, false, false, false, 1)
      
      scala> kv.persist(StorageLevel.DISK_ONLY)
      res15: kv.type = ParallelCollectionRDD[0] at parallelize at :12
      
      scala> kv.getStorageLevel
      res16: org.apache.spark.storage.StorageLevel = StorageLevel(true, false, false, false, 1)
      

  5. Functions to Spark
    Pass reference of a function
    Example to add "hello" to each element in the RDD.
    def sayhello(s: String): String = "Hello " + s
    lines.map(sayhello)
    
    scala> lines.collect
    res31: Array[String] = Array(1,2,3, 4,5,6)
    scala> lines.map(sayhello).collect
    res32: Array[String] = Array(Hello 1,2,3, Hello 4,5,6)
    
    Anonymous functions
    So simple way to do above stuff is:
    lines.map(x => "Hello " + x)
    

  6. KeyValue pairs
    reduceByKey
    val lines = sc.textFile("data.txt")
    val pairs = lines.map(s => (s, 1))
    val counts = pairs.reduceByKey((a, b) => a + b)
    
    Sample data:
    # cat data.txt
    This is a test data text file for Spark to use.
    To test Scala and Spark,
    we need to repeat again and again.
    apple
    orange
    banana
    APPle
    APPLE
    ORANGE
    
    Sample result:
    scala> counts.collect
    res41: Array[(String, Int)] = Array((orange,1), (APPLE,1), (ORANGE,1), (apple,1), ("This is a test data text file for Spark to use. ",1), (APPle,1), ("To test Scala and Spark, ",1), (banana,1), (we need to repeat again and again.,1))
    
    sortByKey
    scala>  counts.sortByKey().collect
    res43: Array[(String, Int)] = Array((APPLE,1), (APPle,1), (ORANGE,1), ("This is a test data text file for Spark to use. ",1), ("To test Scala and Spark, ",1), (apple,1), (banana,1), (orange,1), (we need to repeat again and again.,1))
    

  7. Shared Variables

  8. Broadcast Variables
    Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
    scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
    broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
    
    scala> broadcastVar.value
    res18: Array[Int] = Array(1, 2, 3)
    

  9. Accumulators
    Accumulators are variables that are only “added” to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums.
    scala> val accum = sc.accumulator(0)
    accum: org.apache.spark.Accumulator[Int] = 0
    
    scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
    
    scala> accum.value
    res24: Int = 10
    

16 March 2013

Making Spray based ReST API publicly / remotely accessible

Recently I developed a demo Spray based ReST API and ran it on Windows.
While I was able to access it locally using http://localhost:port/service link, but remotely I was not.
I made following two changes to fix the issue:
1. In your application's application.conf file make hostname as 0.0.0.0
- 127.0.0.1 or localhost is foe listening only on the loopback interface.
- 0.0.0.0 for listening on all available network interfaces. This works in Java 7 and 8 fine. In Java 6, only IPv4 address works.
My application.conf looks like:
akka {
loglevel = DEBUG
event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
}

spray.can.server {
request-timeout = 10s
}

service {
host = 0.0.0.0
port = 8090
}

2. Open 8090 (or whatever you want to use) port in Windows Firewall: http://windows.microsoft.com/en-gb/windows/open-port-windows-firewall#1TC=windows-7

11 October 2012

Scala simple concepts

A simple iteration over Array in Scala can be done as:
  val xs:Array[String] = Array[String]("a", "b", "c", "d", "e")
  for((x,i) <- xs.iterator.zipWithIndex) println(i + "th element is " + x)
  for((x,i) <- xs.view.zipWithIndex) println(i + "th element is " + x) 
  for (i <- xs.indices) println(i + "th element is " + xs(i))
  for (i <- 0 until xs.length) println(i + "th element is " + xs(i))
  for (i <- xs.length - 1 to 0 by -1) println(i + "th element is " + xs(i))

19 September 2012

Terminating a run or test in Scala Build Tool (SBT)

When running a program or test in SBT, if you want to exit from it (and not from SBT) try adding following lines in your build.sbt file:
fork := true

To directly add forking in an SBT session, type
set fork := true

This will not detach and open a new consule from the original sbt console.
But, users can kill this separately.
The way to do this in Max OS or Windows is to search a Java process with command line having sbt-launch in Activity Monitor / Windows Task Manager, and end it.
In Unix, try grepping a process with sbt-launch in a new terminal, and kill it. Command to do that is:
kill -9 `ps -h | grep java | grep -v sbt-launch | grep -v grep | awk '{print $1}'`

By default, the run or test tasks run in the JVM as sbt. Forking can be used to run these tasks in a child process. By default, a forked process uses the same Java and Scala versions being used for the build and the working directory and JVM options of the current process. SBT documentation page discusses how to enable and configure forking for both run and test tasks. Each kind of task may be configured separately by scoping the relevant keys.

17 September 2012

Eclipse crashing and not starting

Problem: After restarting Eclipse, it crashes immediately and asks me to check C:\Users\username\Adobe Flash Builder 4.6\.metadata\.log
In log it shows following error:
!ENTRY org.eclipse.osgi 4 0 2013-02-13 11:53:46.760
!MESSAGE Application error
!STACK 1
org.eclipse.swt.SWTError: Cannot initialize Drop
    at org.eclipse.swt.dnd.DND.error(DND.java:266)
    at org.eclipse.swt.dnd.DND.error(DND.java:227)
    at org.eclipse.swt.dnd.DropTarget.(DropTarget.java:142)
    at org.eclipse.ui.internal.EditorSashContainer.addDropSupport(EditorSashContainer.java:542)
    at org.eclipse.ui.internal.EditorSashContainer.createControl(EditorSashContainer.java:534)
    at org.eclipse.ui.internal.EditorAreaHelper.(EditorAreaHelper.java:41)
    at org.eclipse.ui.internal.WorkbenchPage.init(WorkbenchPage.java:2507)
    at org.eclipse.ui.internal.WorkbenchPage.(WorkbenchPage.java:653)
    at org.eclipse.ui.internal.tweaklets.Workbench3xImplementation.createWorkbenchPage(Workbench3xImplementation.java:47)
    at org.eclipse.ui.internal.WorkbenchWindow$16.runWithException(WorkbenchWindow.java:2222)
    at org.eclipse.ui.internal.StartupThreading$StartupRunnable.run(StartupThreading.java:31)
    at org.eclipse.swt.widgets.RunnableLock.run(RunnableLock.java:35)
    at org.eclipse.swt.widgets.Synchronizer.runAsyncMessages(Synchronizer.java:135)
    at org.eclipse.swt.widgets.Display.runAsyncMessages(Display.java:4140)
    at org.eclipse.swt.widgets.Display.readAndDispatch(Display.java:3757)
    at org.eclipse.ui.application.WorkbenchAdvisor.openWindows(WorkbenchAdvisor.java:803)
    at org.eclipse.ui.internal.Workbench$33.runWithException(Workbench.java:1595)
    at org.eclipse.ui.internal.StartupThreading$StartupRunnable.run(StartupThreading.java:31)
    at org.eclipse.swt.widgets.RunnableLock.run(RunnableLock.java:35)
    at org.eclipse.swt.widgets.Synchronizer.runAsyncMessages(Synchronizer.java:135)
    at org.eclipse.swt.widgets.Display.runAsyncMessages(Display.java:4140)
    at org.eclipse.swt.widgets.Display.readAndDispatch(Display.java:3757)
    at org.eclipse.ui.internal.Workbench.runUI(Workbench.java:2604)
    at org.eclipse.ui.internal.Workbench.access$4(Workbench.java:2494)
    at org.eclipse.ui.internal.Workbench$7.run(Workbench.java:674)
    at org.eclipse.core.databinding.observable.Realm.runWithDefault(Realm.java:332)
    at org.eclipse.ui.internal.Workbench.createAndRunWorkbench(Workbench.java:667)
    at org.eclipse.ui.PlatformUI.createAndRunWorkbench(PlatformUI.java:149)
    at com.adobe.flexbuilder.standalone.FlashBuilderApplication.start(FlashBuilderApplication.java:108)
    at org.eclipse.equinox.internal.app.EclipseAppHandle.run(EclipseAppHandle.java:196)
    at org.eclipse.core.runtime.internal.adaptor.EclipseAppLauncher.runApplication(EclipseAppLauncher.java:110)
    at org.eclipse.core.runtime.internal.adaptor.EclipseAppLauncher.start(EclipseAppLauncher.java:79)
    at org.eclipse.core.runtime.adaptor.EclipseStarter.run(EclipseStarter.java:344)
    at org.eclipse.core.runtime.adaptor.EclipseStarter.run(EclipseStarter.java:179)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at org.eclipse.equinox.launcher.Main.invokeFramework(Main.java:622)
    at org.eclipse.equinox.launcher.Main.basicRun(Main.java:577)
    at org.eclipse.equinox.launcher.Main.run(Main.java:1410)

!ENTRY org.eclipse.core.jobs 2 2 2013-02-13 11:53:47.166
!MESSAGE Job found still running after platform shutdown.  Jobs should be canceled by the plugin that scheduled them during shutdown: org.eclipse.ui.internal.ide.IDEWorkbenchActivityHelper$4



Solution: Delete C:\Users\username\Adobe Flash Builder 4.6\.metadata\.plugins\org.eclipse.core.resources\.root\.indexes\properties.index file. It was corrupted and throwing EOFException.