Data Transformation for Classification with PySpark’s MLLib

Anton Haugen
3 min readFeb 21, 2021

If you’ve been following my blogposts in the past you’ll know that I’ve been pursuing a certification in Data Science with PySpark using LaylaAI’s course on Udemy, PySpark Essentials for Data Scientists (Big Data + Python). Currently, the course is covering how to create classification models using PySpark’s own Machine Learning Library, or MLLib.

As I’ve covered in the past, as difficult as adjusting to a MapReduce framework maybe, there is much that PySpark has in common with libraries like Pandas as well as the syntax for the querying language SQL. Similarly if you’re familiar with classification algorithms from, you could try these out in MLLib. However, there are a few extra steps you will need to take in order to prepare your data for this family of algorithms.

Here are some steps you will need to take in order to create classification models:

1. Formatting Classes and Features

Once your data is imported as a PySpark dataframe, you will need to make sure your classes are 0-indexed and that their column is called “label.” Fortunately, PySpark MLLib has a handy function called string indexer. Here it is performed for on a dataset of children tested for autism, in order to see if a model can predict if a child has autism. ASD Traits, in which the input is ‘Yes’/ ‘No’, would be the dependent variable. Additionally, you’ll want to make sure that all your data in your dataframe is numeric, which is covered in the second cellblock. To do this in the most Pythonic manner, LaylaAI provides the following code:

CODE:

from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import *
from pyspark.sql.functions import *
input_columns = df.columns
input_columns = input_columns[1:-1]
dependent_var = 'Class/ASD Traits'
renamed = df.withColumn('label_str', df[dependent_var].cast(StringType()))indexer = StringIndexer(inputCol='label_str', outputCol='label')indexed = indexer.fit(renamed).transform(renamed)

To vectorize our features, we can use this really great function:

numeric_inputs = []
string_inputs = []
for column in input_columns:
if str(indexed.schema[column].dataType) == 'StringType':
indexer = StringIndexer(inputCol = column, outputCol=column+'_num')
indexed = indexer.fit(indexed).transform(indexed)
new_col_name = column+'_num'
string_inputs.append(new_col_name)
else:
numeric_inputs.append(column)

In addition to the original data, your dataframe now contains additional columns in which the string data is numeric, labeled with the suffix ‘_num’

2. Treating for Skewness and Outliers

While this falls under the umbrella of general data preparation, rather than being something specific to PySpark, knowing how to do it with PySpark can be a little tricky.

To test and treat for skewness, the following function can be used:

d= {}
for col in numeric_inputs:
d[col]=indexed.approxQuantile(col,[0.01, 0.99], 0.25)
for col in numeric_inputs:
skew = indexed.agg(skewness(indexed[col])).collect()
skew=skew[0][0]

if skew > 1:
indexed = indexed.withColumn(col, \
log(when(df[col] < d[col][0], d[col][0]) \
.when(indexed[col]> d[col][1], d[col][1]) \
.otherwise(indexed[col])+1).alias(col))
print(col, " has been treated for pos skew", skew)

elif skew < -1:
indexed = indexed.withColumn(col, \
exp(when(df[col] < d[col][0], d[col][0])\
.when(indexed[col]> d[col][1], d[col][1])\
.otherwise(indexed[col])+1).alias(col))
print(col, " has been treated for neg skew", skew)

To test for outliers, the following code can be used:

minimums = df.select([min(c).alias(c) for c in df.columns if c in numeric_inputs])
min_array = minimums.select(array(numeric_inputs).alias('mins'))
df_minimum = min_array.select(array_min(min_array.mins)).collect()
df_minimum = df_minimum[0][0]
df_minimum

3. Grouping and Scaling Features

Now that we’re done treating our data, the last thing we’ll need to do is group our features together and scale them so that MLLib’s algorithms can read our dataframe correctly.

To group our features together, we’d use the following code

from pyspark.ml.feature import VectorAssemblerfeatures_list = numeric_inputs + string_inputsassembler = VectorAssembler(inputCols=features_list, outputCol=’features’)output = assembler.transform(indexed).select(‘features’,’label’)

To scale our data we would use the following code:

from pyspark.ml.feature import MinMaxScaler
scaler = MinMaxScaler(inputCol=’features’, outputCol=’scaledFeatures’, min = 0, max=1000)
scalerModel = scaler.fit(output)scaled_data= scalerModel.transform(output)output.show()

4. Finalize and Split Your Data

But before you create any models, don’t forget to split your data:

final_data=scaled_data.select(‘label’, ‘scaledFeatures’)final_data = final_data.withColumnRenamed(‘scaledFeatures’, ‘features’)train, test= final_data.randomSplit([0.7, 0.3])

LaylaAI’s course has been really helpful in learning about Spark. I’ve been really enjoying her course, so I really recommend checking out and supporting her work if you enjoyed this blog post!

--

--