import pandas as pd
from datetime import datetime
from multiprocessing.pool import Pool
from multiprocessing import cpu_count
from .utils import extract_inputs
from .poisson_regression import PoissonITS
from .prophet_model import ProphetITS
from .pre_processing import align_prophet_naming_convection
# BaseITS
[docs]class BaseITS:
"""This class assumes that the data is already pre-processed to the required format.
If not pre-processed, the user should use the functions in pre-processing & custom_transform to pre-process their data
Class supports wide-format datasets only. Refer to readme on how to structure your dataset.
Args:
outcome (list, optional): List of the outcome labels. Defaults to None.
location (list, optional): List of the location labels. Defaults to None.
interruption_date (list, optional): List of the interruption dates. Defaults to None.
model (list, optional): List of the models labels. Defaults to None.
verbose (bool, optional): Boolean variable to log outputs. Defaults to False.
Methods:
fit(df: pd.DataFrame, X: pd.Series, y: pd.Series, offset: float = None)
predict( df: pd.DataFrame, X: pd.Series, y: pd.Series, offset: float = None)
fit_predict(df: pd.DataFrame, X: pd.Series, y: pd.Series)
"""
def __init__(
self,
outcome: list = None,
location: list = None,
interruption_date: list = None,
model: list = None,
verbose: bool = False,
):
self.outcome_ = outcome
self.location_ = location
self.interruption_date_ = interruption_date
self.model_ = model
self.verbose = verbose
self.prophet_model_ = ProphetITS()
self.poisson_regression_ = PoissonITS()
self.__validate_inputs_()
# Start implementation with assumption we only accept wide datasets. User expected to convert their data to align to this.
# Write a validation to make sure inputs are as lists.
def __validate_inputs_(self):
"""Class to validate that the inputs provided are the correct ones
Returns:
ValueError: Raises ValueErrors based on the implemented checks
"""
NoneType = type(None)
# print(type(self.location_), isinstance(self.location_, NoneType))
if isinstance(self.location_, NoneType):
raise ValueError(
'Location list not provided. Supply the "location" to the class declaration'
)
if isinstance(self.outcome_, NoneType):
raise ValueError(
'Outcome list not provided. Supply the "location" to the class declaration'
)
if isinstance(self.interruption_date_, NoneType):
raise ValueError(
'Interruption dates list not provided. Supply the "location" to the class declaration'
)
if not isinstance(self.location_, list):
return TypeError('Parameter "location" should of type "list".')
if not isinstance(self.outcome_, list):
return TypeError('Parameter "outcome" should of type "list".')
if not isinstance(self.interruption_date_, list):
return TypeError('Parameter "interruption_date_" should of type "list".')
if not isinstance(self.model_, list):
return TypeError('Parameter "locamodeltion" should of type "list".')
if self.model_ not in [["prophet"], ["poisson"], ["poisson", "prophet"]]:
raise ValueError('Parameter "model" should be "prophet" or "poisson".')
if len(self.outcome_) is 0:
raise ValueError(
'Outcome list is empty. Supply "outcomes" to the class declaration'
)
if (len(self.location_)) is 0:
raise ValueError(
'Location list is empty. Supply the "location" to the class declaration'
)
if len(self.interruption_date_) is 0:
raise ValueError(
'Interruption dates list is empty. Supply the "Interruption dates" to the class declaration'
)
def __validate_fit_predict(self, df: pd.DataFrame):
"""Function to validate fit & prediction specific values conform to the expected values. Not done in _init_ as user is not required to pass a dataframe.
Args:
df (pd.DataFrame): DataFrame with the data
Raises:
ValueError: Raises errors based on the Raises ValueErrors based on the implemented checks
"""
# confirm that the provided list of outcomes & locations is in df
if not set(self.location_).issubset(df["location"].unique().tolist()):
raise ValueError(
'DataFrame must have the unique values(rows) provided in the "location" parameter'
)
if not set(self.outcome_) <= set(df.columns.tolist()):
raise ValueError(
'DataFrame must have the columns provided in the "outcome" parameter'
)
if "location" not in df.columns:
raise ValueError(
'Dataframe must have column "location" with the ' "location."
)
[docs] def fit(self, df: pd.DataFrame, X: pd.Series, y: pd.Series, offset: float = None):
"""Function called by user to fit their models.(prophet or poisson)
Args:
df (pd.DataFrame): DataFrame with the data
offset (float, optional): _description_. Defaults to None.
Returns:
BaseITS: Fitted object of the class.
"""
self.__validate_inputs_()
self.__validate_fit_predict(df)
len_outcomes = len(self.outcome_)
len_locations = len(self.location_)
if (len_outcomes > 1) or (len_locations > 1) or (len(self.model_) > 1):
# works for multiple locations and outcomes
# call the pool fit method for each location and outcome
fitted_model_results = self.__pool_fit(
df=df, offset=offset, model=self.model_
)
return fitted_model_results
else:
fitted_model = self.__fit_once(X=X, y=y, offset=offset, model=self.model_)
return fitted_model
def __fit_once(self, X: pd.Series, y: pd.Series, offset: float = None, model=None):
"""Private class where the fit occurs to only one occurance of either model, outcome, location, intervention_date provided.
Raises:
ValueError: Raises ValueError incase the inputs are not as expected
Returns:
model: Fitted model
"""
if model == "prophet" or ["prophet"]:
if self.verbose:
print("prophet fit once")
concat_df = pd.concat([X, y], axis=1)
concat_df = align_prophet_naming_convection(
concat_df, date_col_name=X.name, y_col_name=y.name
)
self.prophet_model_.fit(concat_df)
return self.prophet_model_
elif model == "poisson" or ["poisson"]:
if offset == None:
if self.verbose:
print("poisson fit once error")
raise ValueError("Offset needs to be defined for poisson regression")
else:
if self.verbose:
print("poisson fit once ")
self.poisson_regression_.fit(X, y, offset)
return self.poisson_regression_
else:
if self.verbose:
print("fit once error")
raise ValueError("Specify either 'prophet' or 'poisson' models")
def __pool_fit(
self,
df: pd.DataFrame,
offset: float = None,
num_threads: int = cpu_count(),
model=None,
):
"""Private function to fit more than one instance of outcomes, locations, models, intervention_dates
Args:
df (pd.DataFrame): DataFrame with the data
offset (float, optional): _description_. Defaults to None.
num_threads (int, optional): Number of cpu counts that will allow multi-threading. Defaults to cpu_count().
Returns:
model: fitted model
"""
if self.verbose:
print(num_threads, "num_threads")
if num_threads > 1:
# define thread pool
pool = Pool(processes=num_threads)
# send jobs to thread pool
results = dict()
for location in self.location_:
results[location] = dict()
# location_results = results.get(location, {})
df_copy = df[df["location"] == location].copy().reset_index(drop=True)
for outcome in self.outcome_:
results[location][outcome] = dict()
# outcome_results = location_results.get(outcome, {})
df_copy["y"] = df_copy[outcome].copy()
for intervention in self.interruption_date_:
results[location][outcome][intervention] = dict()
# intervention_results = outcome_results.get(intervention, {})
prediction_start_date = datetime.strptime(
intervention, "%Y-%m-%d"
)
modeling_df = df_copy[
df_copy["ds"] <= prediction_start_date
].copy()
for model in self.model_:
results[location][outcome][intervention][model] = dict()
# model_results = intervention_results.get(model, {})
# model_results[model] = dict()
if self.verbose:
print(
"{} - {} - {} - {}".format(
location, outcome, intervention, model
)
)
fitted_model = self.__fit_once(
X=modeling_df["ds"],
y=modeling_df["y"],
offset=offset,
model=model,
)
results[location][outcome][intervention][model][
"fitted_model"
] = fitted_model
return results
[docs] def predict(
self, df: pd.DataFrame, X: pd.Series, y: pd.Series, offset: float = None
):
"""Function used to forecast using the previously fitted models
Args:
df (pd.DataFrame): Dataset with the data to be used
offset (float, optional): _description_. Defaults to None.
"""
self.__validate_inputs_()
self.__validate_fit_predict(df)
len_outcomes = len(self.outcome_)
len_locations = len(self.location_)
if self.verbose:
print(len_outcomes, len_locations)
if (len_outcomes > 1) or (len_locations > 1) or (len(self.model_) > 1):
# call the pool fit method for each location and outcome
predictions = self.__pool_predict(df=df, offset=offset)
return predictions
else:
predictions = self.__predict_once(
X=X, y=y, offset=offset, model=self.model_
)
return predictions
def __predict_once(
self,
X: pd.Series,
y: pd.Series = None,
model: str = "prophet",
offset: float = None,
):
"""Private function to predict only one occurence of outcome, location, model, intervention
Args:
X (pd.Series): Series with the X (ds) column
y (pd.Series, optional): Series with the y column. Defaults to None.
Raises:
ValueError: Raises ValueError incase the inputs are not as expected
Returns:
pd.DataFrame: dataframe with the forecast results
"""
if model == "prophet" or ["prophet"]:
concat_df = pd.concat([X, y], axis=1)
concat_df = align_prophet_naming_convection(
concat_df, date_col_name=X.name, y_col_name=y.name
)
predictions = self.prophet_model_.predict(concat_df)
return predictions
elif model == "poisson" or ["poisson"]:
if y == None:
raise ValueError("y series needs to be defined for poisson regression")
else:
predictions = self.poisson_regression_.predict(X, y)
return predictions
else:
raise ValueError("Specify either prophet or the poisson models")
def __pool_predict(
self,
df: pd.DataFrame,
offset: float = None,
num_threads: int = cpu_count(),
):
"""Private function to fit more than one instance of outcomes, locations, models, intervention_dates
Args:
df (pd.DataFrame): Dataset with the data
offset (float, optional): _description_. Defaults to None.
num_threads (int, optional): cpu_count(). Defaults to cpu_count().
Returns:
dict: Dictionary with results of the predictions of the provided instance of outcomes, locations, models, intervention_dates
"""
if self.verbose:
print(num_threads, "num_threads")
if num_threads > 1:
# define thread pool
pool = Pool(processes=num_threads)
# send jobs to thread pool
results = dict()
for location in self.location_:
results[location] = dict()
# location_results = results.get(location, {})
df_copy = df[df["location"] == location].copy().reset_index(drop=True)
for outcome in self.outcome_:
results[location][outcome] = dict()
# outcome_results = location_results.get(outcome, {})
df_copy["y"] = df_copy[outcome].copy()
for intervention in self.interruption_date_:
results[location][outcome][intervention] = dict()
# intervention_results = outcome_results.get(intervention, {})
prediction_start_date = datetime.strptime(
intervention, "%Y-%m-%d"
)
modeling_df = df_copy[
df_copy["ds"] <= prediction_start_date
].copy()
for model in self.model_:
results[location][outcome][intervention][model] = dict()
# model_results = intervention_results.get(model, {})
# model_results[model] = dict()
if self.verbose:
print(
"{} - {} - {} - {}".format(
location, outcome, intervention, model
)
)
forecast_results = self.__predict_once(
X=modeling_df["ds"],
y=modeling_df["y"],
# TODO pass the X one only for prediction, or what the function expects. : Done
offset=offset,
model=model, # TODO need to pass this along to other functions: Done
)
results[location][outcome][intervention][model][
"forecast"
] = forecast_results
return results
[docs] def summary(self):
"""Function to generate the summary of the models
Returns:
_type_: summary returned by the model
"""
if self.model_ == "prophet":
summary = self.prophet_model_.summary()
return summary
elif self.model_ == "poisson":
summary = self.poisson_regression_.summary()
return summary
else:
return "Specify either prophet or poisson models"
[docs] def fit_predict(self, df: pd.DataFrame, X: pd.Series, y: pd.Series):
"""Function to simultaneously fit and predict a function using the prophet model.
This function does not work for poisson-regression as the data needs to be pre-processed using the custom-transform class.
TODO Implement this fuction to check if user has already preprocessed the poisson data.
Args:
df (pd.DataFrame): Dataset
Raises:
NotImplementedError: Raises this error if user tries to use the poisson regression model
ValueError: Raises an error if the user provided model not in this list[prophet, poisson]
Returns:
pd.DataFrame: DataFrame with the forecasted results.
"""
self.__validate_inputs_()
self.__validate_fit_predict(df)
if self.model_ == "poisson" or self.model_ == ["poisson"]:
raise NotImplementedError(
"This function cannot be implemented because the predictions need to be transformed mannualy by the user"
)
elif self.model_ == "prophet" or self.model_ == ["prophet"]:
len_outcomes = len(self.outcome_)
len_locations = len(self.location_)
if (len_outcomes > 1) or (len_locations > 1) or (len(self.model_) > 1):
# call fit_predict from the prophet model using the loops
return self.pool_fit_predict(df=df)
else:
return self.__fit_predict_once(X=X, y=y, model=self.model_)
else:
raise ValueError("Make sure you input the correct model name.")
def __fit_predict_once(
self,
X: pd.Series,
model: str,
y: pd.Series = None,
):
"""Private function to fit and predict only one occurence of outcome, location, model, intervention
Args:
X (pd.Series): Series with the X (ds) column
y (pd.Series, optional): Series with the y column. Defaults to None.
Raises:
ValueError: Raises ValueError incase the inputs are not as expected
Returns:
pd.DataFrame: dataframe with the forecast results
"""
if model == "prophet" or ["prophet"]:
concat_df = pd.concat([X, y], axis=1)
concat_df = align_prophet_naming_convection(
concat_df, date_col_name=X.name, y_col_name=y.name
)
predictions = self.prophet_model_.fit_predict(concat_df)
return predictions
else:
raise ValueError("Specify either the 'prophet' models")
[docs] def pool_fit_predict(
self,
df: pd.DataFrame,
num_threads: int = cpu_count(),
):
"""Private function to fit and predictmore than one instance of outcomes, locations, models, intervention_dates
Args:
num_threads (cpu_count): cpu_count
df (pd.DataFrame): Dataset
Returns:
pd.DataFrame: DataFrame with the forecast results
"""
if self.verbose:
print(num_threads, "num_threads")
if num_threads > 1:
# define thread pool
pool = Pool(processes=num_threads)
# send jobs to thread pool
results = dict()
for location in self.location_:
results[location] = dict()
# location_results = results.get(location, {})
df_copy = df[df["location"] == location].copy().reset_index(drop=True)
for outcome in self.outcome_:
results[location][outcome] = dict()
# outcome_results = location_results.get(outcome, {})
df_copy["y"] = df_copy[outcome].copy()
for intervention in self.interruption_date_:
results[location][outcome][intervention] = dict()
# intervention_results = outcome_results.get(intervention, {})
prediction_start_date = datetime.strptime(
intervention, "%Y-%m-%d"
)
modeling_df = df_copy[
df_copy["ds"] <= prediction_start_date
].copy()
for model in self.model_:
results[location][outcome][intervention][model] = dict()
# model_results = intervention_results.get(model, {})
# model_results[model] = dict()
if self.verbose:
print(
"{} - {} - {} - {}".format(
location, outcome, intervention, model
)
)
forecast_results = self.__fit_predict_once(
X=modeling_df["ds"],
y=modeling_df["y"],
model=model,
)
# model_results[model]["model"] = fitted_model
results[location][outcome][intervention][model][
"forecast"
] = forecast_results
# TODO Work on pool elif statement here & pool_fit(), pool_predict()
return results
# Set call with William / Sekou / Charles to setup the pool help call