Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/batcontrol/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ def handle_forecast_error(self):

def run(self):
""" Main calculation & control loop """
logger.debug('Timeslots are in %d-minute intervals', self.time_resolution)

# Reset some values
self.__reset_run_data()

Expand Down
17 changes: 9 additions & 8 deletions src/batcontrol/logic/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def get_always_allow_discharge_limit(self) -> float:
return self.always_allow_discharge_limit

def is_discharge_always_allowed_soc(self, soc: float) -> bool:
""" Check if discharge is always allowed based on the state of charge (SOC).
""" Check if discharge is always allowed based on the state of charge (SOC).
Args:
soc (float): State of charge as a percentage (0-100).
Returns:
Expand All @@ -92,11 +92,11 @@ def is_discharge_always_allowed_capacity(self, capacity: float) -> bool:

if capacity >= self.max_capacity * self.always_allow_discharge_limit:
logger.debug(
'Discharge is \'always allowed\' for current capacity: %s Wh', round(capacity,0))
'Discharge is \'always allowed\' for current capacity: %.0f Wh', round(capacity,0))
return True

logger.debug(
'Discharge is NOT \'always allowed\' for current capacity: %s Wh', round(capacity,0))
'Discharge is NOT \'always allowed\' for current capacity: %.0f Wh', round(capacity,0))
return False

def is_charging_above_minimum(self, needed_energy: float) -> bool:
Expand All @@ -109,16 +109,17 @@ def is_charging_above_minimum(self, needed_energy: float) -> bool:
return True

logger.debug(
'Charging needed recharge energy is below threshold(%s): %s Wh', round(self.min_charge_energy,0),
round(needed_energy,0))
'Charging needed recharge energy is below threshold(%.0f): %.0f Wh',
round(self.min_charge_energy, 0),
round(needed_energy, 0))
return False

def calculate_charge_rate(self, charge_rate: float) -> int:
""" Calculate the charge rate based on the charge rate multiplier.
Args:
charge_rate (float): The initial charge rate in W.
Returns:
float: The adjusted charge rate in W."""
int: The adjusted charge rate in W."""
logger.debug('Calculating charge rate: %s', charge_rate)
adjusted_charge_rate = charge_rate * self.charge_rate_multiplier
if adjusted_charge_rate < MIN_CHARGE_RATE:
Expand All @@ -127,6 +128,6 @@ def calculate_charge_rate(self, charge_rate: float) -> int:
MIN_CHARGE_RATE, adjusted_charge_rate)
adjusted_charge_rate = MIN_CHARGE_RATE

adjusted_charge_rate = int(round(adjusted_charge_rate, 0))
logger.debug('Adjusted charge rate: %s W', adjusted_charge_rate)
adjusted_charge_rate = int(round(adjusted_charge_rate, 0))
logger.debug('Adjusted charge rate: %d W', adjusted_charge_rate)
return adjusted_charge_rate
152 changes: 82 additions & 70 deletions src/batcontrol/logic/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def calculate_inverter_mode(self, calc_input: CalculationInput,
calc_timestamp = datetime.datetime.now().astimezone(self.timezone)

# ensure availability of data
max_hour = min(len(net_consumption), len(prices))
max_slot = min(len(net_consumption), len(prices))

if self.__is_discharge_allowed(calc_input, net_consumption, prices, calc_timestamp):
inverter_control_settings.allow_discharge = True
Expand All @@ -126,7 +126,7 @@ def calculate_inverter_mode(self, calc_input: CalculationInput,
)
required_recharge_energy = self.__get_required_recharge_energy(
calc_input,
net_consumption[:max_hour],
net_consumption[:max_slot],
prices
)
else:
Expand Down Expand Up @@ -220,31 +220,36 @@ def __is_discharge_allowed(self, calc_input: CalculationInput,

self.calculation_output.min_dynamic_price_difference = min_dynamic_price_difference

max_hour = len(net_consumption)
max_slots = len(net_consumption)
# relevant time range : until next recharge possibility
for h in range(1, max_hour):
future_price = prices[h]
for slot in range(1, max_slots):
future_price = prices[slot]
if future_price <= current_price-min_dynamic_price_difference:
max_hour = h
max_slots = slot
logger.debug(
"[Rule] Recharge possible in %d hours, limiting evaluation window.",
h)
"[Rule] Recharge possible in %d slots, limiting evaluation window.",
slot)
logger.debug(
"[Rule] Future price: %.3f < Current price: %.3f - dyn_price_diff. %.3f ",
future_price,
current_price,
min_dynamic_price_difference
)
break
dt = datetime.timedelta(hours=max_hour-1)
t0 = calc_timestamp
t1 = t0+dt
last_hour = t1.astimezone(self.timezone).strftime("%H:59")

slot_start = calc_timestamp.replace(
minute=(calc_timestamp.minute // self.interval_minutes) * self.interval_minutes,
second=0,
microsecond=0
)
last_time = (slot_start + datetime.timedelta(
minutes=max_slots * self.interval_minutes
)).astimezone(self.timezone).strftime("%H:%M")

logger.debug(
'Evaluating next %d hours until %s',
max_hour,
last_hour
'Evaluating next %d slots until %s',
max_slots,
last_time
)
# distribute remaining energy
consumption = np.array(net_consumption)
Expand All @@ -253,53 +258,53 @@ def __is_discharge_allowed(self, calc_input: CalculationInput,
production = -np.array(net_consumption)
production[production < 0] = 0

# get hours with higher price
higher_price_hours = []
for h in range(max_hour):
future_price = prices[h]
# !!! different formula compared to detect relevant hours
# get slots with higher price
higher_price_slots = []
for slot in range(max_slots):
future_price = prices[slot]
# !!! different formula compared to detect relevant slots
if future_price > current_price:
higher_price_hours.append(h)
higher_price_slots.append(slot)

higher_price_hours.sort()
higher_price_hours.reverse()
higher_price_slots.sort()
higher_price_slots.reverse()

reserved_storage = 0
for higher_price_hour in higher_price_hours:
if consumption[higher_price_hour] == 0:
for higher_price_slot in higher_price_slots:
if consumption[higher_price_slot] == 0:
continue
required_energy = consumption[higher_price_hour]
required_energy = consumption[higher_price_slot]

# correct reserved_storage with potential production
# start with latest hour
for hour in list(range(higher_price_hour))[::-1]:
if production[hour] == 0:
# start with latest slot
for slot in list(range(higher_price_slot))[::-1]:
if production[slot] == 0:
continue
if production[hour] >= required_energy:
production[hour] -= required_energy
if production[slot] >= required_energy:
production[slot] -= required_energy
required_energy = 0
break
else:
required_energy -= production[hour]
production[hour] = 0
required_energy -= production[slot]
production[slot] = 0
# add_remaining required_energy to reserved_storage
reserved_storage += required_energy

self.calculation_output.reserved_energy = reserved_storage

if len(higher_price_hours) > 0:
# This message is somehow confusing, because we are working with an
# hour offset "the next 2 hours", but people may read "2 o'clock".
logger.debug("[Rule] Reserved Energy will be used in the next hours: %s",
higher_price_hours[::-1])
if len(higher_price_slots) > 0:
# This message refers to relative slots (e.g. "next 2 slots"),
# not specific clock times (e.g. "at 2 o'clock").
logger.debug("[Rule] Reserved Energy will be used in the next slots: %s",
higher_price_slots[::-1])
logger.debug(
"[Rule] Reserved Energy: %0.1f Wh. Usable in Battery: %0.1f Wh",
reserved_storage,
calc_input.stored_usable_energy
)
else:
logger.debug("[Rule] No reserved energy required, because no "
"'high price' hours in evaluation window.")
"'high price' slots in evaluation window.")


if calc_input.stored_usable_energy > reserved_storage:
Expand All @@ -322,17 +327,17 @@ def __is_discharge_allowed(self, calc_input: CalculationInput,
return False

# %%
def __get_required_recharge_energy(self, calc_input: CalculationInput ,
def __get_required_recharge_energy(self, calc_input: CalculationInput,
net_consumption: list, prices: dict) -> float:
""" Calculate the required energy to shift toward high price hours.
""" Calculate the required energy to shift toward high price slots.

If a recharge price window is detected, the energy required to
recharge the battery to the next high price hours is calculated.
recharge the battery to the next high price slots is calculated.

return: float (Energy in Wh)
"""
current_price = prices[0]
max_hour = len(net_consumption)
max_slot = len(net_consumption)
consumption = np.array(net_consumption)
consumption[consumption < 0] = 0

Expand All @@ -343,8 +348,8 @@ def __get_required_recharge_energy(self, calc_input: CalculationInput ,
current_price)

# evaluation period until price is first time lower then current price
for h in range(1, max_hour):
future_price = prices[h]
for slot in range(1, max_slot):
future_price = prices[slot]
found_lower_price = False
# Soften the price difference to avoid too early charging
if self.soften_price_difference_on_charging:
Expand All @@ -355,56 +360,62 @@ def __get_required_recharge_energy(self, calc_input: CalculationInput ,
found_lower_price = future_price <= current_price

if found_lower_price:
max_hour = h
max_slot = slot
break

# get high price hours
high_price_hours = []
for h in range(max_hour):
future_price = prices[h]
# As max_slot is 1 at minimum, no protection against out of range needed.
logger.debug(
"[Rule] Evaluation window for recharge energy until slot %d with price %0.3f",
max_slot-1,
prices[max_slot-1]
)

# get high price slots
high_price_slots = []
for slot in range(max_slot):
future_price = prices[slot]
if future_price > current_price+min_dynamic_price_difference:
high_price_hours.append(h)
high_price_slots.append(slot)

# start with nearest hour
high_price_hours.sort()
# start with nearest slot
high_price_slots.sort()
required_energy = 0.0
for high_price_hour in high_price_hours:
energy_to_shift = consumption[high_price_hour]
for high_price_slot in high_price_slots:
energy_to_shift = consumption[high_price_slot]

# correct energy to shift with potential production
# start with nearest hour
for hour in range(1, high_price_hour):
if production[hour] == 0:
# start with nearest slot
for slot in range(1, high_price_slot):
if production[slot] == 0:
continue
if production[hour] >= energy_to_shift:
production[hour] -= energy_to_shift
if production[slot] >= energy_to_shift:
production[slot] -= energy_to_shift
energy_to_shift = 0
else:
energy_to_shift -= production[hour]
production[hour] = 0
energy_to_shift -= production[slot]
production[slot] = 0
# add_remaining energy to shift to recharge amount
required_energy += energy_to_shift

if required_energy > 0.0:
logger.debug("[Rule] Required Energy: %0.1f Wh is based on next 'high price' hours %s",
logger.debug("[Rule] Required Energy: %0.1f Wh is based on next 'high price' slots %s",
required_energy,
high_price_hours
high_price_slots
)
recharge_energy = required_energy-calc_input.stored_usable_energy
logger.debug("[Rule] Stored usable Energy: %0.1f , Recharge Energy: %0.1f Wh",
calc_input.stored_usable_energy,
recharge_energy
)
else:
recharge_energy = 0.0

free_capacity = calc_input.free_capacity

if recharge_energy <= 0.0:
logger.debug(
"[Rule] No additional energy required, because stored energy is sufficient."
)
recharge_energy = 0.0
self.calculation_output.required_recharge_energy = recharge_energy
return recharge_energy

free_capacity = calc_input.free_capacity

if recharge_energy > free_capacity:
recharge_energy = free_capacity
Expand All @@ -414,7 +425,8 @@ def __get_required_recharge_energy(self, calc_input: CalculationInput ,
if not self.common.is_charging_above_minimum(recharge_energy):
recharge_energy = 0.0
else:
# We are adding that minimum charge energy here, so that we are not stuck between limits.
# We are adding that minimum charge energy here, so that we are not stuck
# between limits.
recharge_energy = recharge_energy + self.common.min_charge_energy

self.calculation_output.required_recharge_energy = recharge_energy
Expand Down