Ben
91a0edda5b
Caveats: UI hangs while selecting recipes No calculation is done based on input resource flow
255 lines
11 KiB
Python
Executable file
255 lines
11 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import click
|
|
from NodeGraphQt import BaseNode, NodeBaseWidget
|
|
from NodeGraphQt import NodeGraph, Port
|
|
from NodeGraphQt.constants import PortTypeEnum
|
|
from PySide2.QtCore import Qt
|
|
from PySide2.QtWidgets import QSlider
|
|
from Qt import QtWidgets
|
|
from sqlalchemy import create_engine, select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from .models import Recipe, ResourceFlow, Resource
|
|
from .sfp import SatisfactoryPlus
|
|
from ..helper import prompt
|
|
|
|
|
|
class NodeSlider(NodeBaseWidget):
|
|
def __init__(self, name, label="", parent=None):
|
|
super(NodeSlider, self).__init__(parent)
|
|
self.set_name(name)
|
|
self.pure_label = label or name
|
|
slider = QSlider(Qt.Horizontal)
|
|
slider.setMinimum(1)
|
|
slider.setMaximum(250)
|
|
slider.setValue(100)
|
|
slider.setSingleStep(1)
|
|
slider.setTickInterval(10)
|
|
slider.setTickPosition(QSlider.TicksBelow)
|
|
slider.valueChanged.connect(self.on_value_changed)
|
|
slider.valueChanged.connect(self._update_label)
|
|
self.set_custom_widget(slider)
|
|
self._update_label()
|
|
|
|
def get_value(self):
|
|
widget: QSlider = self.get_custom_widget()
|
|
return widget.value()
|
|
|
|
def set_value(self, value):
|
|
widget: QSlider = self.get_custom_widget()
|
|
widget.setValue(value)
|
|
|
|
def _update_label(self):
|
|
self.set_label(f"{self.pure_label} ({int(self.get_value())}%)")
|
|
|
|
|
|
class GlobalInput(BaseNode):
|
|
__identifier__ = "factorygame"
|
|
NODE_NAME = "Global Input"
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.add_output("Create Machine")
|
|
|
|
|
|
class Machine(BaseNode):
|
|
__identifier__ = "factorygame"
|
|
NODE_NAME = "FactoryGame Machine"
|
|
|
|
def assign_recipe(self, recipe: Recipe):
|
|
for port_idx in range(len(self._inputs)):
|
|
self.delete_input(port_idx)
|
|
|
|
for port_idx in range(len(self._outputs)):
|
|
self.delete_output(port_idx)
|
|
|
|
self.set_property("name", recipe.factory.label, push_undo=False)
|
|
|
|
def in_amount_name(resource_label: str):
|
|
return f"In {resource_label} amount"
|
|
|
|
def out_amount_name(resource_label: str):
|
|
return f"Out {resource_label} amount"
|
|
|
|
input_resources: dict[str, float] = {}
|
|
for ingredient in recipe.ingredients:
|
|
resource_label = ingredient.resource.label
|
|
if resource_label in input_resources:
|
|
input_resources[resource_label] += ingredient.amount_per_minute()
|
|
self.get_widget(in_amount_name(resource_label)).set_value(
|
|
f"{input_resources[resource_label]:.2f} / min"
|
|
)
|
|
else:
|
|
port = self.add_input(resource_label, color=(180, 80, 0))
|
|
port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.Machine")
|
|
port.add_accept_port_type("Create Machine", PortTypeEnum.OUT.value, "factorygame.GlobalInput")
|
|
self._add_text_label(in_amount_name(resource_label), ingredient, input_resources, resource_label)
|
|
|
|
output_resources: dict[str, float] = {}
|
|
for result in recipe.results:
|
|
resource_label = result.resource.label
|
|
if resource_label in output_resources:
|
|
output_resources[resource_label] += result.amount_per_minute()
|
|
self.get_widget(out_amount_name(resource_label)).set_value(
|
|
f"{output_resources[resource_label]:.2f} / min"
|
|
)
|
|
else:
|
|
port = self.add_output(resource_label, color=(200, 20, 0))
|
|
port.add_accept_port_type(resource_label, PortTypeEnum.IN.value, "factorygame.Machine")
|
|
self._add_text_label(out_amount_name(resource_label), result, output_resources, resource_label)
|
|
|
|
performance_slider_name = "machine performance"
|
|
|
|
def set_performance(percentage):
|
|
factor = percentage / 100.0
|
|
for ingredient_label, amount in input_resources.items():
|
|
self.get_widget(in_amount_name(ingredient_label)).set_value(f"{amount * factor:.2f} / min")
|
|
for result_label, amount in output_resources.items():
|
|
self.get_widget(out_amount_name(result_label)).set_value(f"{amount * factor:.2f} / min")
|
|
self.get_widget(performance_slider_name).get_value()
|
|
|
|
slider = NodeSlider(name=performance_slider_name, label="Overclocking Performance", parent=self.view)
|
|
self.add_custom_widget(slider)
|
|
slider.value_changed.connect(lambda name, value: set_performance(max(1, min(250, value))))
|
|
|
|
def _add_text_label(self, name, flow, resource_amounts, resource_label):
|
|
resource_amounts[resource_label] = flow.amount_per_minute()
|
|
self.add_text_input(name=name, label=name, text=f"{resource_amounts[resource_label]} / min")
|
|
widget = self.get_widget(name)
|
|
widget.get_custom_widget().setReadOnly(True)
|
|
widget.widget().setMaximumWidth(220)
|
|
|
|
|
|
def on_port_connected(input_port: Port, output_port: Port):
|
|
global debug
|
|
if debug:
|
|
print(f"Port {output_port} connected to {input_port}")
|
|
if isinstance(output_port.node(), GlobalInput) and output_port.name() == "Create Machine":
|
|
output_port.clear_connections(push_undo=False, emit_signal=False)
|
|
with Session(engine) as session, SatisfactoryPlus() as data_provider:
|
|
do_provider_search = False
|
|
resource_label = input_port.name()
|
|
matching_resources = session.scalars(Resource.by_label(resource_label)).all()
|
|
if len(matching_resources) == 0:
|
|
print("Could not find existing resources matching the search string.. starting wiki search")
|
|
else:
|
|
options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))}
|
|
options[0] = ""
|
|
selected_res = prompt(
|
|
options=options,
|
|
text="Chose a resource to continue or 0 to continue with a wiki search",
|
|
default=1,
|
|
)
|
|
if selected_res is None:
|
|
return
|
|
if selected_res != 0:
|
|
resource = matching_resources[selected_res - 1]
|
|
do_provider_search = False
|
|
|
|
if do_provider_search:
|
|
ret = data_provider.search_for_resource(session=session, search=resource_label)
|
|
if ret is None:
|
|
print("Resource not found")
|
|
return
|
|
|
|
resource, exists_in_db = ret
|
|
if not exists_in_db:
|
|
print("Resource not yet fetched, run fetch first")
|
|
return
|
|
|
|
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
|
|
recipes = session.scalars(stmt).all()
|
|
if len(recipes) == 0:
|
|
print("No recipes found for resource")
|
|
return
|
|
elif len(recipes) > 1:
|
|
options = {(idx + 1): recipes[idx].describe() for idx in range(len(recipes))}
|
|
user_choice = prompt(options=options, text="Select recipe", default=1)
|
|
if user_choice is None:
|
|
return
|
|
recipe = recipes[user_choice - 1]
|
|
else:
|
|
recipe = recipes[0]
|
|
|
|
recipe_machine = graph.create_node("factorygame.Machine", push_undo=True)
|
|
recipe_machine.assign_recipe(recipe)
|
|
recipe_machine.update()
|
|
recipe_machine.set_x_pos(input_port.node().x_pos() - recipe_machine.view.width - 200)
|
|
recipe_machine.get_output(input_port.name()).connect_to(input_port)
|
|
if recipe_machine.x_pos() - (global_input.x_pos() - global_input.view.width) < 200:
|
|
global_input.set_x_pos(recipe_machine.x_pos() - global_input.view.width - 200)
|
|
|
|
|
|
@click.command
|
|
@click.option("--debug", is_flag=True)
|
|
@click.argument("search")
|
|
def main(debug: bool, search: str):
|
|
global engine
|
|
globals()["debug"] = debug
|
|
engine = create_engine("sqlite:///file.db", echo=debug)
|
|
with Session(engine) as session, SatisfactoryPlus(debug=debug) as data_provider:
|
|
do_provider_search = False
|
|
matching_resources = session.scalars(Resource.by_label(search)).all()
|
|
if len(matching_resources) == 0:
|
|
print("Could not find existing resources matching the search string.. starting wiki search")
|
|
else:
|
|
options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))}
|
|
options[0] = ""
|
|
selected_res = prompt(
|
|
options=options, text="Chose a resource to continue or 0 to continue with a wiki search", default=1
|
|
)
|
|
if selected_res is None:
|
|
return
|
|
if selected_res != 0:
|
|
resource = matching_resources[selected_res - 1]
|
|
do_provider_search = False
|
|
|
|
if do_provider_search:
|
|
ret = data_provider.search_for_resource(session=session, search=search)
|
|
if ret is None:
|
|
print("Resource not found")
|
|
return
|
|
|
|
resource, exists_in_db = ret
|
|
if not exists_in_db:
|
|
print("Resource not yet fetched, run fetch first")
|
|
return
|
|
|
|
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
|
|
recipes = session.scalars(stmt).all()
|
|
if len(recipes) == 0:
|
|
print("No recipes found for resource")
|
|
return
|
|
elif len(recipes) > 1:
|
|
options = {(idx + 1): recipes[idx].describe() for idx in range(len(recipes))}
|
|
user_choice = prompt(options=options, text="Select recipe", default=1)
|
|
if user_choice is None:
|
|
return
|
|
recipe = recipes[user_choice - 1]
|
|
else:
|
|
recipe = recipes[0]
|
|
|
|
app = QtWidgets.QApplication([])
|
|
global graph
|
|
graph = NodeGraph()
|
|
graph.widget.resize(1280, 720)
|
|
graph.register_node(Machine)
|
|
graph.register_node(GlobalInput)
|
|
graph_widget = graph.widget
|
|
graph_widget.show()
|
|
global global_input
|
|
global_input = graph.create_node("factorygame.GlobalInput", push_undo=False)
|
|
recipe_machine = graph.create_node("factorygame.Machine", push_undo=False)
|
|
recipe_machine.assign_recipe(recipe)
|
|
graph.auto_layout_nodes([global_input, recipe_machine])
|
|
global_input.set_y_pos(recipe_machine.y_pos())
|
|
global_input.set_x_pos(global_input.x_pos() - global_input.model.width - 200)
|
|
graph.center_on([global_input, recipe_machine])
|
|
graph.port_connected.connect(on_port_connected)
|
|
app.exec_()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|