Skip to content

MESQUAL Granularity Modules

TimeSeriesGranularityAnalyzer

Analyzes and validates time granularity in DatetimeIndex sequences.

This class provides tools for working with time series data that may have varying granularities (e.g., hourly, quarter-hourly). It's particularly useful for electricity market data analysis where different market products can have different time resolutions.

Features
  • Granularity detection for time series data
  • Support for mixed granularities within the same series
  • Strict mode for validation scenarios
  • Per-day granularity analysis

Parameters:

Name Type Description Default
strict_mode bool

If True, raises GranularityError when multiple granularities are detected. If False, only issues warnings. Default is True.

True

Example:

>>> analyzer = TimeSeriesGranularityAnalyzer()
>>> index = pd.date_range('2024-01-01', periods=24, freq='h')
>>> analyzer.get_granularity_as_timedelta(index)
    Timedelta('1 hours')
Source code in submodules/mesqual/mesqual/energy_data_handling/granularity_analyzer.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class TimeSeriesGranularityAnalyzer:
    """Analyzes and validates time granularity in DatetimeIndex sequences.

    This class provides tools for working with time series data that may have varying
    granularities (e.g., hourly, quarter-hourly). It's particularly useful for
    electricity market data analysis where different market products can have
    different time resolutions.

    Features:
        - Granularity detection for time series data
        - Support for mixed granularities within the same series
        - Strict mode for validation scenarios
        - Per-day granularity analysis

    Args:
        strict_mode: If True, raises GranularityError when multiple granularities
            are detected. If False, only issues warnings. Default is True.

    Example:

        >>> analyzer = TimeSeriesGranularityAnalyzer()
        >>> index = pd.date_range('2024-01-01', periods=24, freq='h')
        >>> analyzer.get_granularity_as_timedelta(index)
            Timedelta('1 hours')
    """

    def __init__(self, strict_mode: bool = True):
        self._strict_mode = strict_mode

    @property
    def strict_mode(self) -> bool:
        return self._strict_mode

    @strict_mode.setter
    def strict_mode(self, value: bool):
        self._strict_mode = value

    @_validate_dt_index
    def get_granularity_as_series_of_timedeltas(self, dt_index: pd.DatetimeIndex) -> pd.Series:
        s = pd.Series(index=dt_index)
        s = s.groupby(dt_index.date).apply(lambda g: self._get_granularity_for_day(g.index))
        s = s.droplevel(0, axis=0)
        if dt_index.name is not None:
            s.index.name = dt_index.name
        return s

    @_validate_dt_index
    def get_granularity_as_series_of_minutes(self, dt_index: pd.DatetimeIndex) -> pd.Series:
        return self.get_granularity_as_series_of_timedeltas(dt_index).apply(lambda x: x.total_seconds() / 60)

    @_validate_dt_index
    def get_granularity_as_series_of_hours(self, dt_index: pd.DatetimeIndex) -> pd.Series:
        return self.get_granularity_as_series_of_timedeltas(dt_index).apply(lambda x: x.total_seconds() / 3600)

    def _get_granularity_for_day(self, dt_index: pd.DatetimeIndex) -> pd.Series:
        gran = dt_index.to_series().diff().shift(-1)
        if len(gran) > 1:
            gran.iloc[-1] = gran.iloc[-2]

        if gran.nunique() > 1:
            msg = f'Multiple granularities identified within a day: {gran.unique()}'
            if self._strict_mode:
                raise GranularityError(msg)
            warnings.warn(msg)
        return gran

    @_validate_dt_index
    def get_granularity_as_timedelta(self, dt_index: pd.DatetimeIndex) -> pd.Timedelta:
        if len(dt_index) == 0:
            return pd.Timedelta(0)

        gran = self.get_granularity_as_series_of_timedeltas(dt_index)
        first_gran = gran.iloc[0]

        if gran.nunique() > 1:
            msg = (f'Multiple granularities found: {gran.unique()}. '
                   f'Using {first_gran} as the reference granularity.')
            if self._strict_mode:
                raise GranularityError(msg)
            warnings.warn(msg)
        return first_gran

    @_validate_dt_index
    def get_granularity_as_hours(self, dt_index: pd.DatetimeIndex) -> float:
        return self.get_granularity_as_timedelta(dt_index).total_seconds() / 3600

    @_validate_dt_index
    def get_granularity_as_minutes(self, dt_index: pd.DatetimeIndex) -> float:
        return self.get_granularity_as_timedelta(dt_index).total_seconds() / 60

    @_validate_dt_index
    def validate_constant_granularity(self, dt_index: pd.DatetimeIndex, expected_hours: float) -> bool:
        actual_hours = self.get_granularity_as_hours(dt_index)
        return abs(actual_hours - expected_hours) < 1e-10

TimeSeriesGranularityConverter

Converts time series between different granularities while respecting the nature of the quantity.

This class handles the conversion of time series data between different granularities (e.g., hourly to 15-min or vice versa) while properly accounting for the physical nature of the quantity being converted:

  • Intensive quantities (e.g., prices, power levels) are replicated when increasing granularity and averaged when decreasing granularity.

  • Extensive quantities (e.g., volumes, welfare) are split when increasing granularity and summed when decreasing granularity.

Features
  • Automatic granularity detection using TimeGranularityAnalyzer
  • Per-day processing to handle missing periods properly and prevent incorrect autofilling of missing days
  • Support for both intensive and extensive quantities
  • Timezone-aware processing including daylight saving transitions
Source code in submodules/mesqual/mesqual/energy_data_handling/granularity_converter.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
class TimeSeriesGranularityConverter:
    """Converts time series between different granularities while respecting the nature of the quantity.

    This class handles the conversion of time series data between different granularities
    (e.g., hourly to 15-min or vice versa) while properly accounting for the physical
    nature of the quantity being converted:

    - Intensive quantities (e.g., prices, power levels) are replicated when increasing
      granularity and averaged when decreasing granularity.

    - Extensive quantities (e.g., volumes, welfare) are split when increasing
      granularity and summed when decreasing granularity.

    Features:
        - Automatic granularity detection using TimeGranularityAnalyzer
        - Per-day processing to handle missing periods properly and prevent incorrect autofilling of missing days
        - Support for both intensive and extensive quantities
        - Timezone-aware processing including daylight saving transitions
    """
    def __init__(self):
        """Initialize the granularity converter with analyzer instances.

        Creates both strict and non-strict granularity analyzers for different
        validation requirements during conversion operations.
        """
        self._strict_gran_analyzer = TimeSeriesGranularityAnalyzer(strict_mode=True)
        self._non_strict_gran_analyzer = TimeSeriesGranularityAnalyzer(strict_mode=False)

    def _validate_series_format(self, series: pd.Series) -> None:
        """Validate that the input series has the required DatetimeIndex format.

        Args:
            series: Time series to validate

        Raises:
            TypeError: If series index is not a DatetimeIndex
        """
        if not isinstance(series.index, pd.DatetimeIndex):
            raise TypeError(f"Series index must be DatetimeIndex, got {type(series.index)}")

    def upsample_through_fillna(
            self,
            data: pd.DataFrame | pd.Series,
            quantity_type: QuantityTypeEnum
    ) -> pd.DataFrame | pd.Series:
        """Upsample data using forward-fill strategy with quantity-type-aware scaling.

        This method handles upsampling of sparse data where some values are missing.
        It uses forward-fill to propagate values and applies appropriate scaling
        based on the quantity type:

        - For INTENSIVE quantities: Values are replicated without scaling
        - For EXTENSIVE quantities: Values are divided by the number of periods
          they are spread across within each hour-segment group

        The method processes data per day and hour to handle missing periods properly
        and prevent incorrect auto-filling across day boundaries.

        Args:
            data: Time series data to upsample (Series or DataFrame)
            quantity_type: Type of quantity being converted (INTENSIVE or EXTENSIVE)

        Returns:
            Upsampled data with same type as input

        Example:

            >>> # For extensive quantities (energy), values are divided
            >>> series = pd.Series([100, np.nan, np.nan, np.nan, 200, np.nan, np.nan, np.nan],
            ...                   index=pd.date_range('2024-01-01', freq='15min', periods=5))
            >>> converter.upsample_through_fillna(series, QuantityTypeEnum.EXTENSIVE)
                # Results in [25, 25, 25, 25, 50, 50, 50, 50]
        """
        if isinstance(data, pd.Series):
            return self._upsample_series(data, quantity_type)

        tmp = data.copy().sort_index()
        idx = tmp.index.tz_convert('UTC') if tmp.index.tz is not None else tmp.index

        if quantity_type == QuantityTypeEnum.EXTENSIVE:
            segment_patterns = tmp.notna().cumsum()

            # Group columns by their segment pattern
            pattern_to_cols = {}
            for col in tmp.columns:
                pattern = tuple(segment_patterns[col].values)  # Convert to tuple to make it hashable
                pattern_to_cols.setdefault(pattern, []).append(col)

            # Process each group of columns with same pattern
            result_pieces = []
            for pattern, cols in pattern_to_cols.items():
                segments = segment_patterns[cols[0]]  # Take segments from first column (all are same)
                piece = (
                    tmp[cols]
                    .groupby([idx.date, idx.hour, segments])
                    .transform(lambda s: s.ffill() / len(s))
                )
                result_pieces.append(piece)

            return pd.concat(result_pieces, axis=1).loc[data.index].rename_axis(data.columns.names, axis=1)
        else:
            return tmp.groupby([idx.date, idx.hour]).ffill().loc[data.index]

    def _upsample_series(self, series: pd.Series, quantity_type: QuantityTypeEnum) -> pd.Series:
        """Helper method to upsample a Series using DataFrame-based upsampling.

        Args:
            series: Time series to upsample
            quantity_type: Type of quantity being converted

        Returns:
            Upsampled series
        """
        return self.upsample_through_fillna(
            series.to_frame(),
            quantity_type
        ).iloc[:, 0]

    def convert_to_target_index(
            self,
            series: pd.Series,
            target_index: pd.DatetimeIndex,
            quantity_type: QuantityTypeEnum
    ) -> pd.Series:
        """Convert a time series to match a specific target DatetimeIndex.

        This method converts the granularity of a time series to match the granularity
        of a target index. The target index must have consistent granularity within
        each day and consistent granularity across all days.

        Args:
            series: Source time series to convert
            target_index: DatetimeIndex defining the target granularity and timestamps
            quantity_type: Type of quantity (INTENSIVE or EXTENSIVE) for proper scaling

        Returns:
            Series converted to match target index granularity and timestamps

        Raises:
            ValueError: If target index has multiple granularities within days
                       or inconsistent granularity across days

        Example:

            >>> # Convert hourly to 15-min data
            >>> hourly_series = pd.Series([100, 150, 200], 
            ...                          index=pd.date_range('2024-01-01', freq='1H', periods=3))
            >>> target_idx = pd.date_range('2024-01-01', freq='15min', periods=12)
            >>> result = converter.convert_to_target_index(hourly_series, target_idx,
            ...                                          QuantityTypeEnum.INTENSIVE)
        """
        self._validate_series_format(series)
        target_gran_series = self._strict_gran_analyzer.get_granularity_as_series_of_timedeltas(target_index)
        _grouped = target_gran_series.groupby(target_gran_series.index.date)
        if _grouped.nunique().max() > 1:
            raise ValueError(f"Found some dates with multiple granularities within same day. Can't handle that!")
        if _grouped.first().nunique() > 1:
            raise ValueError(f"Found multiple granularities. Can't handle that!")
        target_granularity = pd.Timedelta(target_gran_series.values[0])
        return series.groupby(series.index.date).apply(
            lambda x: self._convert_date_to_target_granularity(x, target_granularity, quantity_type)
        ).droplevel(0).rename_axis(series.index.name).rename(series.name)

    def convert_to_target_granularity(
            self,
            series: pd.Series,
            target_granularity: pd.Timedelta,
            quantity_type: QuantityTypeEnum
    ) -> pd.Series:
        """Convert a time series to a specific target granularity.

        This method converts the temporal granularity of a time series while properly
        handling the physical nature of the quantity. The conversion is performed
        day-by-day to prevent incorrect handling of missing days or daylight saving
        time transitions.

        Args:
            series: Source time series to convert
            target_granularity: Target granularity as a pandas Timedelta
                               (e.g., pd.Timedelta(minutes=15) for 15-minute data)
            quantity_type: Type of quantity for proper scaling:
                          - INTENSIVE: Values are averaged/replicated (prices, power)
                          - EXTENSIVE: Values are summed/split (volumes, energy)

        Returns:
            Series with converted granularity, maintaining original naming and metadata

        Raises:
            GranularityConversionError: If conversion cannot be performed due to
                                       unsupported granularities or data issues

        Example:

            >>> # Convert 15-minute to hourly data (downsampling)
            >>> quarter_hourly = pd.Series([25, 30, 35, 40], 
            ...                           index=pd.date_range('2024-01-01', freq='15min', periods=4))
            >>> hourly = converter.convert_to_target_granularity(
            ...     quarter_hourly, pd.Timedelta(hours=1), QuantityTypeEnum.EXTENSIVE)
            >>> print(hourly)  # Result: [130] (25+30+35+40)
        """
        self._validate_series_format(series)
        return series.groupby(series.index.date).apply(
            lambda x: self._convert_date_to_target_granularity(x, target_granularity, quantity_type)
        ).droplevel(0).rename_axis(series.index.name).rename(series.name)

    def _convert_date_to_target_granularity(
        self,
        series: pd.Series,
        target_granularity: pd.Timedelta,
        quantity_type: QuantityTypeEnum
    ) -> pd.Series:
        """Convert granularity for a single date's worth of data.

        This internal method handles the actual conversion logic for data within
        a single date range. It determines whether upsampling, downsampling, or
        no conversion is needed, then applies the appropriate method.

        Args:
            series: Time series data for a single date (may span into next date)
            target_granularity: Target granularity as Timedelta
            quantity_type: Type of quantity for scaling decisions

        Returns:
            Converted series for the date period

        Raises:
            GranularityConversionError: If conversion parameters are invalid or
                                       unsupported granularities are encountered
        """
        if len(set(series.index.date)) > 2:
            raise GranularityConversionError('This method is intended for single-date conversion only.')
        source_gran = self._non_strict_gran_analyzer.get_granularity_as_series_of_minutes(series.index)
        if len(source_gran.unique()) > 1:
            raise GranularityConversionError('Cannot convert data with changing granularity within a single day.')
        source_gran_minutes = source_gran.values[0]
        target_gran_minutes = target_granularity.total_seconds() / 60

        _allowed_granularities = [1, 5, 15, 30, 60, 24*60]
        if target_gran_minutes not in _allowed_granularities:
            raise GranularityConversionError(
                f'Target granularity {target_gran_minutes} minutes not supported. '
                f'Allowed granularities: {_allowed_granularities} minutes'
            )

        if target_gran_minutes > source_gran_minutes:
            sampling = SamplingMethodEnum.DOWNSAMPLING
        elif target_gran_minutes < source_gran_minutes:
            sampling = SamplingMethodEnum.UPSAMPLING
        else:
            sampling = SamplingMethodEnum.KEEP

        if sampling == SamplingMethodEnum.UPSAMPLING:
            scaling_factor = source_gran_minutes / target_gran_minutes
            if (scaling_factor % 1) != 0:
                raise GranularityConversionError(
                    f'Source granularity ({source_gran_minutes} min) is not evenly divisible '
                    f'by target granularity ({target_gran_minutes} min)'
                )
            else:
                scaling_factor = int(scaling_factor)

            new_index = pd.date_range(
                start=series.index[0],
                end=series.index[-1],
                freq=f"{target_gran_minutes}min",
                tz=series.index.tz
            )

            # For intensive quantities, replicate values; for extensive, divide by scaling factor
            if quantity_type == QuantityTypeEnum.INTENSIVE:
                return series.reindex(new_index, method='ffill')
            else:  # EXTENSIVE
                return series.reindex(new_index, method='ffill') / scaling_factor

        elif sampling == SamplingMethodEnum.DOWNSAMPLING:
            groups = series.groupby(pd.Grouper(freq=f"{target_gran_minutes}min"))
            # For extensive quantities, sum the values; for intensive, take the mean
            func = 'sum' if quantity_type == QuantityTypeEnum.EXTENSIVE else 'mean'
            return groups.agg(func)

        else:  # SamplingMethodEnum.KEEP
            return series

        return series

__init__

__init__()

Initialize the granularity converter with analyzer instances.

Creates both strict and non-strict granularity analyzers for different validation requirements during conversion operations.

Source code in submodules/mesqual/mesqual/energy_data_handling/granularity_converter.py
51
52
53
54
55
56
57
58
def __init__(self):
    """Initialize the granularity converter with analyzer instances.

    Creates both strict and non-strict granularity analyzers for different
    validation requirements during conversion operations.
    """
    self._strict_gran_analyzer = TimeSeriesGranularityAnalyzer(strict_mode=True)
    self._non_strict_gran_analyzer = TimeSeriesGranularityAnalyzer(strict_mode=False)

upsample_through_fillna

upsample_through_fillna(data: DataFrame | Series, quantity_type: QuantityTypeEnum) -> DataFrame | Series

Upsample data using forward-fill strategy with quantity-type-aware scaling.

This method handles upsampling of sparse data where some values are missing. It uses forward-fill to propagate values and applies appropriate scaling based on the quantity type:

  • For INTENSIVE quantities: Values are replicated without scaling
  • For EXTENSIVE quantities: Values are divided by the number of periods they are spread across within each hour-segment group

The method processes data per day and hour to handle missing periods properly and prevent incorrect auto-filling across day boundaries.

Parameters:

Name Type Description Default
data DataFrame | Series

Time series data to upsample (Series or DataFrame)

required
quantity_type QuantityTypeEnum

Type of quantity being converted (INTENSIVE or EXTENSIVE)

required

Returns:

Type Description
DataFrame | Series

Upsampled data with same type as input

Example:

>>> # For extensive quantities (energy), values are divided
>>> series = pd.Series([100, np.nan, np.nan, np.nan, 200, np.nan, np.nan, np.nan],
...                   index=pd.date_range('2024-01-01', freq='15min', periods=5))
>>> converter.upsample_through_fillna(series, QuantityTypeEnum.EXTENSIVE)
    # Results in [25, 25, 25, 25, 50, 50, 50, 50]
Source code in submodules/mesqual/mesqual/energy_data_handling/granularity_converter.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def upsample_through_fillna(
        self,
        data: pd.DataFrame | pd.Series,
        quantity_type: QuantityTypeEnum
) -> pd.DataFrame | pd.Series:
    """Upsample data using forward-fill strategy with quantity-type-aware scaling.

    This method handles upsampling of sparse data where some values are missing.
    It uses forward-fill to propagate values and applies appropriate scaling
    based on the quantity type:

    - For INTENSIVE quantities: Values are replicated without scaling
    - For EXTENSIVE quantities: Values are divided by the number of periods
      they are spread across within each hour-segment group

    The method processes data per day and hour to handle missing periods properly
    and prevent incorrect auto-filling across day boundaries.

    Args:
        data: Time series data to upsample (Series or DataFrame)
        quantity_type: Type of quantity being converted (INTENSIVE or EXTENSIVE)

    Returns:
        Upsampled data with same type as input

    Example:

        >>> # For extensive quantities (energy), values are divided
        >>> series = pd.Series([100, np.nan, np.nan, np.nan, 200, np.nan, np.nan, np.nan],
        ...                   index=pd.date_range('2024-01-01', freq='15min', periods=5))
        >>> converter.upsample_through_fillna(series, QuantityTypeEnum.EXTENSIVE)
            # Results in [25, 25, 25, 25, 50, 50, 50, 50]
    """
    if isinstance(data, pd.Series):
        return self._upsample_series(data, quantity_type)

    tmp = data.copy().sort_index()
    idx = tmp.index.tz_convert('UTC') if tmp.index.tz is not None else tmp.index

    if quantity_type == QuantityTypeEnum.EXTENSIVE:
        segment_patterns = tmp.notna().cumsum()

        # Group columns by their segment pattern
        pattern_to_cols = {}
        for col in tmp.columns:
            pattern = tuple(segment_patterns[col].values)  # Convert to tuple to make it hashable
            pattern_to_cols.setdefault(pattern, []).append(col)

        # Process each group of columns with same pattern
        result_pieces = []
        for pattern, cols in pattern_to_cols.items():
            segments = segment_patterns[cols[0]]  # Take segments from first column (all are same)
            piece = (
                tmp[cols]
                .groupby([idx.date, idx.hour, segments])
                .transform(lambda s: s.ffill() / len(s))
            )
            result_pieces.append(piece)

        return pd.concat(result_pieces, axis=1).loc[data.index].rename_axis(data.columns.names, axis=1)
    else:
        return tmp.groupby([idx.date, idx.hour]).ffill().loc[data.index]

convert_to_target_index

convert_to_target_index(series: Series, target_index: DatetimeIndex, quantity_type: QuantityTypeEnum) -> Series

Convert a time series to match a specific target DatetimeIndex.

This method converts the granularity of a time series to match the granularity of a target index. The target index must have consistent granularity within each day and consistent granularity across all days.

Parameters:

Name Type Description Default
series Series

Source time series to convert

required
target_index DatetimeIndex

DatetimeIndex defining the target granularity and timestamps

required
quantity_type QuantityTypeEnum

Type of quantity (INTENSIVE or EXTENSIVE) for proper scaling

required

Returns:

Type Description
Series

Series converted to match target index granularity and timestamps

Raises:

Type Description
ValueError

If target index has multiple granularities within days or inconsistent granularity across days

Example:

>>> # Convert hourly to 15-min data
>>> hourly_series = pd.Series([100, 150, 200], 
...                          index=pd.date_range('2024-01-01', freq='1H', periods=3))
>>> target_idx = pd.date_range('2024-01-01', freq='15min', periods=12)
>>> result = converter.convert_to_target_index(hourly_series, target_idx,
...                                          QuantityTypeEnum.INTENSIVE)
Source code in submodules/mesqual/mesqual/energy_data_handling/granularity_converter.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def convert_to_target_index(
        self,
        series: pd.Series,
        target_index: pd.DatetimeIndex,
        quantity_type: QuantityTypeEnum
) -> pd.Series:
    """Convert a time series to match a specific target DatetimeIndex.

    This method converts the granularity of a time series to match the granularity
    of a target index. The target index must have consistent granularity within
    each day and consistent granularity across all days.

    Args:
        series: Source time series to convert
        target_index: DatetimeIndex defining the target granularity and timestamps
        quantity_type: Type of quantity (INTENSIVE or EXTENSIVE) for proper scaling

    Returns:
        Series converted to match target index granularity and timestamps

    Raises:
        ValueError: If target index has multiple granularities within days
                   or inconsistent granularity across days

    Example:

        >>> # Convert hourly to 15-min data
        >>> hourly_series = pd.Series([100, 150, 200], 
        ...                          index=pd.date_range('2024-01-01', freq='1H', periods=3))
        >>> target_idx = pd.date_range('2024-01-01', freq='15min', periods=12)
        >>> result = converter.convert_to_target_index(hourly_series, target_idx,
        ...                                          QuantityTypeEnum.INTENSIVE)
    """
    self._validate_series_format(series)
    target_gran_series = self._strict_gran_analyzer.get_granularity_as_series_of_timedeltas(target_index)
    _grouped = target_gran_series.groupby(target_gran_series.index.date)
    if _grouped.nunique().max() > 1:
        raise ValueError(f"Found some dates with multiple granularities within same day. Can't handle that!")
    if _grouped.first().nunique() > 1:
        raise ValueError(f"Found multiple granularities. Can't handle that!")
    target_granularity = pd.Timedelta(target_gran_series.values[0])
    return series.groupby(series.index.date).apply(
        lambda x: self._convert_date_to_target_granularity(x, target_granularity, quantity_type)
    ).droplevel(0).rename_axis(series.index.name).rename(series.name)

convert_to_target_granularity

convert_to_target_granularity(series: Series, target_granularity: Timedelta, quantity_type: QuantityTypeEnum) -> Series

Convert a time series to a specific target granularity.

This method converts the temporal granularity of a time series while properly handling the physical nature of the quantity. The conversion is performed day-by-day to prevent incorrect handling of missing days or daylight saving time transitions.

Parameters:

Name Type Description Default
series Series

Source time series to convert

required
target_granularity Timedelta

Target granularity as a pandas Timedelta (e.g., pd.Timedelta(minutes=15) for 15-minute data)

required
quantity_type QuantityTypeEnum

Type of quantity for proper scaling: - INTENSIVE: Values are averaged/replicated (prices, power) - EXTENSIVE: Values are summed/split (volumes, energy)

required

Returns:

Type Description
Series

Series with converted granularity, maintaining original naming and metadata

Raises:

Type Description
GranularityConversionError

If conversion cannot be performed due to unsupported granularities or data issues

Example:

>>> # Convert 15-minute to hourly data (downsampling)
>>> quarter_hourly = pd.Series([25, 30, 35, 40], 
...                           index=pd.date_range('2024-01-01', freq='15min', periods=4))
>>> hourly = converter.convert_to_target_granularity(
...     quarter_hourly, pd.Timedelta(hours=1), QuantityTypeEnum.EXTENSIVE)
>>> print(hourly)  # Result: [130] (25+30+35+40)
Source code in submodules/mesqual/mesqual/energy_data_handling/granularity_converter.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def convert_to_target_granularity(
        self,
        series: pd.Series,
        target_granularity: pd.Timedelta,
        quantity_type: QuantityTypeEnum
) -> pd.Series:
    """Convert a time series to a specific target granularity.

    This method converts the temporal granularity of a time series while properly
    handling the physical nature of the quantity. The conversion is performed
    day-by-day to prevent incorrect handling of missing days or daylight saving
    time transitions.

    Args:
        series: Source time series to convert
        target_granularity: Target granularity as a pandas Timedelta
                           (e.g., pd.Timedelta(minutes=15) for 15-minute data)
        quantity_type: Type of quantity for proper scaling:
                      - INTENSIVE: Values are averaged/replicated (prices, power)
                      - EXTENSIVE: Values are summed/split (volumes, energy)

    Returns:
        Series with converted granularity, maintaining original naming and metadata

    Raises:
        GranularityConversionError: If conversion cannot be performed due to
                                   unsupported granularities or data issues

    Example:

        >>> # Convert 15-minute to hourly data (downsampling)
        >>> quarter_hourly = pd.Series([25, 30, 35, 40], 
        ...                           index=pd.date_range('2024-01-01', freq='15min', periods=4))
        >>> hourly = converter.convert_to_target_granularity(
        ...     quarter_hourly, pd.Timedelta(hours=1), QuantityTypeEnum.EXTENSIVE)
        >>> print(hourly)  # Result: [130] (25+30+35+40)
    """
    self._validate_series_format(series)
    return series.groupby(series.index.date).apply(
        lambda x: self._convert_date_to_target_granularity(x, target_granularity, quantity_type)
    ).droplevel(0).rename_axis(series.index.name).rename(series.name)

SamplingMethodEnum

Bases: Enum

Enumeration of sampling methods for granularity conversion.

Attributes:

Name Type Description
UPSAMPLING

Converting from coarser to finer granularity (e.g., hourly to 15-min)

DOWNSAMPLING

Converting from finer to coarser granularity (e.g., 15-min to hourly)

KEEP

No conversion needed - source and target granularities are the same

Source code in submodules/mesqual/mesqual/energy_data_handling/granularity_converter.py
19
20
21
22
23
24
25
26
27
28
29
class SamplingMethodEnum(Enum):
    """Enumeration of sampling methods for granularity conversion.

    Attributes:
        UPSAMPLING: Converting from coarser to finer granularity (e.g., hourly to 15-min)
        DOWNSAMPLING: Converting from finer to coarser granularity (e.g., 15-min to hourly)
        KEEP: No conversion needed - source and target granularities are the same
    """
    UPSAMPLING = 'upsampling'
    DOWNSAMPLING = 'downsampling'
    KEEP = 'keep'