For the Dakar Institute of Technology, I give Python introductory classes. During one of my sessions I was asked about simple linear regression with Python. Let the music play
TOC:
- Introduction
- First solution
- The complete program
- Another solution using the statistics module
- Just give me the code please
Introduction:
Simple linear regression is a commonly used type of predictive analysis. The idea of regression is to examine if a set of predictor variables do a good job in predicting an outcome variable.
Linear regression models are used in many industries such as healthcare (predicting disease from biological factors), economics (predicting growth), businesses (predicting product sales), etc ...
For simple linear regression, the relationship between the independent variable x and the dependent variable y is expressed as: y = slope * x + intercept + noise
, where:
- slope and intercept are the regression parameters that are estimated
- noise is the variability of the data (difference between predicted and actual values of y).
First solution:
Let's write a Python program to achieve this task. We'll use the following data set
1. Reading the data set:
To read the data, we'll write a function in charge of opening and reading the file containing our data. We assume each line will consist of an int value, followed by a tabulation and a float value. This function will:
- open the file
- get all the lines except the first one (the header)
- convert first column to int, convert second column to float
- return a tuple containing 2 things:
def read_text(
filename: t.Union[pathlib.Path, t.AnyStr],
) -> t.Tuple[t.Collection[float], t.Collection[float]]:
with open(filename, "r") as f:
slines = [line.strip().split("\t") for line in [li for li in f.readlines()[1:]]]
nlines = [(int(x), float(y)) for x, y in slines]
X, Y = zip(*nlines)
return X, Y
Bonjour
2. Plotting the data set:
We'll have to do some plotting. The following program is just one we use to setup our plots:
ax: mpl.axes.Axes
_, ax = plt.subplots()
ax.set_xlabel("X-coordinates")
ax.set_ylabel("Y-coordinates ")
ax.set_title("Graphical representation of our points")
ax.grid(True)
The first step in finding a linear regression equation is to determine if there is a relationship between the two variables. To determine if there is a relationship between x and y we can scatter plot the data set.
Scatter plots are used to observe relationships between variables. We'll need one function to do a scatter plot: put dots on our graph and see each individual data point.
def scatter_plot(
x: t.Collection[float], y: t.Collection[float], ax: mpl.axes.Axes
) -> None:
ax.scatter(x, y, color="red")
Using this function with our data set, we can see the following chart:
We'll need a function to draw a line. If we correctly find estimates for our regression parameters (slope and the intercept), we should be able to draw a line that will go through the cloud of data points:
def line_plot(a: int, b: int, ax: mpl.axes.Axes) -> None:
x = [0, 130]
y = [a * x + b for x in x]
ax.plot(x, y)
3. Splitting the data set:
We don’t want our model to over learn from the data and perform poorly when presented data never seen before. We also need to have a mechanism to assess how well our model is generalizing. Hence, we'll need to separate our input data into at least 2 sets: training and testing:
1. Training data set:
This is the set of data that will be used only for learning. We will use only one part of the data set to find the best parameters for our model.
2. Testing data set:
It would be best if we evaluate the model with new data that hasn’t been seen by the model before. So we need a set of data used to provide an evaluation without bias of the model that was fitted on the training data set.
Let's write a function that will split our data. We'll keep 80% for the training and 20% for testing. This function will:
- randomly pick 20% of all the indexes in our collection
- with those 20% indexes, create a list containing 20% of the collections
- from those 20% indexes, create a list containing 80% of the collections
- return all the sets
split_seq = t.Tuple[
t.Collection[float], t.Collection[float], t.Collection[float], t.Collection[float]
]
def split_train_test(
X: t.Collection[float], Y: t.Collection[float], frac: float = 0.2
) -> split_seq:
indexes = list(range(len(X)))
indexes_test = set(random.sample(indexes, int(frac * len(X))))
# get 20% of Xs based on the indexes, do the same for Ys
X_test = [n for i, n in enumerate(X) if i in indexes_test]
Y_test = [n for i, n in enumerate(Y) if i in indexes_test]
# get 80% of Xs based on the indexes, do the same for Ys
X_train = [n for i, n in enumerate(X) if i not in indexes_test]
Y_train = [n for i, n in enumerate(Y) if i not in indexes_test]
return X_train, X_test, Y_train, Y_test
Let's visualize how the 2 sets are rendered in a scatter plot:
4. Least squares for the slope and the intercept:
How do we estimate our parameters α and β ? That’s the learning procedure. There are different optimization techniques to implement this part. To determine the optimum values of these two parameters, we should find the line that best fit the data set.
To determine the optimum values of these two parameters, we need to find the slope and the intercept so that when we sum all the errors between the predicted y values and the observed y values, this sum should be the least possible.
The following chart will show us all the distances (or all the differences, or all the errors) between each point on the dataset and the regression line:
Let's see how the chart looks like when the parameters are not correctly estimated (the regression line does not "fit" the data set) :
And here is the function used to plot the errors. This function will:
- Take the estimated coefficients α and β
- Take the list of X and the list of Y
- For all points in our data set:
- predict
y
based on α and β - draw a dashed line between 2 points:
(x, y)
and(x, y_pred)
- predict
def plot_errors(
a: float,
b: float,
X: t.Collection[float],
Y: t.Collection[float],
ax: mpl.axes.Axes,
) -> None:
for x, y in zip(X, Y):
y_pred = a * x + b
ax.plot([x, x], [y, y_pred], "bo", label="Errors", linestyle="dashed")
The total error of this model is the sum of all the errors for each point:
We should sum all the errors and look for the least error. Here is the formula:
The SSE (Sum of Squared Errors) is equal to the sum of the square of Y observed - Y calculated, for all point in the data set.
If we do all the correct algebraic manipulation, we should find that:
Seeing those formulas, those familiar with statistics already know we'll need to define a function to calculate a mean, another function to calculate a variance and a function to calculate a covariance. βeta hat can be defined as Covariance(X, Y) / Variance(X).
5.1 Mean:
The arithmetic mean, is a measure of central tendency of a finite set of numbers: specifically, the sum of the values divided by the number of values:
def mean(lst: t.Collection[float]) -> float:
return sum(lst) / len(lst)
5.2 Variance:
The variance is the measure of the spread in a data set. The variance is a measure of how far each number in the data set is from the mean, and thus from every other number in the data set. The higher the variance, the more spread out the variables are.
def variance(lst: t.Collection[float]) -> float:
m = mean(lst)
return sum([(x - m) ** 2 for x in lst]) / len(lst)
variance: 582.8381391772394
There is 2 ways to plot the variance. We can do it manually, or we can use a box plot:
Here is the function used to plot the variance:
def plot_variance(
X: t.Collection[float],
Y: t.Collection[float],
) -> None:
fig: mpl.figure.Figure
ax: mpl.axes.Axes
_, ax = plt.subplots(2)
ax[1].set_xlabel("X-coordinates ")
ax[1].set_ylabel("Y-coordinates")
ax[0].set_title("Graphical representation of variance")
ax[1].grid(True)
# box plot
ax[0].boxplot([X], vert=False, showmeans=True)
# mean line
ax[1].axvline(x=mean(X), ymin=0, ymax=130, color="orange")
# distance from point to mean line
x_mean = mean(X)
for x, y in zip(X, Y):
ax[1].plot(
[x, x_mean],
[y, y],
marker="o",
color="green",
label="Mean",
linestyle="dotted",
)
plt.show()
5.3 Covariance:
The covariance is a measure of the relationship between two variables. Covariance is the extent to which the variance in one variable depends on another variable. The higher the covariance, the stronger the relationship.
def covariance(X: t.Sequence[float], Y: t.Sequence[float]) -> float:
mean_x, mean_y, length_x = mean(X), mean(Y), len(X)
covar = 0.0
for i in range(len(X)):
covar += (X[i] - mean_x) * (Y[i] - mean_y)
return covar / length_x
covariance: 2059.003344867359
5.4 Estimates:
Now that we have a function for calculating the variance and another one for calculating the covariance, we can write a function that will calculate our slope and our intercept using this 2 formulas seen earlier:
def estimate_coefficients(X: t.Collection[float], Y: t.Collection[float]) -> t.Tuple[float, float]:
b1 = covariance(X, Y) / variance(X)
b0 = mean(Y) - b1 * mean(X)
return b0, b1
5.5. Prediction:
Now that we have extracted our coefficients, let's see if we can make a prediction, we'll use the testing data set:
b, a = estimate_coefficients(X_train, Y_train)
Y_predicted = [a * x + b for x in X_test]
for i, j in zip(Y_test, Y_predicted):
print(i, '\t', j) # print Y observed in testing set next to Y predicted using our coefficients
> python3 script.py
392.5 389.8167994835544
15.7 65.88576964183908
48.8 45.426967757099156
113.0 99.98377278307227
48.7 52.246568385345796
40.3 38.607367128852516
59.6 76.11517058420904
152.8 147.72097718079874
73.4 161.360178437292
21.3 59.066169013592436
55.6 48.83676807122248
194.5 123.85237498193551
5.6. How good is our model:
We need to able to measure how good our model is. Among the methods available to achieve this task, let's use the most common and the most used: the Root mean squared error.
RMSE:
One way (the most common way?) to assess how well our model fits our data set is to calculate the Root Mean Squared Error. The RMSE is:
- the square root
- of the mean
- of the square of all of the errors
Let's write a function to calculate the RMSE:
def rmse(actual: t.Sequence[float], predicted: t.Sequence[float]) -> float:
sum_error = 0.0
for i in range(len(actual)):
prediction_error = predicted[i] - actual[i]
sum_error += prediction_error**2
mean_error = sum_error / float(len(actual))
return math.sqrt(mean_error)
If we develop mutliple models, and we want to verify which one performs better:
- Calculate the RMSE value for each model
- The lower the RMSE value, the better the model.
- A RMSE value of 0 indicate a perfect fit.
The data set we're working with has a range: [Y_min, Y_max]. This range is important in determining whether a given RMSE value is "low" or "high". There is no "good" or no "bad" RMSE value. It always depends on the range of the data set we're working with.
Normalized RMSE:
The NRMSE is the RMSE divided by Y_max - Y_min. The result will be between 0 and 1, where values closer to 0 represent better fitting models. The normalization facilitates the comparison between data sets with different scales:
def rmse(
actual: t.Sequence[float], predicted: t.Sequence[float]
) -> t.Tuple[float, float]:
mean_error = sum([(predicted[i] - actual[i])**2 for i in range(len(actual))]) / float(len(actual))
rmse = math.sqrt(mean_error)
nrmse = rmse / (max(predicted) - min(predicted))
return rmse, nrmse
>> python3 script.py
The RMSE is 23.27666203730159
Normalized RMSE is 0.06371542339113609
5.7. Regression line:
The complete program:
import math
import random
import requests # type:ignore
import matplotlib as mpl # type:ignore
import matplotlib.pyplot as plt # type:ignore
import typing as t
import pathlib
from dataclasses import dataclass, field
# setup plots
fig: mpl.figure.Figure
ax: mpl.axes.Axes
fig, ax = plt.subplots()
ax.set_xlabel("X-coordinates ")
ax.set_ylabel("Y-coordinates")
ax.set_title("Graphical representation of our points")
ax.grid(True)
splits = t.Tuple[
t.Collection[float], t.Collection[float], t.Collection[float], t.Collection[float]
]
@dataclass
class SimpleLinearRegression:
filename: t.Union[pathlib.Path, str] = "./dataset.txt"
__X: t.Collection[float] = field(init=False)
__Y: t.Collection[float] = field(init=False)
X_train: t.Collection[float] = field(init=False)
Y_train: t.Collection[float] = field(init=False)
X_test: t.Collection[float] = field(init=False)
Y_test: t.Collection[float] = field(init=False)
def __post_init__(self) -> None:
with open(self.filename, "r") as f:
slines = [
line.strip().split("\t") for line in [li for li in f.readlines()[1:]]
]
nlines = [(float(x), float(y)) for x, y in slines]
self.__X, self.__Y = zip(*nlines)
self.__split_train_test()
def scatter_plot(self, ax: mpl.axes.Axes, color: str = "red", label=None) -> None:
ax.scatter(self.__X, self.__Y, color=color, label=label)
def line_plot(self, ax: mpl.axes.Axes) -> None:
b, a = self.estimate_coefficients()
x = [0, 130]
y = [a * x + b for x in x]
ax.plot(x, y, color="green")
def __mean(self, lst: t.Collection[float]) -> float:
return sum(lst) / len(lst)
def __var(self, lst: t.Collection[float]) -> float:
m = self.__mean(lst)
return sum([(x - m) ** 2 for x in lst]) / len(lst)
def __cov(self, X: t.Sequence[float], Y: t.Sequence[float]) -> float:
mean_x = self.__mean(X)
mean_y = self.__mean(Y)
covar = 0.0
for i in range(len(X)):
covar += (X[i] - mean_x) * (Y[i] - mean_y)
return covar / len(X)
def __split_train_test(self, frac: float = 0.2) -> None:
indexes = list(range(len(self.__X)))
indexes_test = set(random.sample(indexes, int(frac * len(self.__X))))
# get 20% of Xs based on the indexes, do the same for Ys
self.X_test = [n for i, n in enumerate(self.__X) if i in indexes_test]
self.Y_test = [n for i, n in enumerate(self.__Y) if i in indexes_test]
# get 80% of Xs based on the indexes, do the same for Ys
self.X_train = [n for i, n in enumerate(self.__X) if i not in indexes_test]
self.Y_train = [n for i, n in enumerate(self.__Y) if i not in indexes_test]
def estimate_coefficients(
self,
) -> t.Tuple[float, float]:
mean_x, mean_y = self.__mean(self.X_train), self.__mean(self.Y_train)
b1 = self.__cov(self.X_train, self.Y_train) / self.__var( # type:ignore
self.X_train
)
b0 = mean_y - b1 * mean_x
return b0, b1
def rmse(self) -> t.Tuple[float, float]:
b, a = self.estimate_coefficients()
predicted = [a * x + b for x in self.X_test]
actual = self.Y_test
mean_error = sum(
[(predicted[i] - actual[i]) ** 2 for i in range(len(actual))] # type:ignore
) / float(
len(actual)
)
rmse = math.sqrt(mean_error)
nrmse = math.sqrt(mean_error) / (max(predicted) - min(predicted))
return rmse, nrmse
# program entry point
if __name__ == "__main__":
def main() -> None:
model = SimpleLinearRegression()
model.scatter_plot(ax)
model.line_plot(ax)
b, a = model.estimate_coefficients()
rmse, nrmse = model.rmse()
print(f"The slope is {a}, the intercept is {b}")
print(f"The RMSE and NRMSE are: {rmse} and {nrmse}")
plt.show()
main()
The slope is 3.426108589853597, the intercept is 20.478117458572427
The RMSE and NRMSE are: 27.82841276749733 and 0.07520791645397
Another solution, using the statistics module:
Before jumping to big and powerful libraries such as NumPy, SciPy, Pandas, Pingouin, Statsmodel, and all the others. We have to notice, Python provides us with a module named statistics. Even though the statistics module is not intended to be a competitor to the ones listed above, it's interesting to see that inside, there is:
- a mean function
- a variance function
- a covariance function
- even a linear regression function.
Our program will suddenly become shorter:
import statistics as lestat
# load dataset
X, Y = read_text("dataset.txt")
# extract coefficients
a, b = lestat.linear_regression(X, Y)
Source code:
Phew. All the cool devs today will share the source code via a Jupyter notebook stored on Github or on Colab. According to the Software Freedom Conservancy, we should all Give Up GitHub. And as a free software advocate, I agree with them. The source code can be found here and the data set here. You're welcome.
I really hope you've learned something, really. Talking about the whole subject won't be possible in this tiny article. To dive deeper and learn even more, let me please recommend the following links: