From bbd255658446b7d3009a17760790415beb159809 Mon Sep 17 00:00:00 2001 From: npounder Date: Tue, 22 May 2018 17:35:12 +0100 Subject: [PATCH 01/15] Hessian correction adapted for narrowband SAIL --- kafka/inference/kf_tools.py | 42 +++++++++++++++++++++---------------- kafka/inference/utils.py | 23 +++++++++++++++++--- kafka/linear_kf.py | 22 +++++++++++++++++-- 3 files changed, 64 insertions(+), 23 deletions(-) diff --git a/kafka/inference/kf_tools.py b/kafka/inference/kf_tools.py index 127685e..bc1a093 100644 --- a/kafka/inference/kf_tools.py +++ b/kafka/inference/kf_tools.py @@ -23,52 +23,56 @@ def band_selecta(band): return np.array([3, 4, 6, 5]) -def hessian_correction_pixel(gp, x0, C_obs_inv, innovation, band, nparams): +def hessian_correction_pixel(hessian, C_obs_inv, innovation): + ''' selecta = band_selecta(band) ddH = gp.hessian(np.atleast_2d(x0[selecta])) big_ddH = np.zeros((nparams, nparams)) for i, ii in enumerate(selecta): for j, jj in enumerate(selecta): big_ddH[ii, jj] = ddH.squeeze()[i, j] - big_hessian_corr = big_ddH*C_obs_inv*innovation - return big_hessian_corr + ''' + hessian_corr = hessian*C_obs_inv*innovation + return hessian_corr -def hessian_correction(gp, x0, R_mat, innovation, mask, state_mask, band, +def hessian_correction(hessian, R_mat, innovation, mask, state_mask, nparams): """Calculates higher order Hessian correction for the likelihood term. Needs the GP, the Observational uncertainty, the mask....""" - if not hasattr(gp, "hessian"): + if hessian is None: # The observation operator does not provide a Hessian method. We just # return 0, meaning no Hessian correction. return 0. C_obs_inv = R_mat.diagonal()[state_mask.flatten()] mask = mask[state_mask].flatten() + little_hess = [] - for i, (innov, C, m) in enumerate(zip(innovation, C_obs_inv, mask)): + for i, (innov, C, m, hess) in enumerate(zip(innovation, C_obs_inv, mask, hessian)): if not m: # Pixel is masked hessian_corr = np.zeros((nparams, nparams)) else: - # Get state for current pixel - x0_pixel = x0.squeeze()[(nparams*i):(nparams*(i + 1))] + ## Get state for current pixel + #x0_pixel = x0.squeeze()[(nparams*i):(nparams*(i + 1))] # Calculate the Hessian correction for this pixel - hessian_corr = m * hessian_correction_pixel(gp, x0_pixel, C, - innov, band, nparams) + hessian_corr = m * hessian_correction_pixel(hess, C, innov) little_hess.append(hessian_corr) - hessian_corr = block_diag(little_hess) + + hessian_corr = block_diag(hessian) return hessian_corr -def hessian_correction_multiband(gp, x0, R_mats, innovations, masks, state_mask, n_bands, - nparams): +def hessian_correction_multiband(hessians, R_mats, innovations, + masks, state_mask, n_bands, nparams): """ Non linear correction for the Hessian of the cost function. This handles multiple bands. """ little_hess_cor = [] - for R, innovation, mask, band in zip(R_mats, innovations, masks, range(n_bands)): - little_hess_cor.append(hessian_correction(gp, x0, R, innovation, mask, state_mask, band, - nparams)) - hessian_corr = sum(little_hess_cor) #block_diag(little_hess_cor) + for R, hessian, innovation, mask in zip( + R_mats, hessians, innovations, masks): + little_hess_cor.append(hessian_correction(hessian, R, innovation, + mask, state_mask, nparams)) + hessian_corr = sum(little_hess_cor) return hessian_corr @@ -317,7 +321,9 @@ def no_propagation(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, prior=None, state_propagator=None, date=None): - """No propagation. In this case, we return the original prior. As the + """ + THIS IS ONLY SUITABLE FOR BROADBAND SAIL uses TIP prior + No propagation. In this case, we return the original prior. As the information filter behaviour is the standard behaviour in KaFKA, we only return the inverse covariance matrix. **NOTE** the input parameters are there to comply with the API, but are **UNUSED**. diff --git a/kafka/inference/utils.py b/kafka/inference/utils.py index 1a43e4b..4c2425a 100644 --- a/kafka/inference/utils.py +++ b/kafka/inference/utils.py @@ -171,12 +171,26 @@ def create_nonlinear_observation_operator(n_params, emulator, metadata, LOG.info("\tDone!") - return (H0, H_matrix.tocsr()) + '''if calc_hess: + ddH = emulator.hessian(x0[mask[state_mask]]) + hess = np.zeros((n_times, nparams, nparams)) + for lil_hes , m in zip(ddH, mask[state_mask].flatten()): + if m: + big_ddH = np.zeros((nparams, nparams)) + for i, ii in enumerate(state_mapper): + for j, jj in enumerate(state_mapper): + #big_ddH[ii, jj] = .squeeze()[i, j] + pass + #hess[.append(big_ddH) + ''' + return (H0, H_matrix.tocsr(), hess) if calc_hess else (H0, H_matrix.tocsr()) + def create_prosail_observation_operator(n_params, emulator, metadata, - mask, state_mask, x_forecast, band): + mask, state_mask, x_forecast, + band, calc_hess=False): """Using an emulator of the nonlinear model around `x_forecast`. This case is quite special, as I'm focusing on a BHR SAIL version (or the JRC TIP), which have spectral parameters @@ -213,7 +227,10 @@ def create_prosail_observation_operator(n_params, emulator, metadata, LOG.info("\tDone!") - return (H0, H_matrix.tocsr()) + if calc_hess: + hess = emulator.hessian(x0[mask[state_mask]]) + + return (H0, H_matrix.tocsr(), hess) if calc_hess else (H0, H_matrix.tocsr()) diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index f4d1ce0..be7030b 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -305,12 +305,27 @@ def do_all_bands(self, timestep, current_data, x_forecast, P_forecast, # Once we have converged... # Correct hessian for higher order terms #split_points = [m.sum( ) for m in MASK] + HESSIAN = [] INNOVATIONS = np.split(innovations, n_bands) - P_correction = hessian_correction_multiband(data.emulator, x_analysis, + for band, data in enumerate(current_data): + # calculate the hessian for the solution + _,_,hessian= self._create_observation_operator(self.n_params, + data.emulator, + data.metadata, + data.mask, + self.state_mask, + x_analysis, + band, + calc_hess = True) + HESSIAN.append(hessian) + P_correction = hessian_correction_multiband(HESSIAN, UNC, INNOVATIONS, MASK, self.state_mask, n_bands, self.n_params) P_analysis_inverse = P_analysis_inverse - P_correction + # Rarely, this returns a small negative value. For now set to nan. + # May require further investigation in the future + P_analysis_inverse[P_analysis_inverse<0] = np.nan # Done with this observation, move along... @@ -407,8 +422,11 @@ def assimilate_band(self, band, timestep, x_forecast, P_forecast, data.uncertainty, innovations, data.mask, self.state_mask, band, self.n_params) + # UPDATE HESSIAN WITH HIGHER ORDER CONTRIBUTION P_analysis_inverse = P_analysis_inverse - P_correction - # P_analysis_inverse = UPDATE HESSIAN WITH HIGHER ORDER CONTRIBUTION + # Rarely, this returns a small negative value. For now set to nan. + # May require further investigation in the future + P_analysis_inverse[P_analysis_inverse<0] = np.nan import matplotlib.pyplot as plt M = self.state_mask*1. M[self.state_mask] = x_analysis[6::7] From ef84d0bdc5067393c57423f1eae924776ab199a1 Mon Sep 17 00:00:00 2001 From: npounder Date: Thu, 24 May 2018 14:19:52 +0100 Subject: [PATCH 02/15] LAI propagator and prior for narrow band SAIL --- kafka/inference/narrowbandSAIL_tools.py | 103 ++++++++++++++++++++++++ 1 file changed, 103 insertions(+) create mode 100644 kafka/inference/narrowbandSAIL_tools.py diff --git a/kafka/inference/narrowbandSAIL_tools.py b/kafka/inference/narrowbandSAIL_tools.py new file mode 100644 index 0000000..c57f3a5 --- /dev/null +++ b/kafka/inference/narrowbandSAIL_tools.py @@ -0,0 +1,103 @@ +import logging +import numpy as np +from .utils import block_diag +import os +import gdal + +def sail_prior_values(): + """ + :returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + + mean = np.array([2.1, np.exp(-60. / 100.), + np.exp(-7.0 / 100.), 0.1, + np.exp(-50 * 0.0176), np.exp(-100. * 0.002), + np.exp(-4. / 2.), 70. / 90., 0.5, 0.9]) + sigma = np.array([0.01, 0.2, + 0.01, 0.05, + 0.01, 0.01, + 0.50, 0.1, 0.1, 0.1]) + + covar = np.diag(sigma ** 2).astype(np.float32) + inv_covar = np.diag(1. / sigma ** 2).astype(np.float32) + return mean, covar, inv_covar + +class SAILPrior(object): + def __init__(self, parameter_list, state_mask): + self.parameter_list = parameter_list + if isinstance(state_mask, (np.ndarray, np.generic)): + self.state_mask = state_mask + else: + self.state_mask = self._read_mask(state_mask) + # parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', + # 'lai', 'ala', 'bsoil', 'psoil'] + # self.mean = np.array([1.19, np.exp(-14.4/100.), + # np.exp(-4.0/100.), 0.1, + # np.exp(-50*0.68), np.exp(-100./21.0), + # np.exp(-3.97/2.),70./90., 0.5, 0.9]) + # sigma = np.array([0.69, 0.016, + # 0.0086, 0.1, + # 1.71e-2, 0.017, + # 0.20, 0.5, 0.5, 0.5]) + mean, c_prior, c_inv_prior = sail_prior_values() + self.mean = mean + self.covar = c_prior + self.inv_covar = c_inv_prior + + + def _read_mask(self, fname): + """Tries to read the mask as a GDAL dataset""" + if not os.path.exists(fname): + raise IOError("State mask is neither an array or a file that exists!") + g = gdal.Open(fname) + if g is None: + raise IOError("{:s} can't be opened with GDAL!".format(fname)) + mask = g.ReadAsArray() + return mask + + def process_prior ( self, time, inv_cov=True): + # Presumably, self._inference_prior has some method to retrieve + # a bunch of files for a given date... + n_pixels = self.state_mask.sum() + x0 = np.array([self.mean for i in range(n_pixels)]).flatten() + if inv_cov: + inv_covar_list = [self.inv_covar for m in range(n_pixels)] + inv_covar = block_diag(inv_covar_list, dtype=np.float32) + return x0, inv_covar + else: + covar_list = [self.covar for m in range(n_pixels)] + covar = block_diag(covar_list, dtype=np.float32) + return x0, covar + + + + +def propagate_LAI_narrowbandSAIL(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + prior=None, state_propagator=None, date=None): + ''' Propagate a single parameter and + set the rest of the parameter propagations to zero + This should be used with a prior for the remaining parameters''' + nparameters = 10 + lai_position = 6 + + x_prior, c_prior, c_inv_prior = sail_prior_values() + + x_forecast = M_matrix.dot(x_analysis) + n_pixels = len(x_analysis)//nparameters + x0 = np.tile(x_prior, n_pixels) + x0[lai_position::nparameters] = x_forecast[lai_position::nparameters] # Update LAI + lai_post_cov = P_analysis_inverse.diagonal()[lai_position::nparameters] + lai_Q = Q_matrix.diagonal()[lai_position::nparameters] + + c_inv_prior_mat = [] + for cov, Q in zip(lai_post_cov, lai_Q): + # inflate uncertainty + lai_inv_cov = 1.0/((1.0/cov)+Q) + little_P_forecast_inverse = c_inv_prior.copy() + little_P_forecast_inverse[lai_position, lai_position] = lai_inv_cov + c_inv_prior_mat.append(little_P_forecast_inverse) + P_forecast_inverse=block_diag(c_inv_prior_mat, dtype=np.float32) + return x0, None, P_forecast_inverse \ No newline at end of file From d227efae4b4b8806d129ac433b63eb37cb2598d0 Mon Sep 17 00:00:00 2001 From: npounder Date: Mon, 4 Jun 2018 14:52:47 +0100 Subject: [PATCH 03/15] Add hessian calculation to the broadband SAIL observation operator --- kafka/inference/utils.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/kafka/inference/utils.py b/kafka/inference/utils.py index 4c2425a..6f21a46 100644 --- a/kafka/inference/utils.py +++ b/kafka/inference/utils.py @@ -125,7 +125,8 @@ def create_linear_observation_operator(obs_op, n_params, metadata, def create_nonlinear_observation_operator(n_params, emulator, metadata, - mask, state_mask, x_forecast, band): + mask, state_mask, x_forecast, + band, calc_hess=False): """Using an emulator of the nonlinear model around `x_forecast`. This case is quite special, as I'm focusing on a BHR SAIL version (or the JRC TIP), which have spectral parameters @@ -171,18 +172,17 @@ def create_nonlinear_observation_operator(n_params, emulator, metadata, LOG.info("\tDone!") - '''if calc_hess: + if calc_hess: ddH = emulator.hessian(x0[mask[state_mask]]) - hess = np.zeros((n_times, nparams, nparams)) - for lil_hes , m in zip(ddH, mask[state_mask].flatten()): + hess = np.zeros((n_times, n_params, n_params)) + for n, (lil_hess, m) in enumerate(zip(ddH, mask[state_mask].flatten())): if m: - big_ddH = np.zeros((nparams, nparams)) + big_hess = np.zeros((n_params, n_params)) for i, ii in enumerate(state_mapper): for j, jj in enumerate(state_mapper): - #big_ddH[ii, jj] = .squeeze()[i, j] - pass - #hess[.append(big_ddH) - ''' + big_hess[ii, jj] = lil_hess.squeeze()[i, j] + hess[n,...] = big_hess + return (H0, H_matrix.tocsr(), hess) if calc_hess else (H0, H_matrix.tocsr()) From e4330f2433815036d717293ba7728058bfb8e7ec Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 11:18:49 +0100 Subject: [PATCH 04/15] warning in logfile when hessian correction results in negative values --- kafka/linear_kf.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index be7030b..568d63c 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -426,7 +426,13 @@ def assimilate_band(self, band, timestep, x_forecast, P_forecast, P_analysis_inverse = P_analysis_inverse - P_correction # Rarely, this returns a small negative value. For now set to nan. # May require further investigation in the future - P_analysis_inverse[P_analysis_inverse<0] = np.nan + negative_values = P_analysis_inverse<0 + if any(negative_values): + P_analysis_inverse[negative_values] = np.nan + LOG.warning("{} negative values in inverse covariance matrix".format( + sum(negative_values))) + + import matplotlib.pyplot as plt M = self.state_mask*1. M[self.state_mask] = x_analysis[6::7] From 4150bbcf9444ae752950004b9bb8be4f0905aadb Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 18:47:06 +0100 Subject: [PATCH 05/15] separate broadband SAIL utils into seperate file TIP prior and LAI propagator for broadband SAIL (or TIP now in one place) common code for propagating a single parameter now in single function --- kafka/inference/broadbandSAIL_tools | 0 kafka/inference/kf_tools.py | 89 ++++++------------------- kafka/inference/narrowbandSAIL_tools.py | 43 +++--------- kafka/linear_kf.py | 3 +- 4 files changed, 32 insertions(+), 103 deletions(-) create mode 100644 kafka/inference/broadbandSAIL_tools diff --git a/kafka/inference/broadbandSAIL_tools b/kafka/inference/broadbandSAIL_tools new file mode 100644 index 0000000..e69de29 diff --git a/kafka/inference/kf_tools.py b/kafka/inference/kf_tools.py index bc1a093..46ed9cb 100644 --- a/kafka/inference/kf_tools.py +++ b/kafka/inference/kf_tools.py @@ -16,22 +16,8 @@ class NoHessianMethod(Exception): def __init__(self, message): self.message = message -def band_selecta(band): - if band == 0: - return np.array([0, 1, 6, 2]) - else: - return np.array([3, 4, 6, 5]) - def hessian_correction_pixel(hessian, C_obs_inv, innovation): - ''' - selecta = band_selecta(band) - ddH = gp.hessian(np.atleast_2d(x0[selecta])) - big_ddH = np.zeros((nparams, nparams)) - for i, ii in enumerate(selecta): - for j, jj in enumerate(selecta): - big_ddH[ii, jj] = ddH.squeeze()[i, j] - ''' hessian_corr = hessian*C_obs_inv*innovation return hessian_corr @@ -53,8 +39,6 @@ def hessian_correction(hessian, R_mat, innovation, mask, state_mask, # Pixel is masked hessian_corr = np.zeros((nparams, nparams)) else: - ## Get state for current pixel - #x0_pixel = x0.squeeze()[(nparams*i):(nparams*(i + 1))] # Calculate the Hessian correction for this pixel hessian_corr = m * hessian_correction_pixel(hess, C, innov) little_hess.append(hessian_corr) @@ -100,43 +84,6 @@ def blend_prior(prior_mean, prior_cov_inverse, x_forecast, P_forecast_inverse): return x_combined, combined_cov_inv -def tip_prior(): - """The JRC-TIP prior in a convenient function which is fun for the whole - family. Note that the effective LAI is here defined in transformed space - where TLAI = exp(-0.5*LAIe). - - Returns - ------- - The mean prior vector, covariance and inverse covariance matrices.""" - # broadly TLAI 0->7 for 1sigma - sigma = np.array([0.12, 0.7, 0.0959, 0.15, 1.5, 0.2, 0.5]) - x0 = np.array([0.17, 1.0, 0.1, 0.7, 2.0, 0.18, np.exp(-0.5*1.5)]) - # The individual covariance matrix - little_p = np.diag(sigma**2).astype(np.float32) - little_p[5, 2] = 0.8862*0.0959*0.2 - little_p[2, 5] = 0.8862*0.0959*0.2 - - inv_p = np.linalg.inv(little_p) - return x0, little_p, inv_p - -def tip_prior_noLAI(prior): - n_pixels = prior['n_pixels'] - mean, prior_cov_inverse = tip_prior(prior) - - -def tip_prior_full(prior): - # This is yet to be properly defined. For now it will create the TIP prior and - # prior just contains the size of the array - this function will be replaced with - # the real code when we know what the priors look like. - x_prior, c_prior, c_inv_prior = tip_prior() - n_pixels = prior['n_pixels'] - mean = np.array([x_prior for i in range(n_pixels)]).flatten() - c_inv_prior_mat = [c_inv_prior for n in range(n_pixels)] - prior_cov_inverse=block_diag(c_inv_prior_mat, dtype=np.float32) - - return mean, prior_cov_inverse - - def propagate_and_blend_prior(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, prior=None, state_propagator=None, date=None): @@ -244,10 +191,11 @@ def propagate_information_filter_SLOW(x_analysis, P_analysis, P_analysis_inverse S= P_analysis_inverse.dot(Q_matrix) A = (sp.eye(n) + S).tocsc() P_forecast_inverse = spl.spsolve(A, P_analysis_inverse) - logging.info("DOne with propagation") + logging.info("Done with propagation") return x_forecast, None, P_forecast_inverse + def propagate_information_filter_approx_SLOW(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, prior=None, state_propagator=None, date=None): @@ -293,35 +241,38 @@ def propagate_information_filter_approx_SLOW(x_analysis, P_analysis, P_analysis_ return x_forecast, None, P_forecast_inverse -def propagate_information_filter_LAI(x_analysis, P_analysis, - P_analysis_inverse, - M_matrix, Q_matrix, - prior=None, state_propagator=None, date=None): - +def propagate_single_parameter(x_analysis, P_analysis, P_analysis_inverse, + M_matrix, Q_matrix, n_param, location, + x_prior, c_inv_prior): + """ Propagate a single parameter and + set the rest of the parameter propagations to the prior. + This should be used with a prior for the remaining parameters""" x_forecast = M_matrix.dot(x_analysis) - x_prior, c_prior, c_inv_prior = tip_prior() - n_pixels = len(x_analysis)//7 - x0 = np.array([x_prior for i in range(n_pixels)]).flatten() - x0[6::7] = x_forecast[6::7] # Update LAI - lai_post_cov = P_analysis_inverse.diagonal()[6::7] - lai_Q = Q_matrix.diagonal()[6::7] + n_pixels = len(x_analysis)//n_param + x0 = np.tile(x_prior, n_pixels) + x0[location::n_param] = x_forecast[location::n_param] # Update LAI + lai_post_cov = P_analysis_inverse.diagonal()[location::n_param] + lai_Q = Q_matrix.diagonal()[location::n_param] c_inv_prior_mat = [] - for n in range(n_pixels): + for cov, Q in zip(lai_post_cov, lai_Q): # inflate uncertainty - lai_inv_cov = 1.0/((1.0/lai_post_cov[n])+lai_Q[n]) + lai_inv_cov = 1.0/((1.0/cov)+Q) little_P_forecast_inverse = c_inv_prior.copy() - little_P_forecast_inverse[6, 6] = lai_inv_cov + little_P_forecast_inverse[location::location] = lai_inv_cov c_inv_prior_mat.append(little_P_forecast_inverse) P_forecast_inverse=block_diag(c_inv_prior_mat, dtype=np.float32) - return x0, None, P_forecast_inverse + def no_propagation(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, prior=None, state_propagator=None, date=None): """ + THIS PROPAGATOR SHOULD NOT BE USED ANY MORE. It is better to set + the state_propagator to None and to use the Prior exlicitly. + THIS IS ONLY SUITABLE FOR BROADBAND SAIL uses TIP prior No propagation. In this case, we return the original prior. As the information filter behaviour is the standard behaviour in KaFKA, we diff --git a/kafka/inference/narrowbandSAIL_tools.py b/kafka/inference/narrowbandSAIL_tools.py index c57f3a5..e2e3608 100644 --- a/kafka/inference/narrowbandSAIL_tools.py +++ b/kafka/inference/narrowbandSAIL_tools.py @@ -1,15 +1,18 @@ import logging +import gdal import numpy as np -from .utils import block_diag import os -import gdal + +from .utils import block_diag +from .kf_tools import propagate_single_parameter def sail_prior_values(): """ :returns ------- The mean prior vector, covariance and inverse covariance matrices.""" - + #parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', + # 'lai', 'ala', 'bsoil', 'psoil'] mean = np.array([2.1, np.exp(-60. / 100.), np.exp(-7.0 / 100.), 0.1, np.exp(-50 * 0.0176), np.exp(-100. * 0.002), @@ -30,16 +33,6 @@ def __init__(self, parameter_list, state_mask): self.state_mask = state_mask else: self.state_mask = self._read_mask(state_mask) - # parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', - # 'lai', 'ala', 'bsoil', 'psoil'] - # self.mean = np.array([1.19, np.exp(-14.4/100.), - # np.exp(-4.0/100.), 0.1, - # np.exp(-50*0.68), np.exp(-100./21.0), - # np.exp(-3.97/2.),70./90., 0.5, 0.9]) - # sigma = np.array([0.69, 0.016, - # 0.0086, 0.1, - # 1.71e-2, 0.017, - # 0.20, 0.5, 0.5, 0.5]) mean, c_prior, c_inv_prior = sail_prior_values() self.mean = mean self.covar = c_prior @@ -71,8 +64,6 @@ def process_prior ( self, time, inv_cov=True): return x0, covar - - def propagate_LAI_narrowbandSAIL(x_analysis, P_analysis, P_analysis_inverse, M_matrix, Q_matrix, @@ -84,20 +75,8 @@ def propagate_LAI_narrowbandSAIL(x_analysis, P_analysis, lai_position = 6 x_prior, c_prior, c_inv_prior = sail_prior_values() - - x_forecast = M_matrix.dot(x_analysis) - n_pixels = len(x_analysis)//nparameters - x0 = np.tile(x_prior, n_pixels) - x0[lai_position::nparameters] = x_forecast[lai_position::nparameters] # Update LAI - lai_post_cov = P_analysis_inverse.diagonal()[lai_position::nparameters] - lai_Q = Q_matrix.diagonal()[lai_position::nparameters] - - c_inv_prior_mat = [] - for cov, Q in zip(lai_post_cov, lai_Q): - # inflate uncertainty - lai_inv_cov = 1.0/((1.0/cov)+Q) - little_P_forecast_inverse = c_inv_prior.copy() - little_P_forecast_inverse[lai_position, lai_position] = lai_inv_cov - c_inv_prior_mat.append(little_P_forecast_inverse) - P_forecast_inverse=block_diag(c_inv_prior_mat, dtype=np.float32) - return x0, None, P_forecast_inverse \ No newline at end of file + return propagate_single_parameter(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + nparameters, lai_position, + x_prior, c_inv_prior) diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index 568d63c..1da2811 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -37,7 +37,6 @@ from .inference import create_linear_observation_operator from .inference import create_nonlinear_observation_operator from .inference import iterate_time_grid -from .inference import propagate_information_filter_LAI # eg from .inference import hessian_correction from .inference import hessian_correction_multiband from .inference.kf_tools import propagate_and_blend_prior @@ -64,7 +63,7 @@ class LinearKalman (object): rather grotty "0-th" order models!""" def __init__(self, observations, output, state_mask, create_observation_operator, parameters_list, - state_propagation=propagate_information_filter_LAI, + state_propagation=None, linear=True, diagnostics=True, prior=None): """The class creator takes (i) an observations object, (ii) an output writer object, (iii) the state mask (a boolean 2D array indicating which From 4baa4233bfdff3996aa604849903b73e70751b00 Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 19:05:26 +0100 Subject: [PATCH 06/15] example script to run with latest propagators --- kafka_test_Py36.py | 181 ++++++++++++++++++--------------------------- 1 file changed, 73 insertions(+), 108 deletions(-) diff --git a/kafka_test_Py36.py b/kafka_test_Py36.py index a1d86f1..7e4f3f2 100644 --- a/kafka_test_Py36.py +++ b/kafka_test_Py36.py @@ -1,8 +1,8 @@ #!/usr/bin/env python import logging -logging.basicConfig( - level=logging.getLevelName(logging.DEBUG), +logging.basicConfig( + level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename="the_log.log") import os @@ -11,7 +11,6 @@ import numpy as np import gdal import osr -import scipy.sparse as sp # from multiply.inference-engine blah blah blah try: @@ -19,18 +18,17 @@ except ImportError: pass -import kafka from kafka.input_output import BHRObservations, KafkaOutput from kafka import LinearKalman -from kafka.inference import block_diag -from kafka.inference import propagate_information_filter_LAI -from kafka.inference import no_propagation +from kafka.inference.broadbandSAIL_tools import propagate_LAI_broadbandSAIL from kafka.inference import create_nonlinear_observation_operator +from kafka.inference.broadbandSAIL_tools import JRCPrior # Probably should be imported from somewhere else, but I can't see # where from ATM... No biggy + def reproject_image(source_img, target_img, dstSRSs=None): """Reprojects/Warps an image to fit exactly another image. Additionally, you can set the destination SRS if you want @@ -54,73 +52,6 @@ def reproject_image(source_img, target_img, dstSRSs=None): dstSRS=dstSRS) return g - - -###class DummyInferencePrior(_WrappingInferencePrior): - ###""" - ###This class is merely a dummy. - ###""" - - ###def process_prior(self, parameters: List[str], time: Union[str, datetime], state_grid: np.array, - - -class JRCPrior(object): - """Dummpy 2.7/3.6 prior class following the same interface as 3.6 only - version.""" - - def __init__ (self, parameter_list, state_mask): - """It makes sense to have the list of parameters and state mask - defined at this point, as they won't change during processing.""" - self.mean, self.covar, self.inv_covar = self._tip_prior() - self.parameter_list = parameter_list - if isinstance(state_mask, (np.ndarray, np.generic) ): - self.state_mask = state_mask - else: - self.state_mask = self._read_mask(state_mask) - - def _read_mask(self, fname): - """Tries to read the mask as a GDAL dataset""" - if not os.path.exists(fname): - raise IOError("State mask is neither an array or a file that exists!") - g = gdal.Open(fname) - if g is None: - raise IOError("{:s} can't be opened with GDAL!".format(fname)) - mask = g.ReadAsArray() - return mask - - def _tip_prior(self): - """The JRC-TIP prior in a convenient function which is fun for the whole - family. Note that the effective LAI is here defined in transformed space - where TLAI = exp(-0.5*LAIe). - - Returns - ------- - The mean prior vector, covariance and inverse covariance matrices.""" - # broadly TLAI 0->7 for 1sigma - sigma = np.array([0.12, 0.7, 0.0959, 0.15, 1.5, 0.2, 0.5]) - x0 = np.array([0.17, 1.0, 0.1, 0.7, 2.0, 0.18, np.exp(-0.5*2.)]) - # The individual covariance matrix - little_p = np.diag(sigma**2).astype(np.float32) - little_p[5, 2] = 0.8862*0.0959*0.2 - little_p[2, 5] = 0.8862*0.0959*0.2 - - inv_p = np.linalg.inv(little_p) - return x0, little_p, inv_p - - def process_prior ( self, time, inv_cov=True): - # Presumably, self._inference_prior has some method to retrieve - # a bunch of files for a given date... - n_pixels = self.state_mask.sum() - x0 = np.array([self.mean for i in range(n_pixels)]).flatten() - if inv_cov: - inv_covar_list = [self.inv_covar for m in range(n_pixels)] - inv_covar = block_diag(inv_covar_list, dtype=np.float32) - return x0, inv_covar - else: - covar_list = [self.covar for m in xrange(n_pixels)] - covar = block_diag(covar_list, dtype=np.float32) - return x0, covar - class KafkaOutputMemory(object): """A very simple class to output the state.""" def __init__(self, parameter_list): @@ -134,57 +65,91 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, self.output[timestep] = solution +def mkdir_p(path): + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise + + if __name__ == "__main__": - + + # Set up logging + Log = logging.getLogger(__name__+".kafka_test_x.py") + + runname = 'Arros_0-25' #Used in output directory as a unique identifier + propagator = propagate_LAI_broadbandSAIL parameter_list = ["w_vis", "x_vis", "a_vis", - "w_nir", "x_nir", "a_nir", "TeLAI"] - - tile = "h11v04" - start_time = "2006185" - + "w_nir", "x_nir", "a_nir", "TeLAI"] + + ## parameters for Bondville data. + #tile = "h11v04" # Bondville + #start_time = "2006001" # Bondville + #cd43a1_dir="/data/MODIS/h11v04/MCD43" + ## Bondville chip + #masklim = ((2200, 2450), (450, 700)) # Bondville, h11v04 + + # Parameters for Spanish tile + tile = "h17v05" # Spain + start_time = "2017001" # Spain + mcd43a1_dir="/data/MODIS/h17v05/MCD43" + # chips in h17v05 Spain, select one + masklim = ((650, 730), (1180, 1280)) # Arros, rice + #masklim = ((900,940), (1300,1340)) = True # Alcornocales + #masklim = ((640,700), (1400,1500)) = True # Campinha + + + + path = "/home/npounder/output/kafka/28Mar/kafkaout_{}".format(runname) + if not os.path.exists(path): + mkdir_p(path) + emulator = "./SAIL_emulator_both_500trainingsamples.pkl" - - if os.path.exists("/storage/ucfajlg/Ujia/MCD43/"): - mcd43a1_dir = "/storage/ucfajlg/Ujia/MCD43/" - else: - mcd43a1_dir="/data/MODIS/h11v04/MCD43" - ####tilewidth = 75 - ###n_pixels = tilewidth*tilewidth + + mask = np.zeros((2400,2400),dtype=np.bool8) - #mask[900:940, 1300:1340] = True # Alcornocales - #mask[640:700, 1400:1500] = True # Campinha - #mask[650:730, 1180:1280] = True # Arros - mask[ 2200:2395, 450:700 ] = True # Bondville, h11v04 + mask[masklim[0][0]:masklim[0][1], + masklim[1][0]:masklim[1][1]] = True - bhr_data = BHRObservations(emulator, tile, mcd43a1_dir, start_time, - end_time=None, mcd43a2_dir=None) + bhr_data = BHRObservations(emulator, tile, mcd43a1_dir, start_time, + end_time=None, mcd43a2_dir=None, period=8) + + Log.info("runname = {}".format(runname)) + Log.info("propagator = {}".format(propagator)) + Log.info("tile = {}".format(tile)) + Log.info("start_time = {}".format(start_time)) + Log.info("mask = {}".format(masklim)) projection, geotransform = bhr_data.define_output() - output = KafkaOutput(parameter_list, geotransform, projection, "/tmp/") + output = KafkaOutput(parameter_list, geotransform, projection, path) the_prior = JRCPrior(parameter_list, mask) - - kf = LinearKalman(bhr_data, output, mask, + + # prior = None when using propagate_LAI_broadbandSAIL + kf = LinearKalman(bhr_data, output, mask, create_nonlinear_observation_operator,parameter_list, - state_propagation=None, - prior=the_prior, + state_propagation=propagator, + prior=None, linear=False) # Get starting state... We can request the prior object for this x_forecast, P_forecast_inv = the_prior.process_prior(None) - + Q = np.zeros_like(x_forecast) - Q[6::7] = 0.025 - + Q[6::7] = 0.25 + kf.set_trajectory_model() kf.set_trajectory_uncertainty(Q) - - base = datetime(2006,7,4) - num_days = 180 + + # This determines the time grid of the retrieved state parameters + base = datetime(2017, 1, 1) + num_days = 366 time_grid = [] - for x in range( 0, num_days, 16): - time_grid.append( base + timedelta(days = x) ) + for x in range( 0, num_days, 8): + time_grid.append(base + timedelta(days=x)) - kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) - + kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) \ No newline at end of file From d19d7037ecbf5276a37b6515010ce7fbcc9e5d96 Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 19:10:51 +0100 Subject: [PATCH 07/15] Added some comments --- kafka_test_Py36.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/kafka_test_Py36.py b/kafka_test_Py36.py index 7e4f3f2..3657b6c 100644 --- a/kafka_test_Py36.py +++ b/kafka_test_Py36.py @@ -81,6 +81,11 @@ def mkdir_p(path): Log = logging.getLogger(__name__+".kafka_test_x.py") runname = 'Arros_0-25' #Used in output directory as a unique identifier + # To run without propagation set propagator to None and set a + # prior in LinearKalman. + # If propagator is set to propagate_LAI_broadbandSAIL then the + # prior in LinearKalman must be set to None because this propagator + # includes a prior propagator = propagate_LAI_broadbandSAIL parameter_list = ["w_vis", "x_vis", "a_vis", "w_nir", "x_nir", "a_nir", "TeLAI"] @@ -127,6 +132,8 @@ def mkdir_p(path): output = KafkaOutput(parameter_list, geotransform, projection, path) + # If using a separate prior then this is passed to LinearKalman + # Otherwise this is just used to set the starting state vector. the_prior = JRCPrior(parameter_list, mask) # prior = None when using propagate_LAI_broadbandSAIL From 8ed511ded213915e98b910a0c77b24fd8f6c2072 Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 19:19:45 +0100 Subject: [PATCH 08/15] example scripts to run with latest propagators --- kafka_test_Py36.py | 4 +- kafka_test_S2.py | 151 ++++++++++++++++----------------------------- 2 files changed, 56 insertions(+), 99 deletions(-) diff --git a/kafka_test_Py36.py b/kafka_test_Py36.py index 3657b6c..46d0e5b 100644 --- a/kafka_test_Py36.py +++ b/kafka_test_Py36.py @@ -81,6 +81,7 @@ def mkdir_p(path): Log = logging.getLogger(__name__+".kafka_test_x.py") runname = 'Arros_0-25' #Used in output directory as a unique identifier + # To run without propagation set propagator to None and set a # prior in LinearKalman. # If propagator is set to propagate_LAI_broadbandSAIL then the @@ -108,7 +109,7 @@ def mkdir_p(path): - path = "/home/npounder/output/kafka/28Mar/kafkaout_{}".format(runname) + path = "/tmp/kafkaout_{}".format(runname) if not os.path.exists(path): mkdir_p(path) @@ -122,7 +123,6 @@ def mkdir_p(path): bhr_data = BHRObservations(emulator, tile, mcd43a1_dir, start_time, end_time=None, mcd43a2_dir=None, period=8) - Log.info("runname = {}".format(runname)) Log.info("propagator = {}".format(propagator)) Log.info("tile = {}".format(tile)) Log.info("start_time = {}".format(start_time)) diff --git a/kafka_test_S2.py b/kafka_test_S2.py index 6f7c7b9..f40c304 100644 --- a/kafka_test_S2.py +++ b/kafka_test_S2.py @@ -4,7 +4,7 @@ logging.basicConfig( level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - filename="the_log.log") + filename="Campinha_noprop_2017.log") import os from datetime import datetime, timedelta import numpy as np @@ -28,14 +28,13 @@ import kafka from kafka.input_output import Sentinel2Observations, KafkaOutput from kafka import LinearKalman -from kafka.inference import block_diag -from kafka.inference import propagate_information_filter_LAI -from kafka.inference import no_propagation +from kafka.inference.narrowbandSAIL_tools import propagate_LAI_narrowbandSAIL from kafka.inference import create_prosail_observation_operator - +from kafka.inference.narrowbandSAIL_tools import SAILPrior # Probably should be imported from somewhere else, but I can't see # where from ATM... No biggy + def reproject_image(source_img, target_img, dstSRSs=None): """Reprojects/Warps an image to fit exactly another image. Additionally, you can set the destination SRS if you want @@ -59,82 +58,6 @@ def reproject_image(source_img, target_img, dstSRSs=None): dstSRS=dstSRS) return g - - -###class DummyInferencePrior(_WrappingInferencePrior): - ###""" - ###This class is merely a dummy. - ###""" - - ###def process_prior(self, parameters: List[str], time: Union[str, datetime], state_grid: np.array, - -class SAILPrior(object): - def __init__ (self, parameter_list, state_mask): - self.parameter_list = parameter_list - if isinstance(state_mask, (np.ndarray, np.generic) ): - self.state_mask = state_mask - else: - self.state_mask = self._read_mask(state_mask) - #parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', - # 'lai', 'ala', 'bsoil', 'psoil'] - #self.mean = np.array([1.19, np.exp(-14.4/100.), - #np.exp(-4.0/100.), 0.1, - #np.exp(-50*0.68), np.exp(-100./21.0), - #np.exp(-3.97/2.),70./90., 0.5, 0.9]) - #sigma = np.array([0.69, 0.016, - #0.0086, 0.1, - #1.71e-2, 0.017, - #0.20, 0.5, 0.5, 0.5]) - self.mean = np.array([2.1, np.exp(-60./100.), - np.exp(-7.0/100.), 0.1, - np.exp(-50*0.0176), np.exp(-100.*0.002), - np.exp(-4./2.), 70./90., 0.5, 0.9]) - sigma = np.array([0.01, 0.2, - 0.01, 0.05, - 0.01, 0.01, - 0.50, 0.1, 0.1, 0.1]) - - self.covar = np.diag(sigma**2).astype(np.float32) - self.inv_covar = np.diag(1./sigma**2).astype(np.float32) - #self.inv_covar[3,3]=0 - ########self.mean = - ########self.variance = - ########lai_m2_m2: [3.1733 1.7940] - - ########cab_ug_cm2: [14.4369 1.8284] - - ########car_u_cm2: [3.9650 0.8948] - - ########cdm_g_cm2: [20.9675 6.4302] - - ########cw_cm: [0.6781 0.6749] - - ########N_: [1.1937 0.6902] - - def _read_mask(self, fname): - """Tries to read the mask as a GDAL dataset""" - if not os.path.exists(fname): - raise IOError("State mask is neither an array or a file that exists!") - g = gdal.Open(fname) - if g is None: - raise IOError("{:s} can't be opened with GDAL!".format(fname)) - mask = g.ReadAsArray() - return mask - - def process_prior ( self, time, inv_cov=True): - # Presumably, self._inference_prior has some method to retrieve - # a bunch of files for a given date... - n_pixels = self.state_mask.sum() - x0 = np.array([self.mean for i in range(n_pixels)]).flatten() - if inv_cov: - inv_covar_list = [self.inv_covar for m in range(n_pixels)] - inv_covar = block_diag(inv_covar_list, dtype=np.float32) - return x0, inv_covar - else: - covar_list = [self.covar for m in range(n_pixels)] - covar = block_diag(covar_list, dtype=np.float32) - return x0, covar - class KafkaOutputMemory(object): """A very simple class to output the state.""" def __init__(self, parameter_list): @@ -147,55 +70,89 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, solution[param] = x_analysis[ii::7] self.output[timestep] = solution +def mkdir_p(path): + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(path): + pass + else: + raise if __name__ == "__main__": - + + # Set up logging + Log = logging.getLogger(__name__+".kafka_test_x.py") + + + runname = 'Campinha_2017_Q0-1' #Used in output directory as a unique identifier + + # To run without propagation set propagator to None and set a + # prior in LinearKalman. + # If propagator is set to propagate_LAI_narrowbandSAIL then the + # prior in LinearKalman must be set to None because this propagator + # includes a prior + propagator = propagate_LAI_narrowbandSAIL + parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', 'lai', 'ala', 'bsoil', 'psoil'] - + start_time = "2017001" - + + Log.info("propagator = {}".format(propagator)) + Log.info("tile = {}".format(tile)) + Log.info("start_time = {}".format(start_time)) + + path = "/tmp/kafkaout_{}".format(runname) + if not os.path.exists(path): + mkdir_p(path) + #emulator_folder = "/home/ucfafyi/DATA/Multiply/emus/sail/" emulator_folder = "/home/glopez/Multiply/src/py36/emus/sail" - + #data_folder = "/data/nemesis/S2_data/30/S/WJ/" data_folder = "/data/001_planet_sentinel_study/sentinel/30/S/WJ" state_mask = "./Barrax_pivots.tif" - + + s2_observations = Sentinel2Observations(data_folder, - emulator_folder, + emulator_folder, state_mask) projection, geotransform = s2_observations.define_output() output = KafkaOutput(parameter_list, geotransform, - projection, "/tmp/") + projection, path) + # If using a separate prior then this is passed to LinearKalman + # Otherwise this is just used to set the starting state vector. the_prior = SAILPrior(parameter_list, state_mask) g = gdal.Open(state_mask) mask = g.ReadAsArray().astype(np.bool) + # prior = None when using propagate_LAI_narrowbandSAIL kf = LinearKalman(s2_observations, output, mask, create_prosail_observation_operator, parameter_list, - state_propagation=None, - prior=the_prior, + state_propagation=propagator, + prior=None,#the_prior, linear=False) # Get starting state... We can request the prior object for this x_forecast, P_forecast_inv = the_prior.process_prior(None) - + Q = np.zeros_like(x_forecast) - + Q[6::10] = 0.1 + kf.set_trajectory_model() kf.set_trajectory_uncertainty(Q) - - base = datetime(2017,7,3) - num_days = 10 - time_grid = list((base + timedelta(days=x) + + # This determines the time grid of the retrieved state parameters + base = datetime(2017,1,1) + num_days = 366 + time_grid = list((base + timedelta(days=x) for x in range(0, num_days, 2))) kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) - From d9c6df9ba09f9324c1315ce932f8838835380a1a Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 19:25:33 +0100 Subject: [PATCH 09/15] remove unecessary tile logging --- kafka_test_S2.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka_test_S2.py b/kafka_test_S2.py index f40c304..b66ab3d 100644 --- a/kafka_test_S2.py +++ b/kafka_test_S2.py @@ -4,7 +4,7 @@ logging.basicConfig( level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - filename="Campinha_noprop_2017.log") + filename="the_log.log") import os from datetime import datetime, timedelta import numpy as np @@ -100,7 +100,6 @@ def mkdir_p(path): start_time = "2017001" Log.info("propagator = {}".format(propagator)) - Log.info("tile = {}".format(tile)) Log.info("start_time = {}".format(start_time)) path = "/tmp/kafkaout_{}".format(runname) @@ -143,6 +142,7 @@ def mkdir_p(path): # Get starting state... We can request the prior object for this x_forecast, P_forecast_inv = the_prior.process_prior(None) + # Inflation amount for propagation Q = np.zeros_like(x_forecast) Q[6::10] = 0.1 From daba58263aecdd04e9a389f9fa9680e951cc79bd Mon Sep 17 00:00:00 2001 From: npounder Date: Fri, 15 Jun 2018 20:12:40 +0100 Subject: [PATCH 10/15] match filepaths to latest output --- kafka_test_Py36.py | 39 +++++++++++++++++++++------------------ kafka_test_S2.py | 14 +++++++------- 2 files changed, 28 insertions(+), 25 deletions(-) diff --git a/kafka_test_Py36.py b/kafka_test_Py36.py index 46d0e5b..1c4e889 100644 --- a/kafka_test_Py36.py +++ b/kafka_test_Py36.py @@ -4,7 +4,7 @@ logging.basicConfig( level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - filename="the_log.log") + filename="logfiles/MODIS_demo.log") import os from datetime import datetime, timedelta @@ -80,7 +80,7 @@ def mkdir_p(path): # Set up logging Log = logging.getLogger(__name__+".kafka_test_x.py") - runname = 'Arros_0-25' #Used in output directory as a unique identifier + runname = 'Bondville_0-05' #Used in output directory as a unique identifier # To run without propagation set propagator to None and set a # prior in LinearKalman. @@ -92,24 +92,28 @@ def mkdir_p(path): "w_nir", "x_nir", "a_nir", "TeLAI"] ## parameters for Bondville data. - #tile = "h11v04" # Bondville - #start_time = "2006001" # Bondville - #cd43a1_dir="/data/MODIS/h11v04/MCD43" + tile = "h11v04" + start_time = "2006001" + time_grid_start = datetime(2006, 1, 1) + num_days = 366 + mcd43a1_dir="/data/MODIS/h11v04/MCD43" ## Bondville chip - #masklim = ((2200, 2450), (450, 700)) # Bondville, h11v04 - - # Parameters for Spanish tile - tile = "h17v05" # Spain - start_time = "2017001" # Spain - mcd43a1_dir="/data/MODIS/h17v05/MCD43" - # chips in h17v05 Spain, select one - masklim = ((650, 730), (1180, 1280)) # Arros, rice + masklim = ((2200, 2400), (450, 700)) # Bondville, h11v04 + + ## Parameters for Spanish tile + #tile = "h17v05" # Spain + #start_time = "2017001" # Spain + #time_grid_start = datetime(2017, 1, 1) + #num_days = 366 + #mcd43a1_dir="/data/MODIS/h17v05/MCD43" + ## chips in h17v05 Spain, select one + #masklim = ((650, 730), (1180, 1280)) # Arros, rice #masklim = ((900,940), (1300,1340)) = True # Alcornocales #masklim = ((640,700), (1400,1500)) = True # Campinha - path = "/tmp/kafkaout_{}".format(runname) + path = "/home/npounder/output/kafka/demo/MODIS/kafkaout_{}".format(runname) if not os.path.exists(path): mkdir_p(path) @@ -146,17 +150,16 @@ def mkdir_p(path): # Get starting state... We can request the prior object for this x_forecast, P_forecast_inv = the_prior.process_prior(None) + # Inflation amount for propagation Q = np.zeros_like(x_forecast) - Q[6::7] = 0.25 + Q[6::7] = 0.05 kf.set_trajectory_model() kf.set_trajectory_uncertainty(Q) # This determines the time grid of the retrieved state parameters - base = datetime(2017, 1, 1) - num_days = 366 time_grid = [] for x in range( 0, num_days, 8): - time_grid.append(base + timedelta(days=x)) + time_grid.append(time_grid_start + timedelta(days=x)) kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) \ No newline at end of file diff --git a/kafka_test_S2.py b/kafka_test_S2.py index b66ab3d..56be621 100644 --- a/kafka_test_S2.py +++ b/kafka_test_S2.py @@ -4,7 +4,7 @@ logging.basicConfig( level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - filename="the_log.log") + filename="logfiles/demo_Campinha_2017_Q0-05.log") import os from datetime import datetime, timedelta import numpy as np @@ -85,7 +85,7 @@ def mkdir_p(path): Log = logging.getLogger(__name__+".kafka_test_x.py") - runname = 'Campinha_2017_Q0-1' #Used in output directory as a unique identifier + runname = 'Campinha_2017_Q0-05' #Used in output directory as a unique identifier # To run without propagation set propagator to None and set a # prior in LinearKalman. @@ -98,11 +98,13 @@ def mkdir_p(path): 'lai', 'ala', 'bsoil', 'psoil'] start_time = "2017001" + time_grid_start = datetime(2017, 1, 1) + num_days = 366 Log.info("propagator = {}".format(propagator)) Log.info("start_time = {}".format(start_time)) - path = "/tmp/kafkaout_{}".format(runname) + path = "/home/npounder/output/kafka/demo/S2/kafkaout_{}".format(runname) if not os.path.exists(path): mkdir_p(path) @@ -144,15 +146,13 @@ def mkdir_p(path): # Inflation amount for propagation Q = np.zeros_like(x_forecast) - Q[6::10] = 0.1 + Q[6::10] = 0.05 kf.set_trajectory_model() kf.set_trajectory_uncertainty(Q) # This determines the time grid of the retrieved state parameters - base = datetime(2017,1,1) - num_days = 366 - time_grid = list((base + timedelta(days=x) + time_grid = list((time_grid_start + timedelta(days=x) for x in range(0, num_days, 2))) kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) From addc9413e939f141a4221d1e5615be0fb292e510 Mon Sep 17 00:00:00 2001 From: npounder Date: Wed, 20 Jun 2018 18:02:17 +0100 Subject: [PATCH 11/15] broadband sail tools --- kafka/inference/broadbandSAIL_tools.py | 97 ++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 kafka/inference/broadbandSAIL_tools.py diff --git a/kafka/inference/broadbandSAIL_tools.py b/kafka/inference/broadbandSAIL_tools.py new file mode 100644 index 0000000..447fc91 --- /dev/null +++ b/kafka/inference/broadbandSAIL_tools.py @@ -0,0 +1,97 @@ +import logging +import gdal +import numpy as np +import os + +from .utils import block_diag +from .kf_tools import propagate_single_parameter + + +def tip_prior_values(): + """The JRC-TIP prior in a convenient function which is fun for the whole + family. Note that the effective LAI is here defined in transformed space + where TLAI = exp(-0.5*LAIe). + + Returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + # broadly TLAI 0->7 for 1sigma + sigma = np.array([0.12, 0.7, 0.0959, 0.15, 1.5, 0.2, 0.5]) + x0 = np.array([0.17, 1.0, 0.1, 0.7, 2.0, 0.18, np.exp(-0.5*1.5)]) + # The individual covariance matrix + little_p = np.diag(sigma**2).astype(np.float32) + little_p[5, 2] = 0.8862*0.0959*0.2 + little_p[2, 5] = 0.8862*0.0959*0.2 + + inv_p = np.linalg.inv(little_p) + return x0, little_p, inv_p + + +class JRCPrior(object): + """Dummpy 2.7/3.6 prior class following the same interface as 3.6 only + version.""" + + def __init__(self, parameter_list, state_mask): + """It makes sense to have the list of parameters and state mask + defined at this point, as they won't change during processing.""" + self.mean, self.covar, self.inv_covar = self._tip_prior() + self.parameter_list = parameter_list + if isinstance(state_mask, (np.ndarray, np.generic) ): + self.state_mask = state_mask + else: + self.state_mask = self._read_mask(state_mask) + + def _read_mask(self, fname): + """Tries to read the mask as a GDAL dataset""" + if not os.path.exists(fname): + raise IOError("State mask is neither an array or a file that exists!") + g = gdal.Open(fname) + if g is None: + raise IOError("{:s} can't be opened with GDAL!".format(fname)) + mask = g.ReadAsArray() + return mask + + def _tip_prior(self): + """The JRC-TIP prior in a convenient function which is fun for the whole + family. Note that the effective LAI is here defined in transformed space + where TLAI = exp(-0.5*LAIe). + + Returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + mean, c_prior, c_inv_prior = tip_prior_values() + self.mean = mean + self.covar = c_prior + self.inv_covar = c_inv_prior + return mean, c_prior, c_inv_prior + + def process_prior ( self, time, inv_cov=True): + # Presumably, self._inference_prior has some method to retrieve + # a bunch of files for a given date... + n_pixels = self.state_mask.sum() + x0 = np.array([self.mean for i in range(n_pixels)]).flatten() + if inv_cov: + inv_covar_list = [self.inv_covar for m in range(n_pixels)] + inv_covar = block_diag(inv_covar_list, dtype=np.float32) + return x0, inv_covar + else: + covar_list = [self.covar for m in range(n_pixels)] + covar = block_diag(covar_list, dtype=np.float32) + return x0, covar + + +def propagate_LAI_broadbandSAIL(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + prior=None, state_propagator=None, date=None): + """Propagate a single parameter and + set the rest of the parameter propagations to the prior. + This should be used with a prior for the remaining parameters""" + nparameters = 7 + lai_position = 6 + x_prior, c_prior, c_inv_prior = tip_prior_values() + return propagate_single_parameter(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + nparameters, lai_position, + x_prior, c_inv_prior) From 2ac9a43352c1ced969aff1b13f734b37961f900f Mon Sep 17 00:00:00 2001 From: Nicola Pounder Date: Wed, 20 Jun 2018 18:03:03 +0100 Subject: [PATCH 12/15] Delete empty file --- kafka/inference/broadbandSAIL_tools | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 kafka/inference/broadbandSAIL_tools diff --git a/kafka/inference/broadbandSAIL_tools b/kafka/inference/broadbandSAIL_tools deleted file mode 100644 index e69de29..0000000 From 7955fa26fc927134cfe84b6ac100d13e947ce0ed Mon Sep 17 00:00:00 2001 From: gerardolopez Date: Thu, 21 Jun 2018 16:46:49 +0000 Subject: [PATCH 13/15] Save state to npz files --- kafka/inference/broadbandSAIL_tools.py | 97 ++++++++++++++++++++++++++ kafka/input_output/observations.py | 17 ++++- kafka_test_S2.py | 32 +++++++-- 3 files changed, 138 insertions(+), 8 deletions(-) create mode 100644 kafka/inference/broadbandSAIL_tools.py diff --git a/kafka/inference/broadbandSAIL_tools.py b/kafka/inference/broadbandSAIL_tools.py new file mode 100644 index 0000000..447fc91 --- /dev/null +++ b/kafka/inference/broadbandSAIL_tools.py @@ -0,0 +1,97 @@ +import logging +import gdal +import numpy as np +import os + +from .utils import block_diag +from .kf_tools import propagate_single_parameter + + +def tip_prior_values(): + """The JRC-TIP prior in a convenient function which is fun for the whole + family. Note that the effective LAI is here defined in transformed space + where TLAI = exp(-0.5*LAIe). + + Returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + # broadly TLAI 0->7 for 1sigma + sigma = np.array([0.12, 0.7, 0.0959, 0.15, 1.5, 0.2, 0.5]) + x0 = np.array([0.17, 1.0, 0.1, 0.7, 2.0, 0.18, np.exp(-0.5*1.5)]) + # The individual covariance matrix + little_p = np.diag(sigma**2).astype(np.float32) + little_p[5, 2] = 0.8862*0.0959*0.2 + little_p[2, 5] = 0.8862*0.0959*0.2 + + inv_p = np.linalg.inv(little_p) + return x0, little_p, inv_p + + +class JRCPrior(object): + """Dummpy 2.7/3.6 prior class following the same interface as 3.6 only + version.""" + + def __init__(self, parameter_list, state_mask): + """It makes sense to have the list of parameters and state mask + defined at this point, as they won't change during processing.""" + self.mean, self.covar, self.inv_covar = self._tip_prior() + self.parameter_list = parameter_list + if isinstance(state_mask, (np.ndarray, np.generic) ): + self.state_mask = state_mask + else: + self.state_mask = self._read_mask(state_mask) + + def _read_mask(self, fname): + """Tries to read the mask as a GDAL dataset""" + if not os.path.exists(fname): + raise IOError("State mask is neither an array or a file that exists!") + g = gdal.Open(fname) + if g is None: + raise IOError("{:s} can't be opened with GDAL!".format(fname)) + mask = g.ReadAsArray() + return mask + + def _tip_prior(self): + """The JRC-TIP prior in a convenient function which is fun for the whole + family. Note that the effective LAI is here defined in transformed space + where TLAI = exp(-0.5*LAIe). + + Returns + ------- + The mean prior vector, covariance and inverse covariance matrices.""" + mean, c_prior, c_inv_prior = tip_prior_values() + self.mean = mean + self.covar = c_prior + self.inv_covar = c_inv_prior + return mean, c_prior, c_inv_prior + + def process_prior ( self, time, inv_cov=True): + # Presumably, self._inference_prior has some method to retrieve + # a bunch of files for a given date... + n_pixels = self.state_mask.sum() + x0 = np.array([self.mean for i in range(n_pixels)]).flatten() + if inv_cov: + inv_covar_list = [self.inv_covar for m in range(n_pixels)] + inv_covar = block_diag(inv_covar_list, dtype=np.float32) + return x0, inv_covar + else: + covar_list = [self.covar for m in range(n_pixels)] + covar = block_diag(covar_list, dtype=np.float32) + return x0, covar + + +def propagate_LAI_broadbandSAIL(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + prior=None, state_propagator=None, date=None): + """Propagate a single parameter and + set the rest of the parameter propagations to the prior. + This should be used with a prior for the remaining parameters""" + nparameters = 7 + lai_position = 6 + x_prior, c_prior, c_inv_prior = tip_prior_values() + return propagate_single_parameter(x_analysis, P_analysis, + P_analysis_inverse, + M_matrix, Q_matrix, + nparameters, lai_position, + x_prior, c_inv_prior) diff --git a/kafka/input_output/observations.py b/kafka/input_output/observations.py index cb6022e..8e291f6 100644 --- a/kafka/input_output/observations.py +++ b/kafka/input_output/observations.py @@ -325,7 +325,7 @@ def get_band_data(self, the_date, band_no): class KafkaOutput(object): """A very simple class to output the state.""" def __init__(self, parameter_list, geotransform, projection, folder, - fmt="GTiff"): + fmt="GTiff", state_folder:str=None): """The inference engine works on tiles, so we get the tilewidth (we assume the tiles are square), the GDAL-friendly geotransform and projection, as well as the destination directory and the @@ -333,6 +333,7 @@ def __init__(self, parameter_list, geotransform, projection, folder, self.geotransform = geotransform self.projection = projection self.folder = folder + self.state_folder = state_folder self.fmt = fmt self.parameter_list = parameter_list @@ -352,6 +353,7 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, A = np.zeros(state_mask.shape, dtype=np.float32) A[state_mask] = x_analysis[ii::n_params] dst_ds.GetRasterBand(1).WriteArray(A) + for ii, param in enumerate(self.parameter_list): fname = os.path.join(self.folder, "%s_%s_unc.tif" % (param, timestep.strftime("A%Y%j"))) @@ -366,7 +368,20 @@ def dump_data(self, timestep, x_analysis, P_analysis, P_analysis_inv, A[state_mask] = 1./np.sqrt(P_analysis_inv.diagonal()[ii::n_params]) dst_ds.GetRasterBand(1).WriteArray(A) + if not self.state_folder == None and os.path.exists( self.state_folder ): + # Dump to disk P_analysis_inv as sparse matrix in npz + fname = os.path.join(self.state_folder, "P_analysis_inv_%s.npz" % + ( timestep.strftime("A%Y%j") ) ) + sp.save_npz( fname, P_analysis_inv ) + + fname = os.path.join(self.state_folder, "P_analysis_%s.npz" % + ( timestep.strftime("A%Y%j") ) ) + np.savez( fname, P_analysis ) + # Dump as well the whole x_analysis in a single vector + fname = os.path.join(self.state_folder, "x_analysis_%s.npz" % + ( timestep.strftime("A%Y%j") ) ) + np.savez( fname, x_analysis ) if __name__ == "__main__": emulator = "../SAIL_emulator_both_500trainingsamples.pkl" diff --git a/kafka_test_S2.py b/kafka_test_S2.py index 56be621..01b106e 100644 --- a/kafka_test_S2.py +++ b/kafka_test_S2.py @@ -5,6 +5,7 @@ level=logging.getLevelName(logging.DEBUG), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename="logfiles/demo_Campinha_2017_Q0-05.log") + import os from datetime import datetime, timedelta import numpy as np @@ -97,22 +98,26 @@ def mkdir_p(path): parameter_list = ['n', 'cab', 'car', 'cbrown', 'cw', 'cm', 'lai', 'ala', 'bsoil', 'psoil'] - start_time = "2017001" - time_grid_start = datetime(2017, 1, 1) + #start_time = "2017001" + #time_grid_start = datetime(2017, 1, 1) + start_time = "2017049" + time_grid_start = datetime(2017, 2, 18) num_days = 366 Log.info("propagator = {}".format(propagator)) Log.info("start_time = {}".format(start_time)) - path = "/home/npounder/output/kafka/demo/S2/kafkaout_{}".format(runname) + path = "/tmp/kafka/demo/S2/kafkaout_{}".format(runname) if not os.path.exists(path): mkdir_p(path) #emulator_folder = "/home/ucfafyi/DATA/Multiply/emus/sail/" - emulator_folder = "/home/glopez/Multiply/src/py36/emus/sail" + #emulator_folder = "/home/glopez/Multiply/src/py36/emus/sail" + emulator_folder = "/data/archive/emulators/s2_prosail" #data_folder = "/data/nemesis/S2_data/30/S/WJ/" - data_folder = "/data/001_planet_sentinel_study/sentinel/30/S/WJ" + #data_folder = "/data/001_planet_sentinel_study/sentinel/30/S/WJ" + data_folder = "/data/S2_L2/30/S/WJ/" state_mask = "./Barrax_pivots.tif" @@ -141,8 +146,20 @@ def mkdir_p(path): prior=None,#the_prior, linear=False) - # Get starting state... We can request the prior object for this - x_forecast, P_forecast_inv = the_prior.process_prior(None) + # Check if there's a prior from a previous timestep + P_inv_fname = "P_analysis_inv_%s.npz" % time_grid_start.strftime("A%Y%j") + P_inv_fname = os.path.join( path, P_inv_fname ) + + x_fname = "x_analysis_%s.npz" % time_grid_start.strftime("A%Y%j") + x_fname = os.path.join( path, x_fname ) + + if os.path.exists( P_inv_fname ) and os.path.exists( x_fname ): + # Load stored matrices... + x_forecast = np.load( x_fname )['arr_0'] + P_forecast_inv = sp.load_npz( P_inv_fname ) + else: + # Get starting state... We can request the prior object for this + x_forecast, P_forecast_inv = the_prior.process_prior(None) # Inflation amount for propagation Q = np.zeros_like(x_forecast) @@ -156,3 +173,4 @@ def mkdir_p(path): for x in range(0, num_days, 2))) kf.run(time_grid, x_forecast, None, P_forecast_inv, iter_obs_op=True) + From 8119c6cd1e3cc7a84082b4ae7ac8770b2b42dc71 Mon Sep 17 00:00:00 2001 From: npounder Date: Mon, 20 Aug 2018 15:50:54 +0100 Subject: [PATCH 14/15] Hessian calc for prosail handles case with masked elements --- kafka/inference/utils.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/kafka/inference/utils.py b/kafka/inference/utils.py index 6f21a46..c7bd5c4 100644 --- a/kafka/inference/utils.py +++ b/kafka/inference/utils.py @@ -228,7 +228,15 @@ def create_prosail_observation_operator(n_params, emulator, metadata, LOG.info("\tDone!") if calc_hess: - hess = emulator.hessian(x0[mask[state_mask]]) + hess = np.zeros((n_times, n_params, n_params), + dtype=np.float32) + hess_ = emulator.hessian(x0[mask[state_mask]]) + + n = 0 + for i, m in enumerate(mask[state_mask].flatten()): + if m: + hess[i, ...] = hess_[n] + n += 1 return (H0, H_matrix.tocsr(), hess) if calc_hess else (H0, H_matrix.tocsr()) @@ -357,7 +365,7 @@ def spsolve2(a, b): a_lu = spl.splu(a.tocsc()) # LU decomposition for sparse a out = sp.lil_matrix((a.shape[1], b.shape[1]), dtype=np.float32) b_csc = b.tocsc() - for j in xrange(b.shape[1]): + for j in range(b.shape[1]): bb = np.array(b_csc[j, :].todense()).squeeze() out[j, j] = a_lu.solve(bb)[j] return out.tocsr() From b4504b4ca4d98278e06a18aef9d2328bd2f58296 Mon Sep 17 00:00:00 2001 From: npounder Date: Mon, 20 Aug 2018 18:34:54 +0100 Subject: [PATCH 15/15] place error in logfile when a retrieval of a fully masked observation is attempted --- kafka/linear_kf.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/kafka/linear_kf.py b/kafka/linear_kf.py index 1da2811..c9d9e05 100644 --- a/kafka/linear_kf.py +++ b/kafka/linear_kf.py @@ -183,8 +183,8 @@ def run(self, time_grid, x_forecast, P_forecast, P_forecast_inverse, x_forecast, P_forecast, P_forecast_inverse = self.advance( x_analysis, P_analysis, P_analysis_inverse, self.trajectory_model, self.trajectory_uncertainty) - is_first = False + if len(locate_times) == 0: # Just advance the time x_analysis = x_forecast @@ -256,14 +256,22 @@ def do_all_bands(self, timestep, current_data, x_forecast, P_forecast, # Also extract single band information from nice package # this allows us to use the same interface as current # Deferring processing to a new solver method in solvers.py - - H_matrix_= self._create_observation_operator(self.n_params, + + try: + H_matrix_= self._create_observation_operator(self.n_params, data.emulator, data.metadata, data.mask, self.state_mask, x_prev, - band) + band, + calc_hess = False) + except ValueError as e: + if (sum(data.mask[self.state_mask])== 0): + LOG.error("All observations masked out") + raise + + H_matrix.append(H_matrix_) Y.append(data.observations) MASK.append(data.mask)