SatisfactoryPlusCalculator/factorygame/data/sfp.py

187 lines
8.4 KiB
Python

from contextlib import AbstractContextManager
from datetime import datetime
from typing import Optional, Callable
from urllib.parse import urljoin
from selenium.webdriver import Firefox
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from sqlalchemy.orm import Session as AlchemySession
from .models import Resource, ResourceFlow, Factory, Recipe
from .provider import RecipeProvider
DEFAULT_IGNORE_FACTORIES = ["A.I. Fluid Packer", "Craft Bench", "Equipment Workshop"]
class SatisfactoryPlus(RecipeProvider, AbstractContextManager):
_browser: Optional[Firefox] = None
ignore_factories: list[str] = []
def __init__(self, ignore_factories: list[str] = None, debug: bool = False):
super().__init__()
if ignore_factories:
for factory in ignore_factories:
self.ignore_factories.append(factory.casefold())
self.debug = debug
def _init_browser(self) -> Firefox:
if self._browser is None:
firefox_options = Options()
firefox_options.add_argument("--width=1600")
firefox_options.add_argument("--height=1015")
if not self.debug:
firefox_options.add_argument("--headless")
self._browser = Firefox(options=firefox_options)
self._browser.implicitly_wait(5)
return self._browser
def _browser_cleanup(self):
if not self.debug and self._browser is not None:
self._browser.quit()
def _normalize_url(self, href: str) -> str:
return urljoin(base=self._init_browser().current_url, url=href)
def __enter__(self):
return self
def __exit__(self, __exc_type, __exc_value, __traceback):
self._browser_cleanup()
def search_for_resource(
self, session: AlchemySession, search: str, prompt: Callable
) -> tuple[Resource, bool] | None:
browser = self._init_browser()
browser.get("https://wiki.kyrium.space/")
search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']")
search_bar.click()
search_bar.send_keys(search)
search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']")
search_button.click()
choices = browser.find_elements(By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center")
if not choices:
print("No wiki entries found for this result")
return None
elif len(choices) > 1:
default_choice = 1
options: dict[int, str] = {}
for idx in range(len(choices)):
recipe_choice = choices[idx]
name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt")
options[idx + 1] = name
if name.casefold() == search.casefold():
default_choice = idx + 1
user_choice = prompt(options=options, text="Chose a recipe to continue…", default=default_choice)
if user_choice is None:
return None
link_html_elem = choices[user_choice - 1]
else:
link_html_elem = choices[0]
alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt")
resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none()
if resource:
return resource, True
else:
resource_fetch_url = self._normalize_url(href=link_html_elem.get_attribute("href"))
return Resource(label=alt_resource_label, uri=resource_fetch_url), False
def update_resource_recipes(self, session: AlchemySession, resource: Resource) -> Resource:
assert resource.uri, "Resource.uri not set"
browser = self._init_browser()
browser.get(resource.uri)
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click()
recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div")
for recipe_html_elem in recipes_html_elems:
with session.begin_nested():
self.extract_recipe(session, recipe_html_elem)
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-1']").click()
ingredient_recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-1'] > div > div")
for recipe_html_elem in ingredient_recipes_html_elems:
with session.begin_nested():
self.extract_recipe(session, recipe_html_elem)
# manual refresh by label, because id might not be set
updated_resource = session.scalars(Resource.by_label(resource.label)).one()
updated_resource.recipes_populated_at = datetime.utcnow()
session.flush()
return updated_resource
def extract_recipe(self, session: AlchemySession, recipe_html_elem):
recipe_factories: list[Factory] = []
factory_html_elements = recipe_html_elem.find_elements(By.CSS_SELECTOR, ".flex-col > span > a")
for factory_html_elem in factory_html_elements:
factory_label = factory_html_elem.text.strip()
assert factory_label, "factory label is missing (a[text])"
if factory_label.casefold() in self.ignore_factories:
continue
# re-use existing Factory or create new
factory = session.scalars(Factory.by_label(factory_label)).one_or_none()
if factory is None:
factory_href = factory_html_elem.get_attribute("href")
assert factory_href, "factory url is missing (a.href)"
factory = Factory(label=factory_label, uri=self._normalize_url(href=factory_href))
session.add(factory)
recipe_factories.append(factory)
if not recipe_factories:
# ignore recipe when no applicable factories exist
return
cached_resources: dict[str, Resource] = {}
def find_or_create_resource(resource_label: str, resource_uri_getter: Callable) -> Resource:
if resource_label in cached_resources:
return cached_resources[resource_label]
db_resource = session.scalars(Resource.by_label(resource_label)).one_or_none()
if db_resource is not None:
cached_resources[resource_label] = db_resource
return db_resource
resource = Resource(label=resource_label, uri=resource_uri_getter())
session.add(resource)
cached_resources[resource_label] = resource
return resource
def extract_resource_flow(html_elem):
resource_img = html_elem.find_element(By.TAG_NAME, "img")
resource_label = resource_img.get_attribute("alt").strip()
assert resource_label, "resource label is missing (img.alt)"
resource_href = html_elem.find_element(By.TAG_NAME, "a").get_attribute("href")
assert resource_href, "resource link is missing (a.href)"
with session.no_autoflush:
resource = find_or_create_resource(
resource_label=resource_label,
resource_uri_getter=lambda: self._normalize_url(href=resource_href),
)
amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip()
time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip()
return ResourceFlow(resource=resource, amount=amount, time=time)
ingredient_html_elements = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)"
)
ingredients: list[ResourceFlow] = []
for ingredient_idx in range(len(ingredient_html_elements)):
resource_flow = extract_resource_flow(ingredient_html_elements[ingredient_idx])
ingredients.append(resource_flow)
result_html_elements = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)"
)
results: list[ResourceFlow] = []
for result_html_elem in result_html_elements:
resource_flow = extract_resource_flow(result_html_elem)
results.append(resource_flow)
with session.no_autoflush:
session.add_all(ingredients)
session.add_all(results)
session.add(Recipe(factories=recipe_factories, ingredients=ingredients, results=results))
session.flush()