from contextlib import AbstractContextManager from datetime import datetime from typing import Optional, Callable from urllib.parse import urljoin from selenium.webdriver import Firefox from selenium.webdriver.common.by import By from selenium.webdriver.firefox.options import Options from sqlalchemy.orm import Session as AlchemySession from .models import Resource, ResourceFlow, Factory, Recipe from .provider import RecipeProvider from ..helper import click_prompt class SatisfactoryPlus(RecipeProvider, AbstractContextManager): _browser: Optional[Firefox] = None ignore_factories: list[str] = [] def __init__(self, ignore_factories: list[str] = None, debug: bool = False): super().__init__() if ignore_factories: for factory in ignore_factories: self.ignore_factories.append(factory.casefold()) self.debug = debug def _init_browser(self) -> Firefox: if self._browser is None: firefox_options = Options() firefox_options.add_argument("--width=1600") firefox_options.add_argument("--height=1015") if not self.debug: firefox_options.add_argument("--headless") self._browser = Firefox(options=firefox_options) self._browser.implicitly_wait(5) return self._browser def _browser_cleanup(self): if not self.debug and self._browser is not None: self._browser.quit() def _normalize_url(self, href: str) -> str: return urljoin(base=self._init_browser().current_url, url=href) def __enter__(self): return self def __exit__(self, __exc_type, __exc_value, __traceback): self._browser_cleanup() def search_for_resource(self, session: AlchemySession, search: str) -> tuple[Resource, bool] | None: browser = self._init_browser() browser.get("https://wiki.kyrium.space/") search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']") search_bar.click() search_bar.send_keys(search) search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']") search_button.click() choices = browser.find_elements(By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center") if not choices: print("No wiki entries found for this result") return None elif len(choices) > 1: default_choice = 1 options: dict[int, str] = {} for idx in range(len(choices)): recipe_choice = choices[idx] name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt") options[idx + 1] = name if name.casefold() == search.casefold(): default_choice = idx + 1 user_choice = click_prompt(options=options, text="Chose a recipe to continue…", default=default_choice) if user_choice is None: return None link_html_elem = choices[user_choice - 1] else: link_html_elem = choices[0] alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt") resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none() if resource: return resource, True else: resource_fetch_url = self._normalize_url(href=link_html_elem.get_attribute("href")) return Resource(label=alt_resource_label, uri=resource_fetch_url), False def update_resource_recipes(self, session: AlchemySession, resource: Resource) -> Resource: assert resource.uri, "Resource.uri not set" browser = self._init_browser() browser.get(resource.uri) browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click() recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div") for recipe_html_elem in recipes_html_elems: self.extract_recipe(session, recipe_html_elem) browser.find_element(By.CSS_SELECTOR, "button[id$='tab-1']").click() ingredient_recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-1'] > div > div") for recipe_html_elem in ingredient_recipes_html_elems: self.extract_recipe(session, recipe_html_elem) # manual refresh by label, because id might not be set updated_resource = session.scalars(Resource.by_label(resource.label)).one() updated_resource.recipes_populated_at = datetime.utcnow() session.flush() return updated_resource def extract_recipe(self, session: AlchemySession, recipe_html_elem): recipe_factories: list[Factory] = [] factory_html_elements = recipe_html_elem.find_elements(By.CSS_SELECTOR, ".flex-col > span > a") for factory_html_elem in factory_html_elements: factory_label = factory_html_elem.text.strip() assert factory_label, "factory label is missing (a[text])" if factory_label.casefold() in self.ignore_factories: return # re-use existing Factory or create new factory = session.scalars(Factory.by_label(factory_label)).one_or_none() if factory is None: factory_href = factory_html_elem.get_attribute("href") assert factory_href, "factory url is missing (a.href)" factory = Factory(label=factory_label, uri=self._normalize_url(href=factory_href)) session.add(factory) recipe_factories.append(factory) if not recipe_factories: # ignore recipe when no applicable factories exist return def find_or_create_resource(resource_label: str, resource_uri_getter: Callable) -> Resource: db_resource = session.scalars(Resource.by_label(resource_label)).one_or_none() if db_resource is not None: return db_resource resource = Resource(label=resource_label, uri=resource_uri_getter()) session.add(resource) return resource def extract_resource_flow(html_elem): resource_img = html_elem.find_element(By.TAG_NAME, "img") resource_label = resource_img.get_attribute("alt").strip() assert resource_label, "resource label is missing (img.alt)" resource_href = html_elem.find_element(By.TAG_NAME, "a").get_attribute("href") assert resource_href, "resource link is missing (a.href)" with session.no_autoflush: resource = find_or_create_resource( resource_label=resource_label, resource_uri_getter=lambda: self._normalize_url(href=resource_href), ) amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip() time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip() return ResourceFlow(resource=resource, amount=amount, time=time) ingredient_html_elements = recipe_html_elem.find_elements( By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)" ) ingredients: list[ResourceFlow] = [] for ingredient_idx in range(len(ingredient_html_elements)): resource_flow = extract_resource_flow(ingredient_html_elements[ingredient_idx]) ingredients.append(resource_flow) result_html_elements = recipe_html_elem.find_elements( By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)" ) results: list[ResourceFlow] = [] for result_html_elem in result_html_elements: resource_flow = extract_resource_flow(result_html_elem) results.append(resource_flow) with session.no_autoflush: session.add_all(ingredients) session.add_all(results) session.add(Recipe(factories=recipe_factories, ingredients=ingredients, results=results)) session.flush()