Refactor code

This commit is contained in:
Ben 2024-01-29 18:37:30 +01:00
parent ba4aa26be3
commit 71e2c46949
Signed by: ben
GPG key ID: 0F54A7ED232D3319
9 changed files with 345 additions and 311 deletions

View file

@ -14,4 +14,4 @@ sqlalchemy = "*"
python_version = "3.11"
[scripts]
calc = {call = "factorygame.calculator.SatisfactoryCalculator:main()"}
fetch = {call = "factorygame.data.fetch:main()"}

View file

@ -3,5 +3,5 @@
Using pipenv, run e.g.
```sh
pipenv run calc --result 'Molten Iron'
pipenv run fetch --result 'Molten Iron'
```

View file

@ -1,306 +0,0 @@
#!/usr/bin/env python3
import datetime
from typing import Optional
from urllib.parse import urljoin
import click
from selenium.webdriver import Firefox
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from sqlalchemy import String, create_engine, select, ForeignKey, Select, Table, Column
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session
__debug = False
__recipe_info_timeout = datetime.timedelta(days=30)
__browser: Optional[Firefox] = None
def get_browser() -> Firefox:
global __browser, __debug
if __browser is None:
firefox_options = Options()
firefox_options.add_argument("--width=1600")
firefox_options.add_argument("--height=1015")
if not __debug:
firefox_options.add_argument("--headless")
__browser = Firefox(options=firefox_options)
__browser.implicitly_wait(5)
return __browser
def browser_cleanup():
global __browser, __debug
if not __debug and __browser is not None:
__browser.quit()
class Base(DeclarativeBase):
pass
class Resource(Base):
__tablename__ = "resources"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str]
recipes_populated_at: Mapped[Optional[datetime.datetime]]
flows: Mapped[list["ResourceFlow"]] = relationship(back_populates="resource")
@classmethod
def by_label(cls, search: str) -> Select[tuple["Resource"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return (
f"Resource(id={self.id}, "
f"label={self.label}, "
f"wiki_url={self.wiki_url}, "
f"recipes_populated_at={self.recipes_populated_at})"
)
class Factory(Base):
__tablename__ = "factories"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str]
@classmethod
def by_label(cls, search: str) -> Select[tuple["Factory"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return f"Factory(id={self.id}, label={self.label}, wiki_url={self.wiki_url})"
ingredients_table = Table(
"recipe_ingredients",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
)
results_table = Table(
"recipe_results",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
)
class ResourceFlow(Base):
__tablename__ = "resource_flows"
id: Mapped[int] = mapped_column(primary_key=True)
ingredient_in: Mapped[Optional["Recipe"]] = relationship(secondary=ingredients_table, back_populates="ingredients")
result_of: Mapped[Optional["Recipe"]] = relationship(secondary=results_table, back_populates="results")
resource_id: Mapped[int] = mapped_column(ForeignKey("resources.id"))
resource: Mapped["Resource"] = relationship(back_populates="flows")
amount: Mapped[str]
time: Mapped[str]
def __repr__(self):
return f"ResourceFlow(id={self.id}, resource_id={self.resource_id}, amount={self.amount}, time={self.time})"
class Recipe(Base):
__tablename__ = "recipes"
id: Mapped[int] = mapped_column(primary_key=True)
factory_id: Mapped[int] = mapped_column(ForeignKey("factories.id"))
factory: Mapped["Factory"] = relationship()
ingredients: Mapped[list["ResourceFlow"]] = relationship(
secondary=ingredients_table, back_populates="ingredient_in"
)
results: Mapped[list["ResourceFlow"]] = relationship(secondary=results_table, back_populates="result_of")
def __repr__(self):
return f"Recipe(id={self.id}, factory={self.factory}, ingredients={self.ingredients}, results={self.results})"
def normalize_url(href: str) -> str:
return urljoin(base=get_browser().current_url, url=href)
def populate_recipes(session: Session, input_resource_label: str) -> Resource:
browser = get_browser()
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click()
recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div")
resources: dict[str, Resource] = {}
new_resources: list[Resource] = []
for recipe_idx in range(len(recipes_html_elems)):
recipe_html_elem = recipes_html_elems[recipe_idx]
factory_html_elem = recipe_html_elem.find_element(By.CSS_SELECTOR, ".flex-col > span > a")
factory_label = factory_html_elem.text.strip()
factory_url = urljoin(base=browser.current_url, url=factory_html_elem.get_attribute("href"))
def extract_resource_flow(html_elem):
resource_img = html_elem.find_element(By.TAG_NAME, "img")
resource_label = resource_img.get_attribute("alt").strip()
assert resource_label, "resource label is missing"
if resource_label in resources:
resource = resources[resource_label]
else:
resource = session.scalars(Resource.by_label(resource_label)).one_or_none()
if not resource:
wiki_url = normalize_url(
href=html_elem.find_element(By.TAG_NAME, "a").get_attribute("href"),
)
resource = Resource(label=resource_label, wiki_url=wiki_url)
new_resources.append(resource)
resources[resource_label] = resource
amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip()
time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip()
return ResourceFlow(resource=resource, amount=amount, time=time)
ingredient_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)"
)
ingredients: list[ResourceFlow] = []
for ingredient_idx in range(len(ingredient_html_elems)):
resource_flow = extract_resource_flow(ingredient_html_elems[ingredient_idx])
ingredients.append(resource_flow)
result_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)"
)
results: list[ResourceFlow] = []
for result_idx in range(len(result_html_elems)):
resource_flow = extract_resource_flow(result_html_elems[result_idx])
results.append(resource_flow)
with session.no_autoflush:
# re-use existing Factory or create new
factory = session.scalars(Factory.by_label(factory_label)).one_or_none()
if not factory:
factory = Factory(label=factory_label, wiki_url=factory_url)
session.add(factory)
session.add_all(new_resources)
session.add_all(ingredients)
session.add_all(results)
session.add(Recipe(factory=factory, ingredients=ingredients, results=results))
session.flush()
updated_resource = session.scalars(Resource.by_label(input_resource_label)).one()
updated_resource.recipes_populated_at = datetime.datetime.utcnow()
session.flush()
return updated_resource
@click.command()
@click.option("--result", is_flag=True)
@click.option("--debug", is_flag=True)
@click.option("--refetch", is_flag=True)
@click.argument("search")
def main(result: bool, debug: bool, refetch: bool, search: str):
global __debug
__debug = debug
engine = create_engine("sqlite:///file.db", echo=debug)
Base.metadata.create_all(bind=engine)
if result and search:
wiki_search = True
resource: Optional[Resource] = None
with Session(engine) as session:
matching_resources = session.scalars(Resource.by_label(search)).all()
if len(matching_resources) == 0:
print("Could not find existing resources matching the search string.. starting wiki search")
else:
for idx in range(1, len(matching_resources) + 1):
print(f"{idx}: {matching_resources[idx - 1].label}")
user_choice = click.prompt(
"Chose a resource to continue or 0 to continue with a wiki search", default=1
)
if user_choice != 0:
resource = matching_resources[user_choice - 1]
wiki_search = False
try:
if wiki_search:
browser = get_browser()
browser.get("https://wiki.kyrium.space/")
search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']")
search_bar.click()
search_bar.send_keys(search)
search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']")
search_button.click()
choices = browser.find_elements(
By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center"
)
if not choices:
print("No wiki entries found for this result")
return
elif len(choices) > 1:
default_choice = 1
choice_names: list[str] = []
for choice_idx in range(1, len(choices) + 1):
recipe_choice = choices[choice_idx - 1]
name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt")
choice_names.append(name)
if name.casefold() == search.casefold():
default_choice = choice_idx
print(f"{choice_idx}: {name}")
user_choice = click.prompt("Chose a recipe to continue…", default=default_choice)
if not user_choice:
user_choice = default_choice
else:
user_choice = int(user_choice)
link_html_elem = choices[user_choice - 1]
else:
link_html_elem = choices[0]
alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt")
resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none()
if not resource:
resource_fetch_url = normalize_url(href=link_html_elem.get_attribute("href"))
refetch = (
refetch
or resource is None
or resource.recipes_populated_at is None
or datetime.datetime.utcnow() - resource.recipes_populated_at > __recipe_info_timeout
)
if refetch and resource is not None:
print("Deleting recipes for", resource.label)
with session.begin_nested():
for flow in session.scalars(
select(ResourceFlow).where(ResourceFlow.resource_id == resource.id)
):
if flow.result_of:
for flow2 in flow.result_of.ingredients:
session.delete(flow2)
for flow2 in flow.result_of.results:
session.delete(flow2)
session.delete(flow.result_of)
if refetch:
browser = get_browser()
if resource is None:
print("Fetching recipes for new resource", alt_resource_label)
assert resource_fetch_url, "Resource wiki url not set"
browser.get(resource_fetch_url)
resource_label = alt_resource_label
else:
print("Refetching recipes for", resource.label)
browser.get(resource.wiki_url)
resource_label = resource.label
with session.begin_nested():
resource = populate_recipes(session=session, input_resource_label=resource_label)
session.refresh(resource)
assert resource, "Resource must be set at this point"
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
for recipe in session.scalars(stmt):
print(recipe)
for flow in recipe.ingredients:
print("ingredient:", flow.resource, flow)
for flow in recipe.results:
print("result: ", flow.resource, flow)
finally:
browser_cleanup()

View file

@ -1,3 +0,0 @@
from .SatisfactoryCalculator import main
main()

83
factorygame/data/fetch.py Normal file
View file

@ -0,0 +1,83 @@
import datetime
from typing import Optional
import click
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
from .models import Base, Resource, ResourceFlow, Recipe
from .sfp import SatisfactoryPlus
__recipe_info_timeout = datetime.timedelta(days=30)
@click.command()
@click.option("--result", is_flag=True)
@click.option("--debug", is_flag=True)
@click.option("--refetch", is_flag=True)
@click.argument("search")
def main(result: bool, debug: bool, refetch: bool, search: str):
engine = create_engine("sqlite:///file.db", echo=debug)
Base.metadata.create_all(bind=engine)
if result and search:
do_provider_search = True
resource: Optional[Resource] = None
with Session(engine) as session:
matching_resources = session.scalars(Resource.by_label(search)).all()
if len(matching_resources) == 0:
print("Could not find existing resources matching the search string.. starting wiki search")
else:
for idx in range(1, len(matching_resources) + 1):
print(f"{idx}: {matching_resources[idx - 1].label}")
user_choice = click.prompt(
"Chose a resource to continue or 0 to continue with a wiki search", default=1
)
if user_choice != 0:
resource = matching_resources[user_choice - 1]
do_provider_search = False
with SatisfactoryPlus(debug=debug) as data_provider:
if do_provider_search:
ret = data_provider.search_for_resource(session=session, search=search)
if ret is None:
return
else:
resource, exists_in_db = ret
refetch = (
refetch
or resource.recipes_populated_at is None
or datetime.datetime.utcnow() - resource.recipes_populated_at > __recipe_info_timeout
)
if refetch and exists_in_db:
print("Deleting recipes for", resource.label)
with session.begin_nested():
for flow in session.scalars(
select(ResourceFlow).where(ResourceFlow.resource_id == resource.id)
):
if flow.result_of:
for flow2 in flow.result_of.ingredients:
session.delete(flow2)
for flow2 in flow.result_of.results:
session.delete(flow2)
session.delete(flow.result_of)
if refetch:
if exists_in_db:
print("Refetching recipes for", resource.label)
else:
print("Fetching recipes for new resource", resource.label)
with session.begin_nested():
resource = data_provider.update_resource_recipes(session=session, resource=resource)
session.refresh(resource)
assert resource, "Resource must be set at this point"
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
for recipe in session.scalars(stmt):
print(recipe)
for flow in recipe.ingredients:
print("ingredient:", flow.resource, flow)
for flow in recipe.results:
print("result: ", flow.resource, flow)

View file

@ -0,0 +1,90 @@
import datetime
from typing import Optional
from sqlalchemy import String, Select, select, Table, Column, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class Resource(Base):
__tablename__ = "resources"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str] # FIXME: rename to uri
recipes_populated_at: Mapped[Optional[datetime.datetime]]
flows: Mapped[list["ResourceFlow"]] = relationship(back_populates="resource")
@classmethod
def by_label(cls, search: str) -> Select[tuple["Resource"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return (
f"Resource(id={self.id}, "
f"label={self.label}, "
f"wiki_url={self.wiki_url}, "
f"recipes_populated_at={self.recipes_populated_at})"
)
class Factory(Base):
__tablename__ = "factories"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str] # FIXME: rename to uri
@classmethod
def by_label(cls, search: str) -> Select[tuple["Factory"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return f"Factory(id={self.id}, label={self.label}, wiki_url={self.wiki_url})"
ingredients_table = Table(
"recipe_ingredients",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
)
results_table = Table(
"recipe_results",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
)
class ResourceFlow(Base):
__tablename__ = "resource_flows"
id: Mapped[int] = mapped_column(primary_key=True)
ingredient_in: Mapped[Optional["Recipe"]] = relationship(secondary=ingredients_table, back_populates="ingredients")
result_of: Mapped[Optional["Recipe"]] = relationship(secondary=results_table, back_populates="results")
resource_id: Mapped[int] = mapped_column(ForeignKey("resources.id"))
resource: Mapped["Resource"] = relationship(back_populates="flows")
amount: Mapped[str]
time: Mapped[str]
def __repr__(self):
return f"ResourceFlow(id={self.id}, resource_id={self.resource_id}, amount={self.amount}, time={self.time})"
class Recipe(Base):
__tablename__ = "recipes"
id: Mapped[int] = mapped_column(primary_key=True)
factory_id: Mapped[int] = mapped_column(ForeignKey("factories.id"))
factory: Mapped["Factory"] = relationship()
ingredients: Mapped[list["ResourceFlow"]] = relationship(
secondary=ingredients_table, back_populates="ingredient_in"
)
results: Mapped[list["ResourceFlow"]] = relationship(secondary=results_table, back_populates="result_of")
def __repr__(self):
return f"Recipe(id={self.id}, factory={self.factory}, ingredients={self.ingredients}, results={self.results})"

View file

@ -0,0 +1,16 @@
import abc
from sqlalchemy.orm import Session
from .models import Resource
class RecipeProvider(abc.ABC):
@abc.abstractmethod
def search_for_resource(self, session: Session, search: str) -> tuple[Resource, bool]:
pass
@abc.abstractmethod
def update_resource_recipes(self, session: Session, resource: Resource):
pass

154
factorygame/data/sfp.py Normal file
View file

@ -0,0 +1,154 @@
from contextlib import AbstractContextManager
from datetime import datetime
from typing import Optional
from urllib.parse import urljoin
import click
from selenium.webdriver import Firefox
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from sqlalchemy.orm import Session
from .models import Resource, ResourceFlow, Factory, Recipe
from .provider import RecipeProvider
class SatisfactoryPlus(RecipeProvider, AbstractContextManager):
_browser: Optional[Firefox] = None
def __init__(self, debug: bool = False):
super().__init__()
self.debug = debug
def _init_browser(self) -> Firefox:
if self._browser is None:
firefox_options = Options()
firefox_options.add_argument("--width=1600")
firefox_options.add_argument("--height=1015")
if not self.debug:
firefox_options.add_argument("--headless")
self._browser = Firefox(options=firefox_options)
self._browser.implicitly_wait(5)
return self._browser
def _browser_cleanup(self):
if not self.debug and self._browser is not None:
self._browser.quit()
def _normalize_url(self, href: str) -> str:
return urljoin(base=self._init_browser().current_url, url=href)
def __enter__(self):
return self
def __exit__(self, __exc_type, __exc_value, __traceback):
self._browser_cleanup()
def search_for_resource(self, session: Session, search: str) -> tuple[Resource, bool] | None:
browser = self._init_browser()
browser.get("https://wiki.kyrium.space/")
search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']")
search_bar.click()
search_bar.send_keys(search)
search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']")
search_button.click()
choices = browser.find_elements(By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center")
if not choices:
print("No wiki entries found for this result")
return None
elif len(choices) > 1:
default_choice = 1
choice_names: list[str] = []
for choice_idx in range(1, len(choices) + 1):
recipe_choice = choices[choice_idx - 1]
name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt")
choice_names.append(name)
if name.casefold() == search.casefold():
default_choice = choice_idx
print(f"{choice_idx}: {name}")
user_choice = click.prompt("Chose a recipe to continue…", default=default_choice)
if not user_choice:
user_choice = default_choice
else:
user_choice = int(user_choice)
link_html_elem = choices[user_choice - 1]
else:
link_html_elem = choices[0]
alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt")
resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none()
if resource:
return resource, True
else:
resource_fetch_url = self._normalize_url(href=link_html_elem.get_attribute("href"))
return Resource(label=alt_resource_label, wiki_url=resource_fetch_url), False
def update_resource_recipes(self, session: Session, resource: Resource) -> Resource:
assert resource.wiki_url, "Resource wiki url not set"
browser = self._init_browser()
browser.get(resource.wiki_url)
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click()
recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div")
resources: dict[str, Resource] = {}
new_resources: list[Resource] = []
for recipe_idx in range(len(recipes_html_elems)):
recipe_html_elem = recipes_html_elems[recipe_idx]
factory_html_elem = recipe_html_elem.find_element(By.CSS_SELECTOR, ".flex-col > span > a")
factory_label = factory_html_elem.text.strip()
factory_url = urljoin(base=browser.current_url, url=factory_html_elem.get_attribute("href"))
def extract_resource_flow(html_elem):
resource_img = html_elem.find_element(By.TAG_NAME, "img")
resource_label = resource_img.get_attribute("alt").strip()
assert resource_label, "resource label is missing"
if resource_label in resources:
res = resources[resource_label]
else:
res = session.scalars(Resource.by_label(resource_label)).one_or_none()
if not res:
wiki_url = self._normalize_url(
href=html_elem.find_element(By.TAG_NAME, "a").get_attribute("href"),
)
res = Resource(label=resource_label, wiki_url=wiki_url)
new_resources.append(res)
resources[resource_label] = res
amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip()
time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip()
return ResourceFlow(resource=res, amount=amount, time=time)
ingredient_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)"
)
ingredients: list[ResourceFlow] = []
for ingredient_idx in range(len(ingredient_html_elems)):
resource_flow = extract_resource_flow(ingredient_html_elems[ingredient_idx])
ingredients.append(resource_flow)
result_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)"
)
results: list[ResourceFlow] = []
for result_idx in range(len(result_html_elems)):
resource_flow = extract_resource_flow(result_html_elems[result_idx])
results.append(resource_flow)
with session.no_autoflush:
# re-use existing Factory or create new
factory = session.scalars(Factory.by_label(factory_label)).one_or_none()
if not factory:
factory = Factory(label=factory_label, wiki_url=factory_url)
session.add(factory)
session.add_all(new_resources)
session.add_all(ingredients)
session.add_all(results)
session.add(Recipe(factory=factory, ingredients=ingredients, results=results))
session.flush()
# refresh by label, because id might not be set
updated_resource = session.scalars(Resource.by_label(resource.label)).one()
updated_resource.recipes_populated_at = datetime.utcnow()
session.flush()
return updated_resource