# Introduction - Customer Churn Prediction notebook
In this notebook, we illustrate how you can train a model for Churn Prediction using PySpark. After training the model, you step through the instructions to deploy the model using Watson Machine Learning.

This notebook is a variation of the original notebook reference in this github repo: https://github.com/elenalowery/cpd4_demo/blob/master/assets/jupyterlab/Predict_Customer_Churn_CPD4.ipynb

## Package installation

In [1]:
import warnings
warnings.filterwarnings('ignore')
import time

In [2]:
# install required Python modules
!pip install --upgrade pyspark==3.0.3 --no-cache | tail -n 1
!pip install lime --no-cache | tail -n 1
!pip install SciPy --no-cache | tail -n 1

[31mERROR: Could not install packages due to an OSError: [Errno 13] Permission denied: 'INSTALLER'
Consider using the `--user` option or check the permissions.
[0m[31m
[0m    Uninstalling py4j-0.10.9.5:
[0mSuccessfully installed lime-0.2.0.1


# Model building and deployment <a name="model"></a>

In this section you will learn how to train Spark MLLib model and next deploy it as web-service using Watson Machine Learning service.

## Load the training data 

- Click in the next cell to insert the code to import the training dataset.
- Click the **Find and add data** icon in the top right, find the data set you'd like to import (for example, CUSTOMER_DATA_ready) into this notebook and click **Insert to code** drop down and select **pandas DataFrame (depr...)**

<font color='red'>DO NOTE select the **pandas DataFrame** option but rather the **pandas DataFrame (depr...)** option</font>

In [3]:
# Click here to insert code to import training datasetfrom ibm_watson_studio_lib import access_project_or_space
# Import dataset into a pandas DataFrame
## Sample inserted code (Note that the name of your dataframe may be different)
##from ibm_watson_studio_lib import access_project_or_space
##wslib = access_project_or_space()

##import pandas as pd

##df_data_1 = pd.read_csv(wslib.mount.get_data_path('CUSTOMER_DATA_ready'))
##df_data_1.head()from ibm_watson_studio_lib import access_project_or_space
from ibm_watson_studio_lib import access_project_or_space
wslib = access_project_or_space()

import pandas as pd

df_data_1 = pd.read_csv(wslib.mount.get_data_path('CUSTOMER_DATA_ready'))
df_data_1.head()


Unnamed: 0,ID,LONGDISTANCE,INTERNATIONAL,LOCAL,DROPPED,PAYMETHOD,LOCALBILLTYPE,LONGDISTANCEBILLTYPE,USAGE,RATEPLAN,GENDER,STATUS,CHILDREN,ESTINCOME,CAROWNER,AGE,CHURN
0,1018,21,0,87,0,CC,Budget,Standard,108,1,F,S,0,95786.8,Y,52.646667,F
1,1020,0,0,11,0,CC,FreeLocal,Intnl_discount,12,3,M,S,0,90321.6,N,55.113333,T
2,1030,29,0,45,0,CH,FreeLocal,Standard,75,4,M,M,2,29616.0,N,49.426667,F
3,1085,21,0,293,0,CH,Budget,Standard,314,2,M,S,2,120000.0,N,42.0,T
4,110,1,0,11,0,CC,Budget,Standard,14,3,F,S,2,21021.6,Y,62.753333,T


In [4]:
# Create a PySpark DataFrame from the pandas DataFrame
from pyspark.sql import SparkSession
import pandas as pd

import json
# Provide the name of the pandas DataFrame from the previous cell (should be of the format df_data_<some_number>)
pandasDFname=df_data_1
spark = SparkSession.builder.getOrCreate()
sparkDF=spark.createDataFrame(pandasDFname)
sparkDF.head()

Row(ID=1018, LONGDISTANCE=21, INTERNATIONAL=0, LOCAL=87, DROPPED=0, PAYMETHOD='CC', LOCALBILLTYPE='Budget', LONGDISTANCEBILLTYPE='Standard', USAGE=108, RATEPLAN=1, GENDER='F', STATUS='S', CHILDREN=0, ESTINCOME=95786.8, CAROWNER='Y', AGE=52.646667, CHURN='F')

## Explore data

In [5]:
sparkDF.printSchema()

root
 |-- ID: long (nullable = true)
 |-- LONGDISTANCE: long (nullable = true)
 |-- INTERNATIONAL: long (nullable = true)
 |-- LOCAL: long (nullable = true)
 |-- DROPPED: long (nullable = true)
 |-- PAYMETHOD: string (nullable = true)
 |-- LOCALBILLTYPE: string (nullable = true)
 |-- LONGDISTANCEBILLTYPE: string (nullable = true)
 |-- USAGE: long (nullable = true)
 |-- RATEPLAN: long (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- CHILDREN: long (nullable = true)
 |-- ESTINCOME: double (nullable = true)
 |-- CAROWNER: string (nullable = true)
 |-- AGE: double (nullable = true)
 |-- CHURN: string (nullable = true)



In [6]:
print("Number of records: " + str(sparkDF.count()))

Number of records: 1415


## Create a model

In [7]:
spark_df = sparkDF
# Split the labeled data into a training set and a test set
(train_data, test_data) = spark_df.randomSplit([0.8, 0.2], 24)

# Provide a target name for your churn model
MODEL_NAME = "Churn Model"
# Provide a target name for your churn model deployment
DEPLOYMENT_NAME = "Churn Deployment"

print("Number of records for training: " + str(train_data.count()))
print("Number of records for evaluation: " + str(test_data.count()))

spark_df.printSchema()

Number of records for training: 1158
Number of records for evaluation: 257
root
 |-- ID: long (nullable = true)
 |-- LONGDISTANCE: long (nullable = true)
 |-- INTERNATIONAL: long (nullable = true)
 |-- LOCAL: long (nullable = true)
 |-- DROPPED: long (nullable = true)
 |-- PAYMETHOD: string (nullable = true)
 |-- LOCALBILLTYPE: string (nullable = true)
 |-- LONGDISTANCEBILLTYPE: string (nullable = true)
 |-- USAGE: long (nullable = true)
 |-- RATEPLAN: long (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- STATUS: string (nullable = true)
 |-- CHILDREN: long (nullable = true)
 |-- ESTINCOME: double (nullable = true)
 |-- CAROWNER: string (nullable = true)
 |-- AGE: double (nullable = true)
 |-- CHURN: string (nullable = true)



The code below creates a Random Forest Classifier with Spark, setting up string indexers for the categorical features and the label column. Finally, this notebook creates a pipeline including the indexers and the model, and does an initial Area Under ROC evaluation of the model.

In [8]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer, IndexToString, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline, Model
from pyspark.ml.feature import SQLTransformer

features = [x for x in spark_df.columns if x != 'CHURN']
# Specify the categorical features
categorical_features = ['PAYMETHOD', 'LOCALBILLTYPE', 'LONGDISTANCEBILLTYPE', 'GENDER', 'STATUS', 'CAROWNER']
# Index the categorical feature so each string value is replaced with an integer
categorical_num_features = [x + '_IX' for x in categorical_features]
si_list = [StringIndexer(inputCol=x, outputCol=y) for x, y in zip(categorical_features, categorical_num_features)]
va_features = VectorAssembler(inputCols=categorical_num_features + [x for x in features if x not in categorical_features], outputCol="features")

In [9]:
# Index the label column
si_label = StringIndexer(inputCol="CHURN", outputCol="label").fit(spark_df)
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=si_label.labels)

In [10]:
from pyspark.ml.classification import RandomForestClassifier
# train a Random Forect Classifier
classifier = RandomForestClassifier(featuresCol="features")
pipeline = Pipeline(stages= si_list + [si_label, va_features, classifier, label_converter])

model = pipeline.fit(train_data)

In [11]:
predictions = model.transform(test_data)
evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol="prediction",  metricName='areaUnderROC')
area_under_curve = evaluatorDT.evaluate(predictions)

evaluatorDT = BinaryClassificationEvaluator(rawPredictionCol="prediction",  metricName='areaUnderPR')
area_under_PR = evaluatorDT.evaluate(predictions)
#default evaluation is areaUnderROC
print("areaUnderROC = %g" % area_under_curve, "areaUnderPR = %g" % area_under_PR)

areaUnderROC = 0.884137 areaUnderPR = 0.888272


In [12]:
# extra code: evaluate more metrics by exporting them into pandas and numpy
from sklearn.metrics import classification_report
y_pred = predictions.toPandas()['prediction']
y_pred = ['T' if pred == 1.0 else 'F' for pred in y_pred]
y_test = test_data.toPandas()['CHURN']
print(classification_report(y_test, y_pred, target_names=['T', 'F']))

              precision    recall  f1-score   support

           T       0.87      0.96      0.91       147
           F       0.94      0.81      0.87       110

    accuracy                           0.89       257
   macro avg       0.90      0.88      0.89       257
weighted avg       0.90      0.89      0.89       257



## Publish the model

In this section, the notebook uses Watson Machine Learning to save the model (including the pipeline) to the WML instance. Previous versions of the model are removed so that the notebook can be run again, resetting all data for another demo.

In [13]:
import os
cpdtoken=os.environ['USER_ACCESS_TOKEN']
wml_credentials = {
"token": cpdtoken,
"instance_id" : "openshift",
"url": os.environ['RUNTIME_ENV_APSX_URL'],
"version": "4.0"
}

from ibm_watson_machine_learning import APIClient
wml_client = APIClient(wml_credentials)

In [14]:
def getSpaceIDwml(wml_client,space_name):
    spaces = wml_client.spaces.get_details()['resources'];
    try:
        spaceList = next(item for item in spaces if item['entity']['name']==space_name)
        spaceID = spaceList['metadata']['id']
    except:
        spaceID = -1
    return spaceID

In [15]:
def createSpacewml(wml_client,space_name):
    spaces = wml_client.spaces.get_details()['resources'];
    for space in spaces:
        if space['entity']['name'] ==space_name:
            print("Deployment space with name",space_name,"already exists . .")
            return space['metadata']['id']
    print("\nCreating a new deployment space -",space_name)
    # create the space
    space_meta_data = {
        wml_client.spaces.ConfigurationMetaNames.NAME : space_name
    }

    stored_space_details = wml_client.spaces.store(space_meta_data)
    space_id = stored_space_details['metadata']['id']
    i=0
    while(True):
        stored_space_details=wml_client.spaces.get_details(space_id)
        status=stored_space_details['entity']['status']['state']
        print("i: ", i, " status: ", status)
        if status == 'active':
            break
        time.sleep(1)
        i = i+1
    return space_id

In [16]:
# Associate Watson Machine Learning with a specific space
space_name='churnUATspace'
space_id=getSpaceIDwml(wml_client,space_name)
if space_id == -1:
    space_id = createSpacewml(wml_client,space_name)
print('space id: ', space_id)
wml_client.set.default_space(space_id)

space id:  881a003a-9572-4df0-8718-8a5d368ad3c2


'SUCCESS'

In [29]:
software_spec_uid = wml_client.software_specifications.get_id_by_name("spark-mllib_3.3")
print("Software Specification ID: {}".format(software_spec_uid))
model_props = {
        wml_client._models.ConfigurationMetaNames.NAME:"{}".format(MODEL_NAME),
        wml_client._models.ConfigurationMetaNames.TYPE: "mllib_3.3",
        wml_client._models.ConfigurationMetaNames.SOFTWARE_SPEC_UID: software_spec_uid,
        #wml_client._models.ConfigurationMetaNames.TRAINING_DATA_REFERENCES: training_data_references,
        wml_client._models.ConfigurationMetaNames.LABEL_FIELD: "CHURN",
    }

Software Specification ID: d11f2434-4fc7-58b7-8a62-755da64fdaf8


In [30]:
def deleteExistingModelsSameName(wml_client,model_name):
    stored_models=wml_client.repository.get_model_details()
    stored_models_details = stored_models['resources']
    for m in stored_models_details:
        m_name = m['metadata']['name']
        if m_name == model_name:
            model_id = m['metadata']['id']
            print("Deleteing model with id: ", model_id, " and name: ", m_name)
            wml_client.repository.delete(model_id)
    return 'Success'

In [31]:
def deleteExistingDeploymentsSameName(wml_client,deployment_name):
    stored_deployments=wml_client.deployments.get_details()
    stored_deployment_details = stored_deployments['resources']
    for d in stored_deployment_details:
        d_name = d['metadata']['name']
        if d_name == deployment_name:
            deployment_id = d['metadata']['id']
            print("Deleteing deployment with id: ", deployment_id, " and name: ", d_name)
            wml_client.deployments.delete(deployment_id)
    return 'Success'

In [32]:
# Delete Existing Deployments with same name
deleteExistingDeploymentsSameName(wml_client,DEPLOYMENT_NAME)

'Success'

In [33]:
# Delete existing models with same name
deleteExistingModelsSameName(wml_client,MODEL_NAME)

'Success'

In [34]:
print("Storing model ...")
published_model_details = wml_client.repository.store_model(
    model=model, 
    meta_props=model_props, 
    training_data=train_data, 
    pipeline=pipeline)

model_uid = wml_client.repository.get_model_id(published_model_details)
print("Done")
print("Model ID: {}".format(model_uid))

Storing model ...
Done
Model ID: 122f94c1-5cb5-44a2-ba91-3cdba1dd1de8


In [35]:
wml_client.repository.list_models()

------------------------------------  -----------  ------------------------  ---------  ----------  ----------------
ID                                    NAME         CREATED                   TYPE       SPEC_STATE  SPEC_REPLACEMENT
122f94c1-5cb5-44a2-ba91-3cdba1dd1de8  Churn Model  2024-03-28T19:01:59.002Z  mllib_3.3  supported
------------------------------------  -----------  ------------------------  ---------  ----------  ----------------


Unnamed: 0,ID,NAME,CREATED,TYPE,SPEC_STATE,SPEC_REPLACEMENT
0,122f94c1-5cb5-44a2-ba91-3cdba1dd1de8,Churn Model,2024-03-28T19:01:59.002Z,mllib_3.3,supported,


In [36]:
wml_client.deployments.list()

----  ----  -----  -------  -------------  ----------  ----------------
GUID  NAME  STATE  CREATED  ARTIFACT_TYPE  SPEC_STATE  SPEC_REPLACEMENT
----  ----  -----  -------  -------------  ----------  ----------------


Unnamed: 0,GUID,NAME,STATE,CREATED,ARTIFACT_TYPE,SPEC_STATE,SPEC_REPLACEMENT


## Deploy the model

The next section of the notebook deploys the model as a RESTful web service in Watson Machine Learning. The deployed model will have a scoring URL you can use to send data to the model for predictions.

In [37]:
deployment_details = wml_client.deployments.create(
    model_uid, 
    meta_props={
        wml_client.deployments.ConfigurationMetaNames.NAME: "{}".format(DEPLOYMENT_NAME),
        wml_client.deployments.ConfigurationMetaNames.ONLINE: {}
    }
)
scoring_url = wml_client.deployments.get_scoring_href(deployment_details)
deployment_uid=wml_client.deployments.get_uid(deployment_details)

print("Scoring URL:" + scoring_url)
print("Model id: {}".format(model_uid))
print("Deployment id: {}".format(deployment_uid))



#######################################################################################

Synchronous deployment creation for uid: '122f94c1-5cb5-44a2-ba91-3cdba1dd1de8' started

#######################################################################################


initializing
Note: online_url is deprecated and will be removed in a future release. Use serving_urls instead.
...................
ready


------------------------------------------------------------------------------------------------
Successfully finished deployment creation, deployment_uid='65068aba-3772-410c-a98b-8880ed9e69ee'
------------------------------------------------------------------------------------------------


Scoring URL:https://internal-nginx-svc.cpd.svc.cluster.local:12443/ml/v4/deployments/65068aba-3772-410c-a98b-8880ed9e69ee/predictions
Model id: 122f94c1-5cb5-44a2-ba91-3cdba1dd1de8
Deployment id: 65068aba-3772-410c-a98b-8880ed9e69ee


## Sample scoring

In [38]:
fields = ["ID","LONGDISTANCE","INTERNATIONAL","LOCAL","DROPPED","PAYMETHOD","LOCALBILLTYPE","LONGDISTANCEBILLTYPE","USAGE",\
            "RATEPLAN","GENDER","STATUS","CHILDREN","ESTINCOME","CAROWNER","AGE"]
values = [[1,28,0,60,0,"Auto","FreeLocal","Standard",89,4,"F","M",1,23000,"N",45]]
scoring_payload = {"input_data": [{"fields": fields, "values": values}]}

In [39]:
scoring_response = wml_client.deployments.score(deployment_uid, scoring_payload)
scoring_response

{'predictions': [{'fields': ['ID',
    'LONGDISTANCE',
    'INTERNATIONAL',
    'LOCAL',
    'DROPPED',
    'PAYMETHOD',
    'LOCALBILLTYPE',
    'LONGDISTANCEBILLTYPE',
    'USAGE',
    'RATEPLAN',
    'GENDER',
    'STATUS',
    'CHILDREN',
    'ESTINCOME',
    'CAROWNER',
    'AGE',
    'PAYMETHOD_IX',
    'LOCALBILLTYPE_IX',
    'LONGDISTANCEBILLTYPE_IX',
    'GENDER_IX',
    'STATUS_IX',
    'CAROWNER_IX',
    'label',
    'features',
    'rawPrediction',
    'probability',
    'prediction',
    'predictedLabel'],
   'values': [[1,
     28,
     0,
     60,
     0,
     'Auto',
     'FreeLocal',
     'Standard',
     89,
     4,
     'F',
     'M',
     1,
     23000.0,
     'N',
     45.0,
     1.0,
     1.0,
     0.0,
     0.0,
     0.0,
     0.0,
     0.0,
     [1.0,
      1.0,
      0.0,
      0.0,
      0.0,
      0.0,
      1.0,
      28.0,
      0.0,
      60.0,
      0.0,
      89.0,
      4.0,
      1.0,
      23000.0,
      45.0],
     [13.488626193468049, 6.511373806531