Introduction to Logistic Regression in PySpark

Introduction
Big Data. Large datasets. Cloud…
Those words are everywhere, following us around and in the thoughts of clients, interviewers, managers and directors. As data gets more and more abundant, datasets only increase in size in a manner that, sometimes, it is not possible to run a machine learning model in a local environment – in a single machine, in other words.
This matter requires us to adapt and find other solutions, such as modeling with Spark, which is one of the most used technologies for Big Data. Spark accepts languages such as SQL, Python, Scala, R and it has its own methods and attributes, including its own Machine Learning library [MLlib]. When you work with Python in Spark, it is called PySpark, for example.
Furthermore, there's a platform called Databricks that wraps Spark in a very well created layer that enables data scientists to work on it just like Anaconda.
When we're creating a ML model in Databricks, it also accepts Scikit Learn models, but since we're more interested in Big Data, this tutorial is all created using Spark's MLlib, which is more suited for large datasets and also this way we add a new tool to our skill set.
Let's go.
Dataset
The dataset for this exercise is already inside Databricks. It's one of the UCI datasets, Adults, that is an extract from a Census and labeled with individuals that earn less or more than $50k per year. The data is publicly available under Creative Commons License in this address: https://archive.ics.uci.edu/dataset/2/adult
Our tutorial is to build a binary classifier that tells whether a person makes less or more than $50k of income in a year.

Coding
In this section, let's go over each step of our model.
Here are the modules we need to import.
from pyspark.sql.functions import col
from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
And loading the dataset.
# Pulling a dataset
from pyspark.sql.types import DoubleType, StringType, StructField, StructType
schema = StructType([
StructField("age", DoubleType(), False),
StructField("workclass", StringType(), False),
StructField("fnlwgt", DoubleType(), False),
StructField("education", StringType(), False),
StructField("education_num", DoubleType(), False),
StructField("marital_status", StringType(), False),
StructField("occupation", StringType(), False),
StructField("relationship", StringType(), False),
StructField("race", StringType(), False),
StructField("sex", StringType(), False),
StructField("capital_gain", DoubleType(), False),
StructField("capital_loss", DoubleType(), False),
StructField("hours_per_week", DoubleType(), False),
StructField("native_country", StringType(), False),
StructField("income", StringType(), False)
])
adults = spark.read.format("csv").schema(schema).load("/databricks-datasets/adult/adult.data")
Preparing the Data
First, we need to prepare the data. Although this is not the core subject of this tutorial, we still need to get rid of null values, for example, so that's what we will do.
It is known from the dataset documentation that the NA values are actually marked as " ?". We will just go ahead and drop them because there aren't too many null values, so it won't affect our model's performance. Therefore, here's how to get rid of them. Any of these columns where we find a "?", we drop it.
adults = (
adults
.filter( (col('workclass') != ' ?') &
(col('education') != ' ?') &
(col('marital_status') != ' ?') &
(col('occupation') != ' ?') &
(col('relationship') != ' ?') &
(col('race') != ' ?') &
(col('sex') != ' ?') &
(col('native_country') != ' ?')
)
)
Nice. Now let's choose the best variables to use in our model.
Feature Engineering
Since we have both categorical and numerical variables, the first step is to split them in separate datasets to be able to use the UnivariateFeatureSelector
from Spark. This method uses Chi² test to select the best categorical variables with categorical labels and ANOVA to select numerical variables with categorical label, which are both of our cases.
Let's first select the best categorical vars. We start by isolating the categorical variables. Then we use RFormula
function to create a vectorized version of our dataset
# Adults cat only
adults_cat = adults.select('workclass', 'education', 'marital_status', 'occupation', 'relationship','race', 'sex','native_country', 'income')
# Creating a vector out of our dataset to be able to use Univariate Feature Selector
formula=RFormula(formula= "income ~ workclass + education + marital_status + occupation + relationship + race + sex + native_country",
featuresCol= "features", labelCol= "label")
vector_df = formula.fit(adults_cat).transform(adults_cat)
The resultant vector_df
looks like this. Notice the last columns features
and label
. Those are the ones needed as input to the variable selector.

Then we create an instance of the selector, inputting the features
and the label
, as mentioned previously. We then set the feature and label types and define how many "best" variables to fetch and fit it to the vectorized version of the data.
# instance the selector
selector = UnivariateFeatureSelector(featuresCol='features',
outputCol="selectedFeatures",
labelCol= 'label')
# Set feature and label types and define how many K variables to fetch
selector.setFeatureType("categorical").setLabelType("categorical").setSelectionThreshold(4)
model = selector.fit(vector_df)
# selectedFeatures
print('Selected Features - Categorical')
print([name for i,name in enumerate(vector_df.columns) if i in model.selectedFeatures])
-----
[OUT]
Selected Features - Categorical
['workclass', 'relationship', 'race', 'sex']
The same procedure can be done for the numerical features.
# Adults numerical only
adults_num = adults.select('age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week', 'income')
# Creating a vector out of our dataset to be able to use Univariate Feature Selector
formula=RFormula(formula= "income ~ age + fnlwgt + education_num + capital_gain + capital_loss + hours_per_week", featuresCol= "features", labelCol= "label")
vector_df = formula.fit(adults_num).transform(adults_num)
# # Using Variable selector for Feature Engineering (num)
selector = UnivariateFeatureSelector(featuresCol='features', outputCol="selectedFeatures", labelCol= 'label')
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(4)
model = selector.fit(vector_df)
#model.selectedFeatures
print('Selected Features - Numerical')
print([name for i,name in enumerate(vector_df.columns) if i in model.selectedFeatures])
-----
[OUT]
Selected Features - Numerical
['age', 'education_num', 'capital_gain', 'capital_loss']
Ok. This is our final dataset.
df_sel = adults.select('workclass', 'occupation', 'race', 'sex', 'age', 'education_num', 'capital_gain', 'capital_loss', 'income')

As we have categorical variables, we will have to create dummy variables, since the Logistic Regression model in Spark MLlib requires only numbers as input. Let's see how to do that next.
Transforming the Data
Dummy variables are a way to transform categories in numbers for ML input. So, there are different ways to do that. The one applied here is One Hot Encoding for multiclass variables.
This process will be applied only for categories, naturally, since the numbers are already…well…numbers.
To make it easier to work with the transformation, we will take advantage of the Pipeline
method, which takes a list of steps and perform them all at once.
So we start by listing our selected categorical variables, then we create an empty list to store the stages to be performed by the Pipeline. Next comes a loop that takes each of the variables in the cat_cols
list and pass it through the StringIndexer
function to index the categories. The OneHotEncoder
makes the final transformation.
This is what the StringIndexer
does:

And here is the code to create the loop.
# Columns to transform
cat_cols = ['workclass', 'occupation', 'race', 'sex']
# List of stages for Pipeline
stages = []
for column in cat_cols:
# Instance encoding with StringIndexer
stringIndexer = StringIndexer(inputCol=column, outputCol=column + "Index")
# Use OneHotEncoder to convert categorical variables into binary SparseVectors
encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[column + "classVec"])
# Add stages. These are not run here, but will run all at once later on.
stages += [stringIndexer, encoder]
Next step is to encode the label. A simple string indexer in this case will do just fine, since it is a binary label. We can use StringIndexer
and add the instance to the Pipeline stages.
# Convert label income into label indices using the StringIndexer
label_encode = StringIndexer(inputCol="income", outputCol="label")
# Add to the Pipeline stages
stages += [label_encode]
Lastly, we will add both numerical and categorical variables together. We can just add the string ‘classVec' to each one hot encoded variable and put all the columns names as input for the VectorAssembler
, and add that to the Pipeline stages
list.
# Transform all features into a vector using VectorAssembler
num_cols = ['age', 'education_num', 'capital_gain', 'capital_loss']
assembler_cols = [c + "classVec" for c in cat_cols] + num_cols
assembler = VectorAssembler(inputCols=assembler_cols, outputCol="features")
stages += [assembler]
Finally, we run the Pipeline transformations all at once.
pipe = Pipeline().setStages(stages)
pipe_model = pipe.fit(df_sel)
prepared_df = pipe_model.transform(df_sel)
The result, if you'd like to see, looks like this.

This data is now ready for input to the Logistic Regression Model.
Model Training
Before modeling, let's split the data in train and test sets.
# Split data into training and test sets
(train, test) = prepared_df.randomSplit([0.7, 0.3], seed=42)
print(train.count())
print(test.count())
Now we can fit the model to the training set.
# Fit model to prepped data
lrModel = LogisticRegression(labelCol= 'label',
featuresCol='features',
maxIter= 10).fit(train)
And creating some predictions.
# predictions
predictions = lrModel.transform(test)
preds = predictions.select("label", "prediction", "probability")
The output looks like this.

Evaluating
To evaluate our model, we use the BinaryClassificationEvaluator
function.
Our model's accuracy is 85%, which is apparently fair.
# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)
Out[189]: 0.8526180736758168
This is the Confusion Matrix.
# Confusion Matrix
display(
preds.crosstab('label', 'prediction')
)

Here we already see that the recall is low. Or, out of the actual people making >50k, how much I am getting right. It's almost a guess, with a poor 48% of correct predictions. The rate of true negatives is high, though, with 93%. However, that's not all merit of our model, since the classes are unbalanced, with 75% of the observations labeled as class 0 (<50k).
print(f'Precision: {1075/(1075+438)}')
print(f'Recall: {1075/(1075+1151)}')
print(f'Specificity: {6324/(6324+438)}')
Precision: 0.7105089226701917
Recall: 0.4829290206648697
Specificity: 0.935226264418811
Let's try to improve this model.
One way to do that is to use the Cross Validation method. We first create a parameters grid, using the code below.
from Pyspark.ml.tuning import ParamGridBuilder, CrossValidator
logit = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
.addGrid(logit.regParam, [0.01, 0.5, 2.0])
.addGrid(logit.threshold, [0.35, 0.38])
.build())
Then we can train the models and print the best parameters.
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=logit, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations
cv_model = cv.fit(train)
print('cutoff:', cv_model.bestModel.getThreshold())
print('regParam:', cv_model.bestModel.getRegParam())
[OUT]
cutoff: 0.38
regParam: 0.01
We can use the best model to predict or create a new one, if you want.
logit = LogisticRegression(labelCol="label", featuresCol="features",
threshold=0.38, maxIter=100)
log_fit = logit.fit(train)
# Make predictions on test data using the transform() method.
preds2 = log_fit.transform(test)
# Confusion Matrix
display(
preds2.crosstab('label', 'prediction')
)

We were able to improve the model a little. The hyperparameter that brings the most changes is the threshold
, which moves the cutoff away from the 50% probability cut for classification. Once we looked at the first model, clearly it was not giving us good True Positive rates. So when we move the classification cutoff to a number smaller than 50%, it means that we are making it "easier" to classify observations as positive. On the other hand, this change comes with the cost of increasing the False Positives, but we are ok with that for now. Here are the new metrics.
- Precision: 0.63
- Recall: 0.62
- Specificity: 0.88
We can also look at the "ROC" metric.
display(log_fit, test, 'ROC')

With that, we end this tutorial.
Before You Go
In this tutorial, we went over how to create a Logistic Regression model using MLlib from Spark. That tool allows one to take advantage of cluster computing power and dealing with Big Data.
It is important to know how to make a couple of transformations, like transforming the dataset to a vector to input to the algorithm.
Here is the full code of this exercise:
Studying/PySpark/101_MyLearning ML on DataBricks.ipynb at master · gurezende/Studying
We can use Scikit Learn in Databricks as well, but knowing MLlib, which is a Spark module seems more appropriate to deal with Big Data.
If you liked this content, don't forget to follow me here and on LinkedIn.
Reference
Extracting, transforming and selecting features
Use Apache Spark MLlib on Databricks