Source code for msppy.evaluation

from msppy.utils.statistics import (rand_int,check_random_state,compute_CI,
allocate_jobs)
import pandas
import time
import numpy
import multiprocessing


class _Evaluation(object):
    """Evaluation base class.

    Parameters
    ----------
    MSP: list
        A multi-stage stochastic program object.

    Attributes
    ----------
    db: list
        The deterministic bounds.

    pv: list
        The simulated policy values.

    epv: float
        The exact value of expected policy value (only available for
        approximation model).

    CI: tuple
        The CI of simulated policy values.

    gap: float
        The gap between upper end of the CI and deterministic bound.

    stage_cost: dataframe
        The cost of individual stage models.

    solution: dataframe
        The solution of queried variables.

    n_sample_paths: int
        The number of sample paths to evaluate policy.

    sample_paths_idx: list
        The index list of exhaustive sample paths if simulation is turned off.

    markovian_samples:
        The simulated Markovian type samples.

    markovian_idx: list
        The Markov state that is the closest to the markovian_samples.
    """
    def __init__(self, MSP):
        self.MSP = MSP
        self.db = MSP.db
        self.pv = None
        self.CI = None
        self.epv = None
        self.gap = None
        self.stage_cost = None
        self.solution = None
        self.n_sample_paths = None
        self.sample_path_idx = None
        self.markovian_idx = None
        self.markovian_samples = None
        self.solve_true = False

    def _compute_gap(self):
        if self.MSP.measure != 'risk neutral':
            self.gap = -1
            return
        try:
            MSP = self.MSP
            if self.CI is not None:
                if MSP.sense == 1:
                    self.gap = abs( (self.CI[1]-self.db) / self.db )
                else:
                    self.gap = abs( (self.db-self.CI[0]) / self.db )
            elif self.epv is not None:
                self.gap = abs( (self.epv-self.db) / self.db )
            else:
                self.gap = abs( (self.pv[0]-self.db) / self.db )
        except ZeroDivisionError:
            self.gap = -1

    def _compute_sample_path_idx_and_markovian_path(self):
        pass

    def run(
            self,
            n_simulations,
            percentile=95,
            query=None,
            query_T=None,
            query_dual=None,
            query_stage_cost=False,
            n_processes=1,):
        """Run a Monte Carlo simulation to evaluate the policy.

        Parameters
        ----------
        n_simulations: int/-1
            If int: the number of simulations;
            If -1: exhuastive evaluation.

        percentile: float, optional (default=95)
            The percentile used to compute the confidence interval.

        query: list, optional (default=None)
            The names of variables that are intended to query.

        query_dual: list, optional (default=None)
            The names of constraints whose dual variables are intended to query.

        query_stage_cost: bool, optional (default=False)
            Whether to query values of individual stage costs.

        n_processes: int, optional (default=1)
            The number of processes to run the simulation.

        T: int, optional (default=None)
            For infinite horizon problem, the number stages to evaluate the policy.
        """
        MSP = self.MSP
        query_T = query_T if query_T else MSP.T
        if not MSP._flag_infinity:
            from msppy.solver import SDDP
            self.solver = SDDP(MSP)
        else:
            from msppy.solver import PSDDP
            self.solver = PSDDP(MSP)
            self.solver.forward_T = query_T
        self.n_simulations = n_simulations
        self._compute_sample_path_idx_and_markovian_path(query_T)
        self.pv = numpy.zeros(self.n_sample_paths)
        stage_cost = solution = solution_dual = None
        if query_stage_cost:
            stage_cost = [
                multiprocessing.RawArray("d",[0] * (query_T))
                for _ in range(self.n_sample_paths)
            ]
        if query is not None:
            solution = {
                item: [
                    multiprocessing.RawArray("d",[0] * (query_T))
                    for _ in range(self.n_sample_paths)
                ]
                for item in query
            }
        if query_dual is not None:
            solution_dual = {
                item: [
                    multiprocessing.RawArray("d",[0] * (query_T))
                    for _ in range(self.n_sample_paths)
                ]
                for item in query_dual
            }
        n_processes = min(self.n_sample_paths, n_processes)
        jobs = allocate_jobs(self.n_sample_paths, n_processes)
        pv = multiprocessing.Array("d", [0] * self.n_sample_paths)
        procs = [None] * n_processes
        for p in range(n_processes):
            procs[p] = multiprocessing.Process(
                target=self.run_single,
                args=(pv,jobs[p],query,query_dual,query_stage_cost,stage_cost,
                    solution,solution_dual)
            )
            procs[p].start()
        for proc in procs:
            proc.join()
        if self.n_simulations != 1:
            self.pv = [item for item in pv]
        else:
            self.pv = pv[0]
        if self.n_simulations == -1:
            self.epv = numpy.dot(
                pv,
                [
                    MSP._compute_weight_sample_path(self.sample_path_idx[j])
                    for j in range(self.n_sample_paths)
                ],
            )
        if self.n_simulations not in [-1,1]:
            self.CI = compute_CI(self.pv, percentile)
        self._compute_gap()
        if query is not None:
            self.solution = {
                k: pandas.DataFrame(
                    numpy.array(v)
                ) for k, v in solution.items()
            }
        if query_dual is not None:
            self.solution_dual = {
                k: pandas.DataFrame(
                    numpy.array(v)
                ) for k, v in solution_dual.items()
            }
        if query_stage_cost:
            self.stage_cost = pandas.DataFrame(numpy.array(stage_cost))

    def run_single(self, pv, jobs, query=None, query_dual=None,
            query_stage_cost=False, stage_cost=None,
            solution=None, solution_dual=None):
        random_state = numpy.random.RandomState([2**32-1, jobs[0]])
        MSP = self.MSP
        markovian_samples = markovian_idices = None
        solver = self.solver
        if MSP._type == "Markovian" and self.solve_true:
            markovian_samples = MSP.Markovian_uncertainty(
                random_state,len(jobs))
            markovian_idices = numpy.zeros([len(jobs),solver.forward_T],dtype=int)
            for t in range(1,solver.forward_T):
                idx,_ = solver._compute_idx(t)
                dist = numpy.empty([len(jobs),MSP.n_Markov_states[idx]])
                for i, markov_state in enumerate(MSP.Markov_states[idx]):
                    temp = markovian_samples[:,t,:] - markov_state
                    dist[:,i] = numpy.sum(temp**2, axis=1)
                markovian_idices[:,t] = numpy.argmin(dist,axis=1)

        for idx,j in enumerate(jobs):
            sample_path_idx = (self.sample_path_idx[j]
                if self.sample_path_idx is not None else None)
            markovian_idx = (markovian_idices[idx]
                if markovian_idices is not None else None)
            markovian_sample = (markovian_samples[idx]
                if markovian_samples is not None else None)
            result = self.solver._forward(
                random_state=random_state,
                sample_path_idx=sample_path_idx,
                markovian_idx=markovian_idx,
                markovian_samples=markovian_sample,
                solve_true=self.solve_true,
                query=query,
                query_dual=query_dual,
                query_stage_cost=query_stage_cost
            )
            if query is not None:
                for item in query:
                    for i in range(len(solution[item][0])):
                        solution[item][j][i] = result['solution'][item][i]
            if query_dual is not None:
                for item in solution_dual:
                    for i in range(len(solution_dual[item][0])):
                        solution_dual[item][j][i] = result['solution_dual'][item][i]
            if query_stage_cost:
                for i in range(len(stage_cost[0])):
                    stage_cost[j][i] = result['stage_cost'][i]
            pv[j] = result['pv']

[docs]class Evaluation(_Evaluation): __doc__ = _Evaluation.__doc__
[docs] def run(self, *args, **kwargs): super().run(*args, **kwargs)
def _compute_sample_path_idx_and_markovian_path(self, T): if self.n_simulations == -1: self.n_sample_paths,self.sample_path_idx = self.MSP._enumerate_sample_paths(T-1) else: self.n_sample_paths = self.n_simulations
[docs]class EvaluationTrue(Evaluation): __doc__ = Evaluation.__doc__
[docs] def run(self, *args, **kwargs): MSP = self.MSP if MSP.__class__.__name__ == 'MSIP': MSP._back_binarize() # discrete finite model should call evaluate instead if ( MSP._type in ["stage-wise independent", "Markov chain"] and MSP._individual_type == "original" and not hasattr(MSP,"bin_stage") ): return super().run(*args, **kwargs) return _Evaluation.run(self, *args, **kwargs)
def _compute_sample_path_idx_and_markovian_path(self, T): MSP = self.MSP if ( MSP._type in ["stage-wise independent", "Markov chain"] and MSP._individual_type == "original" and not hasattr(MSP,"bin_stage") ): return super()._compute_sample_path_idx_and_markovian_path(T) self.n_sample_paths = self.n_simulations self.solve_true = True