Primeros pasos con Spark y sparklyr

Big data
R

Gracias al paquete sparklyr es fácil experimentar con Spark sobre un computador local. En este post presento esos primeros pasos para conocer a Spark, y los pasos que despues siguen para usar Spark en un cluster, todo desde R.

Author
Affiliation
Published

January 2, 2020

Acompañamos en iniciativas y proyectos de ciencia de datos, ingeniería e infraestructura. Visita nuestra página ixpantia y contáctanos.

Introducción

Una de las ventajas de R es que da acceso a una multitud de entornos (frameworks) de análisis y procesamiento de datos que están escritos en otros lenguajes. Como la mayoria ofrecen su funcionalidad a travez de un API, podemos usar herramientas creadas para trabajar con datos masivos (Big Data) sin tener que salir del entorno y lenguaje que ya conocemos.

En la mayoria de los casos esto incluye la posibilidad de hacer una instalación local para desarrollo. Si trabajas en R vas a encontrar que además hay multiples puntos de enganche dentro de RStudio para hacerte la vida más facil. Pero más fácil no significa que todo es automatico. Estos frameworks tienden a ser grandes y complejos, y requiren algo de configuración para funcionar bien con la infraestructura que tienes disponible.

En este post vamos a usar Spark como ejemplo, y aunque el objetivo es que todo el ejemplo funcione (¡si no me avisas para corregir!), este no es una Introducción exhaustiva. La documentación de sparklyr es muy buena y completa y si quieres profundizarte en el tema esta el libro de Luraschi, Kuo y Ruiz llamado Mastering Spark with R. Mas bien este blogpost busca hacer resaltar algunos puntos que son fáciles de no ver al trabajar con Spark y R por primera vez, como la importancia de la configuración a travez de settings.

Instalar Spark

Hay varias opciones para trabajar con Spark desde R, incluyendo SparkR un paquete ‘oficial’ que es parte del proyecto Apache. Pero recomendamos arrancar de una vez con sparklyr. Un buen punto de partida es la documentación de sparklyr de RStudio.

Para poder trabajar con Spark necesitas Java 8 en tu computador (no va a funcionar con una versión mas reciente de Java). La forma más fácil de instalar esta versión es usando los instaladores de Azul que ofrece una versión para cada sistema diferente.

Es posible, si ya tenias Java andando con R antes, que tienes que cambiar tu JAVAHOME. Es un tema recurrente en foros, si no encuentras como hacerlo abre un tiquete.

Con Java 8 instalado puedes dar el siguiente paso.

library(sparklyr)
spark_install()

Al instalar localmente por defecto vas a usar la opción de instalarlo sobre Hadoop (en varias versiones). Si por alguna razón tienes que cambiar la versión de Spark puedes hacerlo (comúnmente par poder seguir ejemplos que encuentras en algún lado).

spark_available_versions(show_hadoop = TRUE)

Una primera conexión

Si en RStudio vas a la pantallita de conexiones (Conextions) y tienes la libreria sparklyr cargado vas ver lo siguiente.

Pantallazo de la conexión con Spark

Nota que si lo corres (te lo pongo aqui abajo para que puedas copiar y pegar) no te va a funcionar!

library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local", config = conf)

Si como yo estas acostumbrado que lo que propone la interfaz de RStudio corre sin tener que pensarlo mas quizas te asombra. Pero creo que la idea de los autores es exactamente eso: forzarte a pensar un momento en tu configuración. Porque hay mas de un ejemplo disponible que te pide hacer la configuración asi.

library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local")

Ahora si! Eso funciona de una vez (si lograste instalar Spark siguiendo los pasos arriba). Pero algo tan simple como copiar datos se va a demorar mucho! Toma este codigo para ver cuanto demaro en tu computador. Despues vamos a ver como hacerlo más rapido!

library(nycflights13)

system.time({
   vuelos <- copy_to(sc, flights)
})

Usa Settings!

Por defecto, Spark en tu computador local va usar solamente un core en tu computador. Y la diferencia entre un solo core y 10 cores en mi laptop es entre mas de 15 minutos y segundos para el dataset que usamos aqui abajo. Aqui para el el ejemplo usamos un conjunto de datos de 278MB para hacer notar la diferencia de los tiempo que necesitamos para hacer las cosas. Con datasets mas pequeños hay comportamiento que no se observa.

El dataset se usó en uno de los retos de Kaggle y proviene de la universidad de Bruxelas. Es conocido como dataset de ejemplo. Para los ejemplos que vienen permite un poco de tiempo, en parte es para mostrar como optimizar tu instancia, pero si te desesperas esperando, saltate estas primera parate para ver como usar todos los recursos de tu computador como los tienes disponible.

Intenta esto (y si quieres primero bajas los datos y despues los cargas con read_csv para no tener que bajar clos 278MB cada vez que corres la linea).

data <- read_csv("https://packages.revolutionanalytics.com/datasets/ccFraud.csv")

sc <- spark_connect(master = "local")

system.time({
  sc_data <- copy_to(sc, data)
})

Y ahora con el poder de tu maquina. Ojo que quizas no tienes 10 cores o tiene menos de 32GB de memoria, cambia la configuracion para reflejar lo que tienes disponible en tu maquina).

conf <- list()
conf$`sparklyr.cores.local` <- 10
conf$`sparklyr.shell.driver-memory` <- "28G"
conf$spark.memory.fraction <- 0.9

sc <- spark_connect(master = "local",
                    config = conf)

system.time({
  sc_data <- copy_to(sc, data)
})

Si haces ambos, vas a ver que la diferencia entre los dos en terminos de tiempo es enorme! Y esta configuración impacta todo calculo que haces sobre tu instancia de Spark local, y si no lo configuras todo va durar mucho tiempo.

Usando dplyr

Con dplyr se puede cambiar el back-end con el cual trabaja. En nuestro caso queremos que funcione con Spark. Pero es facil de olvidar que dplyr es lazy. No ejecuta lo que le estamos pidiendo hasta que pedimos resultado. Asi que esto puede parecer muy muy rapido (¿gracias a Spark?):

system.time({
res_spark <- sc_data %>% 
  group_by(state) %>% 
  summarise(loco_1 = mean(fraudRisk * balance * log(creditLine), na.rm = TRUE),
            loco_2 = mean(balance / numIntlTrans, na.rm = TRUE),
            balance = sum(balance, na.rm = TRUE))
})

Mira que increible 0.001 segundos! Pero es no te da datos para trabajar. Al igular

system.time({
   collect(res_spark)
})
system.time({
res_spark <- sc_data %>% 
  group_by(state) %>% 
  summarise(loco_1 = mean(fraudRisk * balance * log(creditLine), na.rm = TRUE),
            loco_2 = mean(balance / numIntlTrans, na.rm = TRUE),
            balance = sum(balance, na.rm = TRUE)) %>%
  collect()
})

system.time({
res_mem <- data %>% 
  group_by(state) %>% 
  summarise(loco_1 = mean(fraudRisk * balance * log(creditLine), na.rm = TRUE),
            loco_2 = mean(balance / numIntlTrans, na.rm = TRUE),
            balance = sum(balance, na.rm = TRUE))
})

Este blog lo mantiene el equipo de ixpantia y la comunidad de gente interesada en datos de la cual estamos contentos de formar parte ¿Tienes una idea para publicar algo aquí? ¡Escríbenos! Estamos siempre interesados en material e ideas nuevas. © 2019-2022 ixpantia