SatisfactoryPlusCalculator/factorygame/calculator/SatisfactoryCalculator.py
Ben 82e7f26c89
Show recipes from database, if available
Add refetch option.
Strip html text before inserting into columns.
2024-01-29 03:23:36 +01:00

326 lines
14 KiB
Python

#!/usr/bin/env python3
import datetime
from typing import Optional
from urllib.parse import urljoin
import click
import sqlalchemy
from selenium.webdriver import Firefox
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.remote.webdriver import WebDriver
from sqlalchemy import String, create_engine, select, ForeignKey, Select, Table, Column, delete
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session
recipe_info_timeout = datetime.timedelta(days=30)
class Base(DeclarativeBase):
pass
class Resource(Base):
__tablename__ = "resources"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str]
recipes_populated_at: Mapped[Optional[datetime.datetime]]
flows: Mapped[list["ResourceFlow"]] = relationship(back_populates="resource")
@classmethod
def by_label(cls, search: str) -> Select[tuple["Resource"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return (
f"Resource(id={self.id}, "
f"label={self.label}, "
f"wiki_url={self.wiki_url}, "
f"recipes_populated_at={self.recipes_populated_at})"
)
class Factory(Base):
__tablename__ = "factories"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str]
@classmethod
def by_label(cls, search: str) -> Select[tuple["Factory"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return f"Factory(id={self.id}, label={self.label}, wiki_url={self.wiki_url})"
ingredients_table = Table(
"recipe_ingredients",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
)
results_table = Table(
"recipe_results",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
)
class ResourceFlow(Base):
__tablename__ = "resource_flows"
id: Mapped[int] = mapped_column(primary_key=True)
ingredient_in: Mapped[Optional["Recipe"]] = relationship(secondary=ingredients_table, back_populates="ingredients")
result_of: Mapped[Optional["Recipe"]] = relationship(secondary=results_table, back_populates="results")
resource_id: Mapped[int] = mapped_column(ForeignKey("resources.id"))
resource: Mapped["Resource"] = relationship(back_populates="flows")
amount: Mapped[str]
time: Mapped[str]
def __repr__(self):
return f"ResourceFlow(id={self.id}, resource_id={self.resource_id}, amount={self.amount}, time={self.time})"
class Recipe(Base):
__tablename__ = "recipes"
id: Mapped[int] = mapped_column(primary_key=True)
factory_id: Mapped[int] = mapped_column(ForeignKey("factories.id"))
factory: Mapped["Factory"] = relationship()
ingredients: Mapped[list["ResourceFlow"]] = relationship(
secondary=ingredients_table, back_populates="ingredient_in"
)
results: Mapped[list["ResourceFlow"]] = relationship(secondary=results_table, back_populates="result_of")
def __repr__(self):
return f"Recipe(id={self.id}, factory={self.factory}, ingredients={self.ingredients}, results={self.results})"
def normalize_url(browser: WebDriver, href: str) -> str:
return urljoin(base=browser.current_url, url=href)
def populate_recipes(browser: WebDriver, engine: sqlalchemy.Engine, input_resource_label: str) -> int:
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click()
recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div")
with Session(engine, autoflush=False) as session:
for recipe_idx in range(len(recipes_html_elems)):
recipe_html_elem = recipes_html_elems[recipe_idx]
factory_html_elem = recipe_html_elem.find_element(By.CSS_SELECTOR, ".flex-col > span > a")
factory_label = factory_html_elem.text.strip()
factory_url = urljoin(base=browser.current_url, url=factory_html_elem.get_attribute("href"))
print("recipe", recipe_idx, "produced in:", factory_label, factory_url)
def extract_resource_flow(html_elem):
resource_img = html_elem.find_element(By.TAG_NAME, "img")
resource_label = resource_img.get_attribute("alt").strip()
wiki_url = normalize_url(
browser=browser,
href=html_elem.find_element(By.TAG_NAME, "a").get_attribute("href"),
)
resource = Resource(label=resource_label, wiki_url=wiki_url)
amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip()
time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip()
return ResourceFlow(resource=resource, amount=amount, time=time)
ingredient_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)"
)
ingredients: list[ResourceFlow] = []
for ingredient_idx in range(len(ingredient_html_elems)):
resource_flow = extract_resource_flow(ingredient_html_elems[ingredient_idx])
ingredients.append(resource_flow)
print(
"recipe",
recipe_idx,
"ingredient",
ingredient_idx,
"name:",
resource_flow.resource.label,
)
print(
"recipe",
recipe_idx,
"ingredient",
ingredient_idx,
"count:",
resource_flow.amount,
)
print(
"recipe",
recipe_idx,
"ingredient",
ingredient_idx,
"time:",
resource_flow.time,
)
result_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)"
)
results: list[ResourceFlow] = []
for result_idx in range(len(result_html_elems)):
resource_flow = extract_resource_flow(result_html_elems[result_idx])
results.append(resource_flow)
print(
"recipe",
recipe_idx,
"result",
result_idx,
"name:",
resource_flow.resource.label,
)
print(
"recipe",
recipe_idx,
"result",
result_idx,
"count:",
resource_flow.amount,
)
print(
"recipe",
recipe_idx,
"result",
result_idx,
"time:",
resource_flow.time,
)
factory = session.scalars(Factory.by_label(factory_label)).one_or_none()
if not factory:
factory = Factory(label=factory_label, wiki_url=factory_url)
session.add(factory)
for flow in ingredients + results:
res = session.scalars(Resource.by_label(flow.resource.label)).one_or_none()
if res:
flow.resource = res
else:
session.add(flow.resource)
session.add(flow)
recipe = Recipe(factory=factory, ingredients=ingredients, results=results)
session.add(recipe)
session.commit()
updated_resource = session.scalars(Resource.by_label(input_resource_label)).one()
updated_resource.recipes_populated_at = datetime.datetime.utcnow()
res_id = updated_resource.id
session.commit()
return res_id
@click.command()
@click.option("--result", is_flag=True)
@click.option("--debug", is_flag=True)
@click.option("--refetch", is_flag=True)
@click.argument("search")
def main(result: bool, debug: bool, refetch: bool, search: str):
engine = create_engine("sqlite:///file.db", echo=debug)
Base.metadata.create_all(bind=engine)
if result and search:
wiki_search = True
resource_label = search
with Session(engine) as session:
resources = session.scalars(Resource.by_label(resource_label)).all()
if len(resources) == 0:
print("Could not find existing resources matching the search string.. starting wiki search")
else:
for idx in range(1, len(resources) + 1):
print(f"{idx}: {resources[idx - 1].label}")
user_choice = click.prompt(
"Chose a resource to continue or 0 to continue with a wiki search", default=1
)
if user_choice != 0:
res_id = resources[user_choice - 1].id
resource_label = resources[user_choice - 1].label
wiki_search = False
session.commit()
firefox_options = Options()
firefox_options.add_argument("--width=1600")
firefox_options.add_argument("--height=1015")
if not debug:
firefox_options.add_argument("--headless")
browser = Firefox(options=firefox_options)
browser.implicitly_wait(5)
try:
if wiki_search:
browser.get("https://wiki.kyrium.space/")
search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']")
search_bar.click()
search_bar.send_keys(search)
search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']")
search_button.click()
choices = browser.find_elements(By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center")
if not choices:
print("No wiki entries found for this result")
return
elif len(choices) > 1:
default_choice = 1
choice_names: list[str] = []
for choice_idx in range(1, len(choices) + 1):
recipe_choice = choices[choice_idx - 1]
name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt")
choice_names.append(name)
if name.casefold() == search.casefold():
default_choice = choice_idx
print(f"{choice_idx}: {name}")
user_choice = click.prompt("Chose a recipe to continue…", default=default_choice)
if not user_choice:
user_choice = default_choice
else:
user_choice = int(user_choice)
link_html_elem = choices[user_choice - 1]
else:
link_html_elem = choices[0]
resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt")
# FIXME: check if resource_label is in database
if debug:
print("resource_label:", resource_label)
link_html_elem.click()
res_id = populate_recipes(browser=browser, engine=engine, input_resource_label=resource_label)
else:
with Session(engine) as session:
input_resource = session.get(Resource, res_id)
input_resource_url = input_resource.wiki_url
resource_label = input_resource.label
refetch = (
refetch
or input_resource.recipes_populated_at is None
or datetime.datetime.utcnow() - input_resource.recipes_populated_at > recipe_info_timeout
)
if refetch:
print("Deleting recipes for", input_resource)
for flow in session.scalars(select(ResourceFlow).where(ResourceFlow.resource_id == res_id)):
if flow.result_of:
for flow2 in flow.result_of.ingredients:
session.delete(flow2)
for flow2 in flow.result_of.results:
session.delete(flow2)
session.delete(flow.result_of)
session.commit()
if refetch:
print("Refetching recipes for", resource_label)
browser.get(input_resource_url)
res_id = populate_recipes(browser=browser, engine=engine, input_resource_label=resource_label)
with Session(engine) as session:
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == res_id)
for recipe in session.scalars(stmt):
print(recipe)
for flow in recipe.ingredients:
print("ingredient:", flow.resource, flow)
for flow in recipe.results:
print("result: ", flow.resource, flow)
finally:
if not debug:
browser.quit()