Skip to content
Snippets Groups Projects
Commit 4404160b authored by FROGE Ewen's avatar FROGE Ewen
Browse files

Update file CalcInnovation.py

parent d4407e36
No related branches found
No related tags found
No related merge requests found
import numpy as np
import matplotlib.pyplot as plt
from numpy.linalg import inv
from sklearn.neighbors import NearestNeighbors
def create_database(data, p, overlapping=False):
"""Create a database of tuples from the given data with optional overlapping."""
"""
Create a database of tuples from the given data with optional overlapping.
Parameters:
data (array-like): Input data series.
p (int): The size of each candidate analogs.
overlapping (bool): Whether the tuples can overlap.
Returns:
np.ndarray: The database of tuples.
"""
# Create sliding window view of data to form tuples of size p+1
window = np.lib.stride_tricks.sliding_window_view(data, window_shape=(p + 1,))
stride = 1 if overlapping else p
db = window[0::stride, :]
return db
def prediction_neighbors(data, db, k, weighting=True, verbose=False, random=False, lr=False, normalize=False):
"""Predict future data points using nearest neighbors algorithm."""
def prediction_neighbors(data, db, k, weighting=True, verbose=False, Random=False, NoLR=False, normalize=False):
"""
Predict future data points using nearest neighbors algorithm.
Parameters:
data (array-like): Input data series.
db (array-like): Database of tuples, where the last value is the successor value and the Size-1 first values are interpreted as analogs
k (int): Number of neighbors to use.
weighting (bool): Whether to weight the neighbors.
verbose (bool): Whether to print verbose output.
Random (bool): Whether to use random neighbors.
NoLR (bool): Whether to skip linear regression for prediction.
normalize (bool): Whether to normalize the data.
Returns:
np.ndarray: The predicted values.
"""
Np, pp1 = db.shape
p = pp1 - 1
# Create p-uplets from data
puplets = np.lib.stride_tricks.sliding_window_view(data, window_shape=(p,))
if normalize:
# Detrend puplets and database
# Detrend puplets and database by subtracting means
puplet_means = np.mean(puplets, axis=1)
puplets -= puplet_means[:, np.newaxis]
db_means = np.mean(db[:, :-1], axis=1)
db -= db_means[:, np.newaxis]
# Fit nearest neighbors model
neigh = NearestNeighbors(n_jobs=4)
neigh.fit(db[:, :-1])
Dist, Idx = neigh.kneighbors(puplets, n_neighbors=k)
if random:
# Find k nearest neighbors for each p-uplet in data
Dist, Idx = neigh.kneighbors(puplets, n_neighbors=k, return_distance=True)
if Random:
# Randomly select neighbors if specified
Idx = np.random.randint(low=0, high=Np, size=Idx.shape)
if weighting:
# Calculate weights based on distances to neighbors
med = np.median(Dist, axis=1)
weights = np.exp(-Dist / med[:, np.newaxis])
weights /= np.sum(weights, axis=1)[:, np.newaxis]
else:
# Use uniform weights if not specified
weights = np.ones_like(Dist)
vals = np.full_like(data, np.nan)
if lr:
if NoLR:
# Use simple weighted average without linear regression
vals[p:] = np.sum(weights * db[Idx, -1], axis=1)[:-1]
else:
# Perform linear regression to predict values
X = db[Idx, :-1]
y = (weights * db[Idx, -1])[:, :, np.newaxis]
X = np.pad(X, [(0, 0), (0, 0), (1, 0)], mode='constant', constant_values=1)
coef = inv(np.transpose(X, axes=[0, 2, 1]) @ (weights[:, :, np.newaxis] * X)) @ np.transpose(X, axes=[0, 2, 1]) @ y
X = np.pad(X, [(0, 0), (0, 0), (1, 0)], mode='constant', constant_values=1) # Add bias term for linear regression
coef = np.linalg.inv(np.transpose(X, axes=[0, 2, 1]) @ (weights[:, :, np.newaxis] * X)) @ np.transpose(X, axes=[0, 2, 1]) @ y
vals[p:] = coef[:-1, 0, 0] + np.sum(coef[:, 1:, 0] * puplets, axis=1)[:-1]
if normalize:
# Add mean back to predicted values if analog are normalized
vals[p:] += puplet_means[:-1]
if verbose:
# Print additional information if verbose mode is enabled
print('Index', Idx)
print('Dist', Dist)
print('puplets', puplets)
print('Puplets', puplets)
t = 0 #
for i in range(k):
# Plot each neighbor with adjusted transparency based on distance
plt.plot(db[Idx[t, i]], c='blue', alpha=np.min(Dist[t, :]) / Dist[t, i])
plt.plot(data[t:t + p + 1], c='green') # Plot actual data
plt.plot([p - 1, p], [data[t + p - 1], vals[t + p]], c='red') # Plot predicted value
plt.show()
#Careful!! The p first values are Nan for alignment reasons
return vals
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment