Expand source code
from sklearn.tree._tree import Tree
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.base import ClassifierMixin, RegressorMixin
from sklearn import __version__
from collections import namedtuple
import pandas as pd
import numpy as np

from collections import namedtuple

from sklearn import __version__
from sklearn.base import ClassifierMixin, RegressorMixin
from sklearn.tree import DecisionTreeClassifier, DecisionTreeRegressor
from sklearn.tree._tree import Tree
import imodels.util.tree


class Node:
    def __init__(self, impurity, num_samples, num_samples_per_class, predicted_probs):
        self.impurity = impurity
        self.num_samples = num_samples
        self.num_samples_per_class = num_samples_per_class
        self.predicted_probs = predicted_probs
        self.feature_index = 0
        self.threshold = 0
        self.left = None
        self.right = None


class CustomDecisionTreeClassifier(ClassifierMixin):
    def __init__(self, max_leaf_nodes=None, impurity_func='gini'):
        self.root = None
        self.max_leaf_nodes = max_leaf_nodes
        self.impurity_func = impurity_func

    def fit(self, X, y, feature_costs=None):
        self.n_classes_ = len(set(y))
        self.n_features = X.shape[1]
        self.feature_costs_ = imodels.util.tree._validate_feature_costs(
            feature_costs, self.n_features)
        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X, y):
        stack = []
        num_samples_per_class = np.array([np.sum(y == i)
                                          for i in range(self.n_classes_)])
        root = Node(
            impurity=self._calc_impurity(y),
            num_samples=y.size,
            num_samples_per_class=num_samples_per_class,
            predicted_probs=num_samples_per_class / y.size,
        )
        root.impurity_reduction = self._best_split(X, y)[-1]

        stack.append((root, X, y))
        self.n_splits = 0
        while stack:
            node, X_node, y_node = stack.pop()
            idx, thr, _ = self._best_split(X_node, y_node)
            if idx is not None:
                self.n_splits += 1
                indices_left = X_node[:, idx] < thr
                X_left, y_left = X_node[indices_left], y_node[indices_left]
                X_right, y_right = X_node[~indices_left], y_node[~indices_left]
                node.feature_index = idx
                node.threshold = thr
                num_samples_per_class_left = np.array([
                    np.sum(y_left == i) for i in range(self.n_classes_)])
                node.left = Node(
                    impurity=self._calc_impurity(y_left),
                    num_samples=y_left.size,
                    num_samples_per_class=num_samples_per_class_left,
                    predicted_probs=num_samples_per_class_left / y_left.size,
                )
                # some redundant calculation going on here, but it's okay....
                node.left.impurity_reduction = self._best_split(
                    X_left, y_left)[-1]

                num_samples_per_class_right = np.array([
                    np.sum(y_right == i) for i in range(self.n_classes_)])
                node.right = Node(
                    impurity=self._calc_impurity(y_right),
                    num_samples=y_right.size,
                    num_samples_per_class=num_samples_per_class_right,
                    predicted_probs=num_samples_per_class_right / y_right.size,
                )
                node.right.impurity_reduction = self._best_split(
                    X_right, y_right)[-1]
                stack.append((node.right, X_right, y_right))
                stack.append((node.left, X_left, y_left))

            # early stop
            if self.max_leaf_nodes and self.n_splits >= self.max_leaf_nodes - 1:
                return root

            # sort stack by impurity_reduction
            stack = sorted(
                stack, key=lambda x: x[0].impurity_reduction, reverse=True)

        return root

    def _best_split(self, X, y):
        n = y.size
        if n <= 1:
            return None, None, 0

        orig_impurity = self._gini(y)
        impurity_reduction = 0
        best_impurity_reduction = 0
        best_idx, best_thr = None, None

        # loop over features
        for idx in range(self.n_features):
            thresholds, y_classes = zip(*sorted(zip(X[:, idx], y)))
            y_classes = np.array(y_classes)

            # consider every point where threshold value changes
            idx_thresholds = (1 + np.where(np.diff(thresholds))[0]).tolist()
            for i in idx_thresholds:

                # calculate impurity for left and right
                y_left = y_classes[:i]
                y_right = y_classes[i:]
                impurity_reduction = orig_impurity - (y_left.size * self._gini(y_left) +
                                                      y_right.size * self._gini(y_right)) / n

                if self.impurity_func == 'information_gain_ratio':
                    split_info = - (y_left.size / n * np.log2(y_left.size / n)) - (
                        y_right.size / n * np.log2(y_right.size / n))
                    if ~np.isnan(split_info):
                        impurity_reduction = impurity_reduction / split_info
                if self.impurity_func == 'cost_information_gain_ratio':
                    impurity_reduction /= self.feature_costs_[idx]

                if impurity_reduction > best_impurity_reduction:
                    best_impurity_reduction = impurity_reduction
                    best_idx = idx
                    best_thr = (thresholds[i] + thresholds[i - 1]) / 2

        return best_idx, best_thr, impurity_reduction

    def _calc_impurity(self, y):
        if self.impurity_func == 'gini':
            return self._gini(y)
        elif self.impurity_func in ['entropy', 'information_gain_ratio', 'cost_information_gain_ratio']:
            return self._entropy(y)

    def _gini(self, y):
        n = y.size
        return 1.0 - sum((np.sum(y == c) / n) ** 2 for c in range(self.n_classes_))

    def _entropy(self, y):
        n = y.size
        return -sum((np.sum(y == c) / n) * np.log2(np.sum(y == c) / n) for c in range(self.n_classes_))

    def predict(self, X):
        return np.argmax(self.predict_proba(X), axis=1)

    def predict_proba(self, X):
        return np.array([self._predict_single_proba(x) for x in X])

    def _predict_single_proba(self, x):
        node = self.root
        while node.left or node.right:
            if x[node.feature_index] < node.threshold and node.left:
                node = node.left
            elif node.right:
                node = node.right
        return node.predicted_probs


if __name__ == '__main__':
    from sklearn.datasets import load_breast_cancer, load_iris
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score

    # data = load_breast_cancer()
    data = load_iris()
    X = data.data
    y = data.target
    # print(np.unique(y))

    X_train, X_test, y_train, y_test = train_test_split(
        X, y, random_state=42, test_size=0.5)
    m = CustomDecisionTreeClassifier(
        # max_leaf_nodes=20,
        impurity_func='cost_information_gain_ratio')
    m.fit(X_train, y_train)
    y_pred = m.predict(X_test)
    print("Accuracy:", accuracy_score(y_test, y_pred))
    print('n_nodes', m.n_splits + 1)
    print('shapes', m.predict_proba(X_test).shape, m.predict(X_test).shape)

    cost = imodels.util.tree.calculate_mean_depth_of_points_in_custom_tree(m)
    print('Cost', cost)

Classes

class CustomDecisionTreeClassifier (max_leaf_nodes=None, impurity_func='gini')

Mixin class for all classifiers in scikit-learn.

This mixin defines the following functionality:

  • _estimator_type class attribute defaulting to "classifier";
  • score method that default to :func:~sklearn.metrics.accuracy_score.
  • enforce that fit requires y to be passed through the requires_y tag.

Read more in the :ref:User Guide <rolling_your_own_estimator>.

Examples

>>> import numpy as np
>>> from sklearn.base import BaseEstimator, ClassifierMixin
>>> # Mixin classes should always be on the left-hand side for a correct MRO
>>> class MyEstimator(ClassifierMixin, BaseEstimator):
...     def __init__(self, *, param=1):
...         self.param = param
...     def fit(self, X, y=None):
...         self.is_fitted_ = True
...         return self
...     def predict(self, X):
...         return np.full(shape=X.shape[0], fill_value=self.param)
>>> estimator = MyEstimator(param=1)
>>> X = np.array([[1, 2], [2, 3], [3, 4]])
>>> y = np.array([1, 0, 1])
>>> estimator.fit(X, y).predict(X)
array([1, 1, 1])
>>> estimator.score(X, y)
0.66...
Expand source code
class CustomDecisionTreeClassifier(ClassifierMixin):
    def __init__(self, max_leaf_nodes=None, impurity_func='gini'):
        self.root = None
        self.max_leaf_nodes = max_leaf_nodes
        self.impurity_func = impurity_func

    def fit(self, X, y, feature_costs=None):
        self.n_classes_ = len(set(y))
        self.n_features = X.shape[1]
        self.feature_costs_ = imodels.util.tree._validate_feature_costs(
            feature_costs, self.n_features)
        self.root = self._grow_tree(X, y)

    def _grow_tree(self, X, y):
        stack = []
        num_samples_per_class = np.array([np.sum(y == i)
                                          for i in range(self.n_classes_)])
        root = Node(
            impurity=self._calc_impurity(y),
            num_samples=y.size,
            num_samples_per_class=num_samples_per_class,
            predicted_probs=num_samples_per_class / y.size,
        )
        root.impurity_reduction = self._best_split(X, y)[-1]

        stack.append((root, X, y))
        self.n_splits = 0
        while stack:
            node, X_node, y_node = stack.pop()
            idx, thr, _ = self._best_split(X_node, y_node)
            if idx is not None:
                self.n_splits += 1
                indices_left = X_node[:, idx] < thr
                X_left, y_left = X_node[indices_left], y_node[indices_left]
                X_right, y_right = X_node[~indices_left], y_node[~indices_left]
                node.feature_index = idx
                node.threshold = thr
                num_samples_per_class_left = np.array([
                    np.sum(y_left == i) for i in range(self.n_classes_)])
                node.left = Node(
                    impurity=self._calc_impurity(y_left),
                    num_samples=y_left.size,
                    num_samples_per_class=num_samples_per_class_left,
                    predicted_probs=num_samples_per_class_left / y_left.size,
                )
                # some redundant calculation going on here, but it's okay....
                node.left.impurity_reduction = self._best_split(
                    X_left, y_left)[-1]

                num_samples_per_class_right = np.array([
                    np.sum(y_right == i) for i in range(self.n_classes_)])
                node.right = Node(
                    impurity=self._calc_impurity(y_right),
                    num_samples=y_right.size,
                    num_samples_per_class=num_samples_per_class_right,
                    predicted_probs=num_samples_per_class_right / y_right.size,
                )
                node.right.impurity_reduction = self._best_split(
                    X_right, y_right)[-1]
                stack.append((node.right, X_right, y_right))
                stack.append((node.left, X_left, y_left))

            # early stop
            if self.max_leaf_nodes and self.n_splits >= self.max_leaf_nodes - 1:
                return root

            # sort stack by impurity_reduction
            stack = sorted(
                stack, key=lambda x: x[0].impurity_reduction, reverse=True)

        return root

    def _best_split(self, X, y):
        n = y.size
        if n <= 1:
            return None, None, 0

        orig_impurity = self._gini(y)
        impurity_reduction = 0
        best_impurity_reduction = 0
        best_idx, best_thr = None, None

        # loop over features
        for idx in range(self.n_features):
            thresholds, y_classes = zip(*sorted(zip(X[:, idx], y)))
            y_classes = np.array(y_classes)

            # consider every point where threshold value changes
            idx_thresholds = (1 + np.where(np.diff(thresholds))[0]).tolist()
            for i in idx_thresholds:

                # calculate impurity for left and right
                y_left = y_classes[:i]
                y_right = y_classes[i:]
                impurity_reduction = orig_impurity - (y_left.size * self._gini(y_left) +
                                                      y_right.size * self._gini(y_right)) / n

                if self.impurity_func == 'information_gain_ratio':
                    split_info = - (y_left.size / n * np.log2(y_left.size / n)) - (
                        y_right.size / n * np.log2(y_right.size / n))
                    if ~np.isnan(split_info):
                        impurity_reduction = impurity_reduction / split_info
                if self.impurity_func == 'cost_information_gain_ratio':
                    impurity_reduction /= self.feature_costs_[idx]

                if impurity_reduction > best_impurity_reduction:
                    best_impurity_reduction = impurity_reduction
                    best_idx = idx
                    best_thr = (thresholds[i] + thresholds[i - 1]) / 2

        return best_idx, best_thr, impurity_reduction

    def _calc_impurity(self, y):
        if self.impurity_func == 'gini':
            return self._gini(y)
        elif self.impurity_func in ['entropy', 'information_gain_ratio', 'cost_information_gain_ratio']:
            return self._entropy(y)

    def _gini(self, y):
        n = y.size
        return 1.0 - sum((np.sum(y == c) / n) ** 2 for c in range(self.n_classes_))

    def _entropy(self, y):
        n = y.size
        return -sum((np.sum(y == c) / n) * np.log2(np.sum(y == c) / n) for c in range(self.n_classes_))

    def predict(self, X):
        return np.argmax(self.predict_proba(X), axis=1)

    def predict_proba(self, X):
        return np.array([self._predict_single_proba(x) for x in X])

    def _predict_single_proba(self, x):
        node = self.root
        while node.left or node.right:
            if x[node.feature_index] < node.threshold and node.left:
                node = node.left
            elif node.right:
                node = node.right
        return node.predicted_probs

Ancestors

  • sklearn.base.ClassifierMixin

Methods

def fit(self, X, y, feature_costs=None)
Expand source code
def fit(self, X, y, feature_costs=None):
    self.n_classes_ = len(set(y))
    self.n_features = X.shape[1]
    self.feature_costs_ = imodels.util.tree._validate_feature_costs(
        feature_costs, self.n_features)
    self.root = self._grow_tree(X, y)
def predict(self, X)
Expand source code
def predict(self, X):
    return np.argmax(self.predict_proba(X), axis=1)
def predict_proba(self, X)
Expand source code
def predict_proba(self, X):
    return np.array([self._predict_single_proba(x) for x in X])
class Node (impurity, num_samples, num_samples_per_class, predicted_probs)
Expand source code
class Node:
    def __init__(self, impurity, num_samples, num_samples_per_class, predicted_probs):
        self.impurity = impurity
        self.num_samples = num_samples
        self.num_samples_per_class = num_samples_per_class
        self.predicted_probs = predicted_probs
        self.feature_index = 0
        self.threshold = 0
        self.left = None
        self.right = None