Add calculating machine production factor from resource inputs

This commit is contained in:
Ben 2024-02-01 23:49:13 +01:00
parent aa55a34c1c
commit e106fe6a03
Signed by: ben
GPG key ID: 0F54A7ED232D3319

View file

@ -1,15 +1,15 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import itertools
import re import re
from datetime import timedelta from datetime import timedelta
import click import click
from NodeGraphQt import BaseNode, NodeBaseWidget from NodeGraphQt import BaseNode, NodeBaseWidget
from NodeGraphQt import NodeGraph, Port from NodeGraphQt import NodeGraph, Port
from NodeGraphQt.constants import PortTypeEnum from NodeGraphQt.constants import PortTypeEnum, NodePropWidgetEnum
from NodeGraphQt.widgets.node_widgets import _NodeGroupBox from NodeGraphQt.widgets.node_widgets import _NodeGroupBox, NodeLineEdit, NodeCheckBox
from PySide2.QtCore import Qt from PySide2.QtCore import Qt
from PySide2.QtWidgets import QSlider, QLineEdit from PySide2.QtWidgets import QSlider, QLineEdit, QCheckBox, QGraphicsItem
from Qt import QtWidgets from Qt import QtWidgets
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@ -28,158 +28,31 @@ OUTPUT_COLOR = (204, 44, 36)
OTHER_COLOR = (0, 83, 135) OTHER_COLOR = (0, 83, 135)
graph = None graph: NodeGraph
class NodeSlider(NodeBaseWidget): def in_amount_name(label: str) -> str:
def __init__(self, name, label="", parent=None):
super(NodeSlider, self).__init__(parent)
self.set_name(name)
self.pure_label = label or name
slider = QSlider(Qt.Horizontal)
slider.setMinimum(1)
slider.setMaximum(250)
slider.setValue(100)
slider.setSingleStep(1)
slider.setTickInterval(10)
slider.setTickPosition(QSlider.TicksBelow)
slider.valueChanged.connect(self.on_value_changed)
slider.valueChanged.connect(self._update_label)
self.set_custom_widget(slider)
self._update_label()
def get_value(self):
widget: QSlider = self.get_custom_widget()
return widget.value()
def set_value(self, value):
widget: QSlider = self.get_custom_widget()
widget.setValue(value)
def _update_label(self):
self.set_label(f"{self.pure_label} ({int(self.get_value())}%)")
class GlobalInput(BaseNode):
__identifier__ = "factorygame"
NODE_NAME = "Global Input"
def __init__(self):
super().__init__()
self.model.width = 240
self.set_port_deletion_allowed(True)
self.add_special_ports()
def add_special_ports(self):
self.add_output(WORLD_OUTPUT_PORT_NAME, color=WORLD_OUTPUT_COLOR)
self.add_output(CREATE_MACHINE_PORT_NAME, color=OTHER_COLOR)
def reorder_outputs(self):
self.delete_output(WORLD_OUTPUT_PORT_NAME)
self.delete_output(CREATE_MACHINE_PORT_NAME)
self.add_special_ports()
def update_x_pos(self):
min_x_pos = min(map(lambda node: node.x_pos(), filter(lambda node: node != self, graph.all_nodes())))
self.set_x_pos(min_x_pos - self.view.width - 150)
class Machine(BaseNode):
__identifier__ = "factorygame"
NODE_NAME = "FactoryGame Machine"
STYLESHEET_BORDER_COLOR_REGEX = re.compile(
r"QLineEdit\s*{[^{}]*\s*border:[^;]*(rgb\([^)]+\))", re.MULTILINE | re.DOTALL
)
def __init__(self):
super().__init__()
self.model.width = 240
def assign_recipe(self, recipe: Recipe):
for port_idx in range(len(self._inputs)):
self.delete_input(port_idx)
for port_idx in range(len(self._outputs)):
self.delete_output(port_idx)
self.set_property("name", recipe.factory.label, push_undo=False)
input_resources: dict[str, float] = {}
for ingredient in recipe.ingredients:
resource_label = ingredient.resource.label
if resource_label in input_resources:
input_resources[resource_label] += ingredient.amount_per_minute()
self.get_widget(in_amount_name(resource_label)).set_value(
f"{input_resources[resource_label]:.2f} / min"
)
else:
port = self.add_input(resource_label, multi_input=True, color=INPUT_COLOR)
port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.Machine")
port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.GlobalInput")
port.add_accept_port_type(CREATE_MACHINE_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput")
port.add_accept_port_type(WORLD_OUTPUT_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput")
self._add_readonly_resource_text(
in_amount_name(resource_label),
ingredient,
input_resources,
resource_label,
border_color=INPUT_COLOR,
)
output_resources: dict[str, float] = {}
for result in recipe.results:
resource_label = result.resource.label
if resource_label in output_resources:
output_resources[resource_label] += result.amount_per_minute()
self.get_widget(out_amount_name(resource_label)).set_value(
f"{output_resources[resource_label]:.2f} / min"
)
else:
port = self.add_output(resource_label, color=OUTPUT_COLOR)
port.add_accept_port_type(resource_label, PortTypeEnum.IN.value, "factorygame.Machine")
self._add_readonly_resource_text(
out_amount_name(resource_label), result, output_resources, resource_label, border_color=OUTPUT_COLOR
)
performance_slider_name = "machine performance"
def set_performance(percentage):
factor = percentage / 100.0
for ingredient_label, amount in input_resources.items():
self.get_widget(in_amount_name(ingredient_label)).set_value(f"{amount * factor:.2f} / min")
for result_label, amount in output_resources.items():
self.get_widget(out_amount_name(result_label)).set_value(f"{amount * factor:.2f} / min")
self.get_widget(performance_slider_name).get_value()
slider = NodeSlider(name=performance_slider_name, label="Overclocking Performance", parent=self.view)
self.add_custom_widget(slider)
slider.value_changed.connect(lambda name, value: set_performance(max(1, min(250, value))))
def _add_readonly_resource_text(
self,
name: str,
flow: ResourceFlow,
resource_amounts: dict[str, float],
resource_label: str,
border_color: tuple,
):
resource_amounts[resource_label] = flow.amount_per_minute()
text = f"{resource_amounts[resource_label]} / min"
add_resource_text(node=self, name=name, text=text, border_color=border_color, readonly=True)
def in_amount_name(label: str):
return f"In {label} amount" return f"In {label} amount"
def out_amount_name(label: str): def out_amount_name(label: str) -> str:
return f"Out {label} amount" return f"Out {label} amount"
def add_resource_text(node: BaseNode, name: str, text: str, border_color: tuple, readonly: bool = False): def port_amount_name(port: Port) -> str:
if port.type_() == PortTypeEnum.IN:
return in_amount_name(port.name())
elif port.type_() == PortTypeEnum.OUT:
return out_amount_name(port.name())
else:
raise TypeError(f"Invalid port type {port.type_()}")
def add_resource_text(
node: BaseNode, name: str, text: str, border_color: tuple, readonly: bool = False
) -> NodeLineEdit:
node.add_text_input(name=name, label=name, text=text) node.add_text_input(name=name, label=name, text=text)
widget = node.get_widget(name) widget: NodeLineEdit = node.get_widget(name)
line_edit_widget: QLineEdit = widget.get_custom_widget() line_edit_widget: QLineEdit = widget.get_custom_widget()
if readonly: if readonly:
line_edit_widget.setReadOnly(True) line_edit_widget.setReadOnly(True)
@ -194,14 +67,269 @@ def add_resource_text(node: BaseNode, name: str, text: str, border_color: tuple,
) )
line_edit_widget.setStyleSheet(stylesheet) line_edit_widget.setStyleSheet(stylesheet)
line_edit_widget.update() line_edit_widget.update()
return widget
def parse_resource_amount(text: str) -> float:
return float(text.strip().rstrip("min").rstrip().rstrip("/").rstrip())
def resource_amount_to_text(amount: float):
return f"{amount:.2f} / min"
class NodeSlider(NodeBaseWidget):
MIN_VALUE = 1
MAX_VALUE = 250
DEFAULT_VALUE = 100
def __init__(self, name: str, label: str = "", parent: QGraphicsItem = None, readonly: bool = False):
super(NodeSlider, self).__init__(parent)
self.set_name(name)
self.pure_label = label or name
slider = QSlider(Qt.Horizontal)
slider.setEnabled(not readonly)
slider.setRange(self.MIN_VALUE, self.MAX_VALUE)
slider.setValue(self.DEFAULT_VALUE)
slider.setSingleStep(1)
slider.setTickInterval(10)
slider.setTickPosition(QSlider.TicksBelow)
slider.valueChanged.connect(self.on_value_changed)
slider.valueChanged.connect(self._update_label)
self.set_custom_widget(slider)
self._update_label()
def get_custom_widget(self) -> QSlider:
widget = super().get_custom_widget()
assert isinstance(widget, QSlider), f"Unexpected widget, not a QSlider: {widget}"
return widget
def get_value(self) -> int:
return self.get_custom_widget().value()
def set_value(self, value: int):
self.get_custom_widget().setValue(max(self.MIN_VALUE, min(self.MAX_VALUE, value)))
def _update_label(self):
self.set_label(f"{self.pure_label} ({self.get_value()}%)")
class GlobalInput(BaseNode):
__identifier__ = "factorygame"
NODE_NAME = "Global Input"
def __init__(self):
super().__init__()
self.model.width = 240
self.set_port_deletion_allowed(True)
self.add_special_ports()
self.output_resources: dict[str, float] = {}
def add_special_ports(self):
self.add_output(WORLD_OUTPUT_PORT_NAME, multi_output=False, color=WORLD_OUTPUT_COLOR)
self.add_output(CREATE_MACHINE_PORT_NAME, multi_output=False, color=OTHER_COLOR)
def reorder_outputs(self):
self.delete_output(WORLD_OUTPUT_PORT_NAME)
self.delete_output(CREATE_MACHINE_PORT_NAME)
self.add_special_ports()
def update_x_pos(self):
min_x_pos = min(map(lambda node: node.x_pos(), filter(lambda node: node != self, graph.all_nodes())))
self.set_x_pos(min_x_pos - self.view.width - 150)
def create_global_output(self, resource_label: str, initial_resource_amount: str) -> Port:
new_output_port = self.add_output(name=resource_label, multi_output=False, color=OUTPUT_COLOR)
widget = add_resource_text(
node=self,
name=out_amount_name(resource_label),
text=initial_resource_amount,
border_color=OUTPUT_COLOR,
)
self.output_resources[resource_label] = parse_resource_amount(initial_resource_amount)
widget.value_changed.connect(lambda: self.update_resource_output(resource_label))
self.reorder_outputs()
self.update_x_pos()
return new_output_port
def update_resource_output(self, resource_label: str):
widget: NodeLineEdit = self.get_widget(out_amount_name(resource_label))
assert resource_label in self.output_resources
new_amount = parse_resource_amount(widget.get_value())
if self.output_resources[resource_label] != new_amount:
self.output_resources[resource_label] = new_amount
widget.set_value(resource_amount_to_text(self.output_resources[resource_label]))
resource_port = self.get_output(resource_label)
for connected_port in resource_port.connected_ports():
node = connected_port.node()
assert isinstance(node, Machine), f"Connected node must be a machine: {node}"
node.update_input(resource_label, new_amount)
class Machine(BaseNode):
__identifier__ = "factorygame"
NODE_NAME = "FactoryGame Machine"
MAXIMUM_PERFORMANCE_PROP = "machine max perf"
ACTUAL_PERFORMANCE_PROP = "machine actual perf"
AUTOMATIC_PERFORMANCE_PROP = "machine automatic"
STYLESHEET_BORDER_COLOR_REGEX = re.compile(
r"QLineEdit\s*{[^{}]*\s*border:[^;]*(rgb\([^)]+\))", re.MULTILINE | re.DOTALL
)
def __init__(self):
super().__init__()
self.model.width = 240
self.input_resources: dict[str, float] = {}
self.possible_input_resources: dict[str, float] = {}
self.output_resources: dict[str, float] = {}
self.add_checkbox(Machine.AUTOMATIC_PERFORMANCE_PROP, text="Automatic Mode", state=True)
widget: NodeCheckBox = self.get_widget(Machine.AUTOMATIC_PERFORMANCE_PROP)
widget.value_changed.connect(self.recalculate_factor)
checkbox_widget: QCheckBox = widget.get_custom_widget()
checkbox_widget.setStyleSheet(
checkbox_widget.styleSheet() + "\nQCheckBox {\n" f"background-color: transparent;\n" "}"
)
self.actual_performance_slider = NodeSlider(
name=Machine.ACTUAL_PERFORMANCE_PROP,
label="Machine Production Factor",
parent=self.view,
readonly=True,
)
self.add_custom_widget(
widget=self.actual_performance_slider,
widget_type=NodePropWidgetEnum.HIDDEN.value,
)
self.actual_performance_slider.value_changed.connect(self.actual_performance_changed)
def max_performance_changed(self):
self.set_property(Machine.AUTOMATIC_PERFORMANCE_PROP, False)
self.recalculate_factor()
def recalculate_factor(self):
max_factor: float = self.get_property(Machine.MAXIMUM_PERFORMANCE_PROP) / 100.0
if max_factor < 1.0 and self.get_property(Machine.AUTOMATIC_PERFORMANCE_PROP):
new_factor = 1.0
else:
new_factor = max_factor
for resource_label, wanted_resource in self.input_resources.items():
if len(self.get_input(resource_label).connected_ports()) != 0:
input_factor = self.possible_input_resources[resource_label] / wanted_resource
if input_factor < new_factor:
new_factor = input_factor
new_perf = int(new_factor * 100.0)
if self.get_property(Machine.ACTUAL_PERFORMANCE_PROP) != new_perf:
self.set_property(name=Machine.ACTUAL_PERFORMANCE_PROP, value=new_perf, push_undo=False)
if (
self.get_property(Machine.AUTOMATIC_PERFORMANCE_PROP)
and self.get_property(Machine.MAXIMUM_PERFORMANCE_PROP) != new_perf
):
self.set_property(name=Machine.MAXIMUM_PERFORMANCE_PROP, value=new_perf, push_undo=False)
def update_input(self, resource_label: str, value: float):
self.possible_input_resources[resource_label] = value
self.recalculate_factor()
def actual_performance_changed(self, name: str, perf: int):
if name == Machine.ACTUAL_PERFORMANCE_PROP:
factor = perf / 100.0
for ingredient_label, amount in self.input_resources.items():
resource_text_widget = self.get_widget(in_amount_name(ingredient_label))
assert (
resource_text_widget is not None
), f"No ingredient resource text widget found for {ingredient_label}"
resource_text_widget.set_value(resource_amount_to_text(amount * factor))
for result_label, amount in self.output_resources.items():
resource_text_widget = self.get_widget(out_amount_name(result_label))
assert resource_text_widget is not None, f"No result resource text widget found for {result_label}"
new_output_value = amount * factor
resource_text_widget.set_value(resource_amount_to_text(new_output_value))
for port in self.get_output(result_label).connected_ports():
node = port.node()
assert isinstance(node, Machine), f"Connected node must be a machine: {node}"
node.update_input(
resource_label=result_label,
value=new_output_value,
)
def assign_recipe(self, recipe: Recipe):
for port_idx in range(len(self._inputs)):
self.delete_input(port_idx)
for port_idx in range(len(self._outputs)):
self.delete_output(port_idx)
self.set_property("name", recipe.factory.label, push_undo=False)
for ingredient in recipe.ingredients:
resource_label = ingredient.resource.label
if resource_label in self.input_resources:
self.input_resources[resource_label] += ingredient.amount_per_minute()
self.get_widget(in_amount_name(resource_label)).set_value(
resource_amount_to_text(self.input_resources[resource_label])
)
else:
port = self.add_input(resource_label, multi_input=False, color=INPUT_COLOR)
port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.Machine")
port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.GlobalInput")
port.add_accept_port_type(CREATE_MACHINE_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput")
port.add_accept_port_type(WORLD_OUTPUT_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput")
self._add_readonly_resource_text(
in_amount_name(resource_label),
ingredient,
self.input_resources,
resource_label,
border_color=INPUT_COLOR,
)
for result in recipe.results:
resource_label = result.resource.label
if resource_label in self.output_resources:
self.output_resources[resource_label] += result.amount_per_minute()
self.get_widget(out_amount_name(resource_label)).set_value(
resource_amount_to_text(self.output_resources[resource_label])
)
else:
port = self.add_output(resource_label, multi_output=False, color=OUTPUT_COLOR)
port.add_accept_port_type(resource_label, PortTypeEnum.IN.value, "factorygame.Machine")
self._add_readonly_resource_text(
out_amount_name(resource_label),
result,
self.output_resources,
resource_label,
border_color=OUTPUT_COLOR,
)
self.possible_input_resources.update(self.input_resources)
slider = NodeSlider(name=Machine.MAXIMUM_PERFORMANCE_PROP, label="Overclocking Performance", parent=self.view)
self.add_custom_widget(widget=slider, widget_type=NodePropWidgetEnum.SLIDER.value)
slider.value_changed.connect(self.max_performance_changed)
def _add_readonly_resource_text(
self,
name: str,
flow: ResourceFlow,
resource_amounts: dict[str, float],
resource_label: str,
border_color: tuple,
) -> NodeLineEdit:
resource_amounts[resource_label] = flow.amount_per_minute()
text = resource_amount_to_text(resource_amounts[resource_label])
return add_resource_text(node=self, name=name, text=text, border_color=border_color, readonly=True)
def get_resource_output(self, resource_label: str) -> float:
return self.output_resources[resource_label] * (self.get_property(Machine.ACTUAL_PERFORMANCE_PROP) / 100.0)
def on_port_connected(input_port: Port, output_port: Port): def on_port_connected(input_port: Port, output_port: Port):
global debug global debug
if debug: if debug:
print(f"Port {output_port} connected to {input_port}") print(f"Port {output_port} connected to {input_port}")
if isinstance(output_port.node(), GlobalInput): output_node = output_port.node()
global_input_node: GlobalInput = output_port.node() if isinstance(output_node, GlobalInput):
resource_label = input_port.name() resource_label = input_port.name()
if output_port.name() == CREATE_MACHINE_PORT_NAME: if output_port.name() == CREATE_MACHINE_PORT_NAME:
output_port.clear_connections(push_undo=False, emit_signal=False) output_port.clear_connections(push_undo=False, emit_signal=False)
@ -221,27 +349,31 @@ def on_port_connected(input_port: Port, output_port: Port):
recipe_machine.update() recipe_machine.update()
recipe_machine.set_x_pos(input_port.node().x_pos() - recipe_machine.view.width - 100) recipe_machine.set_x_pos(input_port.node().x_pos() - recipe_machine.view.width - 100)
recipe_machine.get_output(resource_label).connect_to(input_port) recipe_machine.get_output(resource_label).connect_to(input_port)
global_input_node.update_x_pos() output_node.update_x_pos()
elif output_port.name() == WORLD_OUTPUT_PORT_NAME: elif output_port.name() == WORLD_OUTPUT_PORT_NAME:
output_port.clear_connections(push_undo=False, emit_signal=False) output_port.clear_connections(push_undo=False, emit_signal=False)
for idx, port in enumerate(global_input_node.output_ports()): for idx, port in enumerate(output_node.output_ports()):
if port.name() == resource_label: assert port.name() != resource_label, f"Duplicate output port for {resource_label}"
port.connect_to(input_port, push_undo=True, emit_signal=False) # if port.name() == resource_label:
# FIXME: recalculate # port.connect_to(input_port, push_undo=True, emit_signal=False)
return # return
if isinstance(input_port.node(), Machine): if isinstance(input_port.node(), Machine):
machine_node: Machine = input_port.node() machine_node: Machine = input_port.node()
new_output_port = global_input_node.add_output(name=resource_label, color=OUTPUT_COLOR) initial_resource_amount = str(machine_node.get_property(in_amount_name(resource_label)))
add_resource_text( new_output_port = output_node.create_global_output(
node=global_input_node, resource_label=resource_label,
name=out_amount_name(resource_label), initial_resource_amount=initial_resource_amount,
text=str(machine_node.get_property(in_amount_name(resource_label))),
border_color=OUTPUT_COLOR,
) )
global_input_node.reorder_outputs()
global_input_node.update_x_pos()
new_output_port.connect_to(input_port, push_undo=True, emit_signal=False) new_output_port.connect_to(input_port, push_undo=True, emit_signal=False)
# FIXME: recalculate elif isinstance(output_node, Machine):
input_node = input_port.node()
resource_label = output_port.name()
if isinstance(input_node, Machine):
resource_amount = output_node.get_resource_output(resource_label)
input_node.update_input(
resource_label,
resource_amount,
)
@click.command @click.command