#!/usr/bin/env python3 import re from concurrent.futures import Future import click import sqlalchemy from NodeGraphQt import BaseNode, NodeBaseWidget from NodeGraphQt import NodeGraph, Port from NodeGraphQt.constants import PortTypeEnum, NodePropWidgetEnum, ViewerEnum from NodeGraphQt.widgets.node_widgets import _NodeGroupBox, NodeLineEdit, NodeCheckBox from PySide2 import QtGui from PySide2.QtCore import Qt, QObject, QThread from PySide2.QtWidgets import QSlider, QLineEdit, QCheckBox, QGraphicsItem, QInputDialog, QDialog, QLabel, QSizePolicy from Qt import QtWidgets from sqlalchemy import create_engine from sqlalchemy.orm import Session from factorygame.data.common import chose_resource, resource_needs_update, chose_recipe from factorygame.data.sfp import SatisfactoryPlus, DEFAULT_IGNORE_FACTORIES from .models import Recipe, Resource, ResourceFlow WORLD_INPUT_PORT_NAME = "World Input" WORLD_OUTPUT_PORT_NAME = "World Output" WORLD_PORT_COLOR = (0, 139, 41) CREATE_MACHINE_PORT_NAME = "Create Machine" INPUT_COLOR = (249, 169, 0) OUTPUT_COLOR = (204, 44, 36) OTHER_COLOR = (0, 83, 135) def generate_fgbg_stylesheet(bg_alpha: float = 1.0) -> str: dark_bg = QtGui.QColor(*ViewerEnum.BACKGROUND_COLOR.value).darker(120) dark_bg.setAlphaF(bg_alpha) bg_color = dark_bg.getRgb() text_color = tuple(map(str, map(lambda i, j: i - j, (255, 255, 255), bg_color))) return ( "* {" " background-color: rgba(" + ",".join(map(str, bg_color)) + ");" " color: rgb(" + ",".join(text_color) + ");" "}" ) def in_amount_name(label: str) -> str: return f"In {label} amount" def out_amount_name(label: str) -> str: return f"Out {label} amount" def port_amount_name(port: Port) -> str: if port.type_() == PortTypeEnum.IN: return in_amount_name(port.name()) elif port.type_() == PortTypeEnum.OUT: return out_amount_name(port.name()) else: raise TypeError(f"Invalid port type {port.type_()}") def add_resource_text( node: BaseNode, name: str, text: str, border_color: tuple, readonly: bool = False ) -> NodeLineEdit: node.add_text_input(name=name, label=name, text=text) widget: NodeLineEdit = node.get_widget(name) line_edit_widget: QLineEdit = widget.get_custom_widget() if readonly: line_edit_widget.setReadOnly(True) group: _NodeGroupBox = widget.widget() group.setMaximumWidth(220) group.adjustSize() if border_color: stylesheet = line_edit_widget.styleSheet() match = Machine.STYLESHEET_BORDER_COLOR_REGEX.match(stylesheet) stylesheet = ( stylesheet[: match.start(1)] + f"rgb({','.join(map(str, border_color))})" + stylesheet[match.end(1) :] ) line_edit_widget.setStyleSheet(stylesheet) line_edit_widget.update() return widget def parse_resource_amount(text: str) -> float: return float(text.strip().rstrip("min").rstrip().rstrip("/").rstrip()) def resource_amount_to_text(amount: float): return f"{amount:.2f} / min" class AsyncResourceFetcher(QThread): def __init__(self, parent, resource_label: str, engine: sqlalchemy.Engine, debug: bool): super().__init__(parent=parent) self.resource_label = resource_label self.engine = engine self.debug = debug def run(self): with Session(self.engine) as session, SatisfactoryPlus( ignore_factories=DEFAULT_IGNORE_FACTORIES, debug=self.debug ) as data_provider: resource = session.scalars(Resource.by_label(self.resource_label)).one() resource = data_provider.update_resource_recipes(session=session, resource=resource) if self.debug: print("Updated resource in separate thread", resource) class NodeSlider(NodeBaseWidget): MIN_VALUE = 1 MAX_VALUE = 250 DEFAULT_VALUE = 100 def __init__(self, name: str, label: str = "", parent: QGraphicsItem = None, readonly: bool = False): super(NodeSlider, self).__init__(parent) self.set_name(name) self.pure_label = label or name slider = QSlider(Qt.Horizontal) slider.setEnabled(not readonly) slider.setRange(self.MIN_VALUE, self.MAX_VALUE) slider.setValue(self.DEFAULT_VALUE) slider.setSingleStep(1) slider.setTickInterval(10) slider.setTickPosition(QSlider.TicksBelow) slider.valueChanged.connect(self.on_value_changed) slider.valueChanged.connect(self._update_label) self.set_custom_widget(slider) self._update_label() def get_custom_widget(self) -> QSlider: widget = super().get_custom_widget() assert isinstance(widget, QSlider), f"Unexpected widget, not a QSlider: {widget}" return widget def get_value(self) -> int: return self.get_custom_widget().value() def set_value(self, value: int): self.get_custom_widget().setValue(max(self.MIN_VALUE, min(self.MAX_VALUE, value))) def _update_label(self): self.set_label(f"{self.pure_label} ({self.get_value()}%)") class GlobalInput(BaseNode): __identifier__ = "factorygame" NODE_NAME = "Global Input" def __init__(self): super().__init__() self.model.width = 240 self.set_port_deletion_allowed(True) self.add_special_ports() self.output_resources: dict[str, float] = {} def add_special_ports(self): self.add_output(WORLD_OUTPUT_PORT_NAME, multi_output=False, color=WORLD_PORT_COLOR) self.add_output(CREATE_MACHINE_PORT_NAME, multi_output=False, color=OTHER_COLOR) def reorder_ports(self): self.delete_output(WORLD_OUTPUT_PORT_NAME) self.delete_output(CREATE_MACHINE_PORT_NAME) self.add_special_ports() def update_x_pos(self, graph: NodeGraph): min_x_pos = min(map(lambda node: node.x_pos(), filter(lambda node: node != self, graph.all_nodes()))) self.set_x_pos(min_x_pos - self.view.width - 150) def create_global_output(self, graph: NodeGraph, resource_label: str, initial_resource_amount: float) -> Port: # FIXME: support multiple resource outputs or add splitters new_output_port = self.add_output(name=resource_label, multi_output=False, color=OUTPUT_COLOR) widget = add_resource_text( node=self, name=out_amount_name(resource_label), text=resource_amount_to_text(initial_resource_amount), border_color=OUTPUT_COLOR, ) self.output_resources[resource_label] = initial_resource_amount widget.value_changed.connect(lambda: self.update_resource_output(resource_label)) self.reorder_ports() self.update_x_pos(graph=graph) return new_output_port def update_resource_output(self, resource_label: str): assert resource_label in self.output_resources widget: NodeLineEdit = self.get_widget(out_amount_name(resource_label)) new_amount = parse_resource_amount(widget.get_value()) if self.output_resources[resource_label] != new_amount: self.output_resources[resource_label] = new_amount widget.set_value(resource_amount_to_text(self.output_resources[resource_label])) resource_port = self.get_output(resource_label) for connected_port in resource_port.connected_ports(): node = connected_port.node() assert isinstance(node, Machine), f"Connected node must be a machine: {node}" node.update_input(resource_label, new_amount) class GlobalOutput(BaseNode): __identifier__ = "factorygame" NODE_NAME = "Global Output" def __init__(self): super().__init__() self.model.width = 240 self.set_port_deletion_allowed(True) self.add_special_ports() self.input_resources: dict[str, float] = {} def add_special_ports(self): self.add_input(WORLD_INPUT_PORT_NAME, multi_input=False, color=WORLD_PORT_COLOR) self.add_input(CREATE_MACHINE_PORT_NAME, multi_input=False, color=OTHER_COLOR) def reorder_ports(self): self.delete_input(WORLD_INPUT_PORT_NAME) self.delete_input(CREATE_MACHINE_PORT_NAME) self.add_special_ports() def update_x_pos(self, graph: NodeGraph): other_nodes = filter(lambda node: node != self, graph.all_nodes()) max_x = max(map(lambda node: node.x_pos() + node.view.width, other_nodes)) self.set_x_pos(max_x + 150) def create_global_input(self, graph: NodeGraph, resource_label: str, initial_resource_amount: float) -> Port: # FIXME: support multiple resource inputs or add mergers new_input_port = self.add_input(name=resource_label, multi_input=False, color=INPUT_COLOR) widget = add_resource_text( node=self, name=in_amount_name(resource_label), text=resource_amount_to_text(initial_resource_amount), border_color=INPUT_COLOR, ) line_edit_widget: QLineEdit = widget.get_custom_widget() line_edit_widget.setReadOnly(True) self.input_resources[resource_label] = initial_resource_amount self.reorder_ports() return new_input_port def update_input(self, resource_label: str, value: float): assert resource_label in self.input_resources widget: NodeLineEdit = self.get_widget(in_amount_name(resource_label)) if self.input_resources[resource_label] != value: self.input_resources[resource_label] = value widget.set_value(resource_amount_to_text(self.input_resources[resource_label])) class Machine(BaseNode): __identifier__ = "factorygame" NODE_NAME = "FactoryGame Machine" MAXIMUM_PERFORMANCE_PROP = "machine max perf" ACTUAL_PERFORMANCE_PROP = "machine actual perf" AUTOMATIC_PERFORMANCE_PROP = "machine automatic" STYLESHEET_BORDER_COLOR_REGEX = re.compile( r"QLineEdit\s*{[^{}]*\s*border:[^;]*(rgb\([^)]+\))", re.MULTILINE | re.DOTALL ) def __init__(self): super().__init__() self.model.width = 240 self.input_resources: dict[str, float] = {} self.possible_input_resources: dict[str, float] = {} self.output_resources: dict[str, float] = {} self.add_checkbox(Machine.AUTOMATIC_PERFORMANCE_PROP, text="Automatic Mode", state=True) widget: NodeCheckBox = self.get_widget(Machine.AUTOMATIC_PERFORMANCE_PROP) widget.value_changed.connect(self.recalculate_factor) checkbox_widget: QCheckBox = widget.get_custom_widget() checkbox_widget.setStyleSheet( checkbox_widget.styleSheet() + "\nQCheckBox {\n" f"background-color: transparent;\n" "}" ) self.add_text_input( name=Machine.ACTUAL_PERFORMANCE_PROP, label="Overall Production", text="100", ) widget: NodeLineEdit = self.get_widget(Machine.ACTUAL_PERFORMANCE_PROP) line_edit_widget: QLineEdit = widget.get_custom_widget() line_edit_widget.setReadOnly(True) def max_performance_changed(self): self.set_property(Machine.AUTOMATIC_PERFORMANCE_PROP, False) self.recalculate_factor() def recalculate_factor(self): max_factor: float = self.get_property(Machine.MAXIMUM_PERFORMANCE_PROP) / 100.0 if max_factor < 1.0 and self.get_property(Machine.AUTOMATIC_PERFORMANCE_PROP): per_machine_factor = 1.0 else: per_machine_factor = max_factor all_machines_factor = None for resource_label, wanted_resource in self.input_resources.items(): if len(self.get_input(resource_label).connected_ports()) != 0: new_factor = self.possible_input_resources[resource_label] / wanted_resource / per_machine_factor if all_machines_factor is None: all_machines_factor = new_factor elif new_factor < all_machines_factor: all_machines_factor = new_factor if all_machines_factor is None: all_machines_factor = per_machine_factor new_perf = int(all_machines_factor * 100.0) if int(self.get_property(Machine.ACTUAL_PERFORMANCE_PROP)) != new_perf: self.get_widget(name=Machine.ACTUAL_PERFORMANCE_PROP).set_value(str(new_perf)) self.actual_performance_changed(name=Machine.ACTUAL_PERFORMANCE_PROP, factor=all_machines_factor) if self.get_property(Machine.AUTOMATIC_PERFORMANCE_PROP): self.set_property(name=Machine.MAXIMUM_PERFORMANCE_PROP, value=100, push_undo=False) def update_input(self, resource_label: str, value: float): self.possible_input_resources[resource_label] = value self.recalculate_factor() def actual_performance_changed(self, name: str, factor: float): if name == Machine.ACTUAL_PERFORMANCE_PROP: for ingredient_label, amount in self.input_resources.items(): resource_text_widget = self.get_widget(in_amount_name(ingredient_label)) assert ( resource_text_widget is not None ), f"No ingredient resource text widget found for {ingredient_label}" resource_text_widget.set_value(resource_amount_to_text(amount * factor)) for result_label, amount in self.output_resources.items(): resource_text_widget = self.get_widget(out_amount_name(result_label)) assert resource_text_widget is not None, f"No result resource text widget found for {result_label}" new_output_value = amount * factor resource_text_widget.set_value(resource_amount_to_text(new_output_value)) for port in self.get_output(result_label).connected_ports(): node = port.node() node.update_input( resource_label=result_label, value=new_output_value, ) def assign_recipe(self, recipe: Recipe): for port_idx in range(len(self._inputs)): self.delete_input(port_idx) for port_idx in range(len(self._outputs)): self.delete_output(port_idx) self.set_property("name", recipe.join_factories(), push_undo=False) for ingredient in recipe.ingredients: resource_label = ingredient.resource.label if resource_label in self.input_resources: self.input_resources[resource_label] += ingredient.amount_per_minute() self.get_widget(in_amount_name(resource_label)).set_value( resource_amount_to_text(self.input_resources[resource_label]) ) else: port = self.add_input(resource_label, multi_input=False, color=INPUT_COLOR) port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.Machine") port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.GlobalInput") port.add_accept_port_type(CREATE_MACHINE_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput") port.add_accept_port_type(WORLD_OUTPUT_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput") self._add_readonly_resource_text( in_amount_name(resource_label), ingredient, self.input_resources, resource_label, border_color=INPUT_COLOR, ) for result in recipe.results: resource_label = result.resource.label if resource_label in self.output_resources: self.output_resources[resource_label] += result.amount_per_minute() self.get_widget(out_amount_name(resource_label)).set_value( resource_amount_to_text(self.output_resources[resource_label]) ) else: port = self.add_output(resource_label, multi_output=False, color=OUTPUT_COLOR) port.add_accept_port_type(resource_label, PortTypeEnum.IN.value, "factorygame.Machine") port.add_accept_port_type(resource_label, PortTypeEnum.IN.value, "factorygame.GlobalOutput") port.add_accept_port_type(CREATE_MACHINE_PORT_NAME, PortTypeEnum.IN.value, "factorygame.GlobalOutput") port.add_accept_port_type(WORLD_INPUT_PORT_NAME, PortTypeEnum.IN.value, "factorygame.GlobalOutput") self._add_readonly_resource_text( out_amount_name(resource_label), result, self.output_resources, resource_label, border_color=OUTPUT_COLOR, ) self.possible_input_resources.update(self.input_resources) slider = NodeSlider(name=Machine.MAXIMUM_PERFORMANCE_PROP, label="Overclocking Performance", parent=self.view) self.add_custom_widget(widget=slider, widget_type=NodePropWidgetEnum.SLIDER.value) slider.value_changed.connect(self.max_performance_changed) def _add_readonly_resource_text( self, name: str, flow: ResourceFlow, resource_amounts: dict[str, float], resource_label: str, border_color: tuple, ) -> NodeLineEdit: resource_amounts[resource_label] = flow.amount_per_minute() text = resource_amount_to_text(resource_amounts[resource_label]) return add_resource_text(node=self, name=name, text=text, border_color=border_color, readonly=True) def get_resource_output(self, resource_label: str) -> float: return self.output_resources[resource_label] * (int(self.get_property(Machine.ACTUAL_PERFORMANCE_PROP)) / 100.0) class GraphController(QObject): def __init__(self, debug: bool, parent=None): super().__init__(parent=parent) self.debug = debug self.engine = create_engine("sqlite:///file.db", echo=debug) self.graph = NodeGraph(parent=self) self.graph.widget.resize(1280, 720) self.graph.register_node(Machine) self.graph.register_node(GlobalInput) self.graph.register_node(GlobalOutput) self.global_input: GlobalInput = self.graph.create_node("factorygame.GlobalInput", push_undo=False) self.global_output: GlobalOutput = self.graph.create_node("factorygame.GlobalOutput", push_undo=False) self.graph.port_connected.connect(self.on_port_connected) def add_machine_from_search(self, search: str): recipe_selected_future = Future() def resource_selected_cb(resource_future: Future[Resource | None]): resource = resource_future.result() if resource is None: # FIXME: use concurrent resource searching/fetching # ret = data_provider.search_for_resource(session=session, search=search) ret = None if ret is None: print("Resource not found") recipe_selected_future.set_result(None) return resource, exists_in_db = ret if not exists_in_db: print("Resource unknown") recipe_selected_future.set_result(None) return assert resource is not None def chose_recipe_async(): with Session(self.engine) as session: chose_recipe(session=session, resource=resource, prompt=self.dialog_prompt).add_done_callback( lambda fut: recipe_selected_future.set_result(fut.result()) ) if resource_needs_update(resource): fetch_future = self.fetch_recipes_async(resource_label=resource.label) fetch_future.add_done_callback(lambda fut: chose_recipe_async) else: chose_recipe_async() def select_recipe_async(): with Session(self.engine) as session: resource_future = chose_resource(session=session, resource_label=search, prompt=self.dialog_prompt) resource_future.add_done_callback(resource_selected_cb) def recipe_selected_cb(recipe_future: Future[Recipe | None]): recipe = recipe_future.result() if recipe is not None: recipe_machine: Machine = self.graph.create_node("factorygame.Machine", push_undo=False) recipe_machine.assign_recipe(recipe) recipe_machine.update() self.graph.auto_layout_nodes([self.global_input, recipe_machine, self.global_output]) self.global_input.set_y_pos(recipe_machine.y_pos()) self.global_output.set_y_pos(recipe_machine.y_pos()) self.global_input.update_x_pos(graph=self.graph) self.global_output.update_x_pos(graph=self.graph) self.graph.center_on([self.global_input, recipe_machine, self.global_output]) recipe_selected_future.add_done_callback(recipe_selected_cb) select_recipe_async() def show(self): self.graph.widget.show() def on_port_connected(self, input_port: Port, output_port: Port): if self.debug: print(f"Port {output_port} connected to {input_port}") output_node = output_port.node() if output_node == self.global_input: resource_label = input_port.name() input_node = input_port.node() if output_port.name() == CREATE_MACHINE_PORT_NAME: output_port.clear_connections(push_undo=False, emit_signal=False) def recipe_selected_cb(recipe_future: Future[Recipe | None]): recipe = recipe_future.result() if recipe is not None: recipe_machine: Machine = self.graph.create_node("factorygame.Machine", push_undo=True) recipe_machine.assign_recipe(recipe) recipe_machine.update() recipe_machine.set_x_pos(input_node.x_pos() - recipe_machine.view.width - 100) recipe_machine.get_output(resource_label).connect_to(input_port) self.global_input.update_x_pos(graph=self.graph) if isinstance(input_node, Machine): input_node.update_input(resource_label, recipe_machine.get_resource_output(resource_label)) with Session(self.engine) as session: resource = session.scalars(Resource.by_label(resource_label)).one_or_none() def chose_recipe_async(): with Session(self.engine) as session: chose_recipe( session=session, resource=resource, prompt=self.dialog_prompt, ).add_done_callback(recipe_selected_cb) if resource_needs_update(resource): fetch_future = self.fetch_recipes_async(resource_label) fetch_future.add_done_callback(lambda fut: chose_recipe_async()) else: chose_recipe_async() elif output_port.name() == WORLD_OUTPUT_PORT_NAME: output_port.clear_connections(push_undo=False, emit_signal=False) for idx, port in enumerate(self.global_input.output_ports()): assert port.name() != resource_label, f"Duplicate output port for {resource_label}" # if port.name() == resource_label: # port.connect_to(input_port, push_undo=True, emit_signal=False) # return if isinstance(input_node, Machine): resource_amount = parse_resource_amount( str(input_node.get_property(in_amount_name(resource_label))) ) new_output_port = self.global_input.create_global_output( graph=self.graph, resource_label=resource_label, initial_resource_amount=resource_amount, ) new_output_port.connect_to(input_port, push_undo=True, emit_signal=False) input_node.update_input(resource_label, resource_amount) elif isinstance(output_node, Machine): input_node = input_port.node() resource_label = output_port.name() if input_node == self.global_output: if input_port.name() == CREATE_MACHINE_PORT_NAME: input_port.clear_connections(push_undo=False, emit_signal=False) def recipe_selected_cb(recipe_future: Future[Recipe | None]): recipe = recipe_future.result() if recipe is not None: recipe_machine: Machine = self.graph.create_node("factorygame.Machine", push_undo=True) recipe_machine.assign_recipe(recipe) recipe_machine.update() recipe_machine.set_x_pos(output_node.x_pos() + output_node.view.width + 100) output_port.connect_to(recipe_machine.get_input(resource_label)) self.global_output.update_x_pos(graph=self.graph) with Session(self.engine) as session: resource = session.scalars(Resource.by_label(resource_label)).one_or_none() def chose_recipe_async(): chose_recipe( session=session, resource=resource, prompt=self.dialog_prompt, ingredient=True ).add_done_callback(recipe_selected_cb) if resource_needs_update(resource): fetch_future = self.fetch_recipes_async(resource_label=resource_label) fetch_future.add_done_callback(lambda fut: chose_recipe_async()) else: chose_recipe_async() elif input_port.name() == WORLD_INPUT_PORT_NAME: input_port.clear_connections(push_undo=False, emit_signal=False) for idx, port in enumerate(self.global_output.output_ports()): assert port.name() != resource_label, f"Duplicate output port for {resource_label}" # if port.name() == resource_label: # port.connect_to(input_port, push_undo=True, emit_signal=False) # return resource_amount = output_node.get_resource_output(resource_label) new_input_port = self.global_output.create_global_input( graph=self.graph, resource_label=resource_label, initial_resource_amount=resource_amount, ) output_port.connect_to(new_input_port, push_undo=True, emit_signal=False) elif isinstance(input_node, Machine): resource_amount = output_node.get_resource_output(resource_label) input_node.update_input( resource_label, resource_amount, ) def fetch_recipes_async(self, resource_label): if self.debug: print("Fetching recipes for resource", resource_label) fetcher_thread = AsyncResourceFetcher( parent=self.graph, resource_label=resource_label, engine=self.engine, debug=self.debug, ) fetch_future = Future() self.dialog_loading(fetch_future) fetcher_thread.finished.connect(lambda: fetch_future.set_result(None)) fetcher_thread.start() return fetch_future def dialog_prompt(self, options: dict[int, str], text: str, default: int) -> Future[int | None]: if self.debug: print("Displaying QInputDialog with options:", ", ".join(options.values())) dialog = QInputDialog(parent=self.graph.widget) dialog.setStyleSheet(generate_fgbg_stylesheet()) dialog.setModal(True) dialog.setLabelText(text) if default in options: dialog.setTextValue(options[default]) reversed_options: dict[str, int] = {value: key for key, value in options.items()} dialog.setComboBoxItems(reversed_options.keys()) ret = Future() dialog.rejected.connect(lambda: ret.set_result(None)) dialog.accepted.connect(lambda: ret.set_result(reversed_options[dialog.textValue()])) dialog.show() return ret def dialog_loading(self, future: Future): dialog = QDialog(parent=self.graph.widget, f=(Qt.Dialog | Qt.ToolTip)) dialog.setStyleSheet(generate_fgbg_stylesheet(0.5)) dialog.setModal(True) dialog.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) label = QLabel("Loading…", dialog) label.setAlignment(Qt.AlignCenter) dialog.show() def remove_dialog(): dialog.hide() dialog.destroy() future.add_done_callback(lambda fut: remove_dialog()) @click.command @click.option("--debug", is_flag=True) @click.argument("search") def main(debug: bool, search: str): app = QtWidgets.QApplication([]) graph_controller = GraphController(debug=debug) graph_controller.show() graph_controller.add_machine_from_search(search=search) app.exec_() if __name__ == "__main__": main()