in src/energy_storage_system/envs.py [0:0]
def step(self, action: int):
assert self.initialized, "Environmet is not initialized"
# Sell
if action == self.DISCHARGE:
discharge_pwr = min(
self.MAX_DISCHARGE_PWR, (self.energy_level - self.ENERGY_MIN) / self.DURATION
)
# Update enery level
self.energy_level = self.energy_level - discharge_pwr * self.DURATION
# fix cost = rate ($/MW) * power (MW)
discharge_cost = self.BETA * discharge_pwr
# Dependant on current price in market ($/MWh * MWh)
reward = (
(self.df_price["price"].iloc[self.index] * self.EFF - self.cost)
* (discharge_pwr * self.DURATION)
) - discharge_cost
# Buy
elif action == self.CHARGE:
charge_pwr = min(
self.MAX_CHARGE_PWR, (self.ENERGY_MAX - self.energy_level) / self.DURATION
)
# Cost only change during charging ($/MWh) = total cost (current+new) / total energy (current+new)
total_energy_cost = (self.cost * self.energy_level) + (
self.df_price["price"].iloc[self.index] * charge_pwr * self.DURATION / self.EFF
)
total_energy = self.energy_level + charge_pwr * self.DURATION
self.cost = total_energy_cost / total_energy
# Update energy level
self.energy_level = self.energy_level + charge_pwr * self.DURATION
# fix cost = rate ($/MW) * power (MW)
charge_cost = self.BETA * charge_pwr
reward = -1 * charge_cost
# Hold
elif action == self.HOLD:
# No change in energy level
reward = 0
else:
assert False, "Invalid action"
# Include historical price in state
historical_price: List = (
self.df_price["price"]
.iloc[self.index - self.HIST_PRICE_HORIZON : self.index][::-1]
.to_list()
)
state: List = [
self.energy_level,
self.cost,
self.df_price["price"].iloc[self.index],
]
state = state + historical_price
# One trajectories or episode has MAX_T hours
if self.counter >= self.MAX_STEPS_PER_EPISODE:
done = True
else:
done = False
info = {}
self.index += 1
self.counter += 1
return state, reward, done, info