from concurrent.futures import Future from contextlib import AbstractContextManager from datetime import datetime from typing import Optional, Callable from urllib.parse import urljoin from selenium.webdriver import Firefox from selenium.webdriver.common.by import By from selenium.webdriver.firefox.options import Options from sqlalchemy.orm import Session as AlchemySession from .models import Resource, ResourceFlow, Factory, Recipe from .provider import RecipeProvider DEFAULT_IGNORE_FACTORIES = ["A.I. Fluid Packer", "Craft Bench", "Equipment Workshop"] class SatisfactoryPlus(RecipeProvider, AbstractContextManager): _browser: Optional[Firefox] = None ignore_factories: list[str] = [] def __init__(self, ignore_factories: list[str] = None, debug: bool = False): super().__init__() if ignore_factories: for factory in ignore_factories: self.ignore_factories.append(factory.casefold()) self.debug = debug def _init_browser(self) -> Firefox: if self._browser is None: firefox_options = Options() firefox_options.add_argument("--width=1600") firefox_options.add_argument("--height=1015") if not self.debug: firefox_options.add_argument("--headless") self._browser = Firefox(options=firefox_options) self._browser.implicitly_wait(5) return self._browser def _browser_cleanup(self): if not self.debug and self._browser is not None: self._browser.quit() def _normalize_url(self, href: str) -> str: return urljoin(base=self._init_browser().current_url, url=href) def __enter__(self): return self def __exit__(self, __exc_type, __exc_value, __traceback): self._browser_cleanup() def search_for_resource( self, session: AlchemySession, search: str, prompt: Callable ) -> Future[tuple[Resource, bool] | None]: browser = self._init_browser() browser.get("https://wiki.kyrium.space/") search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']") search_bar.click() search_bar.send_keys(search) search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']") search_button.click() choices = browser.find_elements(By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center") ret = Future() def process_link_html_elem(link_html_elem): alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt") resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none() if resource: ret.set_result((resource, True)) else: resource_fetch_url = self._normalize_url(href=link_html_elem.get_attribute("href")) ret.set_result((Resource(label=alt_resource_label, uri=resource_fetch_url), False)) if not choices: print("No wiki entries found for this result") ret.set_result(None) elif len(choices) > 1: default_choice = 1 options: dict[int, str] = {} for idx in range(len(choices)): recipe_choice = choices[idx] name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt") options[idx + 1] = name if name.casefold() == search.casefold(): default_choice = idx + 1 def user_choice_cb(fut: Future): user_choice = fut.result() if user_choice is None: ret.set_result(None) else: process_link_html_elem(link_html_elem=choices[user_choice - 1]) prompt(options=options, text="Chose a recipe to continue…", default=default_choice).add_done_callback( user_choice_cb ) else: process_link_html_elem(link_html_elem=choices[0]) return ret def update_resource_recipes(self, session: AlchemySession, resource: Resource) -> Resource: assert resource.uri, "Resource.uri not set" browser = self._init_browser() browser.get(resource.uri) browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click() recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div") for recipe_html_elem in recipes_html_elems: with session.begin_nested(): self.extract_recipe(session, recipe_html_elem) browser.find_element(By.CSS_SELECTOR, "button[id$='tab-1']").click() ingredient_recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-1'] > div > div") for recipe_html_elem in ingredient_recipes_html_elems: with session.begin_nested(): self.extract_recipe(session, recipe_html_elem) # manual refresh by label, because id might not be set updated_resource = session.scalars(Resource.by_label(resource.label)).one() updated_resource.recipes_populated_at = datetime.utcnow() session.flush() return updated_resource def extract_recipe(self, session: AlchemySession, recipe_html_elem): recipe_factories: list[Factory] = [] factory_html_elements = recipe_html_elem.find_elements(By.CSS_SELECTOR, ".flex-col > span > a") for factory_html_elem in factory_html_elements: factory_label = factory_html_elem.text.strip() assert factory_label, "factory label is missing (a[text])" if factory_label.casefold() in self.ignore_factories: continue # re-use existing Factory or create new factory = session.scalars(Factory.by_label(factory_label)).one_or_none() if factory is None: factory_href = factory_html_elem.get_attribute("href") assert factory_href, "factory url is missing (a.href)" factory = Factory(label=factory_label, uri=self._normalize_url(href=factory_href)) session.add(factory) recipe_factories.append(factory) if not recipe_factories: # ignore recipe when no applicable factories exist return cached_resources: dict[str, Resource] = {} def find_or_create_resource(resource_label: str, resource_uri_getter: Callable) -> Resource: if resource_label in cached_resources: return cached_resources[resource_label] db_resource = session.scalars(Resource.by_label(resource_label)).one_or_none() if db_resource is not None: cached_resources[resource_label] = db_resource return db_resource resource = Resource(label=resource_label, uri=resource_uri_getter()) session.add(resource) cached_resources[resource_label] = resource return resource def extract_resource_flow(html_elem): resource_img = html_elem.find_element(By.TAG_NAME, "img") resource_label = resource_img.get_attribute("alt").strip() assert resource_label, "resource label is missing (img.alt)" resource_href = html_elem.find_element(By.TAG_NAME, "a").get_attribute("href") assert resource_href, "resource link is missing (a.href)" with session.no_autoflush: resource = find_or_create_resource( resource_label=resource_label, resource_uri_getter=lambda: self._normalize_url(href=resource_href), ) amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip() time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip() return ResourceFlow(resource=resource, amount=amount, time=time) ingredient_html_elements = recipe_html_elem.find_elements( By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)" ) ingredients: list[ResourceFlow] = [] for ingredient_idx in range(len(ingredient_html_elements)): resource_flow = extract_resource_flow(ingredient_html_elements[ingredient_idx]) ingredients.append(resource_flow) result_html_elements = recipe_html_elem.find_elements( By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)" ) results: list[ResourceFlow] = [] for result_html_elem in result_html_elements: resource_flow = extract_resource_flow(result_html_elem) results.append(resource_flow) with session.no_autoflush: session.add_all(ingredients) session.add_all(results) session.add(Recipe(factories=recipe_factories, ingredients=ingredients, results=results)) session.flush()