-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
random_forest.py
109 lines (90 loc) · 3.54 KB
/
random_forest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# coding:utf-8
import numpy as np
from mla.base import BaseEstimator
from mla.ensemble.base import information_gain, mse_criterion
from mla.ensemble.tree import Tree
class RandomForest(BaseEstimator):
def __init__(self, n_estimators=10, max_features=None, min_samples_split=10, max_depth=None, criterion=None):
"""Base class for RandomForest.
Parameters
----------
n_estimators : int
The number of decision tree.
max_features : int
The number of features to consider when looking for the best split.
min_samples_split : int
The minimum number of samples required to split an internal node.
max_depth : int
Maximum depth of the tree.
criterion : str
The function to measure the quality of a split.
"""
self.max_depth = max_depth
self.min_samples_split = min_samples_split
self.max_features = max_features
self.n_estimators = n_estimators
self.trees = []
def fit(self, X, y):
self._setup_input(X, y)
if self.max_features is None:
self.max_features = int(np.sqrt(X.shape[1]))
else:
assert X.shape[1] > self.max_features
self._train()
def _train(self):
for tree in self.trees:
tree.train(
self.X,
self.y,
max_features=self.max_features,
min_samples_split=self.min_samples_split,
max_depth=self.max_depth
)
def _predict(self, X=None):
raise NotImplementedError()
class RandomForestClassifier(RandomForest):
def __init__(self, n_estimators=10, max_features=None, min_samples_split=10, max_depth=None, criterion="entropy"):
super(RandomForestClassifier, self).__init__(
n_estimators=n_estimators,
max_features=max_features,
min_samples_split=min_samples_split,
max_depth=max_depth,
criterion=criterion,
)
if criterion == "entropy":
self.criterion = information_gain
else:
raise ValueError()
# Initialize empty trees
for _ in range(self.n_estimators):
self.trees.append(Tree(criterion=self.criterion))
def _predict(self, X=None):
y_shape = np.unique(self.y).shape[0]
predictions = np.zeros((X.shape[0], y_shape))
for i in range(X.shape[0]):
row_pred = np.zeros(y_shape)
for tree in self.trees:
row_pred += tree.predict_row(X[i, :])
row_pred /= self.n_estimators
predictions[i, :] = row_pred
return predictions
class RandomForestRegressor(RandomForest):
def __init__(self, n_estimators=10, max_features=None, min_samples_split=10, max_depth=None, criterion="mse"):
super(RandomForestRegressor, self).__init__(
n_estimators=n_estimators,
max_features=max_features,
min_samples_split=min_samples_split,
max_depth=max_depth,
)
if criterion == "mse":
self.criterion = mse_criterion
else:
raise ValueError()
# Initialize empty regression trees
for _ in range(self.n_estimators):
self.trees.append(Tree(regression=True, criterion=self.criterion))
def _predict(self, X=None):
predictions = np.zeros((X.shape[0], self.n_estimators))
for i, tree in enumerate(self.trees):
predictions[:, i] = tree.predict(X)
return predictions.mean(axis=1)