Skip to content
Snippets Groups Projects
Commit 09d17f67 authored by FROGE Ewen's avatar FROGE Ewen
Browse files

Update 2 files

- /AnalogsInnovation/CalcInnovation.py
- /AnalogsInnovation/ExScriptInnovation.py
parent 6b49c741
No related branches found
No related tags found
No related merge requests found
#%%
import numpy as np
import matplotlib.pyplot as plt
from sklearn.neighbors import NearestNeighbors
def create_database(data, p, overlapping=False):
"""
Create a database of p+1 continuous values with or without overlapping.
Create a database of p continuous values with or without overlapping.
Parameters:
data (array-like): Input data series.
......@@ -16,33 +15,21 @@ def create_database(data, p, overlapping=False):
np.ndarray: The database of tuples.
"""
# Create sliding window view of the data with window size p+1
window = np.lib.stride_tricks.sliding_window_view(data, window_shape=(p + 1,))
window = np.lib.stride_tricks.sliding_window_view(data, window_shape=(p,))
# Set stride based on overlapping flag
stride = 1 if overlapping else p
# Select windows based on the stride
db_analog = window[0::stride, :]
return db_analog
db = window[0::stride, :]
return db
def create_puplets(data, p):
"""
Create p-uplets from the data.
Parameters:
data (array-like): Input data series.
p (int): The size of the tuples.
Returns:
np.ndarray: The p-uplets.
"""
return np.lib.stride_tricks.sliding_window_view(data, window_shape=(p,))
def prediction_neighbors(db_puplets, db_analog, k, weighting=True, verbose=False, Random=False, NoLR=False, normalize=False):
def prediction_neighbors(db_data, db_analog, k, weighting=True, verbose=False, Random=False, NoLR=False, normalize=False):
"""
Predict future data points using nearest neighbors algorithm.
Parameters:
db_puplets (array-like): (_,p) Database of puplets on which you want to predict successor
db_analog (array-like): (_,p+1) Database of analogs puplets and their succesor
data (array-like): Input data series.
db_analog (array-like): Database of tuples.
k (int): Number of neighbors to use.
weighting (bool): Whether to weight the neighbors.
verbose (bool): Whether to print verbose output.
......@@ -53,16 +40,21 @@ def prediction_neighbors(db_puplets, db_analog, k, weighting=True, verbose=False
Returns:
np.ndarray: The predicted values.
"""
Np, pp1 = db_analog.shape
p = pp1 - 1
Nanalog , pp1 = db_analog.shape
Ndata,p = db_data.shape
# Check if db_data is of size p and db_analog is of size p+1
if db_analog.shape[1] != p + 1:
raise ValueError(f"Expected db_analog of size p+1={p + 1} (analogs + successors), but got size {db_analog.shape[1]}")
if normalize:
# Compute means for detrending
db_puplets_means = np.mean(db_puplets, axis=1)
# Detrend db_puplets by subtracting mean
db_puplets -= db_puplets_means[:, np.newaxis]
db_data_means = np.mean(db_data, axis=1)
# Detrend db_data by subtracting mean
db_data -= db_data_means[:, np.newaxis]
db_means = np.mean(db_analog[:, :-1], axis=1)
# Detrend database by subtracting mean
......@@ -73,11 +65,11 @@ def prediction_neighbors(db_puplets, db_analog, k, weighting=True, verbose=False
neigh.fit(db_analog[:, :-1])
# Find k nearest neighbors for each p-uplet
Dist, Idx = neigh.kneighbors(db_puplets, n_neighbors=k, return_distance=True)
Dist, Idx = neigh.kneighbors(db_data, n_neighbors=k, return_distance=True)
if Random:
# Replace indices with random indices if Random flag is set
Idx = np.random.randint(low=0, high=Np, size=Idx.shape)
Idx = np.random.randint(low=0, high=Nanalog, size=Idx.shape)
if weighting:
# Compute weights based on distances
......@@ -87,7 +79,7 @@ def prediction_neighbors(db_puplets, db_analog, k, weighting=True, verbose=False
else:
weights = np.ones_like(Dist) # Equal weights if not weighting
vals = np.full_like(data, np.nan) # Initialize prediction array with NaNs
vals = np.full((Ndata+p-1),np.nan) # Initialize prediction array with NaNs
if NoLR:
# Simple weighted average without linear regression
......@@ -103,20 +95,22 @@ def prediction_neighbors(db_puplets, db_analog, k, weighting=True, verbose=False
# Compute regression coefficients
coef = np.linalg.inv(np.transpose(X, axes=[0, 2, 1]) @ (weights[:, :, np.newaxis] * X)) @ np.transpose(X, axes=[0, 2, 1]) @ y
# Predict values using the regression coefficients
vals[p:] = coef[:-1, 0, 0] + np.sum(coef[:, 1:, 0] * db_puplets, axis=1)[:-1]
vals[p:] = coef[:-1, 0, 0] + np.sum(coef[:, 1:, 0] * db_data, axis=1)[:-1]
if normalize:
# Re-add the mean to the predictions if data was normalized
vals[p:] += db_puplets_means[:-1]
vals[p:] += db_data_means[:-1]
if verbose:
print('Index', Idx)
print('Dist', Dist)
print('Puplets', db_puplets)
print('Puplets', db_data)
for _ in range(5):
t = 0
# Plot the nearest neighbors and the prediction
data=db_data[:,-1]
for i in range(k):
plt.plot(db_analog[Idx[t, i]], c='blue', alpha=np.min(Dist[t, :]) / Dist[t, i])
plt.plot(data[t:t + p + 1], c='green')
......@@ -124,43 +118,3 @@ def prediction_neighbors(db_puplets, db_analog, k, weighting=True, verbose=False
plt.show()
return vals
#%% Example
N=2**16 #Size of realisations
#import Modane
path='/users2/local/e22froge/codes/Innovation/DataSignals/uspa_mob15_1.dat' #path to data
fid = open(path,'r')
signal = np.fromfile(fid, dtype='>f')
fid.close()
data=np.reshape(signal,(-1,N))
Nbibpow=16 # total number of point
Bib=np.reshape(data,-1)
Bib=Bib[0:2**Nbibpow]
Bib=(Bib-np.mean(Bib))/np.std(Bib)
db =create_database(Bib,p=3)
real=100 #Arbitrary chosen realisation
velocity=(data[real,:]-np.mean(data[real,:]))/np.std(data[real,:])
vals= prediction_neighbors(velocity,db, k=100)
innovation=velocity-vals
#Careful!! The p first values are Nan for alignment reasons
plt.plot(innovation)
# %%
incr1=(velocity[1:]-velocity[:-1])[1:]
incr2=velocity[2:]-velocity[:-2]
args = (incr1, incr2, velocity[2:]) #data a la fin, le successeur, et incr1 incr2 les "analogs"
db = create_database(velocity, p=3)
vals= prediction_neighbors(incr1, db, k=100)
......@@ -22,14 +22,16 @@ Bib=np.reshape(data,-1)
Bib=Bib[0:2**Nbibpow]
Bib=(Bib-np.mean(Bib))/np.std(Bib)
db = create_database(Bib, p=3)
real=100 #Arbitrary chosen realisation
velocity=(data[real,:]-np.mean(data[real,:]))/np.std(data[real,:])
vals= prediction_neighbors(velocity, db, k=100)
p=3
db_analog =create_database(Bib,p=p+1,overlapping=False) #+1 for successor
db_pasts = create_database(velocity,p=p,overlapping=True)
vals= prediction_neighbors(db_pasts,db_analog, k=100)
innovation=velocity-vals
#Careful!! The p first values are Nan for alignment reasons
plt.plot(innovation)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment