reskit.workflow_manager
=======================

.. py:module:: reskit.workflow_manager


Classes
-------

.. autoapisummary::

   reskit.workflow_manager.WorkflowManager
   reskit.workflow_manager.WorkflowQueue


Functions
---------

.. autoapisummary::

   reskit.workflow_manager._split_locs
   reskit.workflow_manager.distribute_workflow
   reskit.workflow_manager.load_workflow_result
   reskit.workflow_manager.execute_workflow_iteratively


Module Contents
---------------

.. py:class:: WorkflowManager(placements)

   The WorkflowManager class assists with the construction of more specialized WorkflowManagers,
   such as the WindWorkflowManager or the SolarWorkflowManager. In addition to providing the
   general structure for simulation workflow management, the WorkflowManager also defines
   functionalities which should be common across all WorkflowManagers.

   This includes:
     - Basic initialization
     - Time domain management
     - Reading weather data
     - Adjusting variables by a long-run-average value
     - Applying simple loss factors
     - Saving the state of WorkflowManagers to XArray datasets, either in memory or on disc

   Initialization:
   ---------------

   WorkflowManager( placements )



   .. py:attribute:: placements


   .. py:attribute:: locs
      :value: None



   .. py:attribute:: ext


   .. py:attribute:: sim_data


   .. py:attribute:: time_index
      :value: None



   .. py:attribute:: workflow_parameters


   .. py:method:: set_time_index(times)

      Sets the time index of the WorkflowManager

      :param times: The timesteps to use throughout the WorkflowManager's life cycle. The
                    length of this dataset must match the shape of data which is loaded into
                    the WorkflorManager.sim_data member.
      :type times: pd.DatetimeIndex



   .. py:method:: _set_sim_shape()


   .. py:method:: extract_raster_values_at_placements(raster, **kwargs)

      Extracts pixel values at each of the configured placements from the specified raster file



   .. py:method:: read(variables, source_type, source, set_time_index = False, spatial_interpolation_mode = 'bilinear', temporal_reindex_method = 'nearest', time_index_from=None, **kwargs)

      Reads the specified variables from the NetCDF4-style weather dataset, and then extracts
      those variables for each of the coordinates configured in `.placements`. The resulting
      data is then available in `.sim_data`.

      :param variables: The variables (or variables) to be read from the specified source
                        - If a path to a weather source is given, then only the 'standard' variables
                        configured for that source type are available (see the doc string for the
                        weather source you are interested in)
                        - If either 'elevated_wind_speed' or 'surface_wind_speed' is included in the
                        variable list, then the members `.elevated_wind_speed_height` and
                        `.surface_wind_speed_height`, respectfully, are also added. These are constants
                        which specify what the 'native' wind speed height is, which depends on the source
                        - A pre-loaded NCSource can also be given, thus allowing for any variable in the
                        source to be specified in the `variables` list. But the user needs to take care
                        of initializing the NCSource and loading the data they want
      :type variables: str or list of strings
      :param source_type: The type of weather datasource which is to be loaded. Can be one of:
                          "ERA5", "SARAH", "MERRA", or 'user'
                          - If a pre-loaded NCSource is given for the `source` object, then the `source_type`
                          should be "user"
      :type source_type: str
      :param source: The source to read weather variables from
      :type source: str or rk.weather.NCSource
      :param set_time_index: If True, instructs the workflow manager to set the time index to that which is read
                             from the weather source
                             - By default False
      :type set_time_index: bool, optional
      :param spatial_interpolation_mode: The spatial interpolation mode to use while reading data from the weather source at
                                         each of the placement coordinates
                                         - By default "bilinear"
      :type spatial_interpolation_mode: str, optional
      :param temporal_reindex_method: In the event of missing data, this algorithm is used to fill in the missing data.
                                      - Can be, for example, "nearest", "ffill", "bfill", "interpolate"
                                      - By default "nearest"
      :type temporal_reindex_method: str, optional

      :returns: Returns the invoking WorkflowManager (for chaining)
      :rtype: WorkflowManager

      :raises RuntimeError: If set_time_index is False but no `.time_index` exists
      :raises RuntimeError: If source_type is unknown



   .. py:method:: get_scalar_values_from_raster(fp, spatial_interpolation, points=None)

      Auxiliary function to extract raster values with NaN fallback options.



   .. py:method:: adjust_variable_to_long_run_average(variable, source_long_run_average, real_long_run_average, real_lra_scaling = 1, spatial_interpolation = 'linear-spline', nodata_fallback = 'nan', nodata_fallback_scaling = 1, allow_nans = True)

      Adjusts the average mean of the specified variable to a known long-run-average

      Note:
      -----
      uses the equation: variable[t] = variable[t] * real_long_run_average / source_long_run_average

      :param variable: The variable to be adjusted
      :type variable: str
      :param source_long_run_average: The variable's native long run average (the average in the weather file)
                                      - If a string is given, it is expected to be a path to a raster file which can be
                                      used to look up the average values from using the coordinates in `.placements`
                                      - If a numpy ndarray (or derivative) is given, the shape must be one of (time, placements)
                                      or at least (placements)
      :type source_long_run_average: Union[str, float, np.ndarray]
      :param real_long_run_average: The variables 'true' long run average
                                    - If a string is given, it is expected to be a path to a raster file which can be
                                    used to look up the average values from using the coordinates in `.placements`
                                    - If a numpy ndarray (or derivative) is given, the shape must be one of (time, placements)
                                    or at least (placements)
      :type real_long_run_average: Union[str, float, np.ndarray]
      :param real_lra_scaling: An optional scaling factor to apply to the values derived from `real_long_run_average`.
                               - This is primarily useful when `real_long_run_average` is a path to a raster file
                               - By default 1
      :type real_lra_scaling: float, optional
      :param spatial_interpolation: When either `source_long_run_average` or `real_long_run_average` are a path to a raster
                                    file, this input specifies which interpolation algorithm should be used
                                    - Options are: "near", "linear-spline", "cubic-spline", "average"
                                    - By default "linear-spline"
                                    - See for more info: geokit.raster.interpolateValues
      :type spatial_interpolation: str, optional
      :param nodata_fallback: When real_long_run_average has no data, one can decide between different fallback options, by default np.nan:
                              - np.nan or None : return np.nan for missing values in real_long_run_average
                              - float : Apply this float value as a scaling factor for all no-data locations only: source_long_run_average * nodata_fallback.
                              NOTE: A value of 1.0 will return the source lra value in case of missing real lra values (no additional nodata_fallback_scaling applied).
                              - str : Will be interpreted as a filepath to a raster with alternative real_long_run_average values, scaled by nodata_fallback_scaling.
                              - callable : any callable method taking the arguments (all iterables): 'locs' and 'source_long_run_average_value'
                              (the locations as gk.geom.point objects and original value from source data). The output values will be considered as
                              the new real_long_run_average for missing locations only (absolute data, no additional nodata_fallback_scaling applied).

                              NOTE: np.nan will also be returned in case that the nodata fallback does not yield values either.
      :type nodata_fallback: float, str, callable, optional
      :param nodata_fallback_scaling: An optional scaling factor to apply to the values derived from `nodata_fallback`.
                                      - This is primarily useful when `nodata_fallback` is a path to a raster file
                                      - By default 1
      :type nodata_fallback_scaling: float
      :param allow_nans: If True, NaN values may remain after scaling, else an error will raised. By default True.
      :type allow_nans: boolean, optional

      :returns: Returns the invoking WorkflowManager (for chaining)
      :rtype: WorkflowManager



   .. py:method:: spatial_disaggregation(variable, source_high_resolution, source_low_resolution, real_lra_scaling = 1, spatial_interpolation = 'linear-spline')

      [summary]

      :param variable: [description]
      :type variable: str
      :param source_long_run_average: [description]
      :type source_long_run_average: Union[str, float, np.ndarray]
      :param real_long_run_average: [description]
      :type real_long_run_average: Union[str, float, np.ndarray]
      :param real_lra_scaling: [description], by default 1
      :type real_lra_scaling: float, optional
      :param spatial_interpolation: [description], by default "linear-spline"
      :type spatial_interpolation: str, optional



   .. py:method:: apply_loss_factor(loss, variables = ['capacity_factor'])

      Applies a loss factor onto a specified variable

      :param loss: The loss factor(s) to be applied
                   - If a float or a numpy ndarray is given, then the following operation is performed:
                   > variable = variable * (1 - loss)
                   - If a function is given, then  the following operation is performed:
                   > variable = variable * (1 - loss(variable) )
                   - If a numpy ndarray is given, it must be broadcastable to the variable's shape in
                   `.sim_data`
      :type loss: Union[float, np.ndarray, FunctionType]
      :param variables: The variable or variables to apply the loss factor to
                        - By default ["capacity_factor"]
      :type variables: Union[str, List[str]], optional

      :returns: Returns the invoking WorkflowManager (for chaining)
      :rtype: WorkflowManager



   .. py:method:: register_workflow_parameter(key, value)

      Add a parameter to the WorkflowManager which will be included in the output XArray dataset

      :param key: The workflow parameter's access key
      :type key: str
      :param value: The workflow parameter's value. Only strings and floats are allowed
      :type value: Union[str,float]



   .. py:method:: to_xarray(output_netcdf_path = None, output_variables = None, custom_attributes = None, _intermediate_dict=False)

      Generates an XArray dataset from the data currently contained in the WorkflowManager

      Note:
      - The `.placements` data is automatically added to the XArray dataset along the 'locations' dimension
      - The `workflow_parameters` data is automatically added as dimensionless variables
      - The `.sim_data` is automatically added along the dimensions (time, locations)
      - The `.time_index` is automatically added along the dimension 'time'

      :param output_netcdf_path: If given, the XArray dataset will be written to disc at the specified path
                                 - By default None
      :type output_netcdf_path: str, optional
      :param output_variables: If given, specifies the variables which should be included in the resulting
                               dataset. Otherwise all suitable variables found in `.placements`, `.workflow_parameters`,
                               `.sim_data`, and `.time_index` will be included
                               - Only variables of numeric or string type are suitable due to NetCDF4 limitations
                               - By default None
      :type output_variables: List[str], optional
      :param custom_attributes: If given, adds the key-value pairs as attributes to the XArray dataset
                                - These will be added in addition to the workflow_parameters
                                - By default None
      :type custom_attributes: dict, optional

      :returns: The resulting XArray dataset
      :rtype: xarray.Dataset



   .. py:method:: to_netcdf(xds, output_netcdf_path = None, output_variables = None, custom_attributes = None, _intermediate_dict=False)

      Saves an XArray dataset to netCDF4 format

      Note:
      - The `.placements` data is automatically added to the XArray dataset along the 'locations' dimension
      - The `workflow_parameters` data is automatically added as dimensionless variables
      - The `.sim_data` is automatically added along the dimensions (time, locations)
      - The `.time_index` is automatically added along the dimension 'time'

      :param xds: The XArray dataset to save
      :type xds: xarray.Dataset
      :param output_netcdf_path: If given, the XArray dataset will be written to disc at the specified path
                                 - By default None
      :type output_netcdf_path: str
      :param output_variables: If given, specifies the variables which should be included in the resulting
                               dataset. Otherwise all suitable variables found in `.placements`, `.workflow_parameters`,
                               `.sim_data`, and `.time_index` will be included
                               - Only variables of numeric or string type are suitable due to NetCDF4 limitations
                               - By default None
      :type output_variables: List[str], optional
      :param custom_attributes: If given, adds the key-value pairs as attributes to the XArray dataset before saving
                                - These will be added in addition to existing attributes
                                - By default None
      :type custom_attributes: dict, optional

      :returns: The resulting output_netcdf_path
      :rtype: output_netcdf_path



.. py:function:: _split_locs(placements, groups)

.. py:function:: distribute_workflow(workflow_function, placements, jobs = 2, max_batch_size = None, intermediate_output_dir = None, **kwargs)

   Distributes a RESKit simulation workflow across multiple CPUs

   Parallelism is achieved by breaking up the placements dataframe into placement groups via
     KMeans grouping

   :param workflow_function: The workflow function to be parallelized
                             - All RESKit workflow functions should be suitable here
                             - If you want to make your own function, the only requirement is that its first argument
                             should be a pandas DataFrame in the form of a placements table (i.e. has a 'lat' and
                             'lon' column)
                             - Don't forget that that all inputs required for the workflow function are still required,
                             and are passed on as constants through any specified `kwargs`
   :type workflow_function: FunctionType
   :param placements: A DataFrame describing the placements to be simulated
                      For example, if you are simulating wind turbines, the following columns are likely required:
                      ['lon','lat','capacity','hub_height','rotor_diam',]
   :type placements: pandas.DataFrame
   :param jobs: The number of parallel jobs
                - By default 2
   :type jobs: int, optional
   :param max_batch_size: If given, limits the maximum number of total placements which are simulated in parallel
                          - Use this to reduce the memory requirements of the simulations (in turn increasing
                          overall simulation time)
                          - By default None
   :type max_batch_size: int, optional
   :param intermediate_output_dir: In case of very large outputs (which are too large to be joined into a singular XArray dataset),
                                   use this to write the individual simulation results to the specified directory
                                   - By default None
   :type intermediate_output_dir: str, optional
   :param \*\*kwargs: All all key word arguments are passed on as constants to each simulation
                      - Use these to set the required arguments for the given ``workflow_function``

   :returns: An XArray Dataset which contains the combined results of the distributed simulations
   :rtype: xarray.Dataset


.. py:function:: load_workflow_result(datasets, loader=xarray.load_dataset, sortby='location')

.. py:function:: execute_workflow_iteratively(workflow, weather_path_varname, zoom=None, **workflow_args)

   The function executes the indicated workflow iteratively, iterating over weather tiles. The appropriate weather
   tile per placement is extracted automatically and placements are batched together based on weather tile.

   workflow : RESkit workflow
       Callable workflow function, e.g. reskit.wind.wind_era5_2023

   weather_path_varname : str
       Str formatted name of the weather path variable in this workflow, e.g. 'era5_path' for
       reskit.wind.wind_era5_2023. Must must be a key of workflow_args.

   zoom : int, optional
       The zoom level of the weather tiles, required only if <X-TILE> or <Y-TILE> in weather path.

   **workflow_args
       Passed on to the workflow specified above. Must contain ''placements'' and the above
       weather_path_varname as keys.


.. py:class:: WorkflowQueue(workflow, **kwargs)

   The WorkflowQueue object allows for the queueing of multiple RESKit workflow simulations
   which are then executed in parallel

   Initialize:
   -----------
   WorkflowFunction( workflow:FunctionType, **kwargs )

   :param workflow:
   :type workflow: FunctionType
   :param The workflow function to be parallelized:
   :param - All RESKit workflow functions should be suitable here:
   :param - Don't forget that that all inputs required for the workflow function are still required:
   :param :
   :param and are passed on either as constants through ``kwargs`` specified in the initializer:
   :param or:
   :param else in the subsequent ''.append(...)'' calls:
   :param \*\*kwargs: All key word arguments are passed on as constants to each simulation
                      Use these to set the required arguments for the given ``workflow``


   .. py:attribute:: workflow


   .. py:attribute:: constants


   .. py:attribute:: queue


   .. py:method:: append(key, **kwargs)

      Appends a simulation set the current queue

      :param key: The access key to use for this simulation set
      :type key: str
      :param \*\*kwargs: All other keyword arguments are passed on to the simulation
                         for only this simulation



   .. py:method:: execute(jobs = 1)

      Executes all of the simulation sets that are currently in the queue

      :param jobs: The number of parallel jobs, by default 1
      :type jobs: int, optional

      :returns: The results of each simulation set, accessible via their access keys
      :rtype: OrderedDict[xarray.Dataset]



