Source code for ruspy.estimation.nfxp

"""
This module contains functions for estimating the parameters shaping the cost
function. Therefore it also contains the heart of this project. The python
implementation of fix point algorithm developed by John Rust.
"""
import numba
import numpy as np

from ruspy.estimation import config
from ruspy.model_code.choice_probabilities import choice_prob_gumbel
from ruspy.model_code.cost_functions import calc_obs_costs
from ruspy.model_code.fix_point_alg import calc_fixp
from ruspy.model_code.fix_point_alg import contr_op_dev_wrt_params
from ruspy.model_code.fix_point_alg import contr_op_dev_wrt_rc
from ruspy.model_code.fix_point_alg import solve_equ_system_fixp

ev_intermed = None
current_params = None


[docs]def loglike_cost_params_individual( params, maint_func, maint_func_dev, num_states, trans_mat, state_mat, decision_mat, disc_fac, scale, alg_details, ): """ This is the individual logliklihood function for the estimation of the cost parameters needed for the BHHH optimizer. Parameters ---------- params : numpy.ndarray see :ref:`params` maint_func: func see :ref:`maint_func` num_states : int The size of the state space. disc_fac : numpy.float see :ref:`disc_fac` trans_mat : numpy.ndarray see :ref:`trans_mat` state_mat : numpy.ndarray see :ref:`state_mat` decision_mat : numpy.ndarray see :ref:`decision_mat` Returns ------- log_like : numpy.ndarray A num_buses times num_periods dimensional array containing the negative log-likelihood contributions of the individuals. """ costs = calc_obs_costs(num_states, maint_func, params, scale) ev, contr_step_count, newt_kant_step_count = get_ev( params, trans_mat, costs, disc_fac, alg_details ) config.total_contr_count += contr_step_count config.total_newt_kant_count += newt_kant_step_count p_choice = choice_prob_gumbel(ev, costs, disc_fac) log_like = like_hood_data_individual(np.log(p_choice), decision_mat, state_mat) return log_like
[docs]def loglike_cost_params( params, maint_func, maint_func_dev, num_states, trans_mat, state_mat, decision_mat, disc_fac, scale, alg_details, ): """ sums the individual negative log likelihood contributions for algorithms such as the L-BFGS-B. Parameters ---------- params : numpy.ndarray see :ref:`params` maint_func: func see :ref:`maint_func` maint_func_dev: func see :ref:`maint_func` num_states : int The size of the state space. trans_mat : numpy.ndarray see :ref:`trans_mat` state_mat : numpy.ndarray see :ref:`state_mat` decision_mat : numpy.ndarray see :ref:`decision_mat` disc_fac : numpy.float see :ref:`disc_fac` scale : numpy.float see :ref:`scale` alg_details : dict see :ref: `alg_details` Returns ------- log_like_sum : float The negative log likelihood based on the data. """ log_like_sum = loglike_cost_params_individual( params, maint_func, maint_func_dev, num_states, trans_mat, state_mat, decision_mat, disc_fac, scale, alg_details, ).sum() return log_like_sum
[docs]def derivative_loglike_cost_params_individual( params, maint_func, maint_func_dev, num_states, trans_mat, state_mat, decision_mat, disc_fac, scale, alg_details, ): """ This is the Jacobian of the individual log likelihood function of the cost parameter estimation with respect to all cost parameters needed for the BHHH. Parameters ---------- params : numpy.ndarray see :ref:`params` maint_func: func see :ref:`maint_func` num_states : int The size of the state space. disc_fac : numpy.float see :ref:`disc_fac` trans_mat : numpy.ndarray see :ref:`trans_mat` state_mat : numpy.ndarray see :ref:`state_mat` decision_mat : numpy.ndarray see :ref:`decision_mat` Returns ------- dev : numpy.ndarray A num_buses + num_periods x dim(params) matrix in form of numpy array containing the derivative of the individual log-likelihood function for every cost parameter. """ # params = params["value"].to_numpy() dev = np.zeros((decision_mat.shape[1], len(params))) obs_costs = calc_obs_costs(num_states, maint_func, params, scale) ev = get_ev(params, trans_mat, obs_costs, disc_fac, alg_details)[0] p_choice = choice_prob_gumbel(ev, obs_costs, disc_fac) maint_cost_dev = maint_func_dev(num_states, scale) lh_values_rc = like_hood_vaules_rc(ev, obs_costs, p_choice, trans_mat, disc_fac) like_dev_rc = like_hood_data_individual(lh_values_rc, decision_mat, state_mat) dev[:, 0] = like_dev_rc for i in range(len(params) - 1): if len(params) == 2: cost_dev_param = maint_cost_dev else: cost_dev_param = maint_cost_dev[:, i] log_like_values_params = log_like_values_param( ev, obs_costs, p_choice, trans_mat, cost_dev_param, disc_fac ) dev[:, i + 1] = like_hood_data_individual( log_like_values_params, decision_mat, state_mat ) return dev
[docs]def derivative_loglike_cost_params( params, maint_func, maint_func_dev, num_states, trans_mat, state_mat, decision_mat, disc_fac, scale, alg_details, ): """ sums up the Jacobian to obtain the gradient of the negative log likelihood function needed for algorithm such as the L-BFGS-B. Parameters ---------- params : numpy.ndarray see :ref:`params` maint_func: func see :ref:`maint_func` maint_func_dev: func see :ref:`maint_func` num_states : int The size of the state space. trans_mat : numpy.ndarray see :ref:`trans_mat` state_mat : numpy.ndarray see :ref:`state_mat` decision_mat : numpy.ndarray see :ref:`decision_mat` disc_fac : numpy.float see :ref:`disc_fac` scale : numpy.float see :ref:`scale` alg_details : dict see :ref: `alg_details` Returns ------- dev numpy.ndarray A dimension(params) sized vector containing the gradient of the negative likelihood function. """ dev = derivative_loglike_cost_params_individual( params, maint_func, maint_func_dev, num_states, trans_mat, state_mat, decision_mat, disc_fac, scale, alg_details, ).sum(axis=0) return dev
[docs]def log_like_values_param(ev, costs, p_choice, trans_mat, cost_dev, disc_fac): dev_contr_op_params = contr_op_dev_wrt_params(trans_mat, p_choice[:, 0], cost_dev) dev_ev_params = solve_equ_system_fixp( dev_contr_op_params, ev, trans_mat, costs, disc_fac ) dev_value_maint_params = chain_rule_param(cost_dev, dev_ev_params, disc_fac) lh_values_param = like_hood_dev_values(p_choice, dev_value_maint_params) return lh_values_param
[docs]def like_hood_vaules_rc(ev, costs, p_choice, trans_mat, disc_fac): dev_contr_op_rc = contr_op_dev_wrt_rc(trans_mat, p_choice[:, 0]) dev_ev_rc = solve_equ_system_fixp(dev_contr_op_rc, ev, trans_mat, costs, disc_fac) dev_value_maint_rc = 1 + disc_fac * dev_ev_rc - disc_fac * dev_ev_rc[0] lh_values_rc = like_hood_dev_values(p_choice, dev_value_maint_rc) return lh_values_rc
[docs]def chain_rule_param(cost_dev, dev_ev_param, disc_fac): chain_value = ( cost_dev[0] - disc_fac * dev_ev_param[0] + disc_fac * dev_ev_param - cost_dev ) return chain_value
[docs]def like_hood_data_individual(l_values, decision_mat, state_mat): """ generates the individual likelihood contribution based on the model. Parameters ---------- l_values numpy.ndarray the raw log likelihood per state. decision_mat : numpy.ndarray see :ref:`decision_mat` state_mat : numpy.ndarray see :ref:`state_mat` Returns ------- np.array the individual likelihood contribution depending on the exact model. """ return -(decision_mat * np.dot(l_values.T, state_mat)).sum(axis=0)
[docs]def like_hood_data(l_values, decision_mat, state_mat): return -np.sum(decision_mat * np.dot(l_values.T, state_mat))
[docs]def like_hood_dev_values(p_choice, dev_values): l_values = np.empty_like(p_choice) l_values[:, 0] = np.multiply(1 - p_choice[:, 0], dev_values) l_values[:, 1] = np.multiply(1 - p_choice[:, 1], -dev_values) return l_values
[docs]@numba.jit(nopython=True) def create_state_matrix(states, num_states): """ This function constructs a auxiliary matrix for the log-likelihood of the cost parameters. Parameters ---------- states : numpy.ndarray All mileage state observations. num_states : int The size of the state space. Returns ------- state_mat : numpy.ndarray see :ref:`state_mat` """ num_obs = states.shape[0] state_mat = np.full((num_states, num_obs), 0.0) for i, value in enumerate(states): state_mat[value, i] = 1.0 return state_mat
[docs]def get_ev(params, trans_mat, obs_costs, disc_fac, alg_details): """ A auxiliary function, which allows the log-likelihood function as well as its derivative to share the same fixed point and to avoid the need to execute the computation double. Parameters ---------- params : numpy.ndarray see :ref:`params` trans_mat : numpy.ndarray see :ref:`trans_mat` obs_costs : numpy.ndarray see :ref:`costs` disc_fac : numpy.float see :ref:`disc_fac` Returns ------- ev : numpy.ndarray see :ref:`ev` """ global ev_intermed global current_params if (ev_intermed is not None) & np.array_equal(current_params, params): ev = ev_intermed ev_intermed = None else: ev = calc_fixp(trans_mat, obs_costs, disc_fac, **alg_details) ev_intermed = ev current_params = params return ev