From ba4aa26be3e4e8b18d8c8483a1547c12bc792f21 Mon Sep 17 00:00:00 2001 From: Benedikt Ziemons Date: Mon, 29 Jan 2024 16:13:02 +0100 Subject: [PATCH] Implement faster recipe lookup without browser Do resource lookup and run regular refetch-code after wiki search. --- .../calculator/SatisfactoryCalculator.py | 335 +++++++++--------- 1 file changed, 158 insertions(+), 177 deletions(-) diff --git a/factorygame/calculator/SatisfactoryCalculator.py b/factorygame/calculator/SatisfactoryCalculator.py index 0e94ff6..d5e767f 100644 --- a/factorygame/calculator/SatisfactoryCalculator.py +++ b/factorygame/calculator/SatisfactoryCalculator.py @@ -5,15 +5,34 @@ from typing import Optional from urllib.parse import urljoin import click -import sqlalchemy from selenium.webdriver import Firefox from selenium.webdriver.common.by import By from selenium.webdriver.firefox.options import Options -from selenium.webdriver.remote.webdriver import WebDriver -from sqlalchemy import String, create_engine, select, ForeignKey, Select, Table, Column, delete +from sqlalchemy import String, create_engine, select, ForeignKey, Select, Table, Column from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session -recipe_info_timeout = datetime.timedelta(days=30) +__debug = False +__recipe_info_timeout = datetime.timedelta(days=30) +__browser: Optional[Firefox] = None + + +def get_browser() -> Firefox: + global __browser, __debug + if __browser is None: + firefox_options = Options() + firefox_options.add_argument("--width=1600") + firefox_options.add_argument("--height=1015") + if not __debug: + firefox_options.add_argument("--headless") + __browser = Firefox(options=firefox_options) + __browser.implicitly_wait(5) + return __browser + + +def browser_cleanup(): + global __browser, __debug + if not __debug and __browser is not None: + __browser.quit() class Base(DeclarativeBase): @@ -103,116 +122,74 @@ class Recipe(Base): return f"Recipe(id={self.id}, factory={self.factory}, ingredients={self.ingredients}, results={self.results})" -def normalize_url(browser: WebDriver, href: str) -> str: - return urljoin(base=browser.current_url, url=href) +def normalize_url(href: str) -> str: + return urljoin(base=get_browser().current_url, url=href) -def populate_recipes(browser: WebDriver, engine: sqlalchemy.Engine, input_resource_label: str) -> int: +def populate_recipes(session: Session, input_resource_label: str) -> Resource: + browser = get_browser() browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click() recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div") - with Session(engine, autoflush=False) as session: - for recipe_idx in range(len(recipes_html_elems)): - recipe_html_elem = recipes_html_elems[recipe_idx] - factory_html_elem = recipe_html_elem.find_element(By.CSS_SELECTOR, ".flex-col > span > a") - factory_label = factory_html_elem.text.strip() - factory_url = urljoin(base=browser.current_url, url=factory_html_elem.get_attribute("href")) - print("recipe", recipe_idx, "produced in:", factory_label, factory_url) + resources: dict[str, Resource] = {} + new_resources: list[Resource] = [] - def extract_resource_flow(html_elem): - resource_img = html_elem.find_element(By.TAG_NAME, "img") - resource_label = resource_img.get_attribute("alt").strip() - wiki_url = normalize_url( - browser=browser, - href=html_elem.find_element(By.TAG_NAME, "a").get_attribute("href"), - ) - resource = Resource(label=resource_label, wiki_url=wiki_url) - amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip() - time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip() - return ResourceFlow(resource=resource, amount=amount, time=time) + for recipe_idx in range(len(recipes_html_elems)): + recipe_html_elem = recipes_html_elems[recipe_idx] + factory_html_elem = recipe_html_elem.find_element(By.CSS_SELECTOR, ".flex-col > span > a") + factory_label = factory_html_elem.text.strip() + factory_url = urljoin(base=browser.current_url, url=factory_html_elem.get_attribute("href")) - ingredient_html_elems = recipe_html_elem.find_elements( - By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)" - ) - ingredients: list[ResourceFlow] = [] - for ingredient_idx in range(len(ingredient_html_elems)): - resource_flow = extract_resource_flow(ingredient_html_elems[ingredient_idx]) - ingredients.append(resource_flow) - print( - "recipe", - recipe_idx, - "ingredient", - ingredient_idx, - "name:", - resource_flow.resource.label, - ) - print( - "recipe", - recipe_idx, - "ingredient", - ingredient_idx, - "count:", - resource_flow.amount, - ) - print( - "recipe", - recipe_idx, - "ingredient", - ingredient_idx, - "time:", - resource_flow.time, - ) - result_html_elems = recipe_html_elem.find_elements( - By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)" - ) - results: list[ResourceFlow] = [] - for result_idx in range(len(result_html_elems)): - resource_flow = extract_resource_flow(result_html_elems[result_idx]) - results.append(resource_flow) - print( - "recipe", - recipe_idx, - "result", - result_idx, - "name:", - resource_flow.resource.label, - ) - print( - "recipe", - recipe_idx, - "result", - result_idx, - "count:", - resource_flow.amount, - ) - print( - "recipe", - recipe_idx, - "result", - result_idx, - "time:", - resource_flow.time, - ) + def extract_resource_flow(html_elem): + resource_img = html_elem.find_element(By.TAG_NAME, "img") + resource_label = resource_img.get_attribute("alt").strip() + assert resource_label, "resource label is missing" + if resource_label in resources: + resource = resources[resource_label] + else: + resource = session.scalars(Resource.by_label(resource_label)).one_or_none() + if not resource: + wiki_url = normalize_url( + href=html_elem.find_element(By.TAG_NAME, "a").get_attribute("href"), + ) + resource = Resource(label=resource_label, wiki_url=wiki_url) + new_resources.append(resource) + resources[resource_label] = resource + amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip() + time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip() + return ResourceFlow(resource=resource, amount=amount, time=time) + ingredient_html_elems = recipe_html_elem.find_elements( + By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)" + ) + ingredients: list[ResourceFlow] = [] + for ingredient_idx in range(len(ingredient_html_elems)): + resource_flow = extract_resource_flow(ingredient_html_elems[ingredient_idx]) + ingredients.append(resource_flow) + result_html_elems = recipe_html_elem.find_elements( + By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)" + ) + results: list[ResourceFlow] = [] + for result_idx in range(len(result_html_elems)): + resource_flow = extract_resource_flow(result_html_elems[result_idx]) + results.append(resource_flow) + + with session.no_autoflush: + # re-use existing Factory or create new factory = session.scalars(Factory.by_label(factory_label)).one_or_none() if not factory: factory = Factory(label=factory_label, wiki_url=factory_url) session.add(factory) - for flow in ingredients + results: - res = session.scalars(Resource.by_label(flow.resource.label)).one_or_none() - if res: - flow.resource = res - else: - session.add(flow.resource) - session.add(flow) - recipe = Recipe(factory=factory, ingredients=ingredients, results=results) - session.add(recipe) - session.commit() - updated_resource = session.scalars(Resource.by_label(input_resource_label)).one() - updated_resource.recipes_populated_at = datetime.datetime.utcnow() - res_id = updated_resource.id - session.commit() - return res_id + session.add_all(new_resources) + session.add_all(ingredients) + session.add_all(results) + session.add(Recipe(factory=factory, ingredients=ingredients, results=results)) + session.flush() + + updated_resource = session.scalars(Resource.by_label(input_resource_label)).one() + updated_resource.recipes_populated_at = datetime.datetime.utcnow() + session.flush() + return updated_resource @click.command() @@ -221,105 +198,109 @@ def populate_recipes(browser: WebDriver, engine: sqlalchemy.Engine, input_resour @click.option("--refetch", is_flag=True) @click.argument("search") def main(result: bool, debug: bool, refetch: bool, search: str): + global __debug + __debug = debug engine = create_engine("sqlite:///file.db", echo=debug) Base.metadata.create_all(bind=engine) if result and search: wiki_search = True - resource_label = search + resource: Optional[Resource] = None with Session(engine) as session: - resources = session.scalars(Resource.by_label(resource_label)).all() - if len(resources) == 0: + matching_resources = session.scalars(Resource.by_label(search)).all() + if len(matching_resources) == 0: print("Could not find existing resources matching the search string.. starting wiki search") else: - for idx in range(1, len(resources) + 1): - print(f"{idx}: {resources[idx - 1].label}") + for idx in range(1, len(matching_resources) + 1): + print(f"{idx}: {matching_resources[idx - 1].label}") user_choice = click.prompt( "Chose a resource to continue or 0 to continue with a wiki search", default=1 ) if user_choice != 0: - res_id = resources[user_choice - 1].id - resource_label = resources[user_choice - 1].label + resource = matching_resources[user_choice - 1] wiki_search = False - session.commit() - firefox_options = Options() - firefox_options.add_argument("--width=1600") - firefox_options.add_argument("--height=1015") - if not debug: - firefox_options.add_argument("--headless") - browser = Firefox(options=firefox_options) - browser.implicitly_wait(5) - try: - if wiki_search: - browser.get("https://wiki.kyrium.space/") - search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']") - search_bar.click() - search_bar.send_keys(search) - search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']") - search_button.click() - choices = browser.find_elements(By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center") - if not choices: - print("No wiki entries found for this result") - return - elif len(choices) > 1: - default_choice = 1 - choice_names: list[str] = [] - for choice_idx in range(1, len(choices) + 1): - recipe_choice = choices[choice_idx - 1] - name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt") - choice_names.append(name) - if name.casefold() == search.casefold(): - default_choice = choice_idx - print(f"{choice_idx}: {name}") - user_choice = click.prompt("Chose a recipe to continue…", default=default_choice) - if not user_choice: - user_choice = default_choice - else: - user_choice = int(user_choice) - - link_html_elem = choices[user_choice - 1] - else: - link_html_elem = choices[0] - - resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt") - # FIXME: check if resource_label is in database - if debug: - print("resource_label:", resource_label) - link_html_elem.click() - res_id = populate_recipes(browser=browser, engine=engine, input_resource_label=resource_label) - else: - with Session(engine) as session: - input_resource = session.get(Resource, res_id) - input_resource_url = input_resource.wiki_url - resource_label = input_resource.label - refetch = ( - refetch - or input_resource.recipes_populated_at is None - or datetime.datetime.utcnow() - input_resource.recipes_populated_at > recipe_info_timeout + try: + if wiki_search: + browser = get_browser() + browser.get("https://wiki.kyrium.space/") + search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']") + search_bar.click() + search_bar.send_keys(search) + search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']") + search_button.click() + choices = browser.find_elements( + By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center" ) - if refetch: - print("Deleting recipes for", input_resource) - for flow in session.scalars(select(ResourceFlow).where(ResourceFlow.resource_id == res_id)): + if not choices: + print("No wiki entries found for this result") + return + elif len(choices) > 1: + default_choice = 1 + choice_names: list[str] = [] + for choice_idx in range(1, len(choices) + 1): + recipe_choice = choices[choice_idx - 1] + name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt") + choice_names.append(name) + if name.casefold() == search.casefold(): + default_choice = choice_idx + print(f"{choice_idx}: {name}") + user_choice = click.prompt("Chose a recipe to continue…", default=default_choice) + if not user_choice: + user_choice = default_choice + else: + user_choice = int(user_choice) + + link_html_elem = choices[user_choice - 1] + else: + link_html_elem = choices[0] + + alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt") + resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none() + if not resource: + resource_fetch_url = normalize_url(href=link_html_elem.get_attribute("href")) + + refetch = ( + refetch + or resource is None + or resource.recipes_populated_at is None + or datetime.datetime.utcnow() - resource.recipes_populated_at > __recipe_info_timeout + ) + if refetch and resource is not None: + print("Deleting recipes for", resource.label) + with session.begin_nested(): + for flow in session.scalars( + select(ResourceFlow).where(ResourceFlow.resource_id == resource.id) + ): if flow.result_of: for flow2 in flow.result_of.ingredients: session.delete(flow2) for flow2 in flow.result_of.results: session.delete(flow2) session.delete(flow.result_of) - session.commit() - if refetch: - print("Refetching recipes for", resource_label) - browser.get(input_resource_url) - res_id = populate_recipes(browser=browser, engine=engine, input_resource_label=resource_label) - with Session(engine) as session: - stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == res_id) + if refetch: + browser = get_browser() + if resource is None: + print("Fetching recipes for new resource", alt_resource_label) + assert resource_fetch_url, "Resource wiki url not set" + browser.get(resource_fetch_url) + resource_label = alt_resource_label + else: + print("Refetching recipes for", resource.label) + browser.get(resource.wiki_url) + resource_label = resource.label + + with session.begin_nested(): + resource = populate_recipes(session=session, input_resource_label=resource_label) + session.refresh(resource) + + assert resource, "Resource must be set at this point" + stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id) for recipe in session.scalars(stmt): print(recipe) for flow in recipe.ingredients: print("ingredient:", flow.resource, flow) for flow in recipe.results: print("result: ", flow.resource, flow) - finally: - if not debug: - browser.quit() + finally: + browser_cleanup()