SatisfactoryPlusCalculator/factorygame/data/vis.py

484 lines
21 KiB
Python
Executable file

#!/usr/bin/env python3
import re
from concurrent.futures import Future
from datetime import timedelta
import click
from NodeGraphQt import BaseNode, NodeBaseWidget
from NodeGraphQt import NodeGraph, Port
from NodeGraphQt.constants import PortTypeEnum, NodePropWidgetEnum, ViewerEnum
from NodeGraphQt.widgets.node_widgets import _NodeGroupBox, NodeLineEdit, NodeCheckBox
from PySide2 import QtGui
from PySide2.QtCore import Qt, QObject
from PySide2.QtWidgets import QSlider, QLineEdit, QCheckBox, QGraphicsItem, QInputDialog
from Qt import QtWidgets
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from factorygame.data.common import chose_resource, resource_needs_update, chose_recipe
from .models import Recipe, Resource, ResourceFlow
WORLD_OUTPUT_PORT_NAME = "World Output"
WORLD_OUTPUT_COLOR = (0, 139, 41)
CREATE_MACHINE_PORT_NAME = "Create Machine"
INPUT_COLOR = (249, 169, 0)
OUTPUT_COLOR = (204, 44, 36)
OTHER_COLOR = (0, 83, 135)
def in_amount_name(label: str) -> str:
return f"In {label} amount"
def out_amount_name(label: str) -> str:
return f"Out {label} amount"
def port_amount_name(port: Port) -> str:
if port.type_() == PortTypeEnum.IN:
return in_amount_name(port.name())
elif port.type_() == PortTypeEnum.OUT:
return out_amount_name(port.name())
else:
raise TypeError(f"Invalid port type {port.type_()}")
def add_resource_text(
node: BaseNode, name: str, text: str, border_color: tuple, readonly: bool = False
) -> NodeLineEdit:
node.add_text_input(name=name, label=name, text=text)
widget: NodeLineEdit = node.get_widget(name)
line_edit_widget: QLineEdit = widget.get_custom_widget()
if readonly:
line_edit_widget.setReadOnly(True)
group: _NodeGroupBox = widget.widget()
group.setMaximumWidth(220)
group.adjustSize()
if border_color:
stylesheet = line_edit_widget.styleSheet()
match = Machine.STYLESHEET_BORDER_COLOR_REGEX.match(stylesheet)
stylesheet = (
stylesheet[: match.start(1)] + f"rgb({','.join(map(str, border_color))})" + stylesheet[match.end(1) :]
)
line_edit_widget.setStyleSheet(stylesheet)
line_edit_widget.update()
return widget
def parse_resource_amount(text: str) -> float:
return float(text.strip().rstrip("min").rstrip().rstrip("/").rstrip())
def resource_amount_to_text(amount: float):
return f"{amount:.2f} / min"
class NodeSlider(NodeBaseWidget):
MIN_VALUE = 1
MAX_VALUE = 250
DEFAULT_VALUE = 100
def __init__(self, name: str, label: str = "", parent: QGraphicsItem = None, readonly: bool = False):
super(NodeSlider, self).__init__(parent)
self.set_name(name)
self.pure_label = label or name
slider = QSlider(Qt.Horizontal)
slider.setEnabled(not readonly)
slider.setRange(self.MIN_VALUE, self.MAX_VALUE)
slider.setValue(self.DEFAULT_VALUE)
slider.setSingleStep(1)
slider.setTickInterval(10)
slider.setTickPosition(QSlider.TicksBelow)
slider.valueChanged.connect(self.on_value_changed)
slider.valueChanged.connect(self._update_label)
self.set_custom_widget(slider)
self._update_label()
def get_custom_widget(self) -> QSlider:
widget = super().get_custom_widget()
assert isinstance(widget, QSlider), f"Unexpected widget, not a QSlider: {widget}"
return widget
def get_value(self) -> int:
return self.get_custom_widget().value()
def set_value(self, value: int):
self.get_custom_widget().setValue(max(self.MIN_VALUE, min(self.MAX_VALUE, value)))
def _update_label(self):
self.set_label(f"{self.pure_label} ({self.get_value()}%)")
class GlobalInput(BaseNode):
__identifier__ = "factorygame"
NODE_NAME = "Global Input"
def __init__(self):
super().__init__()
self.model.width = 240
self.set_port_deletion_allowed(True)
self.add_special_ports()
self.output_resources: dict[str, float] = {}
def add_special_ports(self):
self.add_output(WORLD_OUTPUT_PORT_NAME, multi_output=False, color=WORLD_OUTPUT_COLOR)
self.add_output(CREATE_MACHINE_PORT_NAME, multi_output=False, color=OTHER_COLOR)
def reorder_outputs(self):
self.delete_output(WORLD_OUTPUT_PORT_NAME)
self.delete_output(CREATE_MACHINE_PORT_NAME)
self.add_special_ports()
def update_x_pos(self, graph: NodeGraph):
min_x_pos = min(map(lambda node: node.x_pos(), filter(lambda node: node != self, graph.all_nodes())))
self.set_x_pos(min_x_pos - self.view.width - 150)
def create_global_output(self, graph: NodeGraph, resource_label: str, initial_resource_amount: str) -> Port:
new_output_port = self.add_output(name=resource_label, multi_output=False, color=OUTPUT_COLOR)
widget = add_resource_text(
node=self,
name=out_amount_name(resource_label),
text=initial_resource_amount,
border_color=OUTPUT_COLOR,
)
self.output_resources[resource_label] = parse_resource_amount(initial_resource_amount)
widget.value_changed.connect(lambda: self.update_resource_output(resource_label))
self.reorder_outputs()
self.update_x_pos(graph=graph)
return new_output_port
def update_resource_output(self, resource_label: str):
widget: NodeLineEdit = self.get_widget(out_amount_name(resource_label))
assert resource_label in self.output_resources
new_amount = parse_resource_amount(widget.get_value())
if self.output_resources[resource_label] != new_amount:
self.output_resources[resource_label] = new_amount
widget.set_value(resource_amount_to_text(self.output_resources[resource_label]))
resource_port = self.get_output(resource_label)
for connected_port in resource_port.connected_ports():
node = connected_port.node()
assert isinstance(node, Machine), f"Connected node must be a machine: {node}"
node.update_input(resource_label, new_amount)
class Machine(BaseNode):
__identifier__ = "factorygame"
NODE_NAME = "FactoryGame Machine"
MAXIMUM_PERFORMANCE_PROP = "machine max perf"
ACTUAL_PERFORMANCE_PROP = "machine actual perf"
AUTOMATIC_PERFORMANCE_PROP = "machine automatic"
STYLESHEET_BORDER_COLOR_REGEX = re.compile(
r"QLineEdit\s*{[^{}]*\s*border:[^;]*(rgb\([^)]+\))", re.MULTILINE | re.DOTALL
)
def __init__(self):
super().__init__()
self.model.width = 240
self.input_resources: dict[str, float] = {}
self.possible_input_resources: dict[str, float] = {}
self.output_resources: dict[str, float] = {}
self.add_checkbox(Machine.AUTOMATIC_PERFORMANCE_PROP, text="Automatic Mode", state=True)
widget: NodeCheckBox = self.get_widget(Machine.AUTOMATIC_PERFORMANCE_PROP)
widget.value_changed.connect(self.recalculate_factor)
checkbox_widget: QCheckBox = widget.get_custom_widget()
checkbox_widget.setStyleSheet(
checkbox_widget.styleSheet() + "\nQCheckBox {\n" f"background-color: transparent;\n" "}"
)
self.add_text_input(
name=Machine.ACTUAL_PERFORMANCE_PROP,
label="Overall Production",
text="100",
)
widget: NodeLineEdit = self.get_widget(Machine.ACTUAL_PERFORMANCE_PROP)
line_edit_widget: QLineEdit = widget.get_custom_widget()
line_edit_widget.setReadOnly(True)
def max_performance_changed(self):
self.set_property(Machine.AUTOMATIC_PERFORMANCE_PROP, False)
self.recalculate_factor()
def recalculate_factor(self):
max_factor: float = self.get_property(Machine.MAXIMUM_PERFORMANCE_PROP) / 100.0
if max_factor < 1.0 and self.get_property(Machine.AUTOMATIC_PERFORMANCE_PROP):
per_machine_factor = 1.0
else:
per_machine_factor = max_factor
all_machines_factor = None
for resource_label, wanted_resource in self.input_resources.items():
if len(self.get_input(resource_label).connected_ports()) != 0:
new_factor = self.possible_input_resources[resource_label] / wanted_resource / per_machine_factor
if all_machines_factor is None:
all_machines_factor = new_factor
elif new_factor < all_machines_factor:
all_machines_factor = new_factor
if all_machines_factor is None:
all_machines_factor = per_machine_factor
new_perf = int(all_machines_factor * 100.0)
if int(self.get_property(Machine.ACTUAL_PERFORMANCE_PROP)) != new_perf:
self.get_widget(name=Machine.ACTUAL_PERFORMANCE_PROP).set_value(str(new_perf))
self.actual_performance_changed(name=Machine.ACTUAL_PERFORMANCE_PROP, factor=all_machines_factor)
if self.get_property(Machine.AUTOMATIC_PERFORMANCE_PROP):
self.set_property(name=Machine.MAXIMUM_PERFORMANCE_PROP, value=100, push_undo=False)
def update_input(self, resource_label: str, value: float):
self.possible_input_resources[resource_label] = value
self.recalculate_factor()
def actual_performance_changed(self, name: str, factor: float):
if name == Machine.ACTUAL_PERFORMANCE_PROP:
for ingredient_label, amount in self.input_resources.items():
resource_text_widget = self.get_widget(in_amount_name(ingredient_label))
assert (
resource_text_widget is not None
), f"No ingredient resource text widget found for {ingredient_label}"
resource_text_widget.set_value(resource_amount_to_text(amount * factor))
for result_label, amount in self.output_resources.items():
resource_text_widget = self.get_widget(out_amount_name(result_label))
assert resource_text_widget is not None, f"No result resource text widget found for {result_label}"
new_output_value = amount * factor
resource_text_widget.set_value(resource_amount_to_text(new_output_value))
for port in self.get_output(result_label).connected_ports():
node = port.node()
assert isinstance(node, Machine), f"Connected node must be a machine: {node}"
node.update_input(
resource_label=result_label,
value=new_output_value,
)
def assign_recipe(self, recipe: Recipe):
for port_idx in range(len(self._inputs)):
self.delete_input(port_idx)
for port_idx in range(len(self._outputs)):
self.delete_output(port_idx)
self.set_property("name", recipe.factory.label, push_undo=False)
for ingredient in recipe.ingredients:
resource_label = ingredient.resource.label
if resource_label in self.input_resources:
self.input_resources[resource_label] += ingredient.amount_per_minute()
self.get_widget(in_amount_name(resource_label)).set_value(
resource_amount_to_text(self.input_resources[resource_label])
)
else:
port = self.add_input(resource_label, multi_input=False, color=INPUT_COLOR)
port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.Machine")
port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.GlobalInput")
port.add_accept_port_type(CREATE_MACHINE_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput")
port.add_accept_port_type(WORLD_OUTPUT_PORT_NAME, PortTypeEnum.OUT.value, "factorygame.GlobalInput")
self._add_readonly_resource_text(
in_amount_name(resource_label),
ingredient,
self.input_resources,
resource_label,
border_color=INPUT_COLOR,
)
for result in recipe.results:
resource_label = result.resource.label
if resource_label in self.output_resources:
self.output_resources[resource_label] += result.amount_per_minute()
self.get_widget(out_amount_name(resource_label)).set_value(
resource_amount_to_text(self.output_resources[resource_label])
)
else:
port = self.add_output(resource_label, multi_output=False, color=OUTPUT_COLOR)
port.add_accept_port_type(resource_label, PortTypeEnum.IN.value, "factorygame.Machine")
self._add_readonly_resource_text(
out_amount_name(resource_label),
result,
self.output_resources,
resource_label,
border_color=OUTPUT_COLOR,
)
self.possible_input_resources.update(self.input_resources)
slider = NodeSlider(name=Machine.MAXIMUM_PERFORMANCE_PROP, label="Overclocking Performance", parent=self.view)
self.add_custom_widget(widget=slider, widget_type=NodePropWidgetEnum.SLIDER.value)
slider.value_changed.connect(self.max_performance_changed)
def _add_readonly_resource_text(
self,
name: str,
flow: ResourceFlow,
resource_amounts: dict[str, float],
resource_label: str,
border_color: tuple,
) -> NodeLineEdit:
resource_amounts[resource_label] = flow.amount_per_minute()
text = resource_amount_to_text(resource_amounts[resource_label])
return add_resource_text(node=self, name=name, text=text, border_color=border_color, readonly=True)
def get_resource_output(self, resource_label: str) -> float:
return self.output_resources[resource_label] * (int(self.get_property(Machine.ACTUAL_PERFORMANCE_PROP)) / 100.0)
class GraphController(QObject):
def __init__(self, debug: bool, parent=None):
super().__init__(parent=parent)
self.debug = debug
bg_color = QtGui.QColor(*ViewerEnum.BACKGROUND_COLOR.value).darker(120).getRgb()
text_color = tuple(map(str, map(lambda i, j: i - j, (255, 255, 255), bg_color)))
self.fgbg_color_stylesheet = (
"* {"
" background-color: rgba(" + ",".join(map(str, bg_color)) + ") ;"
" color: rgb(" + ",".join(text_color) + ");"
"}"
)
self.engine = create_engine("sqlite:///file.db", echo=debug)
self.graph = NodeGraph(parent=self)
self.graph.widget.resize(1280, 720)
self.graph.register_node(Machine)
self.graph.register_node(GlobalInput)
self.global_input: GlobalInput = self.graph.create_node("factorygame.GlobalInput", push_undo=False)
self.graph.port_connected.connect(self.on_port_connected)
def add_machine_from_search(self, search: str):
recipe_selected_future = Future()
def resource_selected_cb(resource_future: Future[Resource | None]):
resource = resource_future.result()
if resource is None:
# FIXME: use concurrent resource searching/fetching
# ret = data_provider.search_for_resource(session=session, search=search)
ret = None
if ret is None:
print("Resource not found")
recipe_selected_future.set_result(None)
return
resource, exists_in_db = ret
if not exists_in_db:
print("Resource not yet fetched, run fetch first")
recipe_selected_future.set_result(None)
return
if resource_needs_update(resource, recipe_info_timeout=timedelta(days=365)):
print("Please fetch resource", resource.label, "first.")
recipe_selected_future.set_result(None)
return
with Session(self.engine) as session:
chose_recipe(session=session, resource=resource, prompt=self.dialog_prompt).add_done_callback(
lambda fut: recipe_selected_future.set_result(fut.result())
)
def select_recipe_async():
with Session(self.engine) as session:
resource_future = chose_resource(session=session, resource_label=search, prompt=self.dialog_prompt)
resource_future.add_done_callback(resource_selected_cb)
def recipe_selections_cb(recipe_future: Future[Recipe | None]):
recipe = recipe_future.result()
if recipe is not None:
recipe_machine: Machine = self.graph.create_node("factorygame.Machine", push_undo=False)
recipe_machine.assign_recipe(recipe)
self.graph.auto_layout_nodes([self.global_input, recipe_machine])
self.global_input.set_y_pos(recipe_machine.y_pos())
self.global_input.update_x_pos(graph=self.graph)
self.graph.center_on([self.global_input, recipe_machine])
recipe_selected_future.add_done_callback(recipe_selections_cb)
select_recipe_async()
def show(self):
self.graph.widget.show()
def on_port_connected(self, input_port: Port, output_port: Port):
if self.debug:
print(f"Port {output_port} connected to {input_port}")
output_node = output_port.node()
if output_node == self.global_input:
resource_label = input_port.name()
if output_port.name() == CREATE_MACHINE_PORT_NAME:
output_port.clear_connections(push_undo=False, emit_signal=False)
def recipe_selected_cb(recipe_future: Future[Recipe | None]):
recipe = recipe_future.result()
if recipe is not None:
recipe_machine: Machine = self.graph.create_node("factorygame.Machine", push_undo=True)
recipe_machine.assign_recipe(recipe)
recipe_machine.update()
recipe_machine.set_x_pos(input_port.node().x_pos() - recipe_machine.view.width - 100)
recipe_machine.get_output(resource_label).connect_to(input_port)
self.global_input.update_x_pos(graph=self.graph)
with Session(self.engine) as session:
resource = session.scalars(Resource.by_label(resource_label)).one_or_none()
if resource_needs_update(resource, recipe_info_timeout=timedelta(days=365)):
print("Please fetch resource", resource_label, "first.")
return
chose_recipe(session=session, resource=resource, prompt=self.dialog_prompt).add_done_callback(
recipe_selected_cb
)
elif output_port.name() == WORLD_OUTPUT_PORT_NAME:
output_port.clear_connections(push_undo=False, emit_signal=False)
for idx, port in enumerate(self.global_input.output_ports()):
assert port.name() != resource_label, f"Duplicate output port for {resource_label}"
# if port.name() == resource_label:
# port.connect_to(input_port, push_undo=True, emit_signal=False)
# return
if isinstance(input_port.node(), Machine):
machine_node: Machine = input_port.node()
initial_resource_amount = str(machine_node.get_property(in_amount_name(resource_label)))
new_output_port = self.global_input.create_global_output(
graph=self.graph,
resource_label=resource_label,
initial_resource_amount=initial_resource_amount,
)
new_output_port.connect_to(input_port, push_undo=True, emit_signal=False)
elif isinstance(output_node, Machine):
input_node = input_port.node()
resource_label = output_port.name()
if isinstance(input_node, Machine):
resource_amount = output_node.get_resource_output(resource_label)
input_node.update_input(
resource_label,
resource_amount,
)
def dialog_prompt(self, options: dict[int, str], text: str, default: int) -> Future[int | None]:
if self.debug:
print("Displaying QInputDialog with options:", ", ".join(options.values()))
dialog = QInputDialog(parent=self.graph.widget)
dialog.setStyleSheet(self.fgbg_color_stylesheet)
dialog.setModal(True)
dialog.setLabelText(text)
if default in options:
dialog.setTextValue(options[default])
reversed_options: dict[str, int] = {value: key for key, value in options.items()}
dialog.setComboBoxItems(reversed_options.keys())
ret = Future()
dialog.rejected.connect(lambda: ret.set_result(None))
dialog.accepted.connect(lambda: ret.set_result(reversed_options[dialog.textValue()]))
dialog.show()
return ret
@click.command
@click.option("--debug", is_flag=True)
@click.argument("search")
def main(debug: bool, search: str):
app = QtWidgets.QApplication([])
graph_controller = GraphController(debug=debug)
graph_controller.show()
graph_controller.add_machine_from_search(search=search)
app.exec_()
if __name__ == "__main__":
main()