Fan Gong 25/01/2018
This notebook tries to construct the classification and regression tree from scratch. For the dataset, I will create some fake data through using make_regression
for regression tree and using the same classification dataset as before using make_classification
in sklearn
# Load Library
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
import collections
import warnings
import math
warnings.filterwarnings('ignore')
# Generate Data
X, y = make_regression(n_samples=1000, n_features=2, n_targets=1, n_informative=2, random_state = 42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 42)
def get_loss(y):
'''
This function aims to calculate the rss loss in one region.
Parameter
---------
y: A vector contains the dependent value in this region.
Return
------
loss: The loss value in this region
'''
loss = sum((y - np.mean(y))**2)
return loss
The second function aims to loop through all the value and find the best splitting in that region.
def get_cutpoint(region_x, region_y):
'''
This function aims to find the optimal predictor and cutpoint to make splitting.
Parameter
---------
region_x: All the observation values in that region
region_y: All the dependent values in that region
Return
------
optimal_para: A length 2 list: [predictor's number to split, cutpoint]
'''
split = {}
for i in range(region_x.shape[1]):
Xi = region_x[:,i] # contains all the values in the ith predictor space
final_loss = 1000000000 # initialize the loss value
for cutpoint in Xi:
# make splitting
part1 = region_y[Xi < cutpoint]
part2 = region_y[Xi >= cutpoint]
# calculate loss in each part and sum them up
loss = get_loss(part1) + get_loss(part2)
if loss <= final_loss:
final_loss = loss
final_cutpoint = cutpoint
split[i] = (final_cutpoint, final_loss)
# based on the optimal value for each predictor, find the cutpoint that leads to minimum loss
final_i = np.argmin([i[1] for i in split.values()]) # which predictor
cutpoint = split[final_i][0] # the cutpoint in that predictor space
loss = split[final_i][1] # the total loss
return([final_i, cutpoint, loss])
This function aims to choose which region to make splitting and create the stop criteria
def get_region(X, y, stop_n):
'''
This function aims to perform recursive binary splitting in each region and find which region to split
Parameter
---------
X: The training observation matrix
y: The training label vector
stop_n: The minimum number of observations that each region could have
Return
------
paras: A dictionary contains all the cutpoint information
region_info: A dictionary contains all the region splitting information
'''
# some initialized values
regions = [[X, y]]
paras = {}
len_old = len(regions)
len_new = len_old + 1
while len_old < len_new:
len_old = len(regions)
final_loss = 1000000000 # initialize the loss value
index = 0
for region in regions:
index += 1
region_x, region_y = region # get the region information
Xi, cutpoint, loss = get_cutpoint(region_x, region_y) # use get_cutpoint function to find the optimal parameter
part1_y = region_y[region_x[:,Xi] < cutpoint]
part1_x = region_x[region_x[:,Xi] < cutpoint,:]
part2_y = region_y[region_x[:,Xi] >= cutpoint]
part2_x = region_x[region_x[:,Xi] >= cutpoint,:]
if loss <= final_loss and len(part1_y) > stop_n and len(part2_y) > stop_n: # do not satisfy the stop criteria
new_region1 = [part1_x, part1_y] # first new region
new_region2 = [part2_x, part2_y] # second new region
Xi_final = Xi # splitting predictor
cutpoint_final = cutpoint # splitting cutpoint
region_index = index-1 # splitting region
final_loss = loss # update the loss
print('region {0} has {1} number of obs in part1, {2} in part 2 with loss {3}'.format(index-1, len(part1_y), len(part2_y), final_loss))
elif all(map(lambda x: x <= stop_n, [len(region[1]) for region in regions])): # if all regions satisfy the stop criteria
break
else: # or this region don't satisfy above two criteria, we continue
#print('region {0} dont reduce loss'.format(index-1))
continue
if collections.Counter(regions[len(regions)-1][1]) != collections.Counter(new_region2[1]): # Check whether we update our splitting
paras.setdefault(Xi_final,[]).append(cutpoint_final) # record our cutting parameters
del(regions[region_index]) # delete the splitting region
regions.extend((new_region1, new_region2)) # replace by two new sub-regions
else:
continue
len_new = len(regions)
return(paras)
get_region(X_test, y_test, 10)
def make_prediction(feature_info, y_info, X):
'''
This function aims to make a prediction based on the region splitting from our training model
Parameter
--------
feature_info: The output from the get_region function containing the region range for each feature
y_info: The output from the get_region function containing the prediction y values.
X: The test data set
Return
------
y: The prediction value
'''
y = []
num_feature = X.shape[1]
for value in X:
for r in range(len(y_info)):
logic = []
for xi in range(num_features):
if value[xi] > feature_info[num_features*r+xi][0] and value[xi] <= feature_info[num_features*r+xi][1]:
logic.append(True)
else:
logic.append(False)
if all(logic):
y.append(y_info[r])
else:
continue
return(y)
y = make_prediction(feature_info, y_info, X_test)