From 3dcf0f4430f6347b7eb9c286ec896e505102c386 Mon Sep 17 00:00:00 2001 From: Benedikt Ziemons Date: Tue, 30 Jan 2024 18:07:28 +0100 Subject: [PATCH] Refactor, deduplicate some code --- factorygame/data/common.py | 48 ++++++++++++++++ factorygame/data/fetch.py | 59 ++++++-------------- factorygame/data/provider.py | 6 +- factorygame/data/sfp.py | 6 +- factorygame/data/vis.py | 104 ++++++++++------------------------- 5 files changed, 99 insertions(+), 124 deletions(-) create mode 100644 factorygame/data/common.py diff --git a/factorygame/data/common.py b/factorygame/data/common.py new file mode 100644 index 0000000..9a4b518 --- /dev/null +++ b/factorygame/data/common.py @@ -0,0 +1,48 @@ +from datetime import timedelta, datetime +from typing import Callable, Optional + +import sqlalchemy +from sqlalchemy import select + +from .models import Resource, Recipe, ResourceFlow + + +def resource_needs_update(resource: Resource | None, recipe_info_timeout: Optional[timedelta] = None) -> bool: + if recipe_info_timeout is None: + recipe_info_timeout = timedelta(days=30) + return ( + resource is None + or resource.recipes_populated_at is None + or datetime.utcnow() - resource.recipes_populated_at > recipe_info_timeout + ) + + +def chose_resource(session: sqlalchemy.Session, resource_label: str, prompt: Callable) -> Resource | None: + matching_resources = session.scalars(Resource.by_label(resource_label)).all() + if len(matching_resources) == 0: + print("Could not find existing resources matching the search string.. starting wiki search") + else: + options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))} + options[0] = "" + selected_res = prompt( + options=options, text="Chose a resource to continue or 0 to continue with a wiki search", default=1 + ) + if selected_res is not None and selected_res != 0: + return matching_resources[selected_res - 1] + return None + + +def chose_recipe(session: sqlalchemy.Session, resource: Resource, prompt: Callable) -> Recipe | None: + stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id) + recipes = session.scalars(stmt).all() + if len(recipes) == 0: + print("No recipes found for resource") + return None + elif len(recipes) > 1: + options = {(idx + 1): recipes[idx].describe() for idx in range(len(recipes))} + user_choice = prompt(options=options, text="Select recipe", default=1) + if user_choice is None: + return None + return recipes[user_choice - 1] + else: + return recipes[0] diff --git a/factorygame/data/fetch.py b/factorygame/data/fetch.py index 15dc908..87fbe7c 100755 --- a/factorygame/data/fetch.py +++ b/factorygame/data/fetch.py @@ -1,18 +1,14 @@ #!/usr/bin/env python3 -import datetime -from typing import Optional - import click from sqlalchemy import create_engine, select from sqlalchemy.orm import Session -from .models import Base, Resource, ResourceFlow, Recipe +from factorygame.data.common import resource_needs_update, chose_resource +from .models import Base, ResourceFlow, Recipe from .sfp import SatisfactoryPlus from ..helper import prompt -__recipe_info_timeout = datetime.timedelta(days=30) - @click.command() @click.option("--result", is_flag=True) @@ -23,53 +19,32 @@ def main(result: bool, debug: bool, refetch: bool, search: str): engine = create_engine("sqlite:///file.db", echo=debug) Base.metadata.create_all(bind=engine) if result and search: - do_provider_search = True - resource: Optional[Resource] = None with Session(engine) as session: - matching_resources = session.scalars(Resource.by_label(search)).all() - if len(matching_resources) == 0: - print("Could not find existing resources matching the search string.. starting wiki search") - else: - options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))} - options[0] = "" - selected_res = prompt( - options=options, text="Chose a resource to continue or 0 to continue with a wiki search", default=1 - ) - if selected_res is None: - return - if selected_res != 0: - resource = matching_resources[selected_res - 1] - do_provider_search = False - exists_in_db = True + resource = chose_resource(session=session, resource_label=search, prompt=prompt) + exists_in_db = resource is not None with SatisfactoryPlus(debug=debug) as data_provider: - if do_provider_search: + if resource is None: ret = data_provider.search_for_resource(session=session, search=search) if ret is None: return else: resource, exists_in_db = ret - refetch = ( - refetch - or resource.recipes_populated_at is None - or datetime.datetime.utcnow() - resource.recipes_populated_at > __recipe_info_timeout - ) - if refetch and exists_in_db: - print("Deleting recipes for", resource.label) - with session.begin_nested(): - for flow in session.scalars( - select(ResourceFlow).where(ResourceFlow.resource_id == resource.id) - ): - if flow.result_of: - for flow2 in flow.result_of.ingredients: - session.delete(flow2) - for flow2 in flow.result_of.results: - session.delete(flow2) - session.delete(flow.result_of) - + refetch |= resource_needs_update(resource) if refetch: if exists_in_db: + print("Deleting recipes for", resource.label) + with session.begin_nested(): + for flow in session.scalars( + select(ResourceFlow).where(ResourceFlow.resource_id == resource.id) + ): + if flow.result_of: + for flow2 in flow.result_of.ingredients: + session.delete(flow2) + for flow2 in flow.result_of.results: + session.delete(flow2) + session.delete(flow.result_of) print("Refetching recipes for", resource.label) else: print("Fetching recipes for new resource", resource.label) diff --git a/factorygame/data/provider.py b/factorygame/data/provider.py index ff1f885..4b7b135 100644 --- a/factorygame/data/provider.py +++ b/factorygame/data/provider.py @@ -1,6 +1,6 @@ import abc -from sqlalchemy.orm import Session +import sqlalchemy from .models import Resource @@ -8,9 +8,9 @@ from .models import Resource class RecipeProvider(abc.ABC): @abc.abstractmethod - def search_for_resource(self, session: Session, search: str) -> tuple[Resource, bool]: + def search_for_resource(self, session: sqlalchemy.Session, search: str) -> tuple[Resource, bool]: pass @abc.abstractmethod - def update_resource_recipes(self, session: Session, resource: Resource): + def update_resource_recipes(self, session: sqlalchemy.Session, resource: Resource): pass diff --git a/factorygame/data/sfp.py b/factorygame/data/sfp.py index 07beff1..9bee773 100644 --- a/factorygame/data/sfp.py +++ b/factorygame/data/sfp.py @@ -3,10 +3,10 @@ from datetime import datetime from typing import Optional from urllib.parse import urljoin +import sqlalchemy from selenium.webdriver import Firefox from selenium.webdriver.common.by import By from selenium.webdriver.firefox.options import Options -from sqlalchemy.orm import Session from .models import Resource, ResourceFlow, Factory, Recipe from .provider import RecipeProvider @@ -44,7 +44,7 @@ class SatisfactoryPlus(RecipeProvider, AbstractContextManager): def __exit__(self, __exc_type, __exc_value, __traceback): self._browser_cleanup() - def search_for_resource(self, session: Session, search: str) -> tuple[Resource, bool] | None: + def search_for_resource(self, session: sqlalchemy.Session, search: str) -> tuple[Resource, bool] | None: browser = self._init_browser() browser.get("https://wiki.kyrium.space/") search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']") @@ -81,7 +81,7 @@ class SatisfactoryPlus(RecipeProvider, AbstractContextManager): resource_fetch_url = self._normalize_url(href=link_html_elem.get_attribute("href")) return Resource(label=alt_resource_label, wiki_url=resource_fetch_url), False - def update_resource_recipes(self, session: Session, resource: Resource) -> Resource: + def update_resource_recipes(self, session: sqlalchemy.Session, resource: Resource) -> Resource: assert resource.wiki_url, "Resource wiki url not set" browser = self._init_browser() browser.get(resource.wiki_url) diff --git a/factorygame/data/vis.py b/factorygame/data/vis.py index 987a9c0..1f52604 100755 --- a/factorygame/data/vis.py +++ b/factorygame/data/vis.py @@ -1,5 +1,7 @@ #!/usr/bin/env python3 +from datetime import timedelta + import click from NodeGraphQt import BaseNode, NodeBaseWidget from NodeGraphQt import NodeGraph, Port @@ -7,11 +9,11 @@ from NodeGraphQt.constants import PortTypeEnum from PySide2.QtCore import Qt from PySide2.QtWidgets import QSlider from Qt import QtWidgets -from sqlalchemy import create_engine, select +from sqlalchemy import create_engine from sqlalchemy.orm import Session -from .models import Recipe, ResourceFlow, Resource -from .sfp import SatisfactoryPlus +from factorygame.data.common import chose_resource, resource_needs_update, chose_recipe +from .models import Recipe, Resource from ..helper import prompt @@ -127,50 +129,17 @@ def on_port_connected(input_port: Port, output_port: Port): print(f"Port {output_port} connected to {input_port}") if isinstance(output_port.node(), GlobalInput) and output_port.name() == "Create Machine": output_port.clear_connections(push_undo=False, emit_signal=False) - with Session(engine) as session, SatisfactoryPlus() as data_provider: - do_provider_search = False + with Session(engine) as session: resource_label = input_port.name() - matching_resources = session.scalars(Resource.by_label(resource_label)).all() - if len(matching_resources) == 0: - print("Could not find existing resources matching the search string.. starting wiki search") - else: - options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))} - options[0] = "" - selected_res = prompt( - options=options, - text="Chose a resource to continue or 0 to continue with a wiki search", - default=1, - ) - if selected_res is None: - return - if selected_res != 0: - resource = matching_resources[selected_res - 1] - do_provider_search = False - - if do_provider_search: - ret = data_provider.search_for_resource(session=session, search=resource_label) - if ret is None: - print("Resource not found") - return - - resource, exists_in_db = ret - if not exists_in_db: - print("Resource not yet fetched, run fetch first") - return - - stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id) - recipes = session.scalars(stmt).all() - if len(recipes) == 0: - print("No recipes found for resource") + resource = session.scalars(Resource.by_label(resource_label)).one_or_none() + if resource_needs_update(resource, recipe_info_timeout=timedelta(days=365)): + print("Please fetch resource", resource_label, "first.") + return + + # FIXME: use some Qt UI prompt method + recipe = chose_recipe(session=session, resource=resource, prompt=prompt) + if recipe is None: return - elif len(recipes) > 1: - options = {(idx + 1): recipes[idx].describe() for idx in range(len(recipes))} - user_choice = prompt(options=options, text="Select recipe", default=1) - if user_choice is None: - return - recipe = recipes[user_choice - 1] - else: - recipe = recipes[0] recipe_machine = graph.create_node("factorygame.Machine", push_undo=True) recipe_machine.assign_recipe(recipe) @@ -188,25 +157,14 @@ def main(debug: bool, search: str): global engine globals()["debug"] = debug engine = create_engine("sqlite:///file.db", echo=debug) - with Session(engine) as session, SatisfactoryPlus(debug=debug) as data_provider: - do_provider_search = False - matching_resources = session.scalars(Resource.by_label(search)).all() - if len(matching_resources) == 0: - print("Could not find existing resources matching the search string.. starting wiki search") - else: - options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))} - options[0] = "" - selected_res = prompt( - options=options, text="Chose a resource to continue or 0 to continue with a wiki search", default=1 - ) - if selected_res is None: - return - if selected_res != 0: - resource = matching_resources[selected_res - 1] - do_provider_search = False + with Session(engine) as session: + # FIXME: use some Qt UI prompt method + resource = chose_resource(session=session, resource_label=search, prompt=prompt) - if do_provider_search: - ret = data_provider.search_for_resource(session=session, search=search) + if resource is None: + # FIXME: use concurrent resource searching/fetching + # ret = data_provider.search_for_resource(session=session, search=search) + ret = None if ret is None: print("Resource not found") return @@ -215,20 +173,14 @@ def main(debug: bool, search: str): if not exists_in_db: print("Resource not yet fetched, run fetch first") return - - stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id) - recipes = session.scalars(stmt).all() - if len(recipes) == 0: - print("No recipes found for resource") + if resource_needs_update(resource, recipe_info_timeout=timedelta(days=365)): + print("Please fetch resource", resource.label, "first.") + return + + # FIXME: use some Qt UI prompt method + recipe = chose_recipe(session=session, resource=resource, prompt=prompt) + if recipe is None: return - elif len(recipes) > 1: - options = {(idx + 1): recipes[idx].describe() for idx in range(len(recipes))} - user_choice = prompt(options=options, text="Select recipe", default=1) - if user_choice is None: - return - recipe = recipes[user_choice - 1] - else: - recipe = recipes[0] app = QtWidgets.QApplication([]) global graph