Source code for sktutor.preprocessing
# -*- coding: utf-8 -*-
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import (
StandardScaler as ScikitStandardScaler,
PolynomialFeatures as SkPolynomialFeatures
)
import numpy as np
from sktutor.utils import dict_with_default, dict_default, bitwise_operator
from scipy import stats
from patsy import dmatrix
import re
from collections import OrderedDict
[docs]def mode(x):
"""Return the most frequent occurance. If two or more values are tied
with the most occurances, then return the lowest value.
:param x: A data vector.
:type x: pandas Series
:rtype: The the most frequent value in x.
"""
vc = x.value_counts()
if len(vc) > 0:
index_names = vc.index.names
vc = pd.DataFrame(vc)
vc.columns = ['counts']
vc = vc.reset_index()
# sort to keep consistent output
vc = vc.sort_values(['counts', 'index'], ascending=[False, True])
vc = vc.set_index(['index'])
vc.index.names = index_names
return vc.index[0]
else:
return None
[docs]class GroupByImputer(BaseEstimator, TransformerMixin):
"""Imputes Missing Values by Group with specified function. If a ``group``
parameter is given, it can be the name of any function which can be passed
to the ``agg`` function of a pandas ``GroupBy`` object. If a ``group``
paramter is not given, then only 'mean', 'median', and 'most_frequent'
can be used.
:param impute_type:
The type of imputation to be performed.
:type impute_type: string
:param group:
The column name or a list of column names to group the ``pandas
DataFrame``.
:type group: string or list of strings
"""
def __init__(self, impute_type, group=None):
self.group = group
if impute_type == 'most_frequent':
self.impute_type = mode
else:
self.impute_type = impute_type
[docs] def fit(self, X, y=None):
"""Fit the imputer on X
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
if self.group:
self.mapper = X.groupby(self.group).agg(self.impute_type).to_dict()
elif self.impute_type == mode:
self.mapper = X.mode().iloc[0, :].to_dict()
else:
if self.impute_type == 'median':
self.mapper = X.median().to_dict()
elif self.impute_type == 'mean':
self.mapper = X.mean().to_dict()
else:
raise ValueError(("Can only use 'most_frequent', 'median',"
"or 'mean' impute_types without 'group'"
"specified."))
return self
def _get_value_from_map(self, x, col):
"""get a value from the mapper, for a given column and a ``pandas
Series`` representing a row of data.
:param x: A row of data from a ``DataFrame``.
:type x: pandas Series
:param col: The name of the column to impute a missing value.
:type col: string
:rtype:
The value from self.mapper dictionary if exists, np.nan otherwise.
"""
try:
key = x[self.group]
if isinstance(key, pd.Series):
key = tuple(key)
return self.mapper[col][key]
except KeyError:
return np.nan
[docs] def transform(self, X):
"""Impute the eligible missing values in X
:param X: The input data with missing values to be imputed.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with eligible missing values imputed.
"""
X = X.copy()
if self.group:
for col in self.mapper.keys():
X[col] = X[col].fillna(X.apply(
lambda x: self._get_value_from_map(x, col), axis=1))
else:
X = X.fillna(pd.Series(self.mapper))
return X
[docs]class MissingValueFiller(BaseEstimator, TransformerMixin):
"""Fill missing values with a specified value. Should only be used with
columns of similar dtypes.
:param value: The value to impute for missing factors.
"""
def __init__(self, value):
self.value = value
[docs] def fit(self, X, y=None):
"""Fit the imputer on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
return self
[docs] def transform(self, X):
"""Impute the eligible missing values in X.
:param X: The input data with missing values to be filled.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with eligible missing values filled.
"""
X = X.fillna(self.value)
return X
[docs]class OverMissingThresholdDropper(BaseEstimator, TransformerMixin):
"""Drop columns with more missing data than a given threshold.
:param threshold: Maximum portion of missing data that is acceptable. Must
be within the interval [0,1]
:type threshold: float
"""
def __init__(self, threshold):
if threshold > 1 or threshold < 0:
raise ValueError("threshold must be within [0,1]")
else:
self.threshold = threshold
[docs] def fit(self, X, y=None):
"""Fit the dropper on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
length = len(X)
na_counts = X.isnull().sum()
self.cols_to_drop = na_counts[
(na_counts > int(length*(self.threshold)))].index.tolist()
return self
[docs] def transform(self, X):
"""Impute the eligible missing values in X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with columns dropped.
"""
X = X.drop(self.cols_to_drop, axis=1)
return X
[docs]class ValueReplacer(BaseEstimator, TransformerMixin):
"""Replaces Values in each column according to a nested dictionary.
``inverse_mapper`` is probably more intuitive for when one value replaces
many values. Only one of ``inverse_mapper`` or ``mapper`` can be used.
:param mapper: Nested dictionary with columns mapping to dictionaries
that map old values to new values.
:type mapper: dictionary
:param inverse_mapper: Nested dictionary with columns mapping to
dictionaries that map new values to a list of old
values
:type inverse_mapper: dictionary
``mapper`` takes the form::
{'column_name': {'old_value1': 'new_value1',
'old_value2': 'new_value1',
'old_value3': 'new_value2'}
}
while ``inverse_mapper`` takes the form::
{'column_name': {'new_value1': ['old_value1', 'old_value2'],
'new_value2': ['old_value1']}
}
"""
def __init__(self, mapper=None, inverse_mapper=None):
self.inverse_mapper = inverse_mapper
if inverse_mapper and mapper:
raise ValueError("Cannot use both a mapper and inverse_mapper.")
elif inverse_mapper:
mapper = {}
for k, d in inverse_mapper.items():
map2 = {}
for key, value in d.items():
for string in value:
map2[string] = key
mapper[k] = map2
elif not mapper:
raise ValueError("Must initialize with either mapper or "
"inverse_mapper.")
mapper = {key: dict_default(value) for key, value in mapper.items()}
self.mapper = mapper
[docs] def fit(self, X, y=None):
"""Fit the value replacer on X. Checks that all columns in mapper are
in present in X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
if len(set(self.mapper.keys()) - set(X.columns)) > 0:
raise ValueError("Mapper contains columns not found in input"
"data: " +
', '.join(set(self.mapper.keys())
- set(X.columns)))
return self
[docs] def transform(self, X):
"""Replace the values in X with the values in the mapper. Values not
accounted for in the mapper will be left untransformed.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with old values mapped to new values.
"""
X = X.copy(deep=True)
for col in self.mapper.keys():
X[col] = X[col].map(self.mapper[col])
return X
[docs]class FactorLimiter(BaseEstimator, TransformerMixin):
"""For each named column, it limits factors to a list of acceptable values.
Non-comforming factors, including missing values, are replaced by a default
value.
:param factors_per_column: dictionary mapping column name keys to a
dictionary with a list of acceptable factor
values and a default factor value for
non-conforming values
:type factors_per_column: dictionary
``factors_per_column`` takes the form::
{'column_name': {'factors': ['value1', 'value2', 'value3'],
'default': 'value1'},
}
}
"""
def __init__(self, factors_per_column=None):
self.factors_per_column = factors_per_column
mapper = {}
for col, specs in factors_per_column.items():
# new_dict = dict_factory('new_dict', specs['default'])
translation = {factor: factor for factor in specs['factors']}
new_dict = dict_with_default(specs['default'], translation)
# mapper[col] = new_dict(translation)
mapper[col] = new_dict
self.mapper = mapper
[docs] def fit(self, X, y=None):
"""Fit the factor limiter on X. Checks that all columns in
factors_per_column are in present in X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
if len(set(self.mapper.keys()) - set(X.columns)) > 0:
raise ValueError("factors_per_column contains keys not found in "
"DataFrame columns:" ', '.join(
set(self.mapper.keys()) - set(X.columns)))
return self
[docs] def transform(self, X):
"""Limit the factors in X with the values in the factor_per_column.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with factors limited to the specifications.
"""
X = X.copy(deep=True)
for col, val in self.mapper.items():
X[col] = X[col].map(val)
return X
[docs]class SingleValueAboveThresholdDropper(BaseEstimator, TransformerMixin):
"""Removes columns with a single value representing a higher percentage
of values than a given threshold
:param threshold: percentage of single value in a column to be removed
:type threshold: float
:param dropna: If True, do not consider NaN as a value
:type dropna: boolean
"""
def __init__(self, threshold=1, dropna=True):
if threshold > 1 or threshold < 0:
raise ValueError("threshold must be within [0,1]")
else:
self.threshold = threshold
self.dropna = dropna
[docs] def fit(self, X, y=None):
"""Fit the dropper on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
length = len(X)
val_counts = X.apply(lambda x:
x.value_counts(dropna=self.dropna).iloc[0])
self.cols_to_drop = val_counts[
(val_counts >= int(length*(self.threshold)))].index.tolist()
return self
[docs] def transform(self, X):
"""Drop the columns in X with single values that exceed the threshold.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with columns dropped to the specifications.
"""
X = X.drop(self.cols_to_drop, axis=1)
return X
[docs]class SingleValueDropper(BaseEstimator, TransformerMixin):
"""Drop columns with only one unique value
:param dropna: If True, do not consider NaN as a value
:type dropna: boolean
"""
def __init__(self, dropna=True):
self.dropna = dropna
def _unique_values(self, x):
values = x.unique().tolist()
if self.dropna and x.isnull().sum() > 0:
if None in values:
values.remove(None)
values = [value for value in values if value == value]
return len(values)
[docs] def fit(self, X, y=None):
"""Fit the dropper on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
val_counts = X.apply(self._unique_values, axis=0)
self.cols_to_drop = val_counts[(val_counts <= 1)].index.tolist()
return self
[docs] def transform(self, X):
"""Drop the columns in X with single non-missing values.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with columns dropper.
"""
X = X.drop(self.cols_to_drop, axis=1)
return X
[docs]class ColumnExtractor(BaseEstimator, TransformerMixin):
"""Extract a list of columns from a ``DataFrame``.
:param col: A list of columns to extract from the ``DataFrame``
:type col: list of strings
"""
def __init__(self, col):
self.col = col
[docs] def fit(self, X, y=None, **fit_params):
"""Fit the extractor on X. Checks that all columns are in X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
if len(set(self.col) - set(X.columns)) > 0:
raise ValueError("Column list contains columns not found in input"
"data: " +
', '.join(set(self.col) - set(X.columns)))
return self
[docs] def transform(self, X, **transform_params):
"""Extract the specified columns in X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with specified columns.
"""
return pd.DataFrame(X[self.col])
[docs]class ColumnDropper(BaseEstimator, TransformerMixin):
"""Drop a list of columns from a ``DataFrame``.
:param col: A list of columns to extract from the ``DataFrame``
:type col: list of strings
"""
def __init__(self, col):
self.col = col
[docs] def fit(self, X, y=None, **fit_params):
"""Fit the dropper on X. Checks that all columns are in X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
if len(set(self.col) - set(X.columns)) > 0:
raise ValueError("Column list contains columns not found in input "
"data: " + ', '.join(set(self.col)
- set(X.columns)))
return self
[docs] def transform(self, X, **transform_params):
"""Drop the specified columns in X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` without specified columns.
"""
return X.drop(self.col, axis=1)
[docs]class DummyCreator(BaseEstimator, TransformerMixin):
"""Create dummy variables from categorical variables.
:param dummy_na: Add a column to indicate NaNs, if False NaNs are ignored.
:type dummy_na: boolean
:param drop_first: Whether to get k-1 dummies out of k categorical levels
by removing the first level.
:type drop_first: boolean
"""
def __init__(self, **kwargs):
self.kwargs = kwargs
def _get_dummies(self, X, fit):
if fit:
return pd.get_dummies(X, **self.kwargs)
else:
new_kwargs = self.kwargs.copy()
if 'drop_first' in self.kwargs:
del new_kwargs['drop_first']
return pd.get_dummies(X, **new_kwargs)
def _fit(self, X):
X = self._get_dummies(X, fit=True)
self.columns = X.columns
return X
[docs] def fit(self, X, y=None, **fit_params):
"""Fit the dummy creator on X. Retains a record of columns produced
with the fitting data.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
self._fit(X)
return self
[docs] def fit_transform(self, X, y=None, **fit_params):
"""Fit the dummy creator on X, then transform X. Same as calling
self.fit().transform(), but more convenient and efficient.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
X = self._fit(X)
return X
[docs] def transform(self, X, **transform_params):
"""Create dummies for the columns in X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with dummy variables.
"""
X = self._get_dummies(X, fit=False)
column_set = set(self.columns)
data_column_set = set(X.columns)
if column_set != data_column_set:
# use same column order
for col in self.columns:
if col not in data_column_set:
X[col] = 0
X = X[self.columns]
return X
[docs]class ColumnValidator(BaseEstimator, TransformerMixin):
"""Ensure that the transformed dataset has the same columns and order as
the original fit dataset. Could be useful to check at the beginning and
end of pipelines.
"""
[docs] def fit(self, X, y=None, **fit_params):
"""Fit the validator on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
self.columns = X.columns
return self
[docs] def transform(self, X, **transform_params):
"""Checks whether a dataset to transform has the same columns as the
fitting dataset, and returns X with columns in the same order as the
dataset in fit.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with specified columns.
"""
if len(set(self.columns) - set(X.columns)) > 0:
raise ValueError("New data is missing columns from original data: "
+ ', '.join(set(self.columns) - set(X.columns)))
elif len(set(X.columns) - set(self.columns)) > 0:
raise ValueError("New data has columns not in the original data: "
+ ', '.join(set(X.columns) - set(self.columns)))
return pd.DataFrame(X[self.columns], index=X.index)
[docs]class TextContainsDummyExtractor(BaseEstimator, TransformerMixin):
"""Extract one or more dummy variables based on whether one or more text
columns contains one or more strings.
:param mapper: a mapping of new columns to criteria to populate it as True
:type mapper: dict
``mapper`` takes the form::
{'old_column1':
{'new_column1':
[{'pattern': 'string1', 'kwargs': {'case': False}},
{'pattern': 'string2', 'kwargs': {'case': False}}
],
'new_column2':
[{'pattern': 'string3', 'kwargs': {'case': False}},
{'pattern': 'string4', 'kwargs': {'case': False}}
],
},
'old_column2':
{'new_column3':
[{'pattern': 'string5', 'kwargs': {'case': False}},
{'pattern': 'string6', 'kwargs': {'case': False}}
],
'new_column4':
[{'pattern': 'string7', 'kwargs': {'case': False}},
{'pattern': 'string8', 'kwargs': {'case': False}}
]
}
}
"""
def __init__(self, mapper):
self.mapper = mapper
[docs] def fit(self, X, y=None):
"""Fit the imputer on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
if len(set(self.mapper.keys()) - set(X.columns)) > 0:
raise ValueError("Mapper contains columns not found in input"
"data: " +
', '.join(set(self.mapper.keys())
- set(X.columns)))
return self
[docs] def transform(self, X):
"""Impute the eligible missing values in X.
:param X: The input data with missing values to be filled.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with eligible missing values filled.
"""
X = X.copy(deep=True)
for old_col, val in self.mapper.items():
for new_col, terms in val.items():
series_list = []
for term in terms:
series_list.append(
X[old_col].str.contains(term['pattern'],
**term['kwargs'])
)
X[new_col] = bitwise_operator(
pd.DataFrame(series_list).transpose(), 'or').astype(int)
return X
[docs]class BitwiseOperator(BaseEstimator, TransformerMixin):
"""Apply a bitwise operator ``&`` or ``|`` to a list of columns.
:param mapper: A mapping from new columns which will be defined by applying
the bitwise operator to a list of old columns
:type mapper: dict
:param operator: the name of the bitwise operator to apply.
'and', 'or' are acceptable inputs
:type operator: str
``mapper`` takes the form::
{'new_column1': ['old_column1', 'old_column2', 'old_column3'],
'new_column2': ['old_column2', 'old_column4', 'old_column5']
}
"""
def __init__(self, operator, mapper):
self.mapper = mapper
if operator in ['and', 'or']:
self.operator = operator
else:
raise ValueError("parameter operator can only be 'and' or 'or'")
[docs] def fit(self, X, y=None, **fit_params):
"""Fit the dropper on X. Checks that all columns are in X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
columns = [item for sublist in
[val for val in self.mapper.values()] for item in sublist]
if len(set(columns) - set(X.columns)) > 0:
raise ValueError("Column list contains columns not found in input "
"data:" + ', '.join(set(columns)
- set(X.columns)))
return self
[docs] def transform(self, X, **transform_params):
"""Drop the specified columns in X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` without specified columns.
"""
X = X.copy(deep=True)
for new_col, cols in self.mapper.items():
X[new_col] = bitwise_operator(X[cols], self.operator).astype(int)
return X
[docs]class BoxCoxTransformer(BaseEstimator, TransformerMixin):
"""Create BoxCox Transformations on all columns.
:param adder: the amount to add to each column before the BoxCox
transformation
:type adder: numeric
"""
def __init__(self, adder=0):
self.adder = adder
[docs] def fit(self, X, y=None, **fit_params):
"""Fit the transformer on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
self.columns = X.columns
self.lambdas = dict()
for col in self.columns:
self.lambdas[col] = stats.boxcox(X[col] + self.adder)[1]
return self
[docs] def fit_transform(self, X, y=None, **fit_params):
"""Fit the validator on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
X = X.copy()
self.columns = X.columns
self.lambdas = dict()
for col in self.columns:
X[col], self.lambdas[col] = stats.boxcox(X[col] + self.adder)
return X
[docs] def transform(self, X, **transform_params):
"""Checks whether a dataset to transform has the same columns as the
fitting dataset, and returns X with columns in the same order as the
dataset in fit.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with specified columns.
"""
for col in self.lambdas:
X[col] = stats.boxcox(X[col] + self.adder, self.lambdas[col])
return X
[docs]class InteractionCreator(BaseEstimator, TransformerMixin):
"""Creates interactions across columns of a ``DataFrame``
:param columns1: first list of columns to create interactions with each of
the second list of columns
:type columns1: list of strings
:param columns2: second list of columns to create interactions with each of
the second list of columns
:type columns2: list of strings
"""
def __init__(self, columns1, columns2):
self.columns1 = columns1
self.columns2 = columns2
[docs] def fit(self, X, y=None, **fit_params):
"""Fit the transformer on X. Checks that all columns are in X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
if len((set(self.columns1) | set(self.columns2)) - set(X.columns)) > 0:
raise ValueError("Column lists contains columns not found in input"
" data: " + ', '.join((set(self.columns1)
| set(self.columns2))
- set(X.columns)))
formula = '0'
for col1 in self.columns1:
for col2 in self.columns2:
formula = formula + '+' + col1 + ':' + col2
self.formula = formula
return self
[docs] def transform(self, X, **transform_params):
"""Add specified interactions to X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` without specified columns.
"""
model_matrix = dmatrix(self.formula, data=X, return_type='dataframe')
return pd.concat([X, model_matrix], axis=1)
[docs]class StandardScaler(ScikitStandardScaler):
"""Standardize features by removing mean and scaling to unit variance
"""
def __init__(self, columns=None, **kwargs):
self.columns = columns
super().__init__(**kwargs)
[docs] def fit(self, X, y=None, **fit_params):
"""Fit the transformer on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
# assign columns if not defined at init
if self.columns is None:
self.columns = X.columns
super().fit(X[self.columns])
return self
[docs] def fit_transform(self, X, y=None, **fit_params):
"""Fit and transform the StandardScaler on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
X = X.copy()
# assign columns if not defined at init
if self.columns is None:
self.columns = X.columns
super().fit(X[self.columns])
# transform proper columns
X_transform = super().transform(X[self.columns])
X_transform = pd.DataFrame(
X_transform, columns=self.columns, index=X.index
)
# keep track of order and combine transform/non-transform columns
cols_to_return = X.columns
non_transformed_cols = [
col for col in cols_to_return if col not in X_transform.columns
]
X = pd.concat([X_transform, X[non_transformed_cols]], axis=1)
# put columns back into original order
X = X[cols_to_return]
return X
[docs] def transform(self, X, partial_cols=None, **transform_params):
"""Transform X with the standard scaling
:param X: The input data.
:type X: pandas DataFrame
:param partial_cols: when specified, only return these columns
:type X: list
:rtype: A ``DataFrame`` with specified columns.
"""
X = X.copy()
# insert dummy columns into df if not provided
if partial_cols is not None:
for col in self.columns:
if col not in X.columns:
X[col] = 0
# remember order of original df
cols_to_return = X.columns
# transform columns in self.columns
X_transform = super().transform(X[self.columns])
X_transform = pd.DataFrame(
X_transform, columns=self.columns, index=X.index
)
# add columns that weren't defined to be transformed back in
non_transformed_cols = [
col for col in cols_to_return if col not in X_transform.columns
]
X = pd.concat([X_transform, X[non_transformed_cols]], axis=1)
# put columns back into original order
X = X[cols_to_return]
# return only specified columns
if partial_cols is not None:
X = X[partial_cols]
return X
[docs] def inverse_transform(self, X, partial_cols=None, **transform_params):
"""Inverse transform X with the standard scaling
:param X: The input data.
:type X: pandas DataFrame
:param partial_cols: when specified, only return these columns
:type X: list
:rtype: A ``DataFrame`` with specified columns.
"""
X = X.copy()
# insert dummy columns into df if not provided
if partial_cols is not None:
for col in self.columns:
if col not in X.columns:
X[col] = 0
# remember order of original df
cols_to_return = X.columns
# transform columns in self.columns
X_transform = super().inverse_transform(
X[self.columns]
)
X_transform = pd.DataFrame(
X_transform, columns=self.columns, index=X.index
)
# add columns that weren't defined to be transformed back in
non_transformed_cols = [
col for col in cols_to_return if col not in X_transform.columns
]
X = pd.concat([X_transform, X[non_transformed_cols]], axis=1)
# put columns back into original order
X = X[cols_to_return]
# return only specified columns
if partial_cols is not None:
X = X[partial_cols]
return X
[docs]class ColumnNameCleaner(BaseEstimator, TransformerMixin):
"""Replaces spaces and formula symbols in column names that conflict with
patsy formula interpretation
"""
[docs] def fit(self, X, y=None, **fit_params):
"""Fit the transformer on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
matcher = re.compile(r'[^A-Z0-9_]', flags=re.IGNORECASE)
self.columns = (X.columns
.str.strip()
.str.replace('+', '_and_')
.str.replace('*', '_by_')
.str.replace('/', '_or_')
.str.replace(matcher, '_'))
print(self.columns)
return self
[docs] def transform(self, X, **transform_params):
"""Transform X with clean column names for patsy
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with specified columns.
"""
# ensure that columns are in same order as in fit
X = X.copy()
X.columns = self.columns
return X
[docs]class PolynomialFeatures(BaseEstimator, TransformerMixin):
"""Creates polynomail features from inputs.
:param degree: The degree of the polynomial
:interaction_only: if true, only interaction features are produced:
features that are products of at most degree distinct input features.
"""
def __init__(self, degree=2, interaction_only=False):
self.degree = degree
self.interaction_only = interaction_only
self.SkPolynomialFeatures = SkPolynomialFeatures(
degree=self.degree,
interaction_only=self.interaction_only,
include_bias=False
)
[docs] def fit(self, X, y=None, **fit_params):
"""Fit the transformer on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
self.columns = X.columns
self.SkPolynomialFeatures.fit(X.values)
# get polynomial feature names
self.poly_feat = [
str(e) for e in self.SkPolynomialFeatures.get_feature_names_out()
if 'x' in e
]
# for each polynomial feature name (x0, x1, etc)
# map to df column name
self.name_dict = OrderedDict()
for n in np.arange(0, self.SkPolynomialFeatures.n_features_in_):
self.name_dict[self.poly_feat[n]] = [self.columns[n]]
# reverse OrderedDict to avoid name issues
# eg., x1 & x11 confusion in column_name_string.replace()
self.name_dict = OrderedDict(reversed(list(self.name_dict.items())))
return self
[docs] def transform(self, X, **transform_params):
"""Transform X with clean column names for patsy
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with specified columns.
"""
X = X.copy()[self.columns]
X_transform = self.SkPolynomialFeatures.transform(X.values)
# replace poly_feat names (x0, x1, etc.)
# with actual column names and cleanup
new_cols = self.poly_feat.copy()
for poly_feat in self.name_dict.keys():
for i, col in enumerate(new_cols):
new_cols[i] = (
new_cols[i]
.replace(' ', '*')
.replace(poly_feat, self.name_dict[poly_feat][0])
)
# return df with original names used
X_transform = pd.DataFrame(
X_transform,
columns=new_cols,
index=X.index
)
return X_transform
[docs]class ContinuousFeatureBinner(BaseEstimator, TransformerMixin):
"""Creates bins for continuous features
:param field: the continuous field for which to create bins
:type field: string
:param bins: The criteria to bin by.
:type bins: array-like
:param right_inclusive: interval should be right-inclusive or not
:type right_inclusive: bool
"""
def __init__(self, field, bins, right_inclusive=True):
self.field = field
self.bins = bins
self.right_inclusive = right_inclusive
[docs] def fit(self, X, y=None):
"""Fit the ContinuousFeatureBinner on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
if self.field not in X.columns:
raise ValueError('field not in X.')
return self
[docs] def transform(self, X):
"""Transform X on ``field``, adding a new column with ``_GRP``
appended.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with specified columns.
"""
X = X.copy(deep=True)
if self.field not in X.columns:
raise ValueError('Field not found in dataframe.')
# use pandas.cut() to create bins
X[str(self.field) + str('_GRP')] = pd.cut(
x=X[self.field],
bins=self.bins,
right=self.right_inclusive
)
# return labels as strings
X[str(self.field) + str('_GRP')] = (
X[str(self.field) + str('_GRP')].astype('str')
)
# label everything not in a bin as 'Other'
X[str(self.field) + str('_GRP')] = (
X[str(self.field) + str('_GRP')]
.replace('nan', np.NaN)
.fillna(value='Other')
)
return X
[docs]class TypeExtractor(BaseEstimator, TransformerMixin):
"""Returns dataframe with only specified field type
:param type: desired type; either 'numeric' or 'categorical'
:type type: string
"""
def __init__(self, type):
self.type = type
[docs] def fit(self, df, **fit_params):
"""Fit the TypeExtractor on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
if self.type == 'numeric':
df = df.select_dtypes(include=[np.number])
self.selected_fields = list(df.columns)
elif self.type == 'categorical':
numeric_cols = df.select_dtypes(include=[np.number]).columns
cat_cols = [col for col in df.columns if col not in numeric_cols]
df = df[cat_cols]
self.selected_fields = cat_cols
print('Selected fields: ' + str(self.selected_fields))
return self
[docs] def transform(self, df, **transform_params):
"""Extract all columns of ``type``.
:param X: The input data.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with extracted columns.
"""
df = df[self.selected_fields]
return df
[docs]class GenericTransformer(BaseEstimator, TransformerMixin):
"""Generic transformer that applies user-defined function within
pipeline framework. Arbitrary callable should only make transformations
and does not store any fit() parameters. Lambda functions are not supported
as they cannot be pickled.
:param function: arbitrary function to use as a transformer
:type function: callable
:param params: dict with function parameter name as key and parameter value
as value
:type params: dict
"""
def __init__(self, function, params=None):
self.function = function
self.params = params
[docs] def transform(self, X, **transform_params):
if self.params is None:
X_transform = self.function(X)
else:
X_transform = self.function(X, **self.params)
return X_transform
[docs]class MissingColumnsReplacer(BaseEstimator, TransformerMixin):
"""Fill in missing columns to a DataFrame
:param cols: The expected list of columns.
:param value: The value to fill the new columns with by default
"""
def __init__(self, cols, value):
self.cols = cols
self.value = value
[docs] def fit(self, X, y=None):
"""Fit the imputer on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
return self
[docs] def transform(self, X):
"""Impute the eligible missing values in X.
:param X: The input data with missing values to be filled.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` with eligible missing values filled.
"""
X = X.copy(deep=True)
new_cols = sorted(list(set(self.cols) - set(X.columns)))
for col in new_cols:
X[col] = np.nan
X.loc[:, new_cols] = X[new_cols].fillna(self.value)
return X
[docs]class SklearnPandasWrapper(BaseEstimator, TransformerMixin):
"""Wrap a scikit-learn Transformer with a pandas-friendly version that
keeps columns and row indices in place. Will only work for Transformers
that do not add or change the order of columns.
:param transformer: The scikit-learn compatible Transformer object.
:type transformer: sklearn Transformer
"""
def __init__(self, transformer):
self.transformer = transformer
[docs] def fit(self, X, y=None):
"""Fit the imputer on X.
:param X: The input data.
:type X: pandas DataFrame
:rtype: Returns self.
"""
self.columns = X.columns
print(self.columns)
self.transformer.fit(X, y)
return self
[docs] def transform(self, X):
"""Transform values in X.
:param X: The input data to be transformed.
:type X: pandas DataFrame
:rtype: A ``DataFrame`` trasnformed.
"""
X_new = self.transformer.transform(X)
X_df = pd.DataFrame(X_new)
X_df.columns = self.columns
X_df.index = X.index
return X_df