Add ingredient recipe fetching

This commit is contained in:
Ben 2024-02-02 17:44:14 +01:00
parent ada8f8c6bb
commit 45b05167ee
Signed by: ben
GPG key ID: 0F54A7ED232D3319
8 changed files with 182 additions and 147 deletions

View file

@ -26,7 +26,7 @@ pipenv --site-packages sync
Using pipenv, run e.g. Using pipenv, run e.g.
```sh ```sh
pipenv run fetch --result 'Molten Iron' pipenv run fetch 'Molten Iron'
``` ```
@ -39,6 +39,6 @@ Thanks to [NodeGraphQt](https://github.com/jchanvfx/NodeGraphQt) a graph base vi
The visualisation window can be opened with e.g. The visualisation window can be opened with e.g.
```sh ```sh
pipenv run fetch --result 'Plastic' pipenv run fetch 'Plastic'
pipenv run vis 'Plastic' pipenv run vis 'Plastic'
``` ```

View file

@ -40,7 +40,7 @@ def chose_resource(session: AlchemySession, resource_label: str, prompt: Callabl
def chose_recipe(session: AlchemySession, resource: Resource, prompt: Callable) -> Future[Recipe | None]: def chose_recipe(session: AlchemySession, resource: Resource, prompt: Callable) -> Future[Recipe | None]:
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id) stmt = select(Recipe).distinct().join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
recipes = session.scalars(stmt).all() recipes = session.scalars(stmt).all()
ret = Future() ret = Future()
if len(recipes) == 0: if len(recipes) == 0:

View file

@ -1,7 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import click import click
from sqlalchemy import create_engine, select from sqlalchemy import create_engine, select, delete
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from factorygame.data.common import resource_needs_update, chose_resource from factorygame.data.common import resource_needs_update, chose_resource
@ -11,53 +11,65 @@ from ..helper import click_prompt
@click.command() @click.command()
@click.option("--result", is_flag=True, default=True)
@click.option("--debug", is_flag=True) @click.option("--debug", is_flag=True)
@click.option("--refetch", is_flag=True) @click.option("--refetch", is_flag=True)
@click.argument("search") @click.argument("search")
def main(result: bool, debug: bool, refetch: bool, search: str): def main(debug: bool, refetch: bool, search: str):
engine = create_engine("sqlite:///file.db", echo=debug) engine = create_engine("sqlite:///file.db", echo=debug)
Base.metadata.create_all(bind=engine) Base.metadata.create_all(bind=engine)
if result and search: if not search:
with Session(engine) as session: print("Empty search option. Exiting…")
resource = chose_resource(session=session, resource_label=search, prompt=click_prompt).result() return
exists_in_db = resource is not None
with SatisfactoryPlus(debug=debug) as data_provider: with Session(engine) as session:
if resource is None: resource = chose_resource(session=session, resource_label=search, prompt=click_prompt).result()
ret = data_provider.search_for_resource(session=session, search=search) exists_in_db = resource is not None
if ret is None:
return
else:
resource, exists_in_db = ret
refetch |= resource_needs_update(resource) with SatisfactoryPlus(debug=debug) as data_provider:
if refetch: if resource is None:
if exists_in_db: ret = data_provider.search_for_resource(session=session, search=search)
print("Deleting recipes for", resource.label) if ret is None:
with session.begin_nested(): return
for flow in session.scalars( else:
select(ResourceFlow).where(ResourceFlow.resource_id == resource.id) resource, exists_in_db = ret
):
if flow.result_of:
for flow2 in flow.result_of.ingredients:
session.delete(flow2)
for flow2 in flow.result_of.results:
session.delete(flow2)
session.delete(flow.result_of)
print("Refetching recipes for", resource.label)
else:
print("Fetching recipes for new resource", resource.label)
refetch |= resource_needs_update(resource)
if refetch:
if exists_in_db:
print("Deleting recipes for", resource.label)
with session.begin_nested(): with session.begin_nested():
resource = data_provider.update_resource_recipes(session=session, resource=resource) resource.recipes_populated_at = None
flow_ids_to_delete: list[int] = list()
for flow in session.scalars(
select(ResourceFlow).where(ResourceFlow.resource_id == resource.id)
):
if flow.ingredient_in:
flow_ids_to_delete += map(lambda obj: obj.id, flow.ingredient_in.ingredients)
flow_ids_to_delete += map(lambda obj: obj.id, flow.ingredient_in.results)
session.delete(flow.ingredient_in)
if flow.result_of:
flow_ids_to_delete += map(lambda obj: obj.id, flow.result_of.ingredients)
flow_ids_to_delete += map(lambda obj: obj.id, flow.result_of.results)
session.delete(flow.result_of)
stmt = delete(ResourceFlow).where(ResourceFlow.id.in_(flow_ids_to_delete))
session.execute(stmt)
print("Refetching recipes for", resource.label)
else:
print("Fetching recipes for new resource", resource.label)
session.refresh(resource) with session.begin_nested():
assert resource, "Resource must be set at this point" resource = data_provider.update_resource_recipes(session=session, resource=resource)
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id) session.refresh(resource)
for recipe in session.scalars(stmt): assert resource, "Resource must be set at this point"
print("Recipe:", recipe.describe())
stmt = select(Recipe).distinct().join(Recipe.ingredients).filter(ResourceFlow.resource_id == resource.id)
for recipe in session.scalars(stmt):
print("Used in recipe", recipe.describe())
stmt = select(Recipe).distinct().join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
for recipe in session.scalars(stmt):
print("Result of recipe", recipe.describe())
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -5,48 +5,14 @@ from typing import Optional
from sqlalchemy import String, Select, select, Table, Column, ForeignKey from sqlalchemy import String, Select, select, Table, Column, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
amount_number = re.compile(r"^\d*(\.\d+)?")
time_number = re.compile(r"^(\d+) seconds?")
class Base(DeclarativeBase): class Base(DeclarativeBase):
pass pass
class Resource(Base):
__tablename__ = "resources"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str] # FIXME: rename to uri
recipes_populated_at: Mapped[Optional[datetime.datetime]]
flows: Mapped[list["ResourceFlow"]] = relationship(back_populates="resource")
@classmethod
def by_label(cls, search: str) -> Select[tuple["Resource"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return (
f"Resource(id={self.id}, "
f"label={self.label}, "
f"wiki_url={self.wiki_url}, "
f"recipes_populated_at={self.recipes_populated_at})"
)
class Factory(Base):
__tablename__ = "factories"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str] # FIXME: rename to uri
@classmethod
def by_label(cls, search: str) -> Select[tuple["Factory"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return f"Factory(id={self.id}, label={self.label}, wiki_url={self.wiki_url})"
ingredients_table = Table( ingredients_table = Table(
"recipe_ingredients", "recipe_ingredients",
Base.metadata, Base.metadata,
@ -59,9 +25,50 @@ results_table = Table(
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True), Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True), Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
) )
recipe_factories_table = Table(
"recipe_factories",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("factory_id", ForeignKey("factories.id"), primary_key=True),
)
amount_number = re.compile(r"^\d*(\.\d+)?")
time_number = re.compile(r"^(\d+) seconds?") class Resource(Base):
__tablename__ = "resources"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
uri: Mapped[str]
flows: Mapped[list["ResourceFlow"]] = relationship(back_populates="resource")
recipes_populated_at: Mapped[Optional[datetime.datetime]]
@classmethod
def by_label(cls, search: str) -> Select[tuple["Resource"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return (
f"Resource(id={self.id}, "
f"label={self.label}, "
f"uri={self.uri}, "
f"recipes_populated_at={self.recipes_populated_at})"
)
class Factory(Base):
__tablename__ = "factories"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
uri: Mapped[str]
used_in: Mapped[list["Recipe"]] = relationship(secondary=recipe_factories_table, viewonly=True)
@classmethod
def by_label(cls, search: str) -> Select[tuple["Factory"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return f"Factory(id={self.id}, label={self.label}, uri={self.uri})"
class ResourceFlow(Base): class ResourceFlow(Base):
@ -93,22 +100,24 @@ class Recipe(Base):
__tablename__ = "recipes" __tablename__ = "recipes"
id: Mapped[int] = mapped_column(primary_key=True) id: Mapped[int] = mapped_column(primary_key=True)
factory_id: Mapped[int] = mapped_column(ForeignKey("factories.id")) factories: Mapped[list["Factory"]] = relationship(secondary=recipe_factories_table, back_populates="used_in")
factory: Mapped["Factory"] = relationship()
ingredients: Mapped[list["ResourceFlow"]] = relationship( ingredients: Mapped[list["ResourceFlow"]] = relationship(
secondary=ingredients_table, back_populates="ingredient_in" secondary=ingredients_table, back_populates="ingredient_in"
) )
results: Mapped[list["ResourceFlow"]] = relationship(secondary=results_table, back_populates="result_of") results: Mapped[list["ResourceFlow"]] = relationship(secondary=results_table, back_populates="result_of")
def join_factories(self):
return ", ".join(map(lambda factory: factory.label, self.factories))
def describe(self) -> str: def describe(self) -> str:
def list_flows(flows: list["ResourceFlow"]) -> str: def list_flows(flows: list["ResourceFlow"]) -> str:
return ", ".join(map(ResourceFlow.describe, flows)) return ", ".join(map(ResourceFlow.describe, flows))
return ( return (
f"in machine: {self.factory.label}, " f"in machine(s): {self.join_factories()}, "
f"ingredients: {list_flows(self.ingredients)}, " f"ingredient(s): {list_flows(self.ingredients)}, "
f"results: {list_flows(self.results)}" f"result(s): {list_flows(self.results)}"
) )
def __repr__(self): def __repr__(self):
return f"Recipe(id={self.id}, factory={self.factory}, ingredients={self.ingredients}, results={self.results})" return f"Recipe(id={self.id}, factories={self.factories}, ingredients={self.ingredients}, results={self.results})"

View file

@ -6,11 +6,10 @@ from .models import Resource
class RecipeProvider(abc.ABC): class RecipeProvider(abc.ABC):
@abc.abstractmethod @abc.abstractmethod
def search_for_resource(self, session: AlchemySession, search: str) -> tuple[Resource, bool]: def search_for_resource(self, session: AlchemySession, search: str) -> tuple[Resource, bool]:
pass pass
@abc.abstractmethod @abc.abstractmethod
def update_resource_recipes(self, session: AlchemySession, resource: Resource): def update_resource_recipes(self, session: AlchemySession, resource: Resource) -> Resource:
pass pass

View file

@ -1,6 +1,6 @@
from contextlib import AbstractContextManager from contextlib import AbstractContextManager
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional, Callable
from urllib.parse import urljoin from urllib.parse import urljoin
from selenium.webdriver import Firefox from selenium.webdriver import Firefox
@ -79,72 +79,87 @@ class SatisfactoryPlus(RecipeProvider, AbstractContextManager):
return resource, True return resource, True
else: else:
resource_fetch_url = self._normalize_url(href=link_html_elem.get_attribute("href")) resource_fetch_url = self._normalize_url(href=link_html_elem.get_attribute("href"))
return Resource(label=alt_resource_label, wiki_url=resource_fetch_url), False return Resource(label=alt_resource_label, uri=resource_fetch_url), False
def update_resource_recipes(self, session: AlchemySession, resource: Resource) -> Resource: def update_resource_recipes(self, session: AlchemySession, resource: Resource) -> Resource:
assert resource.wiki_url, "Resource wiki url not set" assert resource.uri, "Resource.uri not set"
browser = self._init_browser() browser = self._init_browser()
browser.get(resource.wiki_url) browser.get(resource.uri)
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click() browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click()
recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div") recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div")
resources: dict[str, Resource] = {} for recipe_html_elem in recipes_html_elems:
new_resources: list[Resource] = [] self.extract_recipe(session, recipe_html_elem)
for recipe_idx in range(len(recipes_html_elems)): browser.find_element(By.CSS_SELECTOR, "button[id$='tab-1']").click()
recipe_html_elem = recipes_html_elems[recipe_idx] ingredient_recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-1'] > div > div")
factory_html_elem = recipe_html_elem.find_element(By.CSS_SELECTOR, ".flex-col > span > a") for recipe_html_elem in ingredient_recipes_html_elems:
factory_label = factory_html_elem.text.strip() self.extract_recipe(session, recipe_html_elem)
factory_url = urljoin(base=browser.current_url, url=factory_html_elem.get_attribute("href"))
def extract_resource_flow(html_elem): # manual refresh by label, because id might not be set
resource_img = html_elem.find_element(By.TAG_NAME, "img")
resource_label = resource_img.get_attribute("alt").strip()
assert resource_label, "resource label is missing"
if resource_label in resources:
res = resources[resource_label]
else:
res = session.scalars(Resource.by_label(resource_label)).one_or_none()
if not res:
wiki_url = self._normalize_url(
href=html_elem.find_element(By.TAG_NAME, "a").get_attribute("href"),
)
res = Resource(label=resource_label, wiki_url=wiki_url)
new_resources.append(res)
resources[resource_label] = res
amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip()
time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip()
return ResourceFlow(resource=res, amount=amount, time=time)
ingredient_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)"
)
ingredients: list[ResourceFlow] = []
for ingredient_idx in range(len(ingredient_html_elems)):
resource_flow = extract_resource_flow(ingredient_html_elems[ingredient_idx])
ingredients.append(resource_flow)
result_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)"
)
results: list[ResourceFlow] = []
for result_idx in range(len(result_html_elems)):
resource_flow = extract_resource_flow(result_html_elems[result_idx])
results.append(resource_flow)
with session.no_autoflush:
# re-use existing Factory or create new
factory = session.scalars(Factory.by_label(factory_label)).one_or_none()
if not factory:
factory = Factory(label=factory_label, wiki_url=factory_url)
session.add(factory)
session.add_all(new_resources)
session.add_all(ingredients)
session.add_all(results)
session.add(Recipe(factory=factory, ingredients=ingredients, results=results))
session.flush()
# refresh by label, because id might not be set
updated_resource = session.scalars(Resource.by_label(resource.label)).one() updated_resource = session.scalars(Resource.by_label(resource.label)).one()
updated_resource.recipes_populated_at = datetime.utcnow() updated_resource.recipes_populated_at = datetime.utcnow()
session.flush() session.flush()
return updated_resource return updated_resource
def extract_recipe(self, session: AlchemySession, recipe_html_elem):
recipe_factories: list[Factory] = []
factory_html_elements = recipe_html_elem.find_elements(By.CSS_SELECTOR, ".flex-col > span > a")
for factory_html_elem in factory_html_elements:
factory_label = factory_html_elem.text.strip()
assert factory_label, "factory label is missing (a[text])"
# re-use existing Factory or create new
factory = session.scalars(Factory.by_label(factory_label)).one_or_none()
if factory is None:
factory_href = factory_html_elem.get_attribute("href")
assert factory_href, "factory url is missing (a.href)"
factory = Factory(label=factory_label, uri=self._normalize_url(href=factory_href))
session.add(factory)
recipe_factories.append(factory)
def find_or_create_resource(resource_label: str, resource_uri_getter: Callable) -> Resource:
db_resource = session.scalars(Resource.by_label(resource_label)).one_or_none()
if db_resource is not None:
return db_resource
resource = Resource(label=resource_label, uri=resource_uri_getter())
session.add(resource)
return resource
def extract_resource_flow(html_elem):
resource_img = html_elem.find_element(By.TAG_NAME, "img")
resource_label = resource_img.get_attribute("alt").strip()
assert resource_label, "resource label is missing (img.alt)"
resource_href = html_elem.find_element(By.TAG_NAME, "a").get_attribute("href")
assert resource_href, "resource link is missing (a.href)"
with session.no_autoflush:
resource = find_or_create_resource(
resource_label=resource_label,
resource_uri_getter=lambda: self._normalize_url(href=resource_href),
)
amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip()
time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip()
return ResourceFlow(resource=resource, amount=amount, time=time)
ingredient_html_elements = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)"
)
ingredients: list[ResourceFlow] = []
for ingredient_idx in range(len(ingredient_html_elements)):
resource_flow = extract_resource_flow(ingredient_html_elements[ingredient_idx])
ingredients.append(resource_flow)
result_html_elements = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)"
)
results: list[ResourceFlow] = []
for result_html_elem in result_html_elements:
resource_flow = extract_resource_flow(result_html_elem)
results.append(resource_flow)
with session.no_autoflush:
session.add_all(ingredients)
session.add_all(results)
session.add(Recipe(factories=recipe_factories, ingredients=ingredients, results=results))
session.flush()

View file

@ -258,7 +258,7 @@ class Machine(BaseNode):
for port_idx in range(len(self._outputs)): for port_idx in range(len(self._outputs)):
self.delete_output(port_idx) self.delete_output(port_idx)
self.set_property("name", recipe.factory.label, push_undo=False) self.set_property("name", recipe.join_factories(), push_undo=False)
for ingredient in recipe.ingredients: for ingredient in recipe.ingredients:
resource_label = ingredient.resource.label resource_label = ingredient.resource.label

BIN
vis.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 69 KiB

After

Width:  |  Height:  |  Size: 75 KiB