SatisfactoryPlusCalculator/factorygame/data/sfp.py

201 lines
8.8 KiB
Python
Raw Permalink Normal View History

from concurrent.futures import Future
2024-01-29 17:37:30 +00:00
from contextlib import AbstractContextManager
from datetime import datetime
2024-02-02 16:44:14 +00:00
from typing import Optional, Callable
2024-01-29 17:37:30 +00:00
from urllib.parse import urljoin
from selenium.webdriver import Firefox
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from sqlalchemy.orm import Session as AlchemySession
2024-01-29 17:37:30 +00:00
from .models import Resource, ResourceFlow, Factory, Recipe
from .provider import RecipeProvider
2024-02-02 20:48:57 +00:00
DEFAULT_IGNORE_FACTORIES = ["A.I. Fluid Packer", "Craft Bench", "Equipment Workshop"]
2024-01-29 17:37:30 +00:00
class SatisfactoryPlus(RecipeProvider, AbstractContextManager):
_browser: Optional[Firefox] = None
ignore_factories: list[str] = []
2024-01-29 17:37:30 +00:00
def __init__(self, ignore_factories: list[str] = None, debug: bool = False):
2024-01-29 17:37:30 +00:00
super().__init__()
if ignore_factories:
for factory in ignore_factories:
self.ignore_factories.append(factory.casefold())
2024-01-29 17:37:30 +00:00
self.debug = debug
def _init_browser(self) -> Firefox:
if self._browser is None:
firefox_options = Options()
firefox_options.add_argument("--width=1600")
firefox_options.add_argument("--height=1015")
if not self.debug:
firefox_options.add_argument("--headless")
self._browser = Firefox(options=firefox_options)
self._browser.implicitly_wait(5)
return self._browser
def _browser_cleanup(self):
if not self.debug and self._browser is not None:
self._browser.quit()
def _normalize_url(self, href: str) -> str:
return urljoin(base=self._init_browser().current_url, url=href)
def __enter__(self):
return self
def __exit__(self, __exc_type, __exc_value, __traceback):
self._browser_cleanup()
2024-02-02 20:48:57 +00:00
def search_for_resource(
self, session: AlchemySession, search: str, prompt: Callable
) -> Future[tuple[Resource, bool] | None]:
2024-01-29 17:37:30 +00:00
browser = self._init_browser()
browser.get("https://wiki.kyrium.space/")
search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']")
search_bar.click()
search_bar.send_keys(search)
search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']")
search_button.click()
choices = browser.find_elements(By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center")
ret = Future()
def process_link_html_elem(link_html_elem):
alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt")
resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none()
if resource:
ret.set_result((resource, True))
else:
resource_fetch_url = self._normalize_url(href=link_html_elem.get_attribute("href"))
ret.set_result((Resource(label=alt_resource_label, uri=resource_fetch_url), False))
2024-01-29 17:37:30 +00:00
if not choices:
print("No wiki entries found for this result")
ret.set_result(None)
2024-01-29 17:37:30 +00:00
elif len(choices) > 1:
default_choice = 1
options: dict[int, str] = {}
for idx in range(len(choices)):
recipe_choice = choices[idx]
2024-01-29 17:37:30 +00:00
name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt")
options[idx + 1] = name
2024-01-29 17:37:30 +00:00
if name.casefold() == search.casefold():
default_choice = idx + 1
2024-01-29 17:37:30 +00:00
def user_choice_cb(fut: Future):
user_choice = fut.result()
if user_choice is None:
ret.set_result(None)
else:
process_link_html_elem(link_html_elem=choices[user_choice - 1])
prompt(options=options, text="Chose a recipe to continue…", default=default_choice).add_done_callback(
user_choice_cb
)
2024-01-29 17:37:30 +00:00
else:
process_link_html_elem(link_html_elem=choices[0])
return ret
2024-01-29 17:37:30 +00:00
def update_resource_recipes(self, session: AlchemySession, resource: Resource) -> Resource:
2024-02-02 16:44:14 +00:00
assert resource.uri, "Resource.uri not set"
2024-01-29 17:37:30 +00:00
browser = self._init_browser()
2024-02-02 16:44:14 +00:00
browser.get(resource.uri)
2024-01-29 17:37:30 +00:00
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click()
recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div")
2024-02-02 16:44:14 +00:00
for recipe_html_elem in recipes_html_elems:
2024-02-02 20:48:57 +00:00
with session.begin_nested():
self.extract_recipe(session, recipe_html_elem)
2024-01-29 17:37:30 +00:00
2024-02-02 16:44:14 +00:00
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-1']").click()
ingredient_recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-1'] > div > div")
for recipe_html_elem in ingredient_recipes_html_elems:
2024-02-02 20:48:57 +00:00
with session.begin_nested():
self.extract_recipe(session, recipe_html_elem)
2024-01-29 17:37:30 +00:00
2024-02-02 16:44:14 +00:00
# manual refresh by label, because id might not be set
2024-01-29 17:37:30 +00:00
updated_resource = session.scalars(Resource.by_label(resource.label)).one()
updated_resource.recipes_populated_at = datetime.utcnow()
session.flush()
return updated_resource
2024-02-02 16:44:14 +00:00
def extract_recipe(self, session: AlchemySession, recipe_html_elem):
recipe_factories: list[Factory] = []
factory_html_elements = recipe_html_elem.find_elements(By.CSS_SELECTOR, ".flex-col > span > a")
for factory_html_elem in factory_html_elements:
factory_label = factory_html_elem.text.strip()
assert factory_label, "factory label is missing (a[text])"
if factory_label.casefold() in self.ignore_factories:
2024-02-02 20:48:57 +00:00
continue
2024-02-02 16:44:14 +00:00
# re-use existing Factory or create new
factory = session.scalars(Factory.by_label(factory_label)).one_or_none()
if factory is None:
factory_href = factory_html_elem.get_attribute("href")
assert factory_href, "factory url is missing (a.href)"
factory = Factory(label=factory_label, uri=self._normalize_url(href=factory_href))
session.add(factory)
recipe_factories.append(factory)
if not recipe_factories:
# ignore recipe when no applicable factories exist
return
2024-02-02 16:44:14 +00:00
2024-02-02 20:48:57 +00:00
cached_resources: dict[str, Resource] = {}
2024-02-02 16:44:14 +00:00
def find_or_create_resource(resource_label: str, resource_uri_getter: Callable) -> Resource:
2024-02-02 20:48:57 +00:00
if resource_label in cached_resources:
return cached_resources[resource_label]
2024-02-02 16:44:14 +00:00
db_resource = session.scalars(Resource.by_label(resource_label)).one_or_none()
if db_resource is not None:
2024-02-02 20:48:57 +00:00
cached_resources[resource_label] = db_resource
2024-02-02 16:44:14 +00:00
return db_resource
resource = Resource(label=resource_label, uri=resource_uri_getter())
session.add(resource)
2024-02-02 20:48:57 +00:00
cached_resources[resource_label] = resource
2024-02-02 16:44:14 +00:00
return resource
def extract_resource_flow(html_elem):
resource_img = html_elem.find_element(By.TAG_NAME, "img")
resource_label = resource_img.get_attribute("alt").strip()
assert resource_label, "resource label is missing (img.alt)"
resource_href = html_elem.find_element(By.TAG_NAME, "a").get_attribute("href")
assert resource_href, "resource link is missing (a.href)"
with session.no_autoflush:
resource = find_or_create_resource(
resource_label=resource_label,
resource_uri_getter=lambda: self._normalize_url(href=resource_href),
)
amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip()
time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip()
return ResourceFlow(resource=resource, amount=amount, time=time)
ingredient_html_elements = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)"
)
ingredients: list[ResourceFlow] = []
for ingredient_idx in range(len(ingredient_html_elements)):
resource_flow = extract_resource_flow(ingredient_html_elements[ingredient_idx])
ingredients.append(resource_flow)
result_html_elements = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)"
)
results: list[ResourceFlow] = []
for result_html_elem in result_html_elements:
resource_flow = extract_resource_flow(result_html_elem)
results.append(resource_flow)
with session.no_autoflush:
session.add_all(ingredients)
session.add_all(results)
session.add(Recipe(factories=recipe_factories, ingredients=ingredients, results=results))
session.flush()