Skip to content
Snippets Groups Projects
Commit 6b49c741 authored by FROGE Ewen's avatar FROGE Ewen
Browse files

Update file CalcInnovation.py

parent 00f20ef2
No related branches found
No related tags found
No related merge requests found
#%%
import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import NearestNeighbors
def create_database(data, p, overlapping=False):
"""
Create a database of tuples from the given data with optional overlapping.
Create a database of p+1 continuous values with or without overlapping.
Parameters:
data (array-like): Input data series.
p (int): The size of each candidate analogs.
overlapping (bool): Whether the tuples can overlap.
p (int): The size of the tuples.
overlapping (bool): Whether the tuples should overlap.
Returns:
np.ndarray: The database of tuples.
"""
# Create sliding window view of data to form tuples of size p+1
# Create sliding window view of the data with window size p+1
window = np.lib.stride_tricks.sliding_window_view(data, window_shape=(p + 1,))
# Set stride based on overlapping flag
stride = 1 if overlapping else p
db = window[0::stride, :]
return db
# Select windows based on the stride
db_analog = window[0::stride, :]
return db_analog
def prediction_neighbors(data, db, k, weighting=True, verbose=False, Random=False, NoLR=False, normalize=False):
def create_puplets(data, p):
"""
Predict future data points using nearest neighbors algorithm.
Create p-uplets from the data.
Parameters:
data (array-like): Input data series.
db (array-like): Database of tuples, where the last value is the successor value and the Size-1 first values are interpreted as analogs
p (int): The size of the tuples.
Returns:
np.ndarray: The p-uplets.
"""
return np.lib.stride_tricks.sliding_window_view(data, window_shape=(p,))
def prediction_neighbors(db_puplets, db_analog, k, weighting=True, verbose=False, Random=False, NoLR=False, normalize=False):
"""
Predict future data points using nearest neighbors algorithm.
Parameters:
db_puplets (array-like): (_,p) Database of puplets on which you want to predict successor
db_analog (array-like): (_,p+1) Database of analogs puplets and their succesor
k (int): Number of neighbors to use.
weighting (bool): Whether to weight the neighbors.
verbose (bool): Whether to print verbose output.
......@@ -38,69 +53,114 @@ def prediction_neighbors(data, db, k, weighting=True, verbose=False, Random=Fals
Returns:
np.ndarray: The predicted values.
"""
Np, pp1 = db.shape
Np, pp1 = db_analog.shape
p = pp1 - 1
# Create p-uplets from data
puplets = np.lib.stride_tricks.sliding_window_view(data, window_shape=(p,))
if normalize:
# Detrend puplets and database by subtracting means
puplet_means = np.mean(puplets, axis=1)
puplets -= puplet_means[:, np.newaxis]
db_means = np.mean(db[:, :-1], axis=1)
db -= db_means[:, np.newaxis]
# Compute means for detrending
db_puplets_means = np.mean(db_puplets, axis=1)
# Detrend db_puplets by subtracting mean
db_puplets -= db_puplets_means[:, np.newaxis]
db_means = np.mean(db_analog[:, :-1], axis=1)
# Detrend database by subtracting mean
db_analog -= db_means[:, np.newaxis]
# Fit nearest neighbors model
# Initialize and fit nearest neighbors model on the database (excluding the last column)
neigh = NearestNeighbors(n_jobs=4)
neigh.fit(db[:, :-1])
neigh.fit(db_analog[:, :-1])
# Find k nearest neighbors for each p-uplet in data
Dist, Idx = neigh.kneighbors(puplets, n_neighbors=k, return_distance=True)
# Find k nearest neighbors for each p-uplet
Dist, Idx = neigh.kneighbors(db_puplets, n_neighbors=k, return_distance=True)
if Random:
# Randomly select neighbors if specified
# Replace indices with random indices if Random flag is set
Idx = np.random.randint(low=0, high=Np, size=Idx.shape)
if weighting:
# Calculate weights based on distances to neighbors
# Compute weights based on distances
med = np.median(Dist, axis=1)
weights = np.exp(-Dist / med[:, np.newaxis])
weights /= np.sum(weights, axis=1)[:, np.newaxis]
weights /= np.sum(weights, axis=1)[:, np.newaxis] # Normalize weights
else:
# Use uniform weights if not specified
weights = np.ones_like(Dist)
weights = np.ones_like(Dist) # Equal weights if not weighting
vals = np.full_like(data, np.nan)
vals = np.full_like(data, np.nan) # Initialize prediction array with NaNs
if NoLR:
# Use simple weighted average without linear regression
vals[p:] = np.sum(weights * db[Idx, -1], axis=1)[:-1]
# Simple weighted average without linear regression
vals[p:] = np.sum(weights * db_analog[Idx, -1], axis=1)[:-1]
else:
# Perform linear regression to predict values
X = db[Idx, :-1]
y = (weights * db[Idx, -1])[:, :, np.newaxis]
X = np.pad(X, [(0, 0), (0, 0), (1, 0)], mode='constant', constant_values=1) # Add bias term for linear regression
# Linear regression to predict values
X = db_analog[Idx, :-1] # Nearest neighbor features
y = (weights * db_analog[Idx, -1])[:, :, np.newaxis] # Weighted target values
# Add bias term to features
X = np.pad(X, [(0, 0), (0, 0), (1, 0)], mode='constant', constant_values=1)
# Compute regression coefficients
coef = np.linalg.inv(np.transpose(X, axes=[0, 2, 1]) @ (weights[:, :, np.newaxis] * X)) @ np.transpose(X, axes=[0, 2, 1]) @ y
vals[p:] = coef[:-1, 0, 0] + np.sum(coef[:, 1:, 0] * puplets, axis=1)[:-1]
# Predict values using the regression coefficients
vals[p:] = coef[:-1, 0, 0] + np.sum(coef[:, 1:, 0] * db_puplets, axis=1)[:-1]
if normalize:
# Add mean back to predicted values if analog are normalized
vals[p:] += puplet_means[:-1]
# Re-add the mean to the predictions if data was normalized
vals[p:] += db_puplets_means[:-1]
if verbose:
# Print additional information if verbose mode is enabled
print('Index', Idx)
print('Dist', Dist)
print('Puplets', puplets)
print('Puplets', db_puplets)
t = 0 #
for i in range(k):
# Plot each neighbor with adjusted transparency based on distance
plt.plot(db[Idx[t, i]], c='blue', alpha=np.min(Dist[t, :]) / Dist[t, i])
plt.plot(data[t:t + p + 1], c='green') # Plot actual data
plt.plot([p - 1, p], [data[t + p - 1], vals[t + p]], c='red') # Plot predicted value
plt.show()
#Careful!! The p first values are Nan for alignment reasons
for _ in range(5):
t = 0
# Plot the nearest neighbors and the prediction
for i in range(k):
plt.plot(db_analog[Idx[t, i]], c='blue', alpha=np.min(Dist[t, :]) / Dist[t, i])
plt.plot(data[t:t + p + 1], c='green')
plt.plot([p - 1, p], [data[t + p - 1], vals[t + p]], c='red')
plt.show()
return vals
#%% Example
N=2**16 #Size of realisations
#import Modane
path='/users2/local/e22froge/codes/Innovation/DataSignals/uspa_mob15_1.dat' #path to data
fid = open(path,'r')
signal = np.fromfile(fid, dtype='>f')
fid.close()
data=np.reshape(signal,(-1,N))
Nbibpow=16 # total number of point
Bib=np.reshape(data,-1)
Bib=Bib[0:2**Nbibpow]
Bib=(Bib-np.mean(Bib))/np.std(Bib)
db =create_database(Bib,p=3)
real=100 #Arbitrary chosen realisation
velocity=(data[real,:]-np.mean(data[real,:]))/np.std(data[real,:])
vals= prediction_neighbors(velocity,db, k=100)
innovation=velocity-vals
#Careful!! The p first values are Nan for alignment reasons
plt.plot(innovation)
# %%
incr1=(velocity[1:]-velocity[:-1])[1:]
incr2=velocity[2:]-velocity[:-2]
args = (incr1, incr2, velocity[2:]) #data a la fin, le successeur, et incr1 incr2 les "analogs"
db = create_database(velocity, p=3)
vals= prediction_neighbors(incr1, db, k=100)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment