Skip to content

MESQUAL Regional Trade Balance Calculator

RegionalTradeBalanceCalculator

Aggregates bidirectional power flows between regions based on node-level flow data.

Takes line-level flow data with bidirectional flows (up/down) and losses, and aggregates them to a higher regional level (e.g., countries, market areas) or keeps them at node level. Uses networkx for identifying region/node connections and handles multiple lines between the same region/node pairs.

When agg_region_column is None, each node is treated as its own region, allowing for node-to-node trade balance analysis without aggregation.

Example for regional aggregation:

>>> line_model_df = pd.DataFrame({
...     "node_from": ["DE1", "FR1"],
...     "node_to": ["FR1", "BE1"]
... })
>>> node_model_df = pd.DataFrame({
...     "country": ["DE", "FR", "BE"]
... }, index=["DE1", "FR1", "BE1"])
>>> aggregator = RegionalTradeBalanceCalculator(
...     line_model_df=line_model_df,
...     node_model_df=node_model_df,
...     agg_region_column="country"
... )

Example for node-level flows:

>>> aggregator = RegionalTradeBalanceCalculator(
...     line_model_df=line_model_df,
...     node_model_df=node_model_df,
...     agg_region_column=None  # Keep flows at node level
... )
Source code in submodules/mesqual/mesqual/energy_data_handling/variable_utils/regional_trade_balance_calculator.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class RegionalTradeBalanceCalculator:
    """Aggregates bidirectional power flows between regions based on node-level flow data.

    Takes line-level flow data with bidirectional flows (up/down) and losses, and aggregates
    them to a higher regional level (e.g., countries, market areas) or keeps them at node level.
    Uses networkx for identifying region/node connections and handles multiple lines between
    the same region/node pairs.

    When agg_region_column is None, each node is treated as its own region, allowing for
    node-to-node trade balance analysis without aggregation.

    Example for regional aggregation:

        >>> line_model_df = pd.DataFrame({
        ...     "node_from": ["DE1", "FR1"],
        ...     "node_to": ["FR1", "BE1"]
        ... })
        >>> node_model_df = pd.DataFrame({
        ...     "country": ["DE", "FR", "BE"]
        ... }, index=["DE1", "FR1", "BE1"])
        >>> aggregator = RegionalTradeBalanceCalculator(
        ...     line_model_df=line_model_df,
        ...     node_model_df=node_model_df,
        ...     agg_region_column="country"
        ... )

    Example for node-level flows:

        >>> aggregator = RegionalTradeBalanceCalculator(
        ...     line_model_df=line_model_df,
        ...     node_model_df=node_model_df,
        ...     agg_region_column=None  # Keep flows at node level
        ... )
    """
    EXP_VAR = "exp"
    IMP_VAR = "imp"
    NET_EXP_VAR = "net_exp"
    ALL_VARS = [EXP_VAR, IMP_VAR, NET_EXP_VAR]

    def __init__(
            self,
            line_model_df: pd.DataFrame,
            node_model_df: pd.DataFrame,
            agg_region_column: str | None = "country",
            node_from_col: str = "node_from",
            node_to_col: str = "node_to"
    ):
        self.line_model_df = line_model_df
        self.node_model_df = node_model_df
        self.agg_region_column = agg_region_column
        self.node_from_col = node_from_col
        self.node_to_col = node_to_col
        self.node_to_agg_region_map = self._create_node_to_region_map()
        self.agg_region_graph = self._create_region_graph()

    def _create_node_to_region_map(self) -> dict:
        if self.agg_region_column is None:
            return {node: node for node in self.node_model_df.index}
        return self.node_model_df[self.agg_region_column].to_dict()

    def _create_region_graph(self) -> nx.Graph:
        graph = nx.Graph()

        for _, line in self.line_model_df.iterrows():
            region_from = self.node_to_agg_region_map[line[self.node_from_col]]
            region_to = self.node_to_agg_region_map[line[self.node_to_col]]

            if region_from != region_to:
                if not graph.has_edge(region_from, region_to):
                    graph.add_edge(region_from, region_to)

        return graph

    def _get_net_exp_for_couple(self, primary, secondary, flow_data: NetworkLineFlowsData, flow_type: FlowType) -> pd.Series:
        mask_forward = (
                (self.line_model_df[self.node_from_col].map(self.node_to_agg_region_map) == primary) &
                (self.line_model_df[self.node_to_col].map(self.node_to_agg_region_map) == secondary)
        )
        mask_backward = (
                (self.line_model_df[self.node_from_col].map(self.node_to_agg_region_map) == secondary) &
                (self.line_model_df[self.node_to_col].map(self.node_to_agg_region_map) == primary)
        )

        lines_forward = self.line_model_df[mask_forward].index
        lines_backward = self.line_model_df[mask_backward].index

        if flow_type == FlowType.PRE_LOSS:
            return (
                    flow_data.sent_up[lines_forward].sum(axis=1) -
                    flow_data.sent_down[lines_forward].sum(axis=1) +
                    flow_data.sent_down[lines_backward].sum(axis=1) -
                    flow_data.sent_up[lines_backward].sum(axis=1)
            )
        else:  # POST_LOSS
            return (
                    flow_data.sent_up[lines_forward].sum(axis=1) -
                    flow_data.received_down[lines_forward].sum(axis=1) +
                    flow_data.sent_down[lines_backward].sum(axis=1) -
                    flow_data.received_up[lines_backward].sum(axis=1)
            )

    def get_trade_balance(
            self,
            flow_data: NetworkLineFlowsData,
            flow_type: FlowType = FlowType.POST_LOSS
    ) -> pd.DataFrame:
        flows_list = []
        column_level_names = [self.primary_name, self.partner_name, "variable"]

        for primary in self.get_all_regions():
            for secondary in self.get_region_neighbors(primary):
                net_exp = self._get_net_exp_for_couple(primary, secondary, flow_data, flow_type)
                df = pd.concat(
                    {
                        (primary, secondary, self.NET_EXP_VAR): net_exp,
                        (primary, secondary, self.EXP_VAR): net_exp.clip(0),
                        (primary, secondary, self.IMP_VAR): net_exp.clip(None, 0).abs(),
                    },
                    axis=1,
                    names=column_level_names,
                )
                flows_list.append(df)

        if not flows_list:
            return pd.DataFrame(
                index=flow_data.sent_up.index,
                columns=pd.MultiIndex.from_tuples([], names=column_level_names)
            )

        return pd.concat(flows_list, axis=1)

    def get_region_neighbors(self, region: str) -> set:
        return set(self.agg_region_graph.neighbors(region))

    def get_all_regions(self) -> set:
        return set(self.agg_region_graph.nodes())

    @property
    def primary_name(self) -> str:
        return f"primary_{self.agg_region_column}"

    @property
    def partner_name(self) -> str:
        return f"partner_{self.agg_region_column}"

    def aggregate_trade_balance_to_primary_level(self, trade_balance_df: pd.DataFrame) -> pd.DataFrame:
        """Reduces three-level trade balance DataFrame to primary region and variable only."""
        if trade_balance_df.columns.nlevels != 3:
            raise ValueError("Input DataFrame must have three column levels")
        if trade_balance_df.columns.names != [self.primary_name, self.partner_name, "variable"]:
            raise ValueError("Input DataFrame must be in format from aggregate_flows")

        return trade_balance_df.T.groupby(level=[self.primary_name, "variable"]).sum().T

aggregate_trade_balance_to_primary_level

aggregate_trade_balance_to_primary_level(trade_balance_df: DataFrame) -> DataFrame

Reduces three-level trade balance DataFrame to primary region and variable only.

Source code in submodules/mesqual/mesqual/energy_data_handling/variable_utils/regional_trade_balance_calculator.py
158
159
160
161
162
163
164
165
def aggregate_trade_balance_to_primary_level(self, trade_balance_df: pd.DataFrame) -> pd.DataFrame:
    """Reduces three-level trade balance DataFrame to primary region and variable only."""
    if trade_balance_df.columns.nlevels != 3:
        raise ValueError("Input DataFrame must have three column levels")
    if trade_balance_df.columns.names != [self.primary_name, self.partner_name, "variable"]:
        raise ValueError("Input DataFrame must be in format from aggregate_flows")

    return trade_balance_df.T.groupby(level=[self.primary_name, "variable"]).sum().T