Skip to content
Snippets Groups Projects
Commit 85d8348c authored by FROGE Ewen's avatar FROGE Ewen
Browse files

Upload New File

parent abc40d9d
Branches
No related tags found
No related merge requests found
import sys
import numpy as np
import matplotlib.pyplot as plt
from numpy.linalg import pinv
from sklearn.neighbors import NearestNeighbors
from tqdm import tqdm
def create_database(data, p, stride=1, overlapping=False):
"""
Constructs a database of overlapping or non-overlapping past-value sequences.
Parameters:
data (ndarray): Input time series data.
p (int): Number of past values to consider.
stride (int): Step size between samples.
overlapping (bool): Whether to allow overlapping sequences.
Returns:
ndarray: Database of past-value sequences with targets.
"""
uu = np.lib.stride_tricks.sliding_window_view(data, window_shape=(stride * (p - 1) + 1,))[:, ::stride]
uu = np.hstack((uu[:-1], data[(p - 1) * stride + 1:, np.newaxis]))
skip = 1 if overlapping else p
return uu[0::skip, :]
def prediction_neighbors(data, db, k, stride=1, weighting=True, normalize=False):
"""
Predicts future values using a weighted nearest neighbors approach.
Parameters:
data (ndarray): Input time series data.
db (ndarray): Precomputed database of past-value sequences.
k (int): Number of nearest neighbors to consider.
stride (int): Step size between samples.
weighting (bool): Whether to apply exponential distance-based weighting.
normalize (bool): Whether to normalize input sequences before processing.
Returns:
tuple: Predicted values and associated weighted covariance.
"""
np_, pp1 = db.shape
p = pp1 - 1
puplets = np.lib.stride_tricks.sliding_window_view(data, window_shape=(stride * (p - 1) + 1,))[:, ::stride].copy()
if normalize:
puplet_means = np.mean(puplets, axis=1)
puplets -= puplet_means[:, np.newaxis]
db_means = np.mean(db[:, :-1], axis=1)
db -= db_means[:, np.newaxis]
neigh = NearestNeighbors(n_jobs=4)
neigh.fit(db[:, :-1])
dist, idx = neigh.kneighbors(puplets, n_neighbors=k, return_distance=True)
med = np.median(dist, axis=1)
if weighting and np.min(med) != 0:
weights = np.exp(-dist / med[:, np.newaxis])
weights /= np.sum(weights, axis=1)[:, np.newaxis]
else:
weights = np.ones_like(dist)
vals = np.full_like(data, np.nan)
weighted_covariance = np.zeros_like(vals)
x = db[idx, :-1]
y = (weights * db[idx, -1])[:, :, np.newaxis]
x = np.pad(x, [(0, 0), (0, 0), (1, 0)], mode='constant', constant_values=1)
coef = pinv(np.transpose(x, axes=[0, 2, 1]) @ (weights[:, :, np.newaxis] * x)) @ np.transpose(x, axes=[0, 2, 1]) @ y
vals[(p - 1) * stride + 1:] = coef[:-1, 0, 0] + np.sum(coef[:, 1:, 0] * puplets, axis=1)[:-1]
residuals = db[idx, -1] - np.sum(coef * np.transpose(x, (0, 2, 1)), axis=1)
weighted_covariance[(p - 1) * stride + 1:] = (np.sum(weights * residuals**2, axis=1) / np.sum(weights, axis=1))[:-1]
if normalize:
vals[(p - 1) * stride + 1:] += puplet_means[:-1]
return vals, weighted_covariance
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment