Spark MLLib. Sentiment Classification
in Big_data / Big Data
Spark MLlib has a variety of different ML algorithms implemented. Due to limited resources, we will restrain ourselves from using sophisticated methods and use mainly the simplest algorithms. The focus of this lab is pipelines, transformations, estimators, and model selection.
Dataset Description
For the task of sentiment classification we are going to use Twitter dataset from Kaggle (Original). This dataset contains 100k examples. The data is stored in CSV format. Data is very irregular and requires preprocessing
ItemID,Sentiment,SentimentText
1,0, is so sad for my APL friend.............
2,0, I missed the New Moon trailer...
3,1, omg its already 7:30 :O
Spark ML Pipelines
Spark ML allows creating custom data processing pipelines. These pipelines are used to create data flow DAG. Do not confuse this dataflow DAG with other DAG’s that you can find in popular machine learning libraries like TensorFlow. In Spark, each stage of the pipeline can be either a custom processing function (classical programming) or an ML stage. ML stages are trained independently from each other. Further, we are going to discuss feature extraction tools available in Spark ML and later we will see how to fit all of them into a pipeline.
We are going to build a data processing pipeline with DataFrames. In this case, each pipeline stage accepts a DataFrame column as input and requires to specify the output column. For example
val stageInstance = new PipelineStage()
.setInputCol("InColumn")
.setOutputCol("OutColumn")
// this is an generic example
Reading the Data
Twitter data is provided in CSV format. Spark comes with a set of tools for data import.
val spark = SparkSession
.builder()
.appName("Some Name")
.getOrCreate()
After you have instantiated the spark session object, you can read CSV with a standard method
val data = spark.read.format("csv")
.option("header", "true")
.load(path)
This method will return a DataFrame. Working with DataFrames in Spark is similar to DataFrames in Python - the data is organized in a tabular form. You can slice a column using array indexing syntax data("ColumnName")
.
Feature Extraction
Sentiment Classification, in most cases, is a task of binary classification. Usually, the goal is to discern negative comments from neutral or positive (or other combinations of these three).
Further, several techniques for feature extraction are described for the case of Spark ML DataFrames API.
Preprocessing
The preprocessing stage involves text normalization. People can write in very different styles, and when it comes to understanding the meaning of a text snippet, it is easier to create a model when this variation is removed. Consider a simple example from Twitter Sentiment dataset.
Juuuuuuuuuuuuuuuuussssst Chillin!!
The meaning and sentiment are easy to understand for a human, but a computer can struggle with this example. What we ideally need to do is to extract the semantic meaning from this text, along with the style. One of the possible normalizations is to lowercase and remove all repetitive characters
just chiling !
When you work with Spark’s DataFrame, this is done by applying map
transformation to one of the columns. This can be done with the help of User Defined Functions. Their primary benefit is the compatibility with Spark SQL syntax. A UDF that removes repetitive characters and converts characters to lower case can look like this
import org.apache.spark.sql.functions.udf
val removeRepetitive = udf{ str: String => str.replaceAll("((.))\\1+","$1").trim.toLowerCase()}
val noRepetitiveData = data.withColumn("Collapsed", removeRepetitive('SentimentText))
The notation 'SentimentText
signifies a specific column with a given name.
Executing the code above will return a new DataFrame with a new column appended.
Alternatively, you can implement your own pipeline stage.
Tokenization
Spark ML provides two pipeline stages for tokenization: Tokenizer and RegexTokenizer. The first one converts the input string to lowercase and then splits it on whitespace characters and the second extracts tokens using the provided regex pattern.
import org.apache.spark.ml.feature.{Tokenizer, RegexTokenizer}
val tokenizer = new RegexTokenizer()
.setInputCol("PreprocessedSentences")
.setOutputCol("Tokens")
.setPattern("\\s+")
val wordsData = tokenizer.transform(sentenceData)
Stop Words Removal
Spark ML also provides a pipeline stage for stop words removal.
TF/IDF Features
Spark ML provides several stages for document vectorization. Two of them are HashingTF and CountVectorizer. Both return a numerical vector that can be treated as input features for a classifier. The difference is that CountVectorizer
enumerates the unique words first, and, therefore, requires calling fit
.
import org.apache.spark.ml.feature.{HashingTF, IDF}
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("tf").setNumFeatures(2000)
This will create a vector representation of texts. This vector stores the term frequencies in the sparse data format (vector_size, positions, values)
.
val featurizedData = hashingTF.transform(wordsData)
// given input data was
// >> data.select("tokens").collect().foreach(println)
// [Array(a, great, day)]
// [Array(a, great, day, day, a)]
//
// The output will be
// >> data.select("features").collect().foreach(println)
// [(2000,[165,467,1768],[1.0,1.0,1.0])]
// [(2000,[165,467,1768],[1.0,2.0,2.0])]
Be aware, if you use CountVectorizer
the size of your vocabulary will grow with the amount of training data. Each new term adds additional parameters to your model. Often a reasonable approach to constrain dimensionality is to use N most frequent tokens. Refer to tokenizer documentation for details.
IDF can be computed as
val idfModel = new IDF().setInputCol("tfFeatures").setOutputCol("tfidfFeatures").fit()
val rescaledData = idfModel.transform(featurizedData)
// The output will be
// >> data.select("tfidfFeatures").collect().foreach(println)
// [(2000,[165,467,1768],[0.0,0.0,0.0])]
// [(2000,[165,467,1768],[0.0,0.0,0.0])]
// Since logarithm is used for computing IDF, if a term appears in all documents, its IDF value becomes 0. That is why the current output is 0.
The formula that is used for computing IDF is given in the documentation. The output of the IDF stage is TF/IDF vector. The resulting features are suitable for use in every classifier built into Spark MLlib.
Optional: Including NGrams
Sometimes, including N-Grams can improve model accuracy. One of the possible architectures is shown in the figure above. Some auxiliary stages can be added, such as stop words removal (although preserving stop words can be beneficial for n-grams, should be decided with cross-validation).
- N-Gram Extraction and Vectorization Once again, Spark ML has neccessary methods in-place. Keep in mind, that the number of possible N-Grams grows very fast. Given the limitations on the available memory and the complexity of the model, you should limit the size of N-Gram vocabulary. Otherwise, your model can easily overfit.
- Feature Assembler Spark ML’s pipeline stage VectorAssembler helps to concatenate several vectors together. Thus, you can concatenate vectorized tokens and vectorized N-Grams, creating a single feature vector that you are going to pass to the classifier.
Optional: Word Embeddings
An alternative to using token-based vectorization, one can resort to word embeddings. Spark ML provides a pipeline stage for converting an array of tokens into a vector using embeddings. Leaving the details of how exactly embeddings work, consider how the pipeline stage works:
- Given the training data (arrays of tokens), the stage trains word embeddings for each token that occurs more than 5 times (default value) in the dataset. The quality of the vectors depends partially on the number of iterations specified for the stage (usually, the more, the better)
- After the vectors are trained, each entry (array of tokens) is combined. Spark ML uses simple summation over word vectors (check the method
transform(dataset: Dataset[_])
). - Now, each entry is represented by a vector of predefined dimensionality (dimensionality is a parameter of the Word2Vec stage. Refer to the documentation for details), and these vectors are passed directly to the classifier.
Since this feature extraction model involves training word vectors, it can take a significantly longer time to train.
Classification Method
Logistic Regression
Spark ML provides many classification methods. Since we discussed how to extract numerical features, it is recommended to resort to methods that can be optimized by gradient descent. The most simple model is LogisticRegression. The documentation has a thorough example of the usage. It can be also applied for the multi-class case.
import org.apache.spark.ml.classification.{DecisionTreeClassificationModel, DecisionTreeClassifier, LogisticRegression}
val lr = new LogisticRegression()
.setFamily("multinomial")
.setFeaturesCol("tfidfFeatures")
.setLabelCol("Sentiment")
val lrModel = lr.fit(training)
Always evaluate your model after training. Feed some data to the model and look at the output. For LR, the output is the probabilities
and prediction
columns of the DataFrame. If the probabilities
are the same or almost the same for different examples — your model did not learn. The stopping criteria for training the model is reaching the maximum number of iterations or the convergence threshold (the method setTol
).
Optional: Multilayer Perceptron
It is also possible to create a stage with a Neural Network. You can specify the number of layers, layer sizes, learning rate, and convergence tolerance.
Training
Assembling Pipeline
So far we have tried to apply different transformations interactively. This included calling methods fit()
and transform()
. Note that we do not need to do this for every stage when we are working with pipelines. Simply, create new pipeline stages (using the operator new
) and assemble the pipeline. Later, call fit()
and transform()
for the entire pipeline.
Assembling a pipeline in Spark is as easy as creating a single object instance
import org.apache.spark.ml.Pipeline
// pipeline example
val pipe = new Pipeline()
.setStages(Array(
tokenizer, // split original sentence on whitespaces
// this first stage will capture punctuation and emoji as well as words
wordTokenizer, // tokenize original sentence in word tokens
// this second tokenized will be used to create ngrams
// we want to reduce the number of ngrams, and therefore filter all non word charactes
ngram, // create ngrams from word tokens
tokenVectorizer, // map token into features
ngramVectorizer, // map ngrams into features
assembler, // concatenate feature vectos
classifier // add classification stage
))
Then you can train all trainable stages with a single call
val model = pipe.fit(twitterData)
where TwitterData
is the input to your pipeline.
Distributed Hyperparameter Search
In many situations, the data in your pipeline can be processed with the help of only a single computer. transform
method runs locally in every worker, and the model sizes should fit one single worker’s memory. This way, Spark helps us to process the large quantities of data in a distributed way, but it does not allow us to create distributed machine learning models.
Before you saw that you can specify some of the parameters for the stage during instantiation. Alternatively, you can create a parameter map and pass it to the pipeline’s fit
method.
import org.apache.spark.ml.param.ParamMap
val paramMap = new ParamMap()
.put(tokenVectorizer.vocabSize, 10000)
.put(ngramVectorizer.vocabSize, 10000)
.put(classifier.tol, 1e-20)
.put(classifier.maxIter, 100)
val model = pipe.fit(twitterData, paramMap)
this way you can keep all parameters in a single object, and set them from a configuration file in a convenient way.
Another benefit of Spark is that it allows utilizing the cluster resources to perform an extensive hyperparameter search. Spark allows the training of several models simultaneously. For this, you need to create a parameter grid map
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
val paramGrid = new ParamGridBuilder()
.addGrid(tokenVectorizer.vocabSize, Array(10000, 20000))
.addGrid(gramVectorizer.vocabSize, Array(10000, 15000))
.addGrid(lr.tol, Array(1e-20, 1e-10, 1e-5))
.addGrid(lr.maxIter, Array(100, 200, 300))
.build()
Cross Validation
Similarly, you can parallelize model training in a k-fold cross validation prosedure.
val cv = new CrossValidator()
.setEstimator(pipe)
.setEvaluator(new BinaryClassificationEvaluator()
.setRawPredictionCol("prediction")
.setLabelCol("Sentiment"))
.setEstimatorParamMaps(paramGrid)
.setNumFolds(5) // Use 3+ in practice
.setParallelism(2)
This class will train several models as specified by the parameter grid map and select the best set of parameters based on the selected Evaluator. After you instantiated CrossValidator, you use it to fit the model
val model = cv.fit(twitterData)
Applying Model
You can use trained model to process new data
val result = model.transform(twitterTestData)
Configuring to Run on Cluster
After you are done developing the project and testing it on a small data subset, it is time to launch a full-scale training. Compile your applications, copy your jar to the cluster (use one of your team accounts from the previous assignment). Run spark-submit
. Set additional parameters if necessary
spark-submit \
--master yarn \
--deploy-mode client \
--driver-memory 500m \
--executor-memory 500m \
--executor-cores 1 \
my-app.jar arg1 arg2
You can deploy your JAR to the cluster with the following command
spark-submit --master yarn --deploy-mode client path/to/jar hdfs://twitter/
You can check the status at http://namenode:8088
Self-Check Questions
- Can Spark MLlib process images?
- What are the limitations of Spark MLlib