#!/usr/bin/env python3 import re from datetime import timedelta import click from NodeGraphQt import BaseNode, NodeBaseWidget from NodeGraphQt import NodeGraph, Port from NodeGraphQt.constants import PortTypeEnum, NodePropWidgetEnum from NodeGraphQt.widgets.node_widgets import _NodeGroupBox, NodeLineEdit, NodeCheckBox from PySide2.QtCore import Qt from PySide2.QtWidgets import QSlider, QLineEdit, QCheckBox, QGraphicsItem from Qt import QtWidgets from sqlalchemy import create_engine from sqlalchemy.orm import Session from factorygame.data.common import chose_resource, resource_needs_update, chose_recipe from .models import Recipe, Resource, ResourceFlow from ..helper import prompt WORLD_OUTPUT_PORT_NAME = "World Output" WORLD_OUTPUT_COLOR = (0, 139, 41) CREATE_MACHINE_PORT_NAME = "Create Machine" INPUT_COLOR = (249, 169, 0) OUTPUT_COLOR = (204, 44, 36) OTHER_COLOR = (0, 83, 135) graph: NodeGraph def in_amount_name(label: str) -> str: return f"In {label} amount" def out_amount_name(label: str) -> str: return f"Out {label} amount" def port_amount_name(port: Port) -> str: if port.type_() == PortTypeEnum.IN: return in_amount_name(port.name()) elif port.type_() == PortTypeEnum.OUT: return out_amount_name(port.name()) else: raise TypeError(f"Invalid port type {port.type_()}") def add_resource_text( node: BaseNode, name: str, text: str, border_color: tuple, readonly: bool = False ) -> NodeLineEdit: node.add_text_input(name=name, label=name, text=text) widget: NodeLineEdit = node.get_widget(name) line_edit_widget: QLineEdit = widget.get_custom_widget() if readonly: line_edit_widget.setReadOnly(True) group: _NodeGroupBox = widget.widget() group.setMaximumWidth(220) group.adjustSize() if border_color: stylesheet = line_edit_widget.styleSheet() match = Machine.STYLESHEET_BORDER_COLOR_REGEX.match(stylesheet) stylesheet = ( stylesheet[: match.start(1)] + f"rgb({','.join(map(str, border_color))})" + stylesheet[match.end(1) :] ) line_edit_widget.setStyleSheet(stylesheet) line_edit_widget.update() return widget def parse_resource_amount(text: str) -> float: return float(text.strip().rstrip("min").rstrip().rstrip("/").rstrip()) def resource_amount_to_text(amount: float): return f"{amount:.2f} / min" class NodeSlider(NodeBaseWidget): MIN_VALUE = 1 MAX_VALUE = 250 DEFAULT_VALUE = 100 def __init__(self, name: str, label: str = "", parent: QGraphicsItem = None, readonly: bool = False): super(NodeSlider, self).__init__(parent) self.set_name(name) self.pure_label = label or name slider = QSlider(Qt.Horizontal) slider.setEnabled(not readonly) slider.setRange(self.MIN_VALUE, self.MAX_VALUE) slider.setValue(self.DEFAULT_VALUE) slider.setSingleStep(1) slider.setTickInterval(10) slider.setTickPosition(QSlider.TicksBelow) slider.valueChanged.connect(self.on_value_changed) slider.valueChanged.connect(self._update_label) self.set_custom_widget(slider) self._update_label() def get_custom_widget(self) -> QSlider: widget = super().get_custom_widget() assert isinstance(widget, QSlider), f"Unexpected widget, not a QSlider: {widget}" return widget def get_value(self) -> int: return self.get_custom_widget().value() def set_value(self, value: int): self.get_custom_widget().setValue(max(self.MIN_VALUE, min(self.MAX_VALUE, value))) def _update_label(self): self.set_label(f"{self.pure_label} ({self.get_value()}%)") class GlobalInput(BaseNode): __identifier__ = "factorygame" NODE_NAME = "Global Input" def __init__(self): super().__init__() self.model.width = 240 self.set_port_deletion_allowed(True) self.add_special_ports() self.output_resources: dict[str, float] = {} def add_special_ports(self): self.add_output(WORLD_OUTPUT_PORT_NAME, multi_output=False, color=WORLD_OUTPUT_COLOR) self.add_output(CREATE_MACHINE_PORT_NAME, multi_output=False, color=OTHER_COLOR) def reorder_outputs(self): self.delete_output(WORLD_OUTPUT_PORT_NAME) self.delete_output(CREATE_MACHINE_PORT_NAME) self.add_special_ports() def update_x_pos(self): min_x_pos = min(map(lambda node: node.x_pos(), filter(lambda node: node != self, graph.all_nodes()))) self.set_x_pos(min_x_pos - self.view.width - 150) def create_global_output(self, resource_label: str, initial_resource_amount: str) -> Port: new_output_port = self.add_output(name=resource_label, multi_output=False, color=OUTPUT_COLOR) widget = add_resource_text( node=self, name=out_amount_name(resource_label), text=initial_resource_amount, border_color=OUTPUT_COLOR, ) self.output_resources[resource_label] = parse_resource_amount(initial_resource_amount) widget.value_changed.connect(lambda: self.update_resource_output(resource_label)) self.reorder_outputs() self.update_x_pos() return new_output_port def update_resource_output(self, resource_label: str): widget: NodeLineEdit = self.get_widget(out_amount_name(resource_label)) assert resource_label in self.output_resources new_amount = parse_resource_amount(widget.get_value()) if self.output_resources[resource_label] != new_amount: self.output_resources[resource_label] = new_amount widget.set_value(resource_amount_to_text(self.output_resources[resource_label])) resource_port = self.get_output(resource_label) for connected_port in resource_port.connected_ports(): node = connected_port.node() assert isinstance(node, Machine), f"Connected node must be a machine: {node}" node.update_input(resource_label, new_amount) class Machine(BaseNode): __identifier__ = "factorygame" NODE_NAME = "FactoryGame Machine" MAXIMUM_PERFORMANCE_PROP = "machine max perf" ACTUAL_PERFORMANCE_PROP = "machine actual perf" AUTOMATIC_PERFORMANCE_PROP = "machine automatic" STYLESHEET_BORDER_COLOR_REGEX = re.compile( r"QLineEdit\s*{[^{}]*\s*border:[^;]*(rgb\([^)]+\))", re.MULTILINE | re.DOTALL ) def __init__(self): super().__init__() self.model.width = 240 self.input_resources: dict[str, float] = {} self.possible_input_resources: dict[str, float] = {} self.output_resources: dict[str, float] = {} self.add_checkbox(Machine.AUTOMATIC_PERFORMANCE_PROP, text="Automatic Mode", state=True) widget: NodeCheckBox = self.get_widget(Machine.AUTOMATIC_PERFORMANCE_PROP) widget.value_changed.connect(self.recalculate_factor) checkbox_widget: QCheckBox = widget.get_custom_widget() checkbox_widget.setStyleSheet( checkbox_widget.styleSheet() + "\nQCheckBox {\n" f"background-color: transparent;\n" "}" ) self.add_text_input( name=Machine.ACTUAL_PERFORMANCE_PROP, label="Overall Production", text="100", ) widget: NodeLineEdit = self.get_widget(Machine.ACTUAL_PERFORMANCE_PROP) line_edit_widget: QLineEdit = widget.get_custom_widget() line_edit_widget.setReadOnly(True) def max_performance_changed(self): self.set_property(Machine.AUTOMATIC_PERFORMANCE_PROP, False) self.recalculate_factor() def recalculate_factor(self): max_factor: float = self.get_property(Machine.MAXIMUM_PERFORMANCE_PROP) / 100.0 if max_factor < 1.0 and self.get_property(Machine.AUTOMATIC_PERFORMANCE_PROP): per_machine_factor = 1.0 else: per_machine_factor = max_factor all_machines_factor = None for resource_label, wanted_resource in self.input_resources.items(): if len(self.get_input(resource_label).connected_ports()) != 0: new_factor = self.possible_input_resources[resource_label] / wanted_resource / per_machine_factor if all_machines_factor is None: all_machines_factor = new_factor elif new_factor < all_machines_factor: all_machines_factor = new_factor if all_machines_factor is None: all_machines_factor = per_machine_factor new_perf = int(all_machines_factor * 100.0) if int(self.get_property(Machine.ACTUAL_PERFORMANCE_PROP)) != new_perf: self.get_widget(name=Machine.ACTUAL_PERFORMANCE_PROP).set_value(str(new_perf)) self.actual_performance_changed(name=Machine.ACTUAL_PERFORMANCE_PROP, factor=all_machines_factor) if self.get_property(Machine.AUTOMATIC_PERFORMANCE_PROP): self.set_property(name=Machine.MAXIMUM_PERFORMANCE_PROP, value=100, push_undo=False) def update_input(self, resource_label: str, value: float): self.possible_input_resources[resource_label] = value self.recalculate_factor() def actual_performance_changed(self, name: str, factor: float): if name == Machine.ACTUAL_PERFORMANCE_PROP: for ingredient_label, amount in self.input_resources.items(): resource_text_widget = self.get_widget(in_amount_name(ingredient_label)) assert ( resource_text_widget is not None ), f"No ingredient resource text widget found for {ingredient_label}" resource_text_widget.set_value(resource_amount_to_text(amount * factor)) for result_label, amount in self.output_resources.items(): resource_text_widget = self.get_widget(out_amount_name(result_label)) assert resource_text_widget is not None, f"No result resource text widget found for {result_label}" new_output_value = amount * factor resource_text_widget.set_value(resource_amount_to_text(new_output_value)) for port in self.get_output(result_label).connected_ports(): node = port.node() assert isinstance(node, Machine), f"Connected node must be a machine: {node}" node.update_input( resource_label=result_label, value=new_output_value, ) def assign_recipe(self, recipe: Recipe): for port_idx in range(len(self._inputs)): self.delete_input(port_idx) for port_idx in range(len(self._outputs)): self.delete_output(port_idx) self.set_property("name", recipe.factory.label, push_undo=False) for ingredient in recipe.ingredients: resource_label = ingredient.resource.label if resource_label in self.input_resources: self.input_resources[resource_label] += ingredient.amount_per_minute() self.get_widget(in_amount_name(resource_label)).set_value( resource_amount_to_text(self.input_resources[resource_label]) ) else: port = self.add_input(resource_label, multi_input=False, color=INPUT_COLOR) port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.Machine") port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.GlobalInput") port.add_accept_port_type(CREATE_MACHINE_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput") port.add_accept_port_type(WORLD_OUTPUT_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput") self._add_readonly_resource_text( in_amount_name(resource_label), ingredient, self.input_resources, resource_label, border_color=INPUT_COLOR, ) for result in recipe.results: resource_label = result.resource.label if resource_label in self.output_resources: self.output_resources[resource_label] += result.amount_per_minute() self.get_widget(out_amount_name(resource_label)).set_value( resource_amount_to_text(self.output_resources[resource_label]) ) else: port = self.add_output(resource_label, multi_output=False, color=OUTPUT_COLOR) port.add_accept_port_type(resource_label, PortTypeEnum.IN.value, "factorygame.Machine") self._add_readonly_resource_text( out_amount_name(resource_label), result, self.output_resources, resource_label, border_color=OUTPUT_COLOR, ) self.possible_input_resources.update(self.input_resources) slider = NodeSlider(name=Machine.MAXIMUM_PERFORMANCE_PROP, label="Overclocking Performance", parent=self.view) self.add_custom_widget(widget=slider, widget_type=NodePropWidgetEnum.SLIDER.value) slider.value_changed.connect(self.max_performance_changed) def _add_readonly_resource_text( self, name: str, flow: ResourceFlow, resource_amounts: dict[str, float], resource_label: str, border_color: tuple, ) -> NodeLineEdit: resource_amounts[resource_label] = flow.amount_per_minute() text = resource_amount_to_text(resource_amounts[resource_label]) return add_resource_text(node=self, name=name, text=text, border_color=border_color, readonly=True) def get_resource_output(self, resource_label: str) -> float: return self.output_resources[resource_label] * (int(self.get_property(Machine.ACTUAL_PERFORMANCE_PROP)) / 100.0) def on_port_connected(input_port: Port, output_port: Port): global debug if debug: print(f"Port {output_port} connected to {input_port}") output_node = output_port.node() if isinstance(output_node, GlobalInput): resource_label = input_port.name() if output_port.name() == CREATE_MACHINE_PORT_NAME: output_port.clear_connections(push_undo=False, emit_signal=False) with Session(engine) as session: resource = session.scalars(Resource.by_label(resource_label)).one_or_none() if resource_needs_update(resource, recipe_info_timeout=timedelta(days=365)): print("Please fetch resource", resource_label, "first.") return # FIXME: use some Qt UI prompt method recipe = chose_recipe(session=session, resource=resource, prompt=prompt) if recipe is None: return recipe_machine: Machine = graph.create_node("factorygame.Machine", push_undo=True) recipe_machine.assign_recipe(recipe) recipe_machine.update() recipe_machine.set_x_pos(input_port.node().x_pos() - recipe_machine.view.width - 100) recipe_machine.get_output(resource_label).connect_to(input_port) output_node.update_x_pos() elif output_port.name() == WORLD_OUTPUT_PORT_NAME: output_port.clear_connections(push_undo=False, emit_signal=False) for idx, port in enumerate(output_node.output_ports()): assert port.name() != resource_label, f"Duplicate output port for {resource_label}" # if port.name() == resource_label: # port.connect_to(input_port, push_undo=True, emit_signal=False) # return if isinstance(input_port.node(), Machine): machine_node: Machine = input_port.node() initial_resource_amount = str(machine_node.get_property(in_amount_name(resource_label))) new_output_port = output_node.create_global_output( resource_label=resource_label, initial_resource_amount=initial_resource_amount, ) new_output_port.connect_to(input_port, push_undo=True, emit_signal=False) elif isinstance(output_node, Machine): input_node = input_port.node() resource_label = output_port.name() if isinstance(input_node, Machine): resource_amount = output_node.get_resource_output(resource_label) input_node.update_input( resource_label, resource_amount, ) @click.command @click.option("--debug", is_flag=True) @click.argument("search") def main(debug: bool, search: str): global engine globals()["debug"] = debug engine = create_engine("sqlite:///file.db", echo=debug) with Session(engine) as session: # FIXME: use some Qt UI prompt method resource = chose_resource(session=session, resource_label=search, prompt=prompt) if resource is None: # FIXME: use concurrent resource searching/fetching # ret = data_provider.search_for_resource(session=session, search=search) ret = None if ret is None: print("Resource not found") return resource, exists_in_db = ret if not exists_in_db: print("Resource not yet fetched, run fetch first") return if resource_needs_update(resource, recipe_info_timeout=timedelta(days=365)): print("Please fetch resource", resource.label, "first.") return # FIXME: use some Qt UI prompt method recipe = chose_recipe(session=session, resource=resource, prompt=prompt) if recipe is None: return app = QtWidgets.QApplication([]) global graph graph = NodeGraph() graph.widget.resize(1280, 720) graph.register_node(Machine) graph.register_node(GlobalInput) graph_widget = graph.widget graph_widget.show() global_input = graph.create_node("factorygame.GlobalInput", push_undo=False) recipe_machine = graph.create_node("factorygame.Machine", push_undo=False) recipe_machine.assign_recipe(recipe) graph.auto_layout_nodes([global_input, recipe_machine]) global_input.set_y_pos(recipe_machine.y_pos()) global_input.update_x_pos() graph.center_on([global_input, recipe_machine]) graph.port_connected.connect(on_port_connected) app.exec_() if __name__ == "__main__": main()