統計コンサルの議事メモ

統計や機械学習の話題を中心に、思うがままに

SparklyrによるApache Sparkのインストールとロジスティック回帰の実行

{sparklyr}というパッケージを使うことでWindowsであってもApache Sparkのインストールが簡単にできる。また{sparklyr}にはSpark MLlibの機械学習用の関数がラップされており、それを使ってみた結果を記しておく。

基本的にはRstudioの{sparklyr}の紹介ページをなぞってみただけだが、せっかくなので前回の記事で紹介した{MicrosoftML}のrxGLMと比較してみた。

http://spark.rstudio.com/

なお当然ながら環境はWIndows

まずは{sparklyr}のインストール。普通にCRANからインストールできる。

install.packages("sparklyr")
library(sparklyr)

その他、必要なライブラリも合わせて読み込んでおく。

library(dplyr)
library(readr)
library(MicrosoftML)
library(RevoScaleR)

spark_installという関数でSparkがあっさりとインストールできる。なお私の環境(AWSに立ち上げたばかりのWindows Server)ではC++ SP1とJAVAが入っていなかったため以下のようなエラーがでた。

> spark_install(version = "1.6.2")
Error: Running Spark on Windows requires the Microsoft Visual C++ 2010 SP1 Redistributable Package. Please download and install from: 

  https://www.microsoft.com/download/en/details.aspx?id=13523

Restart your rsession after installation completes

もしくは

Error in shell_connection(master = master, spark_home = spark_home, app_name = app_name,  : 
  Java is required to connect to Spark. Please download and install Java from https://www.java.com/en/

エラーメッセージにしたがってC++ SP1とJAVAをそれぞれインストールする。インストール後はRのリスタートだけでなくWindowsの再起動をかけた方が良さそう。

なお{sparklyr}はまだまだ開発途上なので、なるべく最新版を用いた方が変なエラーに悩まされなくてすむようだ。以下のように{sparklyr}を最新版に更新する。なぜか{shiny}とRtoolsが一緒にインストールされた。

devtools::install_github("rstudio/sparklyr")


以上で準備が整ったので、Sparkに接続する。なお私はSparkに触ったことがないため、以降のコマンドは正直よくわからない。

sc <- spark_connect(master = "local")

copy_toによりRのオブジェクト(主にdata.frameが対象か?)をSparkに入れることができる。

install.packages(c("nycflights13", "Lahman"))
iris_tbl    <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")

Sparkに入れたテーブルはRのdata.frameと同様に、以下のように{dplyr}の関数が適用できる。最後のcollectは{dplyr}の関数でSpark上のテーブルをRに読み込むのだが、この辺りは{dplyr}のLazinessに関係しており、以下を参照すると良い。

Manipulating Data with dplyr

delay <- flights_tbl %>% 
   group_by(tailnum) %>%
   summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>%
   filter(count > 20, dist < 2000, !is.na(delay)) %>%
   collect

ローカルのCSVなどのファイルを読み込む時には、以下のようにspark_read_csvを使う。前回同様、SUSYをサンプルデータとして使用したが、ロードにかかった時間は4分ほどだった。

system.time(
   datTmp <- spark_read_csv(sc, "SUSY", 
                            path = "../MicrosoftML/Data/SUSY.csv",
                            header = FALSE)
)


Sparkに入れるのと、Rに直接読み込むのでは比較しても仕方ないかもしれないが、参考用に{readr}のread_csvで同様にSUSYを読み込んでみた。その結果こちらは10分程度かかった。
なお本来なら上記のようにcollectすれば良いのだが、RStudioがクラッシュしてしまったので仕方なくread_csvしている。もしかしたらas.data.frameの方が早いかもしれない。

system.time(
   datTmpDF <- read_csv("../MicrosoftML/Data/SUSY.csv",
                        col_names = FALSE)
)
   ユーザ   システム       経過  
     56.14      32.89     699.00

読み込んだデータを使い、ロジスティック回帰を実行する。本来ならGLMで比較するべきなのだが、ml_generalized_linear_regressionがSpark2.0以降しか使えないため、やむなくロジスティック回帰を用いている。

system.time(
   spark_logit <- datTmp %>%
      ml_logistic_regression(response = "V1",
                             features = c("V2", "V3", "V4", "V5"))
)
   ユーザ   システム       経過  
      0.07       0.01     270.53 

結果は4分半で終了した。500万件の分析であることを考えると十分な速度と思われる。

system.time(
   res_rxGLM  <- rxGlm(X1 ~ X2 + X3 + X4 + X5, 
                       family = binomial("logit"), 
                       data = datTmpDF)
)
Rows Read: 5000000, Total Rows Processed: 5000000, Total Chunk Time: 1.271 seconds 

Starting values (iteration 1) time: 1.456 secs.
Rows Read: 5000000, Total Rows Processed: 5000000, Total Chunk Time: 1.180 seconds 

Iteration 2 time: 1.229 secs.
Rows Read: 5000000, Total Rows Processed: 5000000, Total Chunk Time: 1.556 seconds 

Iteration 3 time: 1.598 secs.
Rows Read: 5000000, Total Rows Processed: 5000000, Total Chunk Time: 1.229 seconds 

Iteration 4 time: 1.272 secs.
Rows Read: 5000000, Total Rows Processed: 5000000, Total Chunk Time: 1.291 seconds 

Iteration 5 time: 1.331 secs.
Rows Read: 5000000, Total Rows Processed: 5000000, Total Chunk Time: 1.263 seconds 

Iteration 6 time: 1.304 secs.

Elapsed computation time: 8.572 secs.
   ユーザ   システム       経過  
      0.89       1.02      27.23 

一方、rxGlmは27秒と、相変わらず凄まじい速度で分析が実行できる。
それぞれの結果を、回帰係数で比較してみるとほぼほぼ同じ結果となった。

> cbind(spark_logit$coefficients,
        res_rxGLM$coefficients)

                     [,1]          [,2]
(Intercept) -1.7100304090 -1.7100301776
V2           2.6339536980  2.6339542674
V3          -0.0004056923 -0.0004060596
V4          -0.0009712714 -0.0009717918
V5          -0.9571219696 -0.9571225996


比較は以上である。前回の記事で書いた通りMRCを使えばかなりの規模のデータに対してRからモデリングを実行できるため、現時点でSparkあるいは{sparklyr}にはそれほど興味は引かれない。しかしRに読み込めないような規模のデータを、Sparkに格納しつつml_generalized_linear_regressionで分析が可能なのであれば、データの規模に応じて分析環境を変えることで分析の幅が広がると感じた。よって次の検証内容としては{sparklyr}を用いてのデータサイズの限界を探るようなものが必要となりそうだ。

なおSparkはH2Oに接続することでH2Oの機械学習用のライブラリが使え、これをH2O Sparkling Waterと言うが、Rでも{sparkling}というパッケージで対応しているのでそこも調査する必要がありそうだ。

最後に、{sparklyr}のチートシートを発見したのでついでにリンクを貼っておく。

http://spark.rstudio.com/images/sparklyr-cheatsheet.pdf