SatisfactoryPlusCalculator/factorygame/calculator/SatisfactoryCalculator.py
Ben ba4aa26be3
Implement faster recipe lookup without browser
Do resource lookup and run regular refetch-code after wiki search.
2024-01-29 16:13:02 +01:00

307 lines
13 KiB
Python

#!/usr/bin/env python3
import datetime
from typing import Optional
from urllib.parse import urljoin
import click
from selenium.webdriver import Firefox
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from sqlalchemy import String, create_engine, select, ForeignKey, Select, Table, Column
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session
__debug = False
__recipe_info_timeout = datetime.timedelta(days=30)
__browser: Optional[Firefox] = None
def get_browser() -> Firefox:
global __browser, __debug
if __browser is None:
firefox_options = Options()
firefox_options.add_argument("--width=1600")
firefox_options.add_argument("--height=1015")
if not __debug:
firefox_options.add_argument("--headless")
__browser = Firefox(options=firefox_options)
__browser.implicitly_wait(5)
return __browser
def browser_cleanup():
global __browser, __debug
if not __debug and __browser is not None:
__browser.quit()
class Base(DeclarativeBase):
pass
class Resource(Base):
__tablename__ = "resources"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str]
recipes_populated_at: Mapped[Optional[datetime.datetime]]
flows: Mapped[list["ResourceFlow"]] = relationship(back_populates="resource")
@classmethod
def by_label(cls, search: str) -> Select[tuple["Resource"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return (
f"Resource(id={self.id}, "
f"label={self.label}, "
f"wiki_url={self.wiki_url}, "
f"recipes_populated_at={self.recipes_populated_at})"
)
class Factory(Base):
__tablename__ = "factories"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str]
@classmethod
def by_label(cls, search: str) -> Select[tuple["Factory"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return f"Factory(id={self.id}, label={self.label}, wiki_url={self.wiki_url})"
ingredients_table = Table(
"recipe_ingredients",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
)
results_table = Table(
"recipe_results",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
)
class ResourceFlow(Base):
__tablename__ = "resource_flows"
id: Mapped[int] = mapped_column(primary_key=True)
ingredient_in: Mapped[Optional["Recipe"]] = relationship(secondary=ingredients_table, back_populates="ingredients")
result_of: Mapped[Optional["Recipe"]] = relationship(secondary=results_table, back_populates="results")
resource_id: Mapped[int] = mapped_column(ForeignKey("resources.id"))
resource: Mapped["Resource"] = relationship(back_populates="flows")
amount: Mapped[str]
time: Mapped[str]
def __repr__(self):
return f"ResourceFlow(id={self.id}, resource_id={self.resource_id}, amount={self.amount}, time={self.time})"
class Recipe(Base):
__tablename__ = "recipes"
id: Mapped[int] = mapped_column(primary_key=True)
factory_id: Mapped[int] = mapped_column(ForeignKey("factories.id"))
factory: Mapped["Factory"] = relationship()
ingredients: Mapped[list["ResourceFlow"]] = relationship(
secondary=ingredients_table, back_populates="ingredient_in"
)
results: Mapped[list["ResourceFlow"]] = relationship(secondary=results_table, back_populates="result_of")
def __repr__(self):
return f"Recipe(id={self.id}, factory={self.factory}, ingredients={self.ingredients}, results={self.results})"
def normalize_url(href: str) -> str:
return urljoin(base=get_browser().current_url, url=href)
def populate_recipes(session: Session, input_resource_label: str) -> Resource:
browser = get_browser()
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click()
recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div")
resources: dict[str, Resource] = {}
new_resources: list[Resource] = []
for recipe_idx in range(len(recipes_html_elems)):
recipe_html_elem = recipes_html_elems[recipe_idx]
factory_html_elem = recipe_html_elem.find_element(By.CSS_SELECTOR, ".flex-col > span > a")
factory_label = factory_html_elem.text.strip()
factory_url = urljoin(base=browser.current_url, url=factory_html_elem.get_attribute("href"))
def extract_resource_flow(html_elem):
resource_img = html_elem.find_element(By.TAG_NAME, "img")
resource_label = resource_img.get_attribute("alt").strip()
assert resource_label, "resource label is missing"
if resource_label in resources:
resource = resources[resource_label]
else:
resource = session.scalars(Resource.by_label(resource_label)).one_or_none()
if not resource:
wiki_url = normalize_url(
href=html_elem.find_element(By.TAG_NAME, "a").get_attribute("href"),
)
resource = Resource(label=resource_label, wiki_url=wiki_url)
new_resources.append(resource)
resources[resource_label] = resource
amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip()
time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip()
return ResourceFlow(resource=resource, amount=amount, time=time)
ingredient_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)"
)
ingredients: list[ResourceFlow] = []
for ingredient_idx in range(len(ingredient_html_elems)):
resource_flow = extract_resource_flow(ingredient_html_elems[ingredient_idx])
ingredients.append(resource_flow)
result_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)"
)
results: list[ResourceFlow] = []
for result_idx in range(len(result_html_elems)):
resource_flow = extract_resource_flow(result_html_elems[result_idx])
results.append(resource_flow)
with session.no_autoflush:
# re-use existing Factory or create new
factory = session.scalars(Factory.by_label(factory_label)).one_or_none()
if not factory:
factory = Factory(label=factory_label, wiki_url=factory_url)
session.add(factory)
session.add_all(new_resources)
session.add_all(ingredients)
session.add_all(results)
session.add(Recipe(factory=factory, ingredients=ingredients, results=results))
session.flush()
updated_resource = session.scalars(Resource.by_label(input_resource_label)).one()
updated_resource.recipes_populated_at = datetime.datetime.utcnow()
session.flush()
return updated_resource
@click.command()
@click.option("--result", is_flag=True)
@click.option("--debug", is_flag=True)
@click.option("--refetch", is_flag=True)
@click.argument("search")
def main(result: bool, debug: bool, refetch: bool, search: str):
global __debug
__debug = debug
engine = create_engine("sqlite:///file.db", echo=debug)
Base.metadata.create_all(bind=engine)
if result and search:
wiki_search = True
resource: Optional[Resource] = None
with Session(engine) as session:
matching_resources = session.scalars(Resource.by_label(search)).all()
if len(matching_resources) == 0:
print("Could not find existing resources matching the search string.. starting wiki search")
else:
for idx in range(1, len(matching_resources) + 1):
print(f"{idx}: {matching_resources[idx - 1].label}")
user_choice = click.prompt(
"Chose a resource to continue or 0 to continue with a wiki search", default=1
)
if user_choice != 0:
resource = matching_resources[user_choice - 1]
wiki_search = False
try:
if wiki_search:
browser = get_browser()
browser.get("https://wiki.kyrium.space/")
search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']")
search_bar.click()
search_bar.send_keys(search)
search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']")
search_button.click()
choices = browser.find_elements(
By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center"
)
if not choices:
print("No wiki entries found for this result")
return
elif len(choices) > 1:
default_choice = 1
choice_names: list[str] = []
for choice_idx in range(1, len(choices) + 1):
recipe_choice = choices[choice_idx - 1]
name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt")
choice_names.append(name)
if name.casefold() == search.casefold():
default_choice = choice_idx
print(f"{choice_idx}: {name}")
user_choice = click.prompt("Chose a recipe to continue…", default=default_choice)
if not user_choice:
user_choice = default_choice
else:
user_choice = int(user_choice)
link_html_elem = choices[user_choice - 1]
else:
link_html_elem = choices[0]
alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt")
resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none()
if not resource:
resource_fetch_url = normalize_url(href=link_html_elem.get_attribute("href"))
refetch = (
refetch
or resource is None
or resource.recipes_populated_at is None
or datetime.datetime.utcnow() - resource.recipes_populated_at > __recipe_info_timeout
)
if refetch and resource is not None:
print("Deleting recipes for", resource.label)
with session.begin_nested():
for flow in session.scalars(
select(ResourceFlow).where(ResourceFlow.resource_id == resource.id)
):
if flow.result_of:
for flow2 in flow.result_of.ingredients:
session.delete(flow2)
for flow2 in flow.result_of.results:
session.delete(flow2)
session.delete(flow.result_of)
if refetch:
browser = get_browser()
if resource is None:
print("Fetching recipes for new resource", alt_resource_label)
assert resource_fetch_url, "Resource wiki url not set"
browser.get(resource_fetch_url)
resource_label = alt_resource_label
else:
print("Refetching recipes for", resource.label)
browser.get(resource.wiki_url)
resource_label = resource.label
with session.begin_nested():
resource = populate_recipes(session=session, input_resource_label=resource_label)
session.refresh(resource)
assert resource, "Resource must be set at this point"
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
for recipe in session.scalars(stmt):
print(recipe)
for flow in recipe.ingredients:
print("ingredient:", flow.resource, flow)
for flow in recipe.results:
print("result: ", flow.resource, flow)
finally:
browser_cleanup()