Introduction to Logistic Regression in PySpark

Author:Murphy  |  View: 21080  |  Time: 2025-03-23 12:12:52
Photo by Ibrahim Rifath on Unsplash

Introduction

Big Data. Large datasets. Cloud…

Those words are everywhere, following us around and in the thoughts of clients, interviewers, managers and directors. As data gets more and more abundant, datasets only increase in size in a manner that, sometimes, it is not possible to run a machine learning model in a local environment – in a single machine, in other words.

This matter requires us to adapt and find other solutions, such as modeling with Spark, which is one of the most used technologies for Big Data. Spark accepts languages such as SQL, Python, Scala, R and it has its own methods and attributes, including its own Machine Learning library [MLlib]. When you work with Python in Spark, it is called PySpark, for example.

Furthermore, there's a platform called Databricks that wraps Spark in a very well created layer that enables data scientists to work on it just like Anaconda.

When we're creating a ML model in Databricks, it also accepts Scikit Learn models, but since we're more interested in Big Data, this tutorial is all created using Spark's MLlib, which is more suited for large datasets and also this way we add a new tool to our skill set.

Let's go.

Dataset

The dataset for this exercise is already inside Databricks. It's one of the UCI datasets, Adults, that is an extract from a Census and labeled with individuals that earn less or more than $50k per year. The data is publicly available under Creative Commons License in this address: https://archive.ics.uci.edu/dataset/2/adult

Our tutorial is to build a binary classifier that tells whether a person makes less or more than $50k of income in a year.

Extract of the dataset. Image by the author.

Coding

In this section, let's go over each step of our model.

Here are the modules we need to import.

from pyspark.sql.functions import col
from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.feature import RFormula
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

And loading the dataset.

# Pulling a dataset
from pyspark.sql.types import DoubleType, StringType, StructField, StructType

schema = StructType([
  StructField("age", DoubleType(), False),
  StructField("workclass", StringType(), False),
  StructField("fnlwgt", DoubleType(), False),
  StructField("education", StringType(), False),
  StructField("education_num", DoubleType(), False),
  StructField("marital_status", StringType(), False),
  StructField("occupation", StringType(), False),
  StructField("relationship", StringType(), False),
  StructField("race", StringType(), False),
  StructField("sex", StringType(), False),
  StructField("capital_gain", DoubleType(), False),
  StructField("capital_loss", DoubleType(), False),
  StructField("hours_per_week", DoubleType(), False),
  StructField("native_country", StringType(), False),
  StructField("income", StringType(), False)
])

adults = spark.read.format("csv").schema(schema).load("/databricks-datasets/adult/adult.data")

Preparing the Data

First, we need to prepare the data. Although this is not the core subject of this tutorial, we still need to get rid of null values, for example, so that's what we will do.

It is known from the dataset documentation that the NA values are actually marked as " ?". We will just go ahead and drop them because there aren't too many null values, so it won't affect our model's performance. Therefore, here's how to get rid of them. Any of these columns where we find a "?", we drop it.

adults = (
  adults
  .filter( (col('workclass') != ' ?') &
          (col('education') != ' ?') &
          (col('marital_status') != ' ?') &
          (col('occupation') != ' ?') &
          (col('relationship') != ' ?') &
          (col('race') != ' ?') &
          (col('sex') != ' ?') &
          (col('native_country') != ' ?')
          )
)

Nice. Now let's choose the best variables to use in our model.

Feature Engineering

Since we have both categorical and numerical variables, the first step is to split them in separate datasets to be able to use the UnivariateFeatureSelector from Spark. This method uses Chi² test to select the best categorical variables with categorical labels and ANOVA to select numerical variables with categorical label, which are both of our cases.

Let's first select the best categorical vars. We start by isolating the categorical variables. Then we use RFormula function to create a vectorized version of our dataset

# Adults cat only
adults_cat = adults.select('workclass', 'education', 'marital_status', 'occupation', 'relationship','race', 'sex','native_country', 'income')

# Creating a vector out of our dataset to be able to use Univariate Feature Selector
formula=RFormula(formula= "income ~ workclass + education + marital_status + occupation + relationship + race + sex + native_country", 
                 featuresCol= "features", labelCol= "label")
vector_df = formula.fit(adults_cat).transform(adults_cat)

The resultant vector_df looks like this. Notice the last columns features and label. Those are the ones needed as input to the variable selector.

Vectorized version of the dataset. Image by the author.

Then we create an instance of the selector, inputting the features and the label, as mentioned previously. We then set the feature and label types and define how many "best" variables to fetch and fit it to the vectorized version of the data.

# instance the selector
selector = UnivariateFeatureSelector(featuresCol='features', 
                                     outputCol="selectedFeatures", 
                                     labelCol= 'label')
# Set feature and label types and define how many K variables to fetch
selector.setFeatureType("categorical").setLabelType("categorical").setSelectionThreshold(4)
model = selector.fit(vector_df)

# selectedFeatures
print('Selected Features - Categorical')
print([name for i,name in enumerate(vector_df.columns) if i in model.selectedFeatures])

-----
[OUT]
Selected Features - Categorical
['workclass', 'relationship', 'race', 'sex']

The same procedure can be done for the numerical features.

# Adults numerical only
adults_num = adults.select('age', 'fnlwgt', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week', 'income')

# Creating a vector out of our dataset to be able to use Univariate Feature Selector
formula=RFormula(formula= "income ~ age + fnlwgt + education_num + capital_gain + capital_loss + hours_per_week", featuresCol= "features", labelCol= "label")
vector_df = formula.fit(adults_num).transform(adults_num)

# # Using Variable selector for Feature Engineering (num)
selector = UnivariateFeatureSelector(featuresCol='features', outputCol="selectedFeatures", labelCol= 'label')
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(4)
model = selector.fit(vector_df)
#model.selectedFeatures
print('Selected Features - Numerical')
print([name for i,name in enumerate(vector_df.columns) if i in model.selectedFeatures])

-----
[OUT]
Selected Features - Numerical
['age', 'education_num', 'capital_gain', 'capital_loss']

Ok. This is our final dataset.

df_sel = adults.select('workclass', 'occupation', 'race', 'sex', 'age', 'education_num', 'capital_gain', 'capital_loss', 'income')
Data with only the selected variables. Image by the author.

As we have categorical variables, we will have to create dummy variables, since the Logistic Regression model in Spark MLlib requires only numbers as input. Let's see how to do that next.

Transforming the Data

Dummy variables are a way to transform categories in numbers for ML input. So, there are different ways to do that. The one applied here is One Hot Encoding for multiclass variables.

This process will be applied only for categories, naturally, since the numbers are already…well…numbers.

To make it easier to work with the transformation, we will take advantage of the Pipeline method, which takes a list of steps and perform them all at once.

So we start by listing our selected categorical variables, then we create an empty list to store the stages to be performed by the Pipeline. Next comes a loop that takes each of the variables in the cat_cols list and pass it through the StringIndexer function to index the categories. The OneHotEncoder makes the final transformation.

This is what the StringIndexer does:

String Indexer applied to a variable. Image by the author.

And here is the code to create the loop.

# Columns to transform
cat_cols = ['workclass', 'occupation', 'race', 'sex']

# List of stages for Pipeline
stages = []

for column in cat_cols:
    # Instance encoding with StringIndexer
    stringIndexer = StringIndexer(inputCol=column, outputCol=column + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[column + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

Next step is to encode the label. A simple string indexer in this case will do just fine, since it is a binary label. We can use StringIndexer and add the instance to the Pipeline stages.

# Convert label income into label indices using the StringIndexer
label_encode = StringIndexer(inputCol="income", outputCol="label")

# Add to the Pipeline stages
stages += [label_encode]

Lastly, we will add both numerical and categorical variables together. We can just add the string ‘classVec' to each one hot encoded variable and put all the columns names as input for the VectorAssembler , and add that to the Pipeline stages list.

# Transform all features into a vector using VectorAssembler
num_cols = ['age', 'education_num', 'capital_gain', 'capital_loss']
assembler_cols = [c + "classVec" for c in cat_cols] + num_cols
assembler = VectorAssembler(inputCols=assembler_cols, outputCol="features")
stages += [assembler]

Finally, we run the Pipeline transformations all at once.

pipe = Pipeline().setStages(stages)
pipe_model = pipe.fit(df_sel)
prepared_df = pipe_model.transform(df_sel)

The result, if you'd like to see, looks like this.

Transformed data. Image by the author.

This data is now ready for input to the Logistic Regression Model.

Model Training

Before modeling, let's split the data in train and test sets.

# Split data into training and test sets
(train, test) = prepared_df.randomSplit([0.7, 0.3], seed=42)
print(train.count())
print(test.count())

Now we can fit the model to the training set.

# Fit model to prepped data
lrModel = LogisticRegression(labelCol= 'label', 
                             featuresCol='features',
                             maxIter= 10).fit(train)

And creating some predictions.

# predictions
predictions = lrModel.transform(test)
preds = predictions.select("label", "prediction", "probability")

The output looks like this.

Predictions from the LR model from MLlib. Image by the author.

Evaluating

To evaluate our model, we use the BinaryClassificationEvaluator function.

Our model's accuracy is 85%, which is apparently fair.

# Evaluate model
evaluator = BinaryClassificationEvaluator()
evaluator.evaluate(predictions)

Out[189]: 0.8526180736758168

This is the Confusion Matrix.

# Confusion Matrix
display(
  preds.crosstab('label', 'prediction')
)
Confusion Matrix of the model. Image by the author.

Here we already see that the recall is low. Or, out of the actual people making >50k, how much I am getting right. It's almost a guess, with a poor 48% of correct predictions. The rate of true negatives is high, though, with 93%. However, that's not all merit of our model, since the classes are unbalanced, with 75% of the observations labeled as class 0 (<50k).

print(f'Precision: {1075/(1075+438)}')
print(f'Recall: {1075/(1075+1151)}')
print(f'Specificity: {6324/(6324+438)}')

Precision: 0.7105089226701917
Recall: 0.4829290206648697
Specificity: 0.935226264418811

Let's try to improve this model.

One way to do that is to use the Cross Validation method. We first create a parameters grid, using the code below.

from Pyspark.ml.tuning import ParamGridBuilder, CrossValidator

logit = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(logit.regParam, [0.01, 0.5, 2.0])
             .addGrid(logit.threshold, [0.35, 0.38])
             .build())

Then we can train the models and print the best parameters.

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=logit, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cv_model = cv.fit(train)

print('cutoff:', cv_model.bestModel.getThreshold())
print('regParam:', cv_model.bestModel.getRegParam())

[OUT]
cutoff: 0.38
regParam: 0.01

We can use the best model to predict or create a new one, if you want.

logit = LogisticRegression(labelCol="label", featuresCol="features", 
                           threshold=0.38, maxIter=100)
log_fit = logit.fit(train)

# Make predictions on test data using the transform() method.
preds2 = log_fit.transform(test)

# Confusion Matrix
display(
  preds2.crosstab('label', 'prediction')
)
Tuned model. Image by the author.

We were able to improve the model a little. The hyperparameter that brings the most changes is the threshold, which moves the cutoff away from the 50% probability cut for classification. Once we looked at the first model, clearly it was not giving us good True Positive rates. So when we move the classification cutoff to a number smaller than 50%, it means that we are making it "easier" to classify observations as positive. On the other hand, this change comes with the cost of increasing the False Positives, but we are ok with that for now. Here are the new metrics.

  • Precision: 0.63
  • Recall: 0.62
  • Specificity: 0.88

We can also look at the "ROC" metric.

display(log_fit, test, 'ROC')
ROC Curve for the model. Image by the author.

With that, we end this tutorial.

Before You Go

In this tutorial, we went over how to create a Logistic Regression model using MLlib from Spark. That tool allows one to take advantage of cluster computing power and dealing with Big Data.

It is important to know how to make a couple of transformations, like transforming the dataset to a vector to input to the algorithm.

Here is the full code of this exercise:

Studying/PySpark/101_MyLearning ML on DataBricks.ipynb at master · gurezende/Studying

We can use Scikit Learn in Databricks as well, but knowing MLlib, which is a Spark module seems more appropriate to deal with Big Data.

If you liked this content, don't forget to follow me here and on LinkedIn.

Gustavo Santos – Medium

Reference

Extracting, transforming and selecting features

One-hot – Wikipedia

Use Apache Spark MLlib on Databricks

Use Apache Spark MLlib on Databricks

LogisticRegression – PySpark 3.5.0 documentation

Tags: Data Science Logistic Regression Machine Learning Pyspark Spark Mllib

Comment