SatisfactoryPlusCalculator/factorygame/data/vis.py

240 lines
9.4 KiB
Python
Executable file

#!/usr/bin/env python3
import re
from datetime import timedelta
import click
from NodeGraphQt import BaseNode, NodeBaseWidget
from NodeGraphQt import NodeGraph, Port
from NodeGraphQt.constants import PortTypeEnum
from NodeGraphQt.widgets.node_widgets import _NodeGroupBox
from PySide2.QtCore import Qt
from PySide2.QtWidgets import QSlider, QLineEdit
from Qt import QtWidgets
from sqlalchemy import create_engine
from sqlalchemy.orm import Session
from factorygame.data.common import chose_resource, resource_needs_update, chose_recipe
from .models import Recipe, Resource
from ..helper import prompt
INPUT_COLOR = (249, 169, 0)
OUTPUT_COLOR = (204, 44, 36)
OTHER_COLOR = (0, 83, 135)
class NodeSlider(NodeBaseWidget):
def __init__(self, name, label="", parent=None):
super(NodeSlider, self).__init__(parent)
self.set_name(name)
self.pure_label = label or name
slider = QSlider(Qt.Horizontal)
slider.setMinimum(1)
slider.setMaximum(250)
slider.setValue(100)
slider.setSingleStep(1)
slider.setTickInterval(10)
slider.setTickPosition(QSlider.TicksBelow)
slider.valueChanged.connect(self.on_value_changed)
slider.valueChanged.connect(self._update_label)
self.set_custom_widget(slider)
self._update_label()
def get_value(self):
widget: QSlider = self.get_custom_widget()
return widget.value()
def set_value(self, value):
widget: QSlider = self.get_custom_widget()
widget.setValue(value)
def _update_label(self):
self.set_label(f"{self.pure_label} ({int(self.get_value())}%)")
class GlobalInput(BaseNode):
__identifier__ = "factorygame"
NODE_NAME = "Global Input"
def __init__(self):
super().__init__()
self.add_output("Create Machine", color=OTHER_COLOR)
class Machine(BaseNode):
__identifier__ = "factorygame"
NODE_NAME = "FactoryGame Machine"
STYLESHEET_BORDER_COLOR_REGEX = re.compile(
r"QLineEdit\s*{[^{}]*\s*border:[^;]*(rgb\([^)]+\))", re.MULTILINE | re.DOTALL
)
def __init__(self):
super().__init__()
self.model.width = 240
def assign_recipe(self, recipe: Recipe):
for port_idx in range(len(self._inputs)):
self.delete_input(port_idx)
for port_idx in range(len(self._outputs)):
self.delete_output(port_idx)
self.set_property("name", recipe.factory.label, push_undo=False)
def in_amount_name(resource_label: str):
return f"In {resource_label} amount"
def out_amount_name(resource_label: str):
return f"Out {resource_label} amount"
input_resources: dict[str, float] = {}
for ingredient in recipe.ingredients:
resource_label = ingredient.resource.label
if resource_label in input_resources:
input_resources[resource_label] += ingredient.amount_per_minute()
self.get_widget(in_amount_name(resource_label)).set_value(
f"{input_resources[resource_label]:.2f} / min"
)
else:
port = self.add_input(resource_label, color=INPUT_COLOR)
port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.Machine")
port.add_accept_port_type("Create Machine", PortTypeEnum.OUT.value, "factorygame.GlobalInput")
self._add_text_label(
in_amount_name(resource_label),
ingredient,
input_resources,
resource_label,
border_color=INPUT_COLOR,
)
output_resources: dict[str, float] = {}
for result in recipe.results:
resource_label = result.resource.label
if resource_label in output_resources:
output_resources[resource_label] += result.amount_per_minute()
self.get_widget(out_amount_name(resource_label)).set_value(
f"{output_resources[resource_label]:.2f} / min"
)
else:
port = self.add_output(resource_label, color=OUTPUT_COLOR)
port.add_accept_port_type(resource_label, PortTypeEnum.IN.value, "factorygame.Machine")
self._add_text_label(
out_amount_name(resource_label), result, output_resources, resource_label, border_color=OUTPUT_COLOR
)
performance_slider_name = "machine performance"
def set_performance(percentage):
factor = percentage / 100.0
for ingredient_label, amount in input_resources.items():
self.get_widget(in_amount_name(ingredient_label)).set_value(f"{amount * factor:.2f} / min")
for result_label, amount in output_resources.items():
self.get_widget(out_amount_name(result_label)).set_value(f"{amount * factor:.2f} / min")
self.get_widget(performance_slider_name).get_value()
slider = NodeSlider(name=performance_slider_name, label="Overclocking Performance", parent=self.view)
self.add_custom_widget(slider)
slider.value_changed.connect(lambda name, value: set_performance(max(1, min(250, value))))
def _add_text_label(self, name, flow, resource_amounts, resource_label, border_color):
resource_amounts[resource_label] = flow.amount_per_minute()
self.add_text_input(name=name, label=name, text=f"{resource_amounts[resource_label]} / min")
widget = self.get_widget(name)
line_edit_widget: QLineEdit = widget.get_custom_widget()
line_edit_widget.setReadOnly(True)
group: _NodeGroupBox = widget.widget()
group.setMaximumWidth(220)
group.adjustSize()
if border_color:
stylesheet = line_edit_widget.styleSheet()
match = Machine.STYLESHEET_BORDER_COLOR_REGEX.match(stylesheet)
stylesheet = (
stylesheet[: match.start(1)] + f"rgb({','.join(map(str, border_color))})" + stylesheet[match.end(1) :]
)
line_edit_widget.setStyleSheet(stylesheet)
line_edit_widget.update()
def on_port_connected(input_port: Port, output_port: Port):
global debug
if debug:
print(f"Port {output_port} connected to {input_port}")
if isinstance(output_port.node(), GlobalInput) and output_port.name() == "Create Machine":
output_port.clear_connections(push_undo=False, emit_signal=False)
with Session(engine) as session:
resource_label = input_port.name()
resource = session.scalars(Resource.by_label(resource_label)).one_or_none()
if resource_needs_update(resource, recipe_info_timeout=timedelta(days=365)):
print("Please fetch resource", resource_label, "first.")
return
# FIXME: use some Qt UI prompt method
recipe = chose_recipe(session=session, resource=resource, prompt=prompt)
if recipe is None:
return
recipe_machine = graph.create_node("factorygame.Machine", push_undo=True)
recipe_machine.assign_recipe(recipe)
recipe_machine.update()
recipe_machine.set_x_pos(input_port.node().x_pos() - recipe_machine.view.width - 100)
recipe_machine.get_output(input_port.name()).connect_to(input_port)
if recipe_machine.x_pos() - (global_input.x_pos() - global_input.view.width) < 150:
global_input.set_x_pos(recipe_machine.x_pos() - global_input.view.width - 150)
@click.command
@click.option("--debug", is_flag=True)
@click.argument("search")
def main(debug: bool, search: str):
global engine
globals()["debug"] = debug
engine = create_engine("sqlite:///file.db", echo=debug)
with Session(engine) as session:
# FIXME: use some Qt UI prompt method
resource = chose_resource(session=session, resource_label=search, prompt=prompt)
if resource is None:
# FIXME: use concurrent resource searching/fetching
# ret = data_provider.search_for_resource(session=session, search=search)
ret = None
if ret is None:
print("Resource not found")
return
resource, exists_in_db = ret
if not exists_in_db:
print("Resource not yet fetched, run fetch first")
return
if resource_needs_update(resource, recipe_info_timeout=timedelta(days=365)):
print("Please fetch resource", resource.label, "first.")
return
# FIXME: use some Qt UI prompt method
recipe = chose_recipe(session=session, resource=resource, prompt=prompt)
if recipe is None:
return
app = QtWidgets.QApplication([])
global graph
graph = NodeGraph()
graph.widget.resize(1280, 720)
graph.register_node(Machine)
graph.register_node(GlobalInput)
graph_widget = graph.widget
graph_widget.show()
global global_input
global_input = graph.create_node("factorygame.GlobalInput", push_undo=False)
recipe_machine = graph.create_node("factorygame.Machine", push_undo=False)
recipe_machine.assign_recipe(recipe)
graph.auto_layout_nodes([global_input, recipe_machine])
global_input.set_y_pos(recipe_machine.y_pos())
global_input.set_x_pos(global_input.x_pos() - global_input.model.width - 150)
graph.center_on([global_input, recipe_machine])
graph.port_connected.connect(on_port_connected)
app.exec_()
if __name__ == "__main__":
main()