from contextlib import AbstractContextManager from datetime import datetime from typing import Optional from urllib.parse import urljoin import sqlalchemy from selenium.webdriver import Firefox from selenium.webdriver.common.by import By from selenium.webdriver.firefox.options import Options from .models import Resource, ResourceFlow, Factory, Recipe from .provider import RecipeProvider from ..helper import prompt class SatisfactoryPlus(RecipeProvider, AbstractContextManager): _browser: Optional[Firefox] = None def __init__(self, debug: bool = False): super().__init__() self.debug = debug def _init_browser(self) -> Firefox: if self._browser is None: firefox_options = Options() firefox_options.add_argument("--width=1600") firefox_options.add_argument("--height=1015") if not self.debug: firefox_options.add_argument("--headless") self._browser = Firefox(options=firefox_options) self._browser.implicitly_wait(5) return self._browser def _browser_cleanup(self): if not self.debug and self._browser is not None: self._browser.quit() def _normalize_url(self, href: str) -> str: return urljoin(base=self._init_browser().current_url, url=href) def __enter__(self): return self def __exit__(self, __exc_type, __exc_value, __traceback): self._browser_cleanup() def search_for_resource(self, session: sqlalchemy.Session, search: str) -> tuple[Resource, bool] | None: browser = self._init_browser() browser.get("https://wiki.kyrium.space/") search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']") search_bar.click() search_bar.send_keys(search) search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']") search_button.click() choices = browser.find_elements(By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center") if not choices: print("No wiki entries found for this result") return None elif len(choices) > 1: default_choice = 1 options: dict[int, str] = {} for idx in range(len(choices)): recipe_choice = choices[idx] name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt") options[idx + 1] = name if name.casefold() == search.casefold(): default_choice = idx + 1 user_choice = prompt(options=options, text="Chose a recipe to continue…", default=default_choice) if user_choice is None: return None link_html_elem = choices[user_choice - 1] else: link_html_elem = choices[0] alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt") resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none() if resource: return resource, True else: resource_fetch_url = self._normalize_url(href=link_html_elem.get_attribute("href")) return Resource(label=alt_resource_label, wiki_url=resource_fetch_url), False def update_resource_recipes(self, session: sqlalchemy.Session, resource: Resource) -> Resource: assert resource.wiki_url, "Resource wiki url not set" browser = self._init_browser() browser.get(resource.wiki_url) browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click() recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div") resources: dict[str, Resource] = {} new_resources: list[Resource] = [] for recipe_idx in range(len(recipes_html_elems)): recipe_html_elem = recipes_html_elems[recipe_idx] factory_html_elem = recipe_html_elem.find_element(By.CSS_SELECTOR, ".flex-col > span > a") factory_label = factory_html_elem.text.strip() factory_url = urljoin(base=browser.current_url, url=factory_html_elem.get_attribute("href")) def extract_resource_flow(html_elem): resource_img = html_elem.find_element(By.TAG_NAME, "img") resource_label = resource_img.get_attribute("alt").strip() assert resource_label, "resource label is missing" if resource_label in resources: res = resources[resource_label] else: res = session.scalars(Resource.by_label(resource_label)).one_or_none() if not res: wiki_url = self._normalize_url( href=html_elem.find_element(By.TAG_NAME, "a").get_attribute("href"), ) res = Resource(label=resource_label, wiki_url=wiki_url) new_resources.append(res) resources[resource_label] = res amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip() time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip() return ResourceFlow(resource=res, amount=amount, time=time) ingredient_html_elems = recipe_html_elem.find_elements( By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)" ) ingredients: list[ResourceFlow] = [] for ingredient_idx in range(len(ingredient_html_elems)): resource_flow = extract_resource_flow(ingredient_html_elems[ingredient_idx]) ingredients.append(resource_flow) result_html_elems = recipe_html_elem.find_elements( By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)" ) results: list[ResourceFlow] = [] for result_idx in range(len(result_html_elems)): resource_flow = extract_resource_flow(result_html_elems[result_idx]) results.append(resource_flow) with session.no_autoflush: # re-use existing Factory or create new factory = session.scalars(Factory.by_label(factory_label)).one_or_none() if not factory: factory = Factory(label=factory_label, wiki_url=factory_url) session.add(factory) session.add_all(new_resources) session.add_all(ingredients) session.add_all(results) session.add(Recipe(factory=factory, ingredients=ingredients, results=results)) session.flush() # refresh by label, because id might not be set updated_resource = session.scalars(Resource.by_label(resource.label)).one() updated_resource.recipes_populated_at = datetime.utcnow() session.flush() return updated_resource