#!/usr/bin/env python3 import itertools import re from datetime import timedelta import click from NodeGraphQt import BaseNode, NodeBaseWidget from NodeGraphQt import NodeGraph, Port from NodeGraphQt.constants import PortTypeEnum from NodeGraphQt.widgets.node_widgets import _NodeGroupBox from PySide2.QtCore import Qt from PySide2.QtWidgets import QSlider, QLineEdit from Qt import QtWidgets from sqlalchemy import create_engine from sqlalchemy.orm import Session from factorygame.data.common import chose_resource, resource_needs_update, chose_recipe from .models import Recipe, Resource, ResourceFlow from ..helper import prompt WORLD_OUTPUT_PORT_NAME = "World Output" WORLD_OUTPUT_COLOR = (0, 139, 41) CREATE_MACHINE_PORT_NAME = "Create Machine" INPUT_COLOR = (249, 169, 0) OUTPUT_COLOR = (204, 44, 36) OTHER_COLOR = (0, 83, 135) graph = None class NodeSlider(NodeBaseWidget): def __init__(self, name, label="", parent=None): super(NodeSlider, self).__init__(parent) self.set_name(name) self.pure_label = label or name slider = QSlider(Qt.Horizontal) slider.setMinimum(1) slider.setMaximum(250) slider.setValue(100) slider.setSingleStep(1) slider.setTickInterval(10) slider.setTickPosition(QSlider.TicksBelow) slider.valueChanged.connect(self.on_value_changed) slider.valueChanged.connect(self._update_label) self.set_custom_widget(slider) self._update_label() def get_value(self): widget: QSlider = self.get_custom_widget() return widget.value() def set_value(self, value): widget: QSlider = self.get_custom_widget() widget.setValue(value) def _update_label(self): self.set_label(f"{self.pure_label} ({int(self.get_value())}%)") class GlobalInput(BaseNode): __identifier__ = "factorygame" NODE_NAME = "Global Input" def __init__(self): super().__init__() self.model.width = 240 self.set_port_deletion_allowed(True) self.add_special_ports() def add_special_ports(self): self.add_output(WORLD_OUTPUT_PORT_NAME, color=WORLD_OUTPUT_COLOR) self.add_output(CREATE_MACHINE_PORT_NAME, color=OTHER_COLOR) def reorder_outputs(self): self.delete_output(WORLD_OUTPUT_PORT_NAME) self.delete_output(CREATE_MACHINE_PORT_NAME) self.add_special_ports() def update_x_pos(self): min_x_pos = min(map(lambda node: node.x_pos(), filter(lambda node: node != self, graph.all_nodes()))) self.set_x_pos(min_x_pos - self.view.width - 150) class Machine(BaseNode): __identifier__ = "factorygame" NODE_NAME = "FactoryGame Machine" STYLESHEET_BORDER_COLOR_REGEX = re.compile( r"QLineEdit\s*{[^{}]*\s*border:[^;]*(rgb\([^)]+\))", re.MULTILINE | re.DOTALL ) def __init__(self): super().__init__() self.model.width = 240 def assign_recipe(self, recipe: Recipe): for port_idx in range(len(self._inputs)): self.delete_input(port_idx) for port_idx in range(len(self._outputs)): self.delete_output(port_idx) self.set_property("name", recipe.factory.label, push_undo=False) input_resources: dict[str, float] = {} for ingredient in recipe.ingredients: resource_label = ingredient.resource.label if resource_label in input_resources: input_resources[resource_label] += ingredient.amount_per_minute() self.get_widget(in_amount_name(resource_label)).set_value( f"{input_resources[resource_label]:.2f} / min" ) else: port = self.add_input(resource_label, multi_input=True, color=INPUT_COLOR) port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.Machine") port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.GlobalInput") port.add_accept_port_type(CREATE_MACHINE_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput") port.add_accept_port_type(WORLD_OUTPUT_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput") self._add_readonly_resource_text( in_amount_name(resource_label), ingredient, input_resources, resource_label, border_color=INPUT_COLOR, ) output_resources: dict[str, float] = {} for result in recipe.results: resource_label = result.resource.label if resource_label in output_resources: output_resources[resource_label] += result.amount_per_minute() self.get_widget(out_amount_name(resource_label)).set_value( f"{output_resources[resource_label]:.2f} / min" ) else: port = self.add_output(resource_label, color=OUTPUT_COLOR) port.add_accept_port_type(resource_label, PortTypeEnum.IN.value, "factorygame.Machine") self._add_readonly_resource_text( out_amount_name(resource_label), result, output_resources, resource_label, border_color=OUTPUT_COLOR ) performance_slider_name = "machine performance" def set_performance(percentage): factor = percentage / 100.0 for ingredient_label, amount in input_resources.items(): self.get_widget(in_amount_name(ingredient_label)).set_value(f"{amount * factor:.2f} / min") for result_label, amount in output_resources.items(): self.get_widget(out_amount_name(result_label)).set_value(f"{amount * factor:.2f} / min") self.get_widget(performance_slider_name).get_value() slider = NodeSlider(name=performance_slider_name, label="Overclocking Performance", parent=self.view) self.add_custom_widget(slider) slider.value_changed.connect(lambda name, value: set_performance(max(1, min(250, value)))) def _add_readonly_resource_text( self, name: str, flow: ResourceFlow, resource_amounts: dict[str, float], resource_label: str, border_color: tuple, ): resource_amounts[resource_label] = flow.amount_per_minute() text = f"{resource_amounts[resource_label]} / min" add_resource_text(node=self, name=name, text=text, border_color=border_color, readonly=True) def in_amount_name(label: str): return f"In {label} amount" def out_amount_name(label: str): return f"Out {label} amount" def add_resource_text(node: BaseNode, name: str, text: str, border_color: tuple, readonly: bool = False): node.add_text_input(name=name, label=name, text=text) widget = node.get_widget(name) line_edit_widget: QLineEdit = widget.get_custom_widget() if readonly: line_edit_widget.setReadOnly(True) group: _NodeGroupBox = widget.widget() group.setMaximumWidth(220) group.adjustSize() if border_color: stylesheet = line_edit_widget.styleSheet() match = Machine.STYLESHEET_BORDER_COLOR_REGEX.match(stylesheet) stylesheet = ( stylesheet[: match.start(1)] + f"rgb({','.join(map(str, border_color))})" + stylesheet[match.end(1) :] ) line_edit_widget.setStyleSheet(stylesheet) line_edit_widget.update() def on_port_connected(input_port: Port, output_port: Port): global debug if debug: print(f"Port {output_port} connected to {input_port}") if isinstance(output_port.node(), GlobalInput): global_input_node: GlobalInput = output_port.node() resource_label = input_port.name() if output_port.name() == CREATE_MACHINE_PORT_NAME: output_port.clear_connections(push_undo=False, emit_signal=False) with Session(engine) as session: resource = session.scalars(Resource.by_label(resource_label)).one_or_none() if resource_needs_update(resource, recipe_info_timeout=timedelta(days=365)): print("Please fetch resource", resource_label, "first.") return # FIXME: use some Qt UI prompt method recipe = chose_recipe(session=session, resource=resource, prompt=prompt) if recipe is None: return recipe_machine: Machine = graph.create_node("factorygame.Machine", push_undo=True) recipe_machine.assign_recipe(recipe) recipe_machine.update() recipe_machine.set_x_pos(input_port.node().x_pos() - recipe_machine.view.width - 100) recipe_machine.get_output(resource_label).connect_to(input_port) global_input_node.update_x_pos() elif output_port.name() == WORLD_OUTPUT_PORT_NAME: output_port.clear_connections(push_undo=False, emit_signal=False) for idx, port in enumerate(global_input_node.output_ports()): if port.name() == resource_label: port.connect_to(input_port, push_undo=True, emit_signal=False) # FIXME: recalculate return if isinstance(input_port.node(), Machine): machine_node: Machine = input_port.node() new_output_port = global_input_node.add_output(name=resource_label, color=OUTPUT_COLOR) add_resource_text( node=global_input_node, name=out_amount_name(resource_label), text=str(machine_node.get_property(in_amount_name(resource_label))), border_color=OUTPUT_COLOR, ) global_input_node.reorder_outputs() global_input_node.update_x_pos() new_output_port.connect_to(input_port, push_undo=True, emit_signal=False) # FIXME: recalculate @click.command @click.option("--debug", is_flag=True) @click.argument("search") def main(debug: bool, search: str): global engine globals()["debug"] = debug engine = create_engine("sqlite:///file.db", echo=debug) with Session(engine) as session: # FIXME: use some Qt UI prompt method resource = chose_resource(session=session, resource_label=search, prompt=prompt) if resource is None: # FIXME: use concurrent resource searching/fetching # ret = data_provider.search_for_resource(session=session, search=search) ret = None if ret is None: print("Resource not found") return resource, exists_in_db = ret if not exists_in_db: print("Resource not yet fetched, run fetch first") return if resource_needs_update(resource, recipe_info_timeout=timedelta(days=365)): print("Please fetch resource", resource.label, "first.") return # FIXME: use some Qt UI prompt method recipe = chose_recipe(session=session, resource=resource, prompt=prompt) if recipe is None: return app = QtWidgets.QApplication([]) global graph graph = NodeGraph() graph.widget.resize(1280, 720) graph.register_node(Machine) graph.register_node(GlobalInput) graph_widget = graph.widget graph_widget.show() global_input = graph.create_node("factorygame.GlobalInput", push_undo=False) recipe_machine = graph.create_node("factorygame.Machine", push_undo=False) recipe_machine.assign_recipe(recipe) graph.auto_layout_nodes([global_input, recipe_machine]) global_input.set_y_pos(recipe_machine.y_pos()) global_input.update_x_pos() graph.center_on([global_input, recipe_machine]) graph.port_connected.connect(on_port_connected) app.exec_() if __name__ == "__main__": main()