1

I've been struggling with org.apache.spark.SparkException: Task not serializable but finally figured out how to make this work:

case class Article(id: Int, title: String, content: String) extends Serializable

 val index: RDD[(String, List[(Int, Int)])] = (for {
      article <- articlesRDD
      text = article.title + article.content
      word <- text.split(" ")
    } yield (word, (article.id, 1)))
      .groupByKey()
      .mapPartitions{
        _.map {
          case(k, v) => (k, v.groupBy(_._1).map(pair => (pair._1, pair._2.map(_._2).sum)).toList) // Works as expected
          //case(k, v) => (k, reducer(v.toList)) // Fails
        }
      }.cache()

And here is reducer:

def reducer(list: List[(Int, Int)]): List[(Int, Int)] = {
    list.groupBy(_._1).map(
      pair => (pair._1, pair._2.map(_._2).sum)
    ).toList
  }

I also tried defining the reducer function as a val but I'm getting the same error. Actually, the error occurs in a Databricks notebook, in my machine running Spark in local mode it works fine.

Why does the commented case statement fail ? Do I have to pass anonymous functions always even if they are not as trivial as my reducer function ?

Thanks in advance :)

1

You don't say where reducer is defined, but it's likely to be in a non-serializable class (e.g. the one containing SparkContext etc.). Then using it requires capturing the instance it's called on. Define it in an object instead.

From the Spark Programming Guide:

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. There are two recommended ways to do this:

  • Anonymous function syntax, which can be used for short pieces of code.
  • Static methods in a global singleton object. For example, you can define object MyFunctions and then pass MyFunctions.func1, as follows:

    object MyFunctions {
      def func1(s: String): String = { ... }
    }
    
    myRdd.map(MyFunctions.func1)
    

Note that while it is also possible to pass a reference to a method in a class instance (as opposed to a singleton object), this requires sending the object that contains that class along with the method.

  • Actually, the error occurs in a Databricks notebook. In my machine running Spark in local mode it works fine. I'm sorry. Forgot adding that little detail. – Ronald CardonaMartínez Apr 16 at 13:50

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service, privacy policy and cookie policy

Not the answer you're looking for? Browse other questions tagged or ask your own question.