Compare commits

...

3 commits

2 changed files with 298 additions and 73 deletions

View file

@ -11,7 +11,7 @@ from ..helper import prompt
@click.command()
@click.option("--result", is_flag=True)
@click.option("--result", is_flag=True, default=True)
@click.option("--debug", is_flag=True)
@click.option("--refetch", is_flag=True)
@click.argument("search")

View file

@ -1,31 +1,96 @@
#!/usr/bin/env python3
import re
from datetime import timedelta
import click
from NodeGraphQt import BaseNode, NodeBaseWidget
from NodeGraphQt import NodeGraph, Port
from NodeGraphQt.constants import PortTypeEnum
from NodeGraphQt.constants import PortTypeEnum, NodePropWidgetEnum
from NodeGraphQt.widgets.node_widgets import _NodeGroupBox, NodeLineEdit, NodeCheckBox
from PySide2.QtCore import Qt
from PySide2.QtWidgets import QSlider
from PySide2.QtWidgets import QSlider, QLineEdit, QCheckBox, QGraphicsItem
from Qt import QtWidgets
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from factorygame.data.common import chose_resource, resource_needs_update, chose_recipe
from .models import Recipe, Resource
from .models import Recipe, Resource, ResourceFlow
from ..helper import prompt
WORLD_OUTPUT_PORT_NAME = "World Output"
WORLD_OUTPUT_COLOR = (0, 139, 41)
CREATE_MACHINE_PORT_NAME = "Create Machine"
INPUT_COLOR = (249, 169, 0)
OUTPUT_COLOR = (204, 44, 36)
OTHER_COLOR = (0, 83, 135)
graph: NodeGraph
def in_amount_name(label: str) -> str:
return f"In {label} amount"
def out_amount_name(label: str) -> str:
return f"Out {label} amount"
def port_amount_name(port: Port) -> str:
if port.type_() == PortTypeEnum.IN:
return in_amount_name(port.name())
elif port.type_() == PortTypeEnum.OUT:
return out_amount_name(port.name())
else:
raise TypeError(f"Invalid port type {port.type_()}")
def add_resource_text(
node: BaseNode, name: str, text: str, border_color: tuple, readonly: bool = False
) -> NodeLineEdit:
node.add_text_input(name=name, label=name, text=text)
widget: NodeLineEdit = node.get_widget(name)
line_edit_widget: QLineEdit = widget.get_custom_widget()
if readonly:
line_edit_widget.setReadOnly(True)
group: _NodeGroupBox = widget.widget()
group.setMaximumWidth(220)
group.adjustSize()
if border_color:
stylesheet = line_edit_widget.styleSheet()
match = Machine.STYLESHEET_BORDER_COLOR_REGEX.match(stylesheet)
stylesheet = (
stylesheet[: match.start(1)] + f"rgb({','.join(map(str, border_color))})" + stylesheet[match.end(1) :]
)
line_edit_widget.setStyleSheet(stylesheet)
line_edit_widget.update()
return widget
def parse_resource_amount(text: str) -> float:
return float(text.strip().rstrip("min").rstrip().rstrip("/").rstrip())
def resource_amount_to_text(amount: float):
return f"{amount:.2f} / min"
class NodeSlider(NodeBaseWidget):
def __init__(self, name, label="", parent=None):
MIN_VALUE = 1
MAX_VALUE = 250
DEFAULT_VALUE = 100
def __init__(self, name: str, label: str = "", parent: QGraphicsItem = None, readonly: bool = False):
super(NodeSlider, self).__init__(parent)
self.set_name(name)
self.pure_label = label or name
slider = QSlider(Qt.Horizontal)
slider.setMinimum(1)
slider.setMaximum(250)
slider.setValue(100)
slider.setEnabled(not readonly)
slider.setRange(self.MIN_VALUE, self.MAX_VALUE)
slider.setValue(self.DEFAULT_VALUE)
slider.setSingleStep(1)
slider.setTickInterval(10)
slider.setTickPosition(QSlider.TicksBelow)
@ -34,16 +99,19 @@ class NodeSlider(NodeBaseWidget):
self.set_custom_widget(slider)
self._update_label()
def get_value(self):
widget: QSlider = self.get_custom_widget()
return widget.value()
def get_custom_widget(self) -> QSlider:
widget = super().get_custom_widget()
assert isinstance(widget, QSlider), f"Unexpected widget, not a QSlider: {widget}"
return widget
def set_value(self, value):
widget: QSlider = self.get_custom_widget()
widget.setValue(value)
def get_value(self) -> int:
return self.get_custom_widget().value()
def set_value(self, value: int):
self.get_custom_widget().setValue(max(self.MIN_VALUE, min(self.MAX_VALUE, value)))
def _update_label(self):
self.set_label(f"{self.pure_label} ({int(self.get_value())}%)")
self.set_label(f"{self.pure_label} ({self.get_value()}%)")
class GlobalInput(BaseNode):
@ -52,13 +120,140 @@ class GlobalInput(BaseNode):
def __init__(self):
super().__init__()
self.add_output("Create Machine")
self.model.width = 240
self.set_port_deletion_allowed(True)
self.add_special_ports()
self.output_resources: dict[str, float] = {}
def add_special_ports(self):
self.add_output(WORLD_OUTPUT_PORT_NAME, multi_output=False, color=WORLD_OUTPUT_COLOR)
self.add_output(CREATE_MACHINE_PORT_NAME, multi_output=False, color=OTHER_COLOR)
def reorder_outputs(self):
self.delete_output(WORLD_OUTPUT_PORT_NAME)
self.delete_output(CREATE_MACHINE_PORT_NAME)
self.add_special_ports()
def update_x_pos(self):
min_x_pos = min(map(lambda node: node.x_pos(), filter(lambda node: node != self, graph.all_nodes())))
self.set_x_pos(min_x_pos - self.view.width - 150)
def create_global_output(self, resource_label: str, initial_resource_amount: str) -> Port:
new_output_port = self.add_output(name=resource_label, multi_output=False, color=OUTPUT_COLOR)
widget = add_resource_text(
node=self,
name=out_amount_name(resource_label),
text=initial_resource_amount,
border_color=OUTPUT_COLOR,
)
self.output_resources[resource_label] = parse_resource_amount(initial_resource_amount)
widget.value_changed.connect(lambda: self.update_resource_output(resource_label))
self.reorder_outputs()
self.update_x_pos()
return new_output_port
def update_resource_output(self, resource_label: str):
widget: NodeLineEdit = self.get_widget(out_amount_name(resource_label))
assert resource_label in self.output_resources
new_amount = parse_resource_amount(widget.get_value())
if self.output_resources[resource_label] != new_amount:
self.output_resources[resource_label] = new_amount
widget.set_value(resource_amount_to_text(self.output_resources[resource_label]))
resource_port = self.get_output(resource_label)
for connected_port in resource_port.connected_ports():
node = connected_port.node()
assert isinstance(node, Machine), f"Connected node must be a machine: {node}"
node.update_input(resource_label, new_amount)
class Machine(BaseNode):
__identifier__ = "factorygame"
NODE_NAME = "FactoryGame Machine"
MAXIMUM_PERFORMANCE_PROP = "machine max perf"
ACTUAL_PERFORMANCE_PROP = "machine actual perf"
AUTOMATIC_PERFORMANCE_PROP = "machine automatic"
STYLESHEET_BORDER_COLOR_REGEX = re.compile(
r"QLineEdit\s*{[^{}]*\s*border:[^;]*(rgb\([^)]+\))", re.MULTILINE | re.DOTALL
)
def __init__(self):
super().__init__()
self.model.width = 240
self.input_resources: dict[str, float] = {}
self.possible_input_resources: dict[str, float] = {}
self.output_resources: dict[str, float] = {}
self.add_checkbox(Machine.AUTOMATIC_PERFORMANCE_PROP, text="Automatic Mode", state=True)
widget: NodeCheckBox = self.get_widget(Machine.AUTOMATIC_PERFORMANCE_PROP)
widget.value_changed.connect(self.recalculate_factor)
checkbox_widget: QCheckBox = widget.get_custom_widget()
checkbox_widget.setStyleSheet(
checkbox_widget.styleSheet() + "\nQCheckBox {\n" f"background-color: transparent;\n" "}"
)
self.actual_performance_slider = NodeSlider(
name=Machine.ACTUAL_PERFORMANCE_PROP,
label="Machine Production Factor",
parent=self.view,
readonly=True,
)
self.add_custom_widget(
widget=self.actual_performance_slider,
widget_type=NodePropWidgetEnum.HIDDEN.value,
)
self.actual_performance_slider.value_changed.connect(self.actual_performance_changed)
def max_performance_changed(self):
self.set_property(Machine.AUTOMATIC_PERFORMANCE_PROP, False)
self.recalculate_factor()
def recalculate_factor(self):
max_factor: float = self.get_property(Machine.MAXIMUM_PERFORMANCE_PROP) / 100.0
if max_factor < 1.0 and self.get_property(Machine.AUTOMATIC_PERFORMANCE_PROP):
new_factor = 1.0
else:
new_factor = max_factor
for resource_label, wanted_resource in self.input_resources.items():
if len(self.get_input(resource_label).connected_ports()) != 0:
input_factor = self.possible_input_resources[resource_label] / wanted_resource
if input_factor < new_factor:
new_factor = input_factor
new_perf = int(new_factor * 100.0)
if self.get_property(Machine.ACTUAL_PERFORMANCE_PROP) != new_perf:
self.set_property(name=Machine.ACTUAL_PERFORMANCE_PROP, value=new_perf, push_undo=False)
if (
self.get_property(Machine.AUTOMATIC_PERFORMANCE_PROP)
and self.get_property(Machine.MAXIMUM_PERFORMANCE_PROP) != new_perf
):
self.set_property(name=Machine.MAXIMUM_PERFORMANCE_PROP, value=new_perf, push_undo=False)
def update_input(self, resource_label: str, value: float):
self.possible_input_resources[resource_label] = value
self.recalculate_factor()
def actual_performance_changed(self, name: str, perf: int):
if name == Machine.ACTUAL_PERFORMANCE_PROP:
factor = perf / 100.0
for ingredient_label, amount in self.input_resources.items():
resource_text_widget = self.get_widget(in_amount_name(ingredient_label))
assert (
resource_text_widget is not None
), f"No ingredient resource text widget found for {ingredient_label}"
resource_text_widget.set_value(resource_amount_to_text(amount * factor))
for result_label, amount in self.output_resources.items():
resource_text_widget = self.get_widget(out_amount_name(result_label))
assert resource_text_widget is not None, f"No result resource text widget found for {result_label}"
new_output_value = amount * factor
resource_text_widget.set_value(resource_amount_to_text(new_output_value))
for port in self.get_output(result_label).connected_ports():
node = port.node()
assert isinstance(node, Machine), f"Connected node must be a machine: {node}"
node.update_input(
resource_label=result_label,
value=new_output_value,
)
def assign_recipe(self, recipe: Recipe):
for port_idx in range(len(self._inputs)):
self.delete_input(port_idx)
@ -68,86 +263,117 @@ class Machine(BaseNode):
self.set_property("name", recipe.factory.label, push_undo=False)
def in_amount_name(resource_label: str):
return f"In {resource_label} amount"
def out_amount_name(resource_label: str):
return f"Out {resource_label} amount"
input_resources: dict[str, float] = {}
for ingredient in recipe.ingredients:
resource_label = ingredient.resource.label
if resource_label in input_resources:
input_resources[resource_label] += ingredient.amount_per_minute()
if resource_label in self.input_resources:
self.input_resources[resource_label] += ingredient.amount_per_minute()
self.get_widget(in_amount_name(resource_label)).set_value(
f"{input_resources[resource_label]:.2f} / min"
resource_amount_to_text(self.input_resources[resource_label])
)
else:
port = self.add_input(resource_label, color=(180, 80, 0))
port = self.add_input(resource_label, multi_input=False, color=INPUT_COLOR)
port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.Machine")
port.add_accept_port_type("Create Machine", PortTypeEnum.OUT.value, "factorygame.GlobalInput")
self._add_text_label(in_amount_name(resource_label), ingredient, input_resources, resource_label)
port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.GlobalInput")
port.add_accept_port_type(CREATE_MACHINE_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput")
port.add_accept_port_type(WORLD_OUTPUT_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput")
self._add_readonly_resource_text(
in_amount_name(resource_label),
ingredient,
self.input_resources,
resource_label,
border_color=INPUT_COLOR,
)
output_resources: dict[str, float] = {}
for result in recipe.results:
resource_label = result.resource.label
if resource_label in output_resources:
output_resources[resource_label] += result.amount_per_minute()
if resource_label in self.output_resources:
self.output_resources[resource_label] += result.amount_per_minute()
self.get_widget(out_amount_name(resource_label)).set_value(
f"{output_resources[resource_label]:.2f} / min"
resource_amount_to_text(self.output_resources[resource_label])
)
else:
port = self.add_output(resource_label, color=(200, 20, 0))
port = self.add_output(resource_label, multi_output=False, color=OUTPUT_COLOR)
port.add_accept_port_type(resource_label, PortTypeEnum.IN.value, "factorygame.Machine")
self._add_text_label(out_amount_name(resource_label), result, output_resources, resource_label)
self._add_readonly_resource_text(
out_amount_name(resource_label),
result,
self.output_resources,
resource_label,
border_color=OUTPUT_COLOR,
)
performance_slider_name = "machine performance"
self.possible_input_resources.update(self.input_resources)
def set_performance(percentage):
factor = percentage / 100.0
for ingredient_label, amount in input_resources.items():
self.get_widget(in_amount_name(ingredient_label)).set_value(f"{amount * factor:.2f} / min")
for result_label, amount in output_resources.items():
self.get_widget(out_amount_name(result_label)).set_value(f"{amount * factor:.2f} / min")
self.get_widget(performance_slider_name).get_value()
slider = NodeSlider(name=Machine.MAXIMUM_PERFORMANCE_PROP, label="Overclocking Performance", parent=self.view)
self.add_custom_widget(widget=slider, widget_type=NodePropWidgetEnum.SLIDER.value)
slider.value_changed.connect(self.max_performance_changed)
slider = NodeSlider(name=performance_slider_name, label="Overclocking Performance", parent=self.view)
self.add_custom_widget(slider)
slider.value_changed.connect(lambda name, value: set_performance(max(1, min(250, value))))
def _add_text_label(self, name, flow, resource_amounts, resource_label):
def _add_readonly_resource_text(
self,
name: str,
flow: ResourceFlow,
resource_amounts: dict[str, float],
resource_label: str,
border_color: tuple,
) -> NodeLineEdit:
resource_amounts[resource_label] = flow.amount_per_minute()
self.add_text_input(name=name, label=name, text=f"{resource_amounts[resource_label]} / min")
widget = self.get_widget(name)
widget.get_custom_widget().setReadOnly(True)
widget.widget().setMaximumWidth(220)
text = resource_amount_to_text(resource_amounts[resource_label])
return add_resource_text(node=self, name=name, text=text, border_color=border_color, readonly=True)
def get_resource_output(self, resource_label: str) -> float:
return self.output_resources[resource_label] * (self.get_property(Machine.ACTUAL_PERFORMANCE_PROP) / 100.0)
def on_port_connected(input_port: Port, output_port: Port):
global debug
if debug:
print(f"Port {output_port} connected to {input_port}")
if isinstance(output_port.node(), GlobalInput) and output_port.name() == "Create Machine":
output_port.clear_connections(push_undo=False, emit_signal=False)
with Session(engine) as session:
resource_label = input_port.name()
resource = session.scalars(Resource.by_label(resource_label)).one_or_none()
if resource_needs_update(resource, recipe_info_timeout=timedelta(days=365)):
print("Please fetch resource", resource_label, "first.")
return
output_node = output_port.node()
if isinstance(output_node, GlobalInput):
resource_label = input_port.name()
if output_port.name() == CREATE_MACHINE_PORT_NAME:
output_port.clear_connections(push_undo=False, emit_signal=False)
with Session(engine) as session:
resource = session.scalars(Resource.by_label(resource_label)).one_or_none()
if resource_needs_update(resource, recipe_info_timeout=timedelta(days=365)):
print("Please fetch resource", resource_label, "first.")
return
# FIXME: use some Qt UI prompt method
recipe = chose_recipe(session=session, resource=resource, prompt=prompt)
if recipe is None:
return
# FIXME: use some Qt UI prompt method
recipe = chose_recipe(session=session, resource=resource, prompt=prompt)
if recipe is None:
return
recipe_machine = graph.create_node("factorygame.Machine", push_undo=True)
recipe_machine.assign_recipe(recipe)
recipe_machine.update()
recipe_machine.set_x_pos(input_port.node().x_pos() - recipe_machine.view.width - 200)
recipe_machine.get_output(input_port.name()).connect_to(input_port)
if recipe_machine.x_pos() - (global_input.x_pos() - global_input.view.width) < 200:
global_input.set_x_pos(recipe_machine.x_pos() - global_input.view.width - 200)
recipe_machine: Machine = graph.create_node("factorygame.Machine", push_undo=True)
recipe_machine.assign_recipe(recipe)
recipe_machine.update()
recipe_machine.set_x_pos(input_port.node().x_pos() - recipe_machine.view.width - 100)
recipe_machine.get_output(resource_label).connect_to(input_port)
output_node.update_x_pos()
elif output_port.name() == WORLD_OUTPUT_PORT_NAME:
output_port.clear_connections(push_undo=False, emit_signal=False)
for idx, port in enumerate(output_node.output_ports()):
assert port.name() != resource_label, f"Duplicate output port for {resource_label}"
# if port.name() == resource_label:
# port.connect_to(input_port, push_undo=True, emit_signal=False)
# return
if isinstance(input_port.node(), Machine):
machine_node: Machine = input_port.node()
initial_resource_amount = str(machine_node.get_property(in_amount_name(resource_label)))
new_output_port = output_node.create_global_output(
resource_label=resource_label,
initial_resource_amount=initial_resource_amount,
)
new_output_port.connect_to(input_port, push_undo=True, emit_signal=False)
elif isinstance(output_node, Machine):
input_node = input_port.node()
resource_label = output_port.name()
if isinstance(input_node, Machine):
resource_amount = output_node.get_resource_output(resource_label)
input_node.update_input(
resource_label,
resource_amount,
)
@click.command
@ -190,13 +416,12 @@ def main(debug: bool, search: str):
graph.register_node(GlobalInput)
graph_widget = graph.widget
graph_widget.show()
global global_input
global_input = graph.create_node("factorygame.GlobalInput", push_undo=False)
recipe_machine = graph.create_node("factorygame.Machine", push_undo=False)
recipe_machine.assign_recipe(recipe)
graph.auto_layout_nodes([global_input, recipe_machine])
global_input.set_y_pos(recipe_machine.y_pos())
global_input.set_x_pos(global_input.x_pos() - global_input.model.width - 200)
global_input.update_x_pos()
graph.center_on([global_input, recipe_machine])
graph.port_connected.connect(on_port_connected)
app.exec_()