It’s been more than a year since I’ve started working on my first Big Data project. In this project we chose Apache Spark as batch processing framework. Using Apache Spark throws you in a functional, distributed and asynchronous programming world. While you have not to worry about the distributed adjective, because Spark hides you every problem related to it, the functional and asynchronous adjectives make things more complex with respect to the error handling process. Let’s explain why.
Classical error handling
In classic situations, such as in languages as Java or C++, exceptions are used to signal an exceptional behaviour of a program.
If you need to handle an particular type of exception, you will use a
catch statement, surrounding the
statements that can rise the exception.
In Scala language the above statements become as follow.
I will not discuss if it is better to use a checked or an unchecked exception in Java. This is not the aim of this post. The important thing is the locality of the statements that handle the exception and the statements that rise the exception. The latter is inside the scope of the formers.
Asynchronous execution and exception handling
Apache Spark uses
RDD (Resilient Distributed Datasets) as the basic block to build algorithms over huge amount
of data (refer to A new way to err, Apache Spark for a more detailed description).
RDD you can do two type of operations, transformations and actions. Transformations construct a new
from a previous one. For example.
flatMap operations are considered a transformations.
Actions compute a result based on an
RDD, and either return it to the driver program or save it
to an external storage system (such as HDFS, HBase, …).
Finally, when you work with
RDD you have to remember that
although you can define new RDDs any time, Spark computes them only in a lazy fashion —that is, the first time they are used in an action.
During the processing of a transformation on a
RDD, something can go wrong and an exception can be raised. The first
attempt to handle the exception is usually to surround the transformation inside a
Unfortunately, as stated above, the code inside the transformation will not be really executed until the first action
was reached. Then, the above code is totally useless regarding the handling of the exceptions. The only thing we can do
if we want to catch exceptions rise during transformations on an
RDD is to surround actions statements with
As you can see, we have lost completely the locality of the exception handling process.
Using this approach we lose which element throw the exception. Furthermore, Spark is used to process huge
amount of data: are we sure we want to block the whole execution for a single error on a single element of an
Functional programming and exception handling
A first attempt to resolve the last problem can be to move the
catch statement inside the transformation. Then,
the above code will become the following.
Eureka! We gain back the locality feature! Unfortunately, doing this way, we encountered a new problem. Remember: a
transformation constructs a new
RDD from a previous one. The original type of partial function input of the
String => Char.
To maintain this signature, we have to return a
Char or a subtype of it also in the
statement. What should we chose? Empty character? A special character? You know, every choice can lead something wrong
in the sooner or later.
What can we do? The only way to exit from this impasse is to revamp monads. More or less a monad is a generic container that enhances with additional properties a simple type. Scala offers at least three different types of monads that help us to deal with exceptional situations:
Optionand its two subclasses,
None. This monad acts like a list of one or zero elements and we can use it when we are not interested in the details of the error situation we can encounter.
Eitherand its two subclasses,
Right[K]. This monad lets you to return two different types of objects,
K, respectively in the case of an exceptional behaviour of the process and in the case of a correct behaviour.
Tryand its two sublcasses,
Failure[T]. This monad is similar to the
Either. Instead of using a generic type
Failureuses always a type that is a subclass of
Trytype is available in Scala since version 2.10.
Then, if your aim is that of tracing the exceptions rise during the processing of an
RDD and continuing to elaborate
value not in error only, the
Try[T] monad suites perfectly your needs. This amazing type comes with a useful
factory method on the companion object, that lets you to build a
Failure object directly from the result
of a computation.
If the computation produces a value, than an object of type
Success[T] is built, a
Failure object is built
otherwise. The types are immutable. The
Failure type gives access to its attribute
exception, that contains the
error rise during computation.
RDD[T] will become a
RDD[Try[T]]. Using this escamotage, we can now use the same data structure to forward
both data and exceptions. That’s great!
Ok, we have a
RDD[Try[T]]…now what? How can we work with instances of
Try[T]? Do you remember when I said the this
type is a monad? Then, we can use the
flatMap methods to work proficiently with it.
If you have to transform the content of the a
Try object, you can apply the
map method. The function will be applied
only to instances of the
Success type and the
Failure ones will be forward during the transformation as they are.
If you starts with a
Try[A], then you will be finished with a
Try[B]. What if your transformation can create a
in turn? Using the
map method you will obtain a
Try[Try[B]] type. Well, I say to you: “Using the
flatMap will lift one level higher your type, returning you simple a
Returning to our initial
RDD[Try[T]], now we have all the information we need to work with it. In my opinion, the best
way to work with this type is using the for comprehension structure. Let’s have some dirty fun right now.
A for-comprehension in Scala is equal to syntactic sugar for a series of application of
filter function). For example, take the following statements.
Using a for-comprrehension the above code can be simplified and rewritten as follow:
It is more readable, isn’t it?
Collecting the results
At some point in your program you will need to get only the
Failure instances inside the
There is a lot of techniques to accomplish this task, but the one I preferred is to use the
Differently from the homonym action, this method accepts a partial function as input and builds a new collection by applying the partial function to all elements on which the function is defined.
And that’s all, folks!