Predicting large text data with spark via the R package sparklyr

1 Abstract

Unlike the classical programming languages that are very slow and even sometimes fail to load very large data sets since they use only a single core, Apache Spark is known as the fastest distributed system that can handle with ease large datasets by deploying all the available machines and cores to build cluster, so that the computing time of each task performed on the data will be drastically reduced since each worker node in the cluster takes in charge small part of the task in question. Even that the native language of spark is scala (but it can also support java and sql), the good news for R users is that they can benefit from spark without having to learn the above supported languages by making use of the R package sparklyr. In this article we trained random forest model using text data which is in practice known as large data set. for illustration purposes and to make things faster however we used a small data set about email messages and also constrained ourselves to use the local mode in which spark created a cluster from the available cores in my machine. Notice that the same codes in this paper can be used in the cloud whatever the size of our data, even with billions of data points, except for the connection method to spark which is slightly different. Since the raw data requires some transformation to be consumed by the model, we applied the well-known method called tokenization to create the model features, then trained and evaluated a random forest model applied on the design matrix after having been filled using the TF method. Lastly, we trained the same model (random forest model with the same hyperparameter values) using another method called TF-IDF method (Sparck , 1972).

2 Keywords

Large dataset, R, spark, sparklyr, cluster, tokenization, TF, TF-IDF, random forest model, machine learning.

3 Introduction

R is one of the best programming languages for statistical analysis, and provides data scientist by super powerful tools that make their work super easy and more exciting. However, since the amount of information today is growing exponentially, R and all the classical languages (python, java,…etc.) that use one single machine (one single core node) would face a great challenges to handle and deal with large dataset that, in some cases, its size can even exceed the memory size. As a solution to the above classical programming language limitations, spark and hadoop are two new systems. Both use a computing distributed system that run multiple tasks using multiple machines (called nodes, and together called cluster) at the same time. However, spark has the superiority over hadoop by its ability to load the data in memory which makes it much higher faster (Luraschi, 2014). Spark creates a cluster using either physical machines or virtual machines provided by some cloud provider such as google, amazon, microsoft…etc (it can also creat a cluster using the available cores in a single machine known as local mode). Its native language is scala, but also can support sql and java. Thankfully, spark provides a high level APIs in python and R so that the R users can use spark as a platform to work with large datasets using their familiar codes and without having to learn scala, sql or java. However, the connection between R and spark is not straightforward, it is set by the help of sparklyr package, which is like any other R packages, with its own functions and supports almost all the famous dplyr R package functions. Usually, most of text data are considered as large datasets, either due to their large sizes or the large computing time required for their manipulations or modulizations. That is why, in this paper, we will train Random forest model using sparklyr to predict whether a text message is spam or ham from the data set SMSSpamCollection uploaded from kaggle website. To convert the character features to numeric type we will use two famous methods , TF transformation, and TF-IDF (Jones, 1972) transformation. This article will be divided into the following sections:

  • Data Preparation: we will illustrate how do we read, clean, and prepare the data to be consumed by the model.
  • TF Method: we will train a random forest model (James et al, 2013) on the term frequency TF features.
  • TF-IDF method: We will train the random forest model on the TF_IDF features.
  • Add features: we will create another feature from the data to be used as a new predictor.

4 Data preparation

First, we call the R packages tidyverse and sparklyr, and we set up the connection to spark using the following R codes.

suppressPackageStartupMessages(library(sparklyr))
suppressPackageStartupMessages(library(tidyverse))
sc<-spark_connect(master = "local")

Second, we call the data that has been uploaded and saved in my R directory (notice that the data does not have column headers), and we use the glimpse function to get a first glance.

path <- "C://Users/dell/Documents/SMSSpamCollection.txt"
mydata<-spark_read_csv(sc,name="SMS",path=path, header=FALSE,delimiter = "\t",overwrite = TRUE)
knitr::kable(head(mydata,3))
V1 V2
ham Go until jurong point, crazy.. Available only in bugis n great world la e buffet… Cine there got amore wat…
ham Ok lar… Joking wif u oni…
spam Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C’s apply 08452810075over18’s

It will be more practical if we replace the default column names V1 and V2 by Labels and messages respectively.

names(mydata)<-c("labels","messages")

we can get the dimension of this data by using the function sdf_dim

sdf_dim(mydata)
[1] 5574    2

We can also take a look at some messages by displaying the first three rows.

select(mydata,messages)%>%
  head(3) %>% 
  knitr::kable("html")
messages
Go until jurong point, crazy.. Available only in bugis n great world la e buffet… Cine there got amore wat…
Ok lar… Joking wif u oni…
Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C’s apply 08452810075over18’s

Modeling text data requires special attention since most of the machine learning algorithms require numeric data, so how do we can transform the text entries in messages into numeric type?. The most well known approach is called tokenization, this simply means splitting each text in the column messages into small pieces called tokens (also called bag of words) in a way such that each token has meaningful effect to discriminating between the dependent variable labels. For example, if we think that arbitrary numbers or some symbols like / or dots…etc. do not have any discriminating impact then we can remove them from the entries. Each row in this data (which is labeled as ham or spam ) is considered as document ( 5574 documents in our case) that has a text (which is a collection of tokens), and the whole data after tokenization (as a rectangular matrix) is called corpus. To keep things simple let’s suppose that everything except the words are useless for predicting the labels, so we can use the function spark sql function regexp_replace to remove everything except letters, then we rename the resulted column cleaned.

newdata<-mydata%>%
  mutate(cleaned=regexp_replace(messages,"[^a-zA-Z]"," "))%>%
  mutate(cleaned=lower(cleaned))%>%
  select(labels,cleaned)
newdata%>%
  select(cleaned)%>%
  head(3)%>%
  knitr::kable()
cleaned
go until jurong point crazy available only in bugis n great world la e buffet cine there got amore wat
ok lar joking wif u oni
free entry in a wkly comp to win fa cup final tkts st may text fa to to receive entry question std txt rate t c s apply over s

At this stage and before going ahead we should split the data between training set and testing set. However, since we have an imbalanced data with roughly 87% of ham’s and 13% of spam’s, we should preserve the proportion of the labels by splitting the data in a such way to get stratified samples.

newdata%>%
  group_by(labels)%>%
  count()%>%
  collect()%>%
  mutate(prop=n/sum(n))%>%
  knitr::kable()
labels n prop
ham 4827 0.8659849
spam 747 0.1340151

To accomplish this task by hand, first we filter the data between ham and spam, then each set will be split randomly between training set and testing set, and next we rbind together the training sets in one set and then we do the same thing for testing sets.

dataham<-newdata%>%
  filter(labels=="ham")
dataspam<-newdata%>%
  filter(labels=="spam")
partitionham<-dataham%>%
  sdf_random_split(training=0.8,test=0.2,seed = 111)
partitionspam<-dataspam%>%
  sdf_random_split(training=0.8,test=0.2,seed = 111)

train<-sdf_bind_rows(partitionham$training,partitionspam$training)%>%
  compute("train")
test<-sdf_bind_rows(partitionham$test,partitionspam$test)%>%
  compute("test")

5 TF model

Since machine learning models require inputs as numeric data, the common practice in text analysis thus is to convert each single text into tokens (or pieces) so that these tokens will be the features that can be used to discriminate between class labels, In our case, they are a simple words. Using the TF method, if a particular word exists in a particular document we assign the number of frequency of this word (or just 1 if we do not care about the frequency) in the corresponding cell in the design matrix (which is called Document Term Matrix DTM), otherwise we assign zero. this method will give us a very large and sparse rectangular matrix with huge number of features compared to the number of documents, that is why spark can help to handle this type of data. Due to its popularity, we will fit random forest model, which known as one of the most powerful machine learning models, to the transformed data. to be brief We will make use of the spark feature pipline that helps us to group all the following required steps to enable running the model:

  • convert the dependent variable labels to integer type.
  • tokenize the cleaned messages into words (tokens).
  • remove stop words from the tokens since they tend to spread out randomly among documents.
  • replace each term in each document by its frequency number.
  • define the model that will be used (here random forest model).

At the final step we use ml_random_forest function and we keep all the default values, for example, 20 for number of trees, 5 for the max depth, and gini as the impurity function, and do not forget to set the seed to get the result reproducible. lastly we call the ml_fit function to fit the model.

pipline<-ml_pipeline(sc)%>%
  ft_string_indexer(input_col = "labels",output_col="class")%>%
  ft_tokenizer(input_col = "cleaned", output_col="words")%>%
  ft_stop_words_remover(input_col = "words",output_col = "cleaned_words")%>%
  ft_count_vectorizer(input_col = "cleaned_words", output_col="terms",
                      min_df=5,binary=TRUE)%>%
  ft_vector_assembler(input_cols = "terms",output_col="features")%>%
  ml_random_forest_classifier(label_col="class",
                 features_col="features",
                 seed=222)
model_rf<-ml_fit(pipline,train)

To evaluate our model we use the ml_transfrom function.

ml_transform(model_rf,train)%>%
  ml_binary_classification_evaluator(label_col = "class",
                                     metric_name= "areaUnderROC")
[1] 0.9693865

Notice that in binary classification model sparklyr provides only two metrics areaUnderROC and areaUnderPR (Murphy, 2012). Using the former metric we get high score which is about 0.97. This rate is ranged between 0 and 1, The higher the rate the best the model. However, since this rate is resulted from the training data, it might be the result of an overfitting (Lantz, 2016) problem, that is why the more reliable one is that that resulted from the testing set, , which is now 0.976.

ml_transform(model_rf,test)%>%
  ml_binary_classification_evaluator(label_col = "class",
                                     metric_name= "areaUnderROC")
[1] 0.9653819

Fortunately The two rate values are very close to each other indicating the good generalization of our model.
To get the prediction we use the ml_predict function .

pred<-ml_predict(model_rf,test)

As we see some columns are nested. This is not problem since you can extract the elements of this list using the function unlist. For instance, we can show the most used words in each class label using the package wordcloud

p1<-pred%>%
  filter(labels=="ham")%>%
  pull(cleaned_words)%>%
  unlist()
wordcloud::wordcloud(p1,max.words = 50, random.order = FALSE,
                     colors=c("blue","red","green","yellow"),random.color = TRUE)

p2<-pred%>%
  filter(labels=="spam")%>%
  pull(cleaned_words)%>%
  unlist()
wordcloud::wordcloud(p2,max.words = 50,random.order = FALSE, 
                     colors=c("blue","red","green","yellow"),random.color = TRUE)  

From the upper figure we see that the most common words in hm’s are: get, good, know, whereas the lower figure shows the most ones for spam’s, which are: call, free, mobile. This means that if we receive a new email message that has the word free for instance , it will be more probable to be spam.

6 TF-IDF model

The main drawback of TF method is that it does not take into account the distribution of each term across the documents that reflects how much information each term provides. To measure the information of each term we compute its DF document frequency value which is the number of documents d where the term t appears, and hence the inverse document frequency IDF value for each pair (d,t) will be computed as follows:

\[idf(t,d)=log(\frac{N}{1+|d\epsilon D,t\epsilon d|})\]

Where N is the total number of documents (number of rows). By multiplying TF with IDF we get TF-IDF value for each term. In the above TF pipline we include the function ft_idf , then we fit again random forest model on the transformed data, and we evaluate the model directly by using the test data.

pipline2<-ml_pipeline(sc)%>%
  ft_string_indexer(input_col = "labels",output_col="class")%>%
  ft_tokenizer(input_col = "cleaned", output_col="words")%>%
  ft_stop_words_remover(input_col = "words",output_col = "cleaned_words")%>%
  ft_count_vectorizer(input_col = "cleaned_words", output_col="tf_terms")%>%
  ft_idf(input_col = "tf_terms", output_col="tfidf_terms")%>%
    ml_random_forest_classifier(label_col="class",
                 features_col="tfidf_terms",
                 seed=222)

model_rf.tfidf <- ml_fit(pipline2, train)

ml_transform(model_rf.tfidf,test)%>%
  ml_binary_classification_evaluator(label_col = "class",
                                     metric_name= "areaUnderROC")
## [1] 0.953212

Using this more complex model than the previous one is not justified for this data since their rates are close to each other.

7 Add new features

Customizing new features from the data that we think they are more relevant than the old ones is a popular strategy used to improve prediction quality. For example, with our data we think that spam messages tend to be shorter than ham messages, we can, thus, add the messages’ lengths as new features.

train1 <- train %>% mutate(lengths=nchar(cleaned))
test1 <- test %>% mutate(lengths=nchar(cleaned))

Now let’s retrain the above models again with this new added feature.

7.1 TF model

pipline_tf<-ml_pipeline(sc)%>%
  ft_string_indexer(input_col = "labels",output_col="class")%>%
  ft_tokenizer(input_col = "cleaned", output_col="words")%>%
  ft_stop_words_remover(input_col = "words",output_col = "cleaned_words")%>%
  ft_count_vectorizer(input_col = "cleaned_words", output_col="terms",
                      min_df=5,binary=TRUE)%>%
  ft_vector_assembler(input_cols = c("terms","lengths"),output_col="features")%>%
  ml_random_forest_classifier(label_col="class",
                 features_col="features",
                 seed=222)

model_rf_new<-ml_fit(pipline_tf,train1)
ml_transform(model_rf_new,test1)%>%
  ml_binary_classification_evaluator(label_col = "class",
                                     metric_name= "areaUnderROC")
## [1] 0.9849365

Fortunately, our expectation about this new feature is confirmed since we have got a significant improvement compared to the previous results.

7.2 tf_idf model

pipline_tfidf<-ml_pipeline(sc)%>%
  ft_string_indexer(input_col = "labels",output_col="class")%>%
  ft_tokenizer(input_col = "cleaned", output_col="words")%>%
  ft_stop_words_remover(input_col = "words",output_col = "cleaned_words")%>%
  ft_count_vectorizer(input_col = "cleaned_words", output_col="tf_terms")%>%
  ft_idf(input_col = "tf_terms", output_col="tfidf_terms")%>%
  ft_vector_assembler(input_cols = c("tfidf_terms","lengths"),output_col="features")%>%
    ml_random_forest_classifier(label_col="class",
                 features_col="features",
                 seed=222)

model_rf_new2 <- ml_fit(pipline_tfidf, train1)

ml_transform(model_rf_new2,test1)%>%
  ml_binary_classification_evaluator(label_col = "class",
                                     metric_name= "areaUnderROC")
## [1] 0.9857918

Again, as we said before, the use of idf method is not justified, and it would be better to stay with the tf method.

8 n-gram model

In contrast to the function ft_tokenizer that splits the text into tokens where each token has a single word, each token resulted from the sparklyr function ft_ngram has n words respecting the same appearance order as in the original text. To well understand let’s take the following example.

data <- copy_to(sc, data.frame(x="I like both R and python"), overwrite = TRUE)
data
## # Source: spark<?> [?? x 1]
##   x                       
##   <chr>                   
## 1 I like both R and python

the ft_tokenizer function gives the following tokens:

ft_tokenizer(data, "x", "y") %>% 
  mutate(y1=explode(y)) %>% select(y1)
## # Source: spark<?> [?? x 1]
##   y1    
##   <chr> 
## 1 i     
## 2 like  
## 3 both  
## 4 r     
## 5 and   
## 6 python

Whereas, with ft_ngram, where \(n=2\) we get the following tokens

data  %>%  ft_tokenizer("x", "y") %>% 
  ft_ngram("y", "y1", n=2) %>%
  mutate(z=explode(y1)) %>% 
  select(z)
## # Source: spark<?> [?? x 1]
##   z         
##   <chr>     
## 1 i like    
## 2 like both 
## 3 both r    
## 4 r and     
## 5 and python

Now let’s train 2_gram Random forest model.

pipline_2gram<-ml_pipeline(sc)%>%
  ft_string_indexer(input_col = "labels",output_col="class")%>%
  ft_tokenizer(input_col = "cleaned", output_col="words")%>%
  ft_stop_words_remover(input_col = "words",output_col = "cleaned_words")%>%
  ft_ngram(input_col = "cleaned_words", output_col="ngram_words", n=2) %>% 
  ft_count_vectorizer(input_col = "ngram_words", output_col="tf_terms")%>%
  ft_vector_assembler(input_cols = c("tf_terms","lengths"),output_col="features")%>%
  ml_random_forest_classifier(label_col="class",
                 features_col="features",
                 seed=222)

model_rf_2gram <- ml_fit(pipline_2gram, train1)

ml_transform(model_rf_2gram,test1)%>%
  ml_binary_classification_evaluator(label_col = "class",
                                     metric_name= "areaUnderROC")
## [1] 0.8835537

You should know that this function takes only tokens with tow words exactly, not tokens with less or equal 2 words. That is why we have obtained a lower rate than the previous models.

When you are satisfied by your final model, you can save it for further use as follows.

#ml_save(model_rf_ngram,"spark_ngram",overwrite = TRUE)

The last thing to mention, is when you finish your work do not forget to free your resources by disconnecting from spark as follows

spark_disconnect(sc)

9 Conclusion:

This article is a brief introduction to illustrate how easy to handle and model large data set with the combination of the two powerful languages R and spark. we have used a text data set since this type of data that characterizes the most large datasets encountered in the real world.

10 References

11 Session information

sessionInfo()
## R version 4.0.1 (2020-06-06)
## Platform: x86_64-w64-mingw32/x64 (64-bit)
## Running under: Windows 10 x64 (build 19041)
## 
## Matrix products: default
## 
## locale:
## [1] LC_COLLATE=English_United States.1252 
## [2] LC_CTYPE=English_United States.1252   
## [3] LC_MONETARY=English_United States.1252
## [4] LC_NUMERIC=C                          
## [5] LC_TIME=English_United States.1252    
## 
## attached base packages:
## [1] stats     graphics  grDevices utils     datasets  methods   base     
## 
## other attached packages:
##  [1] forcats_0.5.0   stringr_1.4.0   dplyr_1.0.2     purrr_0.3.4    
##  [5] readr_1.3.1     tidyr_1.1.2     tibble_3.0.3    ggplot2_3.3.2  
##  [9] tidyverse_1.3.0 sparklyr_1.4.0 
## 
## loaded via a namespace (and not attached):
##  [1] Rcpp_1.0.5         lubridate_1.7.9    forge_0.2.0        utf8_1.1.4        
##  [5] assertthat_0.2.1   rprojroot_1.3-2    digest_0.6.25      slam_0.1-47       
##  [9] R6_2.4.1           cellranger_1.1.0   backports_1.1.10   reprex_0.3.0      
## [13] evaluate_0.14      httr_1.4.2         highr_0.8          blogdown_0.20     
## [17] pillar_1.4.6       rlang_0.4.7        readxl_1.3.1       uuid_0.1-4        
## [21] rstudioapi_0.11    blob_1.2.1         rmarkdown_2.4      config_0.3        
## [25] r2d3_0.2.3         htmlwidgets_1.5.2  munsell_0.5.0      broom_0.7.1       
## [29] compiler_4.0.1     modelr_0.1.8       xfun_0.18          pkgconfig_2.0.3   
## [33] askpass_1.1        base64enc_0.1-3    htmltools_0.5.0    openssl_1.4.3     
## [37] tidyselect_1.1.0   bookdown_0.20      fansi_0.4.1        crayon_1.3.4      
## [41] dbplyr_1.4.4       withr_2.3.0        grid_4.0.1         jsonlite_1.7.1    
## [45] gtable_0.3.0       lifecycle_0.2.0    DBI_1.1.0          magrittr_1.5      
## [49] scales_1.1.1       cli_2.0.2          stringi_1.5.3      fs_1.5.0          
## [53] NLP_0.2-0          xml2_1.3.2         ellipsis_0.3.1     generics_0.0.2    
## [57] vctrs_0.3.4        wordcloud_2.6      RColorBrewer_1.1-2 tools_4.0.1       
## [61] glue_1.4.2         hms_0.5.3          parallel_4.0.1     yaml_2.2.1        
## [65] tm_0.7-7           colorspace_1.4-1   rvest_0.3.6        knitr_1.30        
## [69] haven_2.3.1
Metales Abdelkader
Metales Abdelkader

My research interests include Econometrics, statistics, machine learning, deep learning.

Related

comments powered by Disqus