import numba
import numpy as np
from ruspy.simulation.simulation_model import decide
from ruspy.simulation.simulation_model import draw_increment
[docs]@numba.jit(nopython=True)
def simulate_strategy(
num_periods,
num_buses,
costs,
ev,
trans_mat,
disc_fac,
seed,
):
"""
Simulating the decision process.
This function simulates the decision strategy, as long as the current period is
below the number of periods and the current highest state of a bus is in the
first half of the state space.
Parameters
----------
num_periods : int
The number of periods to be simulated.
num_buses : int
The number of buses to be simulated.
costs : numpy.ndarray
see :ref:`costs`
ev : numpy.ndarray
see :ref:`ev`
trans_mat : numpy.ndarray
see :ref:`trans_mat`
disc_fac : float
see :ref:`disc_fac`
seed : int
A positive integer setting the random seed for drawing random numbers.
Returns
-------
states : numpy.ndarray
A two dimensional numpy array containing for each bus in each period the
state as an integer.
decisions : numpy.ndarray
A two dimensional numpy array containing for each bus in each period the
decision as an integer.
utilities : numpy.ndarray
A two dimensional numpy array containing for each bus in each period the
utility as a float.
usage : numpy.ndarray
A two dimensional numpy array containing for each bus in each period the
mileage usage of last period as integer.
"""
np.random.seed(seed)
num_states = ev.shape[0]
states = np.zeros((num_buses, num_periods), dtype=numba.u2)
decisions = np.zeros((num_buses, num_periods), dtype=numba.b1)
utilities = np.zeros((num_buses, num_periods), dtype=numba.float32)
usage = np.zeros((num_buses, num_periods), dtype=numba.u1)
absorbing_state = 0
for bus in range(num_buses):
for period in range(num_periods):
old_state = states[bus, period]
intermediate_state, decision, utility = decide(
old_state,
costs,
disc_fac,
ev,
)
state_increase = draw_increment(intermediate_state, trans_mat)
decisions[bus, period] = decision
utilities[bus, period] = utility
new_state = intermediate_state + state_increase
if new_state > num_states:
new_state = num_states
state_increase = num_states - intermediate_state
usage[bus, period] = state_increase
if period < num_periods - 1:
states[bus, period + 1] = new_state
if new_state == num_states:
absorbing_state = 1
return states, decisions, utilities, usage, absorbing_state
[docs]@numba.jit(nopython=True)
def simulate_strategy_reduced_data_utilities(
num_periods,
num_buses,
costs,
ev,
trans_mat,
disc_fac,
seed,
):
"""
Simulating the decision process with reduced data usage.
This function simulates the decision strategy, as long as the current period is
below the number of periods and the current highest state of a bus is in the
first half of the state space.
Parameters
----------
num_periods : int
The number of periods to be simulated.
num_buses : int
The number of buses to be simulated.
costs : numpy.ndarray
see :ref:`costs`
ev : numpy.ndarray
see :ref:`ev`
trans_mat : numpy.ndarray
see :ref:`trans_mat`
disc_fac : float
see :ref:`disc_fac`
seed : int
A positive integer setting the random seed for drawing random numbers.
Returns
-------
utilities : numpy.ndarray
A two dimensional numpy array containing for each bus in each period the
utility as a float.
"""
np.random.seed(seed)
num_states = ev.shape[0]
utilities = np.zeros((num_buses, num_periods), dtype=numba.float32)
absorbing_state = 0
for bus in range(num_buses):
new_state = 0
for period in range(num_periods):
old_state = new_state
intermediate_state, decision, utility = decide(
old_state,
costs,
disc_fac,
ev,
)
state_increase = draw_increment(intermediate_state, trans_mat)
utilities[bus, period] = utility
new_state = intermediate_state + state_increase
if new_state > num_states:
new_state = num_states
if new_state == num_states:
absorbing_state = 1
return utilities, absorbing_state
[docs]@numba.jit(nopython=True)
def simulate_strategy_reduced_data_disc_utility(
num_periods,
num_buses,
costs,
ev,
trans_mat,
disc_fac,
seed,
):
"""
Simulating the decision process with reduced data usage.
This function simulates the decision strategy, as long as the current period is
below the number of periods and the current highest state of a bus is in the
first half of the state space.
Parameters
----------
num_periods : int
The number of periods to be simulated.
num_buses : int
The number of buses to be simulated.
costs : numpy.ndarray
see :ref:`costs`
ev : numpy.ndarray
see :ref:`ev`
trans_mat : numpy.ndarray
see :ref:`trans_mat`
disc_fac : float
see :ref:`disc_fac`
seed : int
A positive integer setting the random seed for drawing random numbers.
Returns
-------
utilities : numpy.ndarray
A two dimensional numpy array containing for each bus in each period the
utility as a float.
"""
np.random.seed(seed)
num_states = ev.shape[0]
disc_utility = 0.0
absorbing_state = 0
for _ in range(num_buses):
new_state = 0
for period in range(num_periods):
old_state = new_state
intermediate_state, decision, utility = decide(
old_state,
costs,
disc_fac,
ev,
)
state_increase = draw_increment(intermediate_state, trans_mat)
disc_utility += disc_fac**period * utility
new_state = intermediate_state + state_increase
if new_state > num_states:
new_state = num_states
if new_state == num_states:
absorbing_state = 1
disc_utility /= num_buses
return disc_utility, absorbing_state
# This was an old attempt to implement more shocks than the standard gumbel. Would do
# this much different now!!!! Just keep it for further work!
# def get_unobs_data(shock):
# # If no specification on the shocks is given. A right skewed gumbel distribution
# # with mean 0 and scale pi^2/6 is assumed for each shock component.
# shock = (
# (
# pd.Series(index=["loc"], data=[], name="gumbel"),
# pd.Series(index=["loc"], data=[-np.euler_gamma], name="gumbel"),
# )
# if shock is None
# else shock
# )
#
# loc_scale = np.zeros((2, 2), dtype=float)
# for i, params in enumerate(shock):
# if "loc" in params.index:
# loc_scale[i, 0] = params["loc"]
# else:
# loc_scale[i, 0] = 0
# if "scale" in params.index:
# loc_scale[i, 1] = params["scale"]
# else:
# loc_scale[i, 1] = 1
#
# return shock[0].name, shock[1].name, loc_scale
#
#
# @numba.jit(nopython=True)
# def draw_unob(dist_name, loc, scale):
# if dist_name == "gumbel":
# return np.random.gumbel(loc, scale)
# elif dist_name == "normal":
# return np.random.normal(loc, scale)
# else:
# raise ValueError