Installation to connect Spark and H2O in R

Tags : models

In the ThinkR Task force, we love playing with H2O in R. Their algorithms for machine learning and artificial intelligence are really powerful. Combined with Apache Spark through Sparkling Water, H2O provides even more powerful data processing workflows, which you can run on your own laptop.

Installing Spark and H2O so that they work together within R

The H2O documentation is great, but you may still encounter some problems of versions compatibilities. At ThinkR, we have the expertise on R server installation. In this blog post, we present how to initiate an association between Spark and H2O within R. This is a step-by-step procedure, showing where the process may stop on errors. So, do not be surprise if some code lead to some errors ! Also, some steps are repeated because you will need to restart your session.

Problem encountered

There are many reasons to get the following error message when using {sparklyr}:

#> Error : org.apache.spark.SparkException: Job 1 cancelled because SparkContext was shut down...
...[cut](see below for the complete message]

A common reason is a problem of memory allocation when using big datasets. However, you can also get this problem from the very beginning, without any dataset. In this case, this is most probably a problem of versions compatibilities.
You can find on github the table of versions compatibilities: https://github.com/h2oai/rsparkling/blob/master/README.md. We’ll speak about it in more details below.

Install libraries

Well. Let’s go through the process. First installation is quite easy with CRAN repository. This will directly download and install h2o along with R libraries.

install.packages(c("rsparkling", "sparklyr", "h2o"))

Then load libraries.

# Libraries
library(rsparkling)
library(sparklyr)
library(h2o)
library(here)
library(glue)

Define memory allocation

As I said, this is on of the reason for you to get the error message. So this memory allocation parameterization may save your analyses.
Depending on the size of the data and analyses you will want to perform, you may need to allocate an important amount of memory to your processes. Define the following depending on your machine. Here, I run models on my laptop having (16G) of RAM.
I also define a folder (“spark_dir”) in which all fitted models and logs will be stored, so that I can easily know where to find them and see what is happening.

# Just in case you are already connected
spark_disconnect_all()
# Define a directory where to save model outputs and logs
spark_dir <- here("tmp/spark_dir/")
# Set driver and executor memory allocations **depending on your machine**
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "8G"
config$`sparklyr.shell.executor-memory` <- "8G"
config$spark.yarn.executor.memoryOverhead <- "4g"
config$`sparklyr.shell.driver-java-options` <-
  glue("-Djava.io.tmpdir={spark_dir}")

First connection to Spark

To install and connect to Apache Spark, you will use spark_connect using configuration we set up above.

# This may install one version of Spark the first time
sc <- spark_connect(master = "local", config = config)

You will probably get the following message:

#> * Using Spark: 2.2.1
#> Spark version 2.2 detected. Will call latest Sparkling Water version 2.2.12
#> Detected H2O version 3.16.0.2. Please install H2O version 3.18.0.2, which is compliant with the latest Sparkling Water version for Spark 2.2.* -> Sparkling Water version 2.2.12
#> To update your h2o R package, copy/paste the following commands and then restart your R session:
#>    detach("package:rsparkling", unload = TRUE)
#>    if ("package:h2o" %in% search()) { detach("package:h2o", unload = TRUE) }
#>    if (isNamespaceLoaded("h2o")){ unloadNamespace("h2o") }
#>    remove.packages("h2o")
#>    install.packages("h2o", type = "source", repos = "https://h2o-release.s3.amazonaws.com/h2o/rel-wolpert/2/R")

Indeed, version of H2O installed by default within R by {h2o} library is not totally in accordance with version required by Sparkling Water. Hence, you need to do what is recommended in the message to install the correct version of H2O.

Here, you will need to restart your R session (without saving workspace of course. Never do it…)

Error with Java version

If you are experiencing the following error message:

Error in validate_java_version_line(master, version) : 
  Java 9 is currently unsupported in Spark distributions unless you manually install Hadoop 2.8 and manually configure Spark. Please consider uninstalling Java 9 and reinstalling Java 8. To override this failure set 'options(sparklyr.java9 = TRUE)'.

You need to install or re-install Java 8. If like me, you already have multiple version of java installed, you can specify the directory of the one you want to use using Sys.setenv(JAVA_HOME = "/path/to/java"):

# For Java8 on Ubuntu 64bit
Sys.setenv(JAVA_HOME = "/usr/lib/jvm/java-8-openjdk-amd64")

Start H2O cluster

You restarted your R session. Let’s load libraries and define configuration again. And start Spark.

# Libraries
library(rsparkling)
library(sparklyr)
library(h2o)
library(here)
library(glue)
# Just in case you are already connected
spark_disconnect_all() # Be sure it is what you want
# Define a directory where to save model outputs and logs
spark_dir <- here("tmp/spark_dir/")
# Set driver and executor memory allocations **depending on your machine**
config <- spark_config()
config$`sparklyr.shell.driver-memory` <- "8G"
config$`sparklyr.shell.executor-memory` <- "8G"
config$spark.yarn.executor.memoryOverhead <- "4g"
config$`sparklyr.shell.driver-java-options` <-
  glue("-Djava.io.tmpdir={spark_dir}")
# Connect to Spark
sc <- spark_connect(master = "local", config = config)

No error message !

#> * Using Spark: 2.2.1
#> Spark version 2.2 detected. Will call latest Sparkling Water version 2.2.12

If you work within Rstudio, you probably remarked that your “Connection Pane” shows your Spark connection. With no tables if you are not connected to an existing Spark session.
Now that we started connection to Spark, you may want to initiation H2O cluster. Let’s test for H2O and Spark connection.

# Create h2o cluster
h2o_context(sc)

Well, a new error message…

#> Version mismatch! H2O is running version 3.18.0.7 but h2o-R package is version 3.18.0.2.
#> Install the matching h2o-R version from – http://h2o-release.s3.amazonaws.com/h2o/rel-wolpert/7/index.html

What do we do with this one ? If we do install the recommended H2O version, it is not the same as the one recommended when we used spark_connect. So which one to choose ?
In this case and from now, I recommended to use parameter strict_version_check = FALSE, because we do not really care if H2O is not totally the version used for compiling the R {h2o} package. This should work as well.
Let’s create the context…

# Create context with Sparkling Water
h2o_context(sc, strict_version_check = FALSE)

… and finally start H2O clusters ! Choose the number of cores you allow it to use ((4) for me) as well as the maximum memory allocation. Yes, memory again, but this is H2O here, not spark. I have (16G) available, let me allow for “(14G)”.

# Start H2O clusters
h2o.init(ip = "localhost", nthreads = 4, max_mem_size = "14G",
         strict_version_check = FALSE)
#> Connection successful!

Yeah ! This worked ! Well. This should work. Indeed, depending on the Spark version you define, this is not always the case. And at this point, you may encounter the erro message:

#> Error : org.apache.spark.SparkException: Job 1 cancelled because SparkContext was shut down...

Define Sparkling Water version

On my machine, this worked with the default settings. In my case, Spark 2.2.1 is installed and work with Sparkling Water 2.2.12.
If you installed all these a few months ago, or if you work with different versions of Apache Spark like Spark 2.1, you will probably get the error message. To get rid of it, you only have to specify, which version of Sparkling Water to run. And to know what is working with what, let’s have a look at the table in this page : https://github.com/h2oai/rsparkling/blob/master/README.md
You can see the version of Spark compatibilities with Sparkling Water and H2O. Hence, to be correct, here is what you can define:

  • Find the Spark version you want: spark_available_versions()
  • Install Spark version you want: spark_install(version = "2.2.1")
  • Fix the correct sparkling water version: options(rsparkling.sparklingwater.version = "2.2.11")
  • Install the adequate H2O
# Install adequate h2o
detach("package:rsparkling", unload = TRUE)
if ("package:h2o" %in% search()) {
  detach("package:h2o", unload = TRUE)
}
if (isNamespaceLoaded("h2o")) {
  unloadNamespace("h2o")
}
remove.packages("h2o", lib = .libPaths()[1])
install.packages("h2o",
  type = "source",
  repos = "https://h2o-release.s3.amazonaws.com/h2o/rel-wolpert/5/R"
)
# **Restart your session**

Summary: complete configurations

Below, you will find the complete code that you can use on every Spark + H2O = Sparkling Water R script project you start.
Update versions depending on this page: https://github.com/h2oai/rsparkling/blob/master/README.md and adapt memory parameters depending on your machine.

Spark 2.3

Restart your Rstudio session before.
[Updated on 2018-10-27]

# Define Sparkling Water version for Spark 2.3
options(rsparkling.sparklingwater.version = "2.3.16")
# Load libraries
library(rsparkling)
library(sparklyr)
library(h2o)
library(here)
library(glue)
# Install spark (eventually with specific hadoop version)
spark_available_versions(show_hadoop = TRUE)
spark_install(version = "2.3.2", hadoop_version = "2.7")
# Set driver and executor memory allocations
spark_dir <- here("tmp/spark/")
config <- spark_config()
# config$spark.driver.memory <- "6G"
#config$spark.executor.memory <- "6G"
config$`sparklyr.shell.driver-memory` <- "6G"
config$`sparklyr.shell.executor-memory` <- "6G"
config$spark.executor.memoryOverhead <- "2g"
  glue("-Djava.io.tmpdir={spark_dir}")
needed <- FALSE
if (needed) {
    # Install adequate h2o
  detach("package:rsparkling", unload = TRUE)
  if ("package:h2o" %in% search()) {
    detach("package:h2o", unload = TRUE)
  }
  if (isNamespaceLoaded("h2o")) {
    unloadNamespace("h2o")
  }
  remove.packages("h2o", lib = .libPaths()[1])
  install.packages("h2o",
    type = "source",
    repos = "https://h2o-release.s3.amazonaws.com/h2o/rel-wright/10/R"
  )
  # **Restart your R session**
}
# Connect to spark version 2.3.2
sc <- spark_connect(
  master = "local", config = config, version = "2.3.2")
# Create Sparkling Water context
h2o_context(sc, strict_version_check = FALSE)
# Stop earlier h2o cluster if exists
h2o.shutdown() # Be sure it is what you want
# Create h2o cluster
h2o.init(ip = "localhost", nthreads = 4, max_mem_size = "12G",
         strict_version_check = FALSE)
# clean slate - just in case the cluster was already running
h2o.removeAll() # Be sure it is what you want

Spark 2.2

Restart your Rstudio session before.

# Define Sparkling Water version for Spark 2.2
options(rsparkling.sparklingwater.version = "2.2.11")
# Load libraries
library(rsparkling)
library(sparklyr)
library(h2o)
library(here)
library(glue)
# Install spark (eventually with specific hadoop version)
spark_available_versions(show_hadoop = TRUE)
spark_install(version = "2.2.1", hadoop_version = "2.7")
# Set driver and executor memory allocations
spark_dir <- here("tmp/spark/")
config <- spark_config()
# config$spark.driver.memory <- "6G"
#config$spark.executor.memory <- "6G"
config$`sparklyr.shell.driver-memory` <- "6G"
config$`sparklyr.shell.executor-memory` <- "6G"
config$spark.yarn.executor.memoryOverhead <- "2g"
  glue("-Djava.io.tmpdir={spark_dir}")
needed <- FALSE
if (needed) {
    # Install adequate h2o
  detach("package:rsparkling", unload = TRUE)
  if ("package:h2o" %in% search()) {
    detach("package:h2o", unload = TRUE)
  }
  if (isNamespaceLoaded("h2o")) {
    unloadNamespace("h2o")
  }
  remove.packages("h2o", lib = .libPaths()[1])
  install.packages("h2o",
    type = "source",
    repos = "https://h2o-release.s3.amazonaws.com/h2o/rel-wolpert/5/R"
  )
  # **Restart your R session**
}
# Connect to spark version 2.2.1
sc <- spark_connect(
  master = "local", config = config, version = "2.2.1")
# Create Sparkling Water context
h2o_context(sc, strict_version_check = FALSE)
# Stop earlier h2o cluster if exists
h2o.shutdown() # Be sure it is what you want
# Create h2o cluster
h2o.init(ip = "localhost", nthreads = 4, max_mem_size = "12G",
         strict_version_check = FALSE)
# clean slate - just in case the cluster was already running
h2o.removeAll() # Be sure it is what you want

Spark 2.1

Restart your Rstudio session before.

# Define sparkling water version for Spark 2.1
options(rsparkling.sparklingwater.version = "2.1.25")
# Libraries
library(rsparkling)
library(sparklyr)
library(h2o)
library(here)
library(glue)
# Install spark (eventually with specific hadoop version)
spark_available_versions(show_hadoop = TRUE)
spark_install(version = "2.1.1", hadoop_version = "2.7")
# Set driver and executor memory allocations
spark_dir <- here("tmp/spark/")
config <- spark_config()
# config$spark.driver.memory <- "6G"
#config$spark.executor.memory <- "6G"
config$`sparklyr.shell.driver-memory` <- "6G"
config$`sparklyr.shell.executor-memory` <- "6G"
config$spark.yarn.executor.memoryOverhead <- "2g"
  glue("-Djava.io.tmpdir={spark_dir}")
needed <- FALSE
if (needed) {
    # Install adequate h2o
  detach("package:rsparkling", unload = TRUE)
  if ("package:h2o" %in% search()) {
    detach("package:h2o", unload = TRUE)
  }
  if (isNamespaceLoaded("h2o")) {
    unloadNamespace("h2o")
  }
  remove.packages("h2o", lib = .libPaths()[1])
  install.packages("h2o",
    type = "source",
    repos = "https://h2o-release.s3.amazonaws.com/h2o/rel-wolpert/5/R"
  )
  # **Restart your R session**
}
# Connect to spark version 2.1.1
sc <- spark_connect(
  master = "local", config = config, version = "2.1.1")
# Create Sparkling Water context
h2o_context(sc, strict_version_check = FALSE)
# Stop earlier h2o cluster if exists
h2o.shutdown() # Be sure it is what you want
# Create h2o cluster
h2o.init(ip = "localhost", nthreads = 4, max_mem_size = "12G",
         strict_version_check = FALSE)
# clean slate - just in case the cluster was already running
h2o.removeAll() # Be sure it is what you want

Test if this works

If you want to test if your connection between Spark and H2O is working correctly, you can run the following machine learning algorithm example. This is the example of {rsparkling} github repository.

library(dplyr)
# Copy data to Spark (See it in Connection pane in Rstudio)
mtcars_tbl <- copy_to(sc, mtcars, overwrite = TRUE)
# In case nothing happens:
# mtcars_tbl <- sparklyr:::copy_to.spark_connection(sc, mtcars, overwrite = TRUE)
# Convert to Spark data to H2O
mtcars_hf <- as_h2o_frame(sc, mtcars_tbl)
# Define variables
y <- "mpg"
x <- setdiff(names(mtcars_hf), y)
# Split the mtcars H2O Frame into train & test sets
splits <- h2o.splitFrame(mtcars_hf, ratios = 0.7, seed = 1)
# Fit a gradient boost model
fit <- h2o.gbm(x = x,
               y = y,
               training_frame = splits[[1]],
               nfolds = 3,
               min_rows = 1,
               seed = 1)
print(fit)
#> Model Details:
#> ==============
#> H2ORegressionModel: gbm
#>   number_of_trees number_of_internal_trees model_size_in_bytes min_depth max_depth
#> 1              50                       50               14804         5         5
#>   mean_depth min_leaves max_leaves mean_leaves
#> 1    5.00000         17         21    18.64000
#> H2ORegressionMetrics: gbm
#> ** Reported on training data. **
#> MSE:  0.001211724
#> RMSE:  0.03480983
#> MAE:  0.02761402
#> RMSLE:  0.001929304
#> Mean Residual Deviance :  0.001211724

Complete error message

This is the complete error message you may encounter if Sparkling Water has not been set correctly or if your memory allocation was too low for models you wanted to run.


#> Erreur : org.apache.spark.SparkException: Job 1 cancelled because SparkContext was shut down
#>  at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:808)
#>  at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806)
#>  at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
#>  at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:806)
#>  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1668)
#>  at org.apache.spark.util.EventLoop.stop(EventLoop.scala:83)
#>  at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1587)
#>  at org.apache.spark.SparkContext$$anonfun$stop$8.apply$mcV$sp(SparkContext.scala:1833)
#>  at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1283)
#>  at org.apache.spark.SparkContext.stop(SparkContext.scala:1832)
#>  at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:581)
#>  at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
#>  at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
#>  at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
#>  at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
#>  at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
#>  at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
#>  at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
#>  at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
#>  at scala.util.Try$.apply(Try.scala:192)
#>  at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
#>  at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
#>  at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
#>  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)
#>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1925)
#>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1938)
#>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951)
#>  at org.apache.spark.SparkContext.runJob(SparkContext.scala:1965)
#>  at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
#>  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
#>  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
#>  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
#>  at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
#>  at org.apache.spark.h2o.backends.internal.InternalBackendUtils$class.startH2O(InternalBackendUtils.scala:163)
#>  at org.apache.spark.h2o.backends.internal.InternalBackendUtils$.startH2O(InternalBackendUtils.scala:262)
#>  at org.apache.spark.h2o.backends.internal.InternalH2OBackend.init(InternalH2OBackend.scala:99)
#>  at org.apache.spark.h2o.H2OContext.init(H2OContext.scala:129)
#>  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:381)
#>  at org.apache.spark.h2o.H2OContext$.getOrCreate(H2OContext.scala:416)
#>  at org.apache.spark.h2o.H2OContext.getOrCreate(H2OContext.scala)
#>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
#>  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
#>  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
#>  at java.lang.reflect.Method.invoke(Method.java:498)
#>  at sparklyr.Invoke$.invoke(invoke.scala:102)
#>  at sparklyr.StreamHandler$.handleMethodCall(stream.scala:97)
#>  at sparklyr.StreamHandler$.read(stream.scala:62)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:52)
#>  at sparklyr.BackendHandler.channelRead0(handler.scala:14)
#>  at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
#>  at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
#>  at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293)
#>  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:267)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
#>  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:346)
#>  at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1294)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:367)
#>  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:353)
#>  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:911)
#>  at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:652)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:575)
#>  at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:489)
#>  at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:451)
#>  at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
#>  at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
#>  at java.lang.Thread.run(Thread.java:748)

About the author

Sébastien Rochette

Sébastien Rochette

Modélisateur, formateur R, joueur de cartographies



Also read