This jupyter notebook tries to construct Adaboost model from scratch. I will still use the classification data that I generated before so that I am able to compare the classification results with other algorithms. Also, in the end, I will:
sklearn.ensemble.AdaBoostClassifier
module # Load Library
import numpy as np
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, roc_curve, auc
from scipy.stats import mode
import math
# Generate Data
X, y = make_classification(n_samples = 2000, n_features = 2, n_redundant =0, n_classes = 2, random_state = 42)
y[y == 0] = -1
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 42)
# weights loss function
def get_loss(true_yi, pred_yi, wi):
'''
Parameter
---------
weights: A list contains weights for each of the data point
'''
return sum(((true_yi != pred_yi).astype(int) * wi)/sum(wi))
class DecisionStump:
'''
This class aims to generate Decision Stump object that is used in the Adaboost algorithm
'''
def __init__(self):
self.split = {} # the dataframe contains the splitting information when we fit the data
self.final_i = None
self.cutpoint = None
self.loss = None
self.final_pred_1 = None
self.final_pred_2 = None
def fit(self, X_train, y_train, weights):
for i in range(X_train.shape[1]):
Xi = X_train[:,i] # contains all the values in the ith predictor space
final_loss = math.inf # initialize the loss value
for cutpoint in Xi:
# make splitting
true_yi_part1 = y_train[Xi < cutpoint]
weights_part1 = weights[Xi < cutpoint]
pred_yi_part1 = np.array(list(mode(true_yi_part1)[0]) * len(true_yi_part1))
true_yi_part2 = y_train[Xi >= cutpoint]
weights_part2 = weights[Xi >= cutpoint]
pred_yi_part2 = np.array(list(mode(true_yi_part2)[0]) * len(true_yi_part2))
if len(true_yi_part1)==0 or len(true_yi_part2)==0:
continue
# calculate loss in each part and sum them up
loss = get_loss(true_yi_part1,pred_yi_part1,weights_part1) + get_loss(true_yi_part2,pred_yi_part2,weights_part2)
if loss <= final_loss:
final_loss = loss
final_cutpoint = cutpoint
final_pred_1 = np.unique(pred_yi_part1).astype(int)
final_pred_2 = np.unique(pred_yi_part2).astype(int)
self.split[i] = (final_cutpoint, final_loss, final_pred_1, final_pred_2)
# based on the optimal value for each predictor, find the cutpoint that leads to minimum loss
self.final_i = np.argmin([i[1] for i in self.split.values()]) # which predictor
self.cutpoint = self.split[self.final_i][0] # the cutpoint in that predictor space
self.loss = self.split[self.final_i][1] # the total loss
self.final_pred_1 = self.split[self.final_i][2]
self.final_pred_2 = self.split[self.final_i][3]
def predict(self, X_test):
pred = []
i = self.final_i # the targeted splitting feature
cp = self.cutpoint # the targeted splitting cutpoint
for xi in X_test[:,i]:
if xi < cp:
pred.append(self.final_pred_1)
else:
pred.append(self.final_pred_2)
return np.ravel(np.array(pred))
Now let us construct the Adaboost algorithm
class Adaboost:
def __init__(self, n_estimators):
self.n_estimators = n_estimators # the number of iterations we have
self.WeakLearner = []
self.alpha = []
def fit(self, X_train, y_train):
weights = np.array([1/len(y_train)] * int(len(y_train))) # initialize the weights
for m in range(self.n_estimators): # we will construct several weak learner
if m % 10 == 0:
print('It has already iterated for {} times'.format(m+1))
sample_index = np.random.choice(np.arange(len(y_train)), p=(weights/sum(weights)), size = len(y_train))
sample_X_train = X_train[sample_index]
sample_y_train = y_train[sample_index]
# construct decision stump
ds = DecisionStump()
ds.fit(sample_X_train, sample_y_train, weights)
prediction_m = ds.predict(X_train)
err_m = ds.loss
alpha_m = np.log((1-err_m)/err_m+0.0000000000001)
weights = weights * np.exp(alpha_m * np.logical_not(np.equal(prediction_m, y_train)).astype(int))
# store each weak leaner
self.WeakLearner.append(ds)
self.alpha.append(alpha_m)
def predict(self, X_test):
prediction = 0
for m in range(self.n_estimators):
wl = self.WeakLearner[m]
prediction = prediction + wl.predict(X_test) * self.alpha[m]
prediction[prediction>=0] = 1
prediction[prediction<0] = -1
return prediction
ad = Adaboost(50)
ad.fit(X_train, y_train)
y_pred = ad.predict(X_test)
np.mean(np.equal(y_pred, y_test))
fig = plt.figure(figsize = (15,5))
plt.subplot(1,2,1)
plt.scatter(X_test[y_test==1][:,0],X_test[y_test==1][:,1], c = 'b', label = 'class1')
plt.scatter(X_test[y_test==-1][:,0],X_test[y_test==-1][:,1], c = 'r', label = 'class2')
plt.legend(loc='best');
plt.title("Original Dataset Distribution")
plt.xlabel('X1')
plt.ylabel('X2')
plt.subplot(1,2,2)
plt.scatter(X_test[y_pred==1][:,0],X_test[y_pred==1][:,1], c = 'b', label = 'class1')
plt.scatter(X_test[y_pred==-1][:,0],X_test[y_pred==-1][:,1], c = 'r', label = 'class2')
plt.legend(loc='best');
plt.title("Predicted Dataset Distribution")
plt.xlabel('X1')
plt.ylabel('X2')
plt.show()
Then I compare my result with the result of sklearn
from sklearn.ensemble import AdaBoostClassifier
ada = AdaBoostClassifier()
ada = ada.fit(X_train,y_train)
np.mean(np.equal(ada.predict(X_test), y_test))
Still, our result is very close to sklearn
's result
def alpha(err):
return np.log((1-err)/err)
err = np.linspace(start = 0, stop = 1, num = 1000)
ap = alpha(err)
fig = plt.figure()
plt.plot(err, ap)
plt.axhline(y = 0, c = 'r', alpha = 0.2, linestyle = '--')
plt.axvline(x = 0.5, c = 'r', alpha = 0.2, linestyle = '--')
plt.xlabel('error rate')
plt.ylabel('alpha')
plt.show()