13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165 | class RegionalTradeBalanceCalculator:
"""Aggregates bidirectional power flows between regions based on node-level flow data.
Takes line-level flow data with bidirectional flows (up/down) and losses, and aggregates
them to a higher regional level (e.g., countries, market areas) or keeps them at node level.
Uses networkx for identifying region/node connections and handles multiple lines between
the same region/node pairs.
When agg_region_column is None, each node is treated as its own region, allowing for
node-to-node trade balance analysis without aggregation.
Example for regional aggregation:
>>> line_model_df = pd.DataFrame({
... "node_from": ["DE1", "FR1"],
... "node_to": ["FR1", "BE1"]
... })
>>> node_model_df = pd.DataFrame({
... "country": ["DE", "FR", "BE"]
... }, index=["DE1", "FR1", "BE1"])
>>> aggregator = RegionalTradeBalanceCalculator(
... line_model_df=line_model_df,
... node_model_df=node_model_df,
... agg_region_column="country"
... )
Example for node-level flows:
>>> aggregator = RegionalTradeBalanceCalculator(
... line_model_df=line_model_df,
... node_model_df=node_model_df,
... agg_region_column=None # Keep flows at node level
... )
"""
EXP_VAR = "exp"
IMP_VAR = "imp"
NET_EXP_VAR = "net_exp"
ALL_VARS = [EXP_VAR, IMP_VAR, NET_EXP_VAR]
def __init__(
self,
line_model_df: pd.DataFrame,
node_model_df: pd.DataFrame,
agg_region_column: str | None = "country",
node_from_col: str = "node_from",
node_to_col: str = "node_to"
):
self.line_model_df = line_model_df
self.node_model_df = node_model_df
self.agg_region_column = agg_region_column
self.node_from_col = node_from_col
self.node_to_col = node_to_col
self.node_to_agg_region_map = self._create_node_to_region_map()
self.agg_region_graph = self._create_region_graph()
def _create_node_to_region_map(self) -> dict:
if self.agg_region_column is None:
return {node: node for node in self.node_model_df.index}
return self.node_model_df[self.agg_region_column].to_dict()
def _create_region_graph(self) -> nx.Graph:
graph = nx.Graph()
for _, line in self.line_model_df.iterrows():
region_from = self.node_to_agg_region_map[line[self.node_from_col]]
region_to = self.node_to_agg_region_map[line[self.node_to_col]]
if region_from != region_to:
if not graph.has_edge(region_from, region_to):
graph.add_edge(region_from, region_to)
return graph
def _get_net_exp_for_couple(self, primary, secondary, flow_data: NetworkLineFlowsData, flow_type: FlowType) -> pd.Series:
mask_forward = (
(self.line_model_df[self.node_from_col].map(self.node_to_agg_region_map) == primary) &
(self.line_model_df[self.node_to_col].map(self.node_to_agg_region_map) == secondary)
)
mask_backward = (
(self.line_model_df[self.node_from_col].map(self.node_to_agg_region_map) == secondary) &
(self.line_model_df[self.node_to_col].map(self.node_to_agg_region_map) == primary)
)
lines_forward = self.line_model_df[mask_forward].index
lines_backward = self.line_model_df[mask_backward].index
if flow_type == FlowType.PRE_LOSS:
return (
flow_data.sent_up[lines_forward].sum(axis=1) -
flow_data.sent_down[lines_forward].sum(axis=1) +
flow_data.sent_down[lines_backward].sum(axis=1) -
flow_data.sent_up[lines_backward].sum(axis=1)
)
else: # POST_LOSS
return (
flow_data.sent_up[lines_forward].sum(axis=1) -
flow_data.received_down[lines_forward].sum(axis=1) +
flow_data.sent_down[lines_backward].sum(axis=1) -
flow_data.received_up[lines_backward].sum(axis=1)
)
def get_trade_balance(
self,
flow_data: NetworkLineFlowsData,
flow_type: FlowType = FlowType.POST_LOSS
) -> pd.DataFrame:
flows_list = []
column_level_names = [self.primary_name, self.partner_name, "variable"]
for primary in self.get_all_regions():
for secondary in self.get_region_neighbors(primary):
net_exp = self._get_net_exp_for_couple(primary, secondary, flow_data, flow_type)
df = pd.concat(
{
(primary, secondary, self.NET_EXP_VAR): net_exp,
(primary, secondary, self.EXP_VAR): net_exp.clip(0),
(primary, secondary, self.IMP_VAR): net_exp.clip(None, 0).abs(),
},
axis=1,
names=column_level_names,
)
flows_list.append(df)
if not flows_list:
return pd.DataFrame(
index=flow_data.sent_up.index,
columns=pd.MultiIndex.from_tuples([], names=column_level_names)
)
return pd.concat(flows_list, axis=1)
def get_region_neighbors(self, region: str) -> set:
return set(self.agg_region_graph.neighbors(region))
def get_all_regions(self) -> set:
return set(self.agg_region_graph.nodes())
@property
def primary_name(self) -> str:
return f"primary_{self.agg_region_column}"
@property
def partner_name(self) -> str:
return f"partner_{self.agg_region_column}"
def aggregate_trade_balance_to_primary_level(self, trade_balance_df: pd.DataFrame) -> pd.DataFrame:
"""Reduces three-level trade balance DataFrame to primary region and variable only."""
if trade_balance_df.columns.nlevels != 3:
raise ValueError("Input DataFrame must have three column levels")
if trade_balance_df.columns.names != [self.primary_name, self.partner_name, "variable"]:
raise ValueError("Input DataFrame must be in format from aggregate_flows")
return trade_balance_df.T.groupby(level=[self.primary_name, "variable"]).sum().T
|