Compare commits

...

2 commits

Author SHA1 Message Date
Ben 91a0edda5b
Add first visualisation revision
Caveats:
UI hangs while selecting recipes
No calculation is done based on input resource flow
2024-01-30 01:32:04 +01:00
Ben 71e2c46949
Refactor code 2024-01-29 18:37:30 +01:00
13 changed files with 676 additions and 312 deletions

View file

@ -7,6 +7,7 @@ name = "pypi"
selenium = "*"
click = "*"
sqlalchemy = "*"
nodegraphqt = "*"
[dev-packages]
@ -14,4 +15,5 @@ sqlalchemy = "*"
python_version = "3.11"
[scripts]
calc = {call = "factorygame.calculator.SatisfactoryCalculator:main()"}
fetch = {call = "factorygame.data.fetch:main()"}
vis = {call = "factorygame.data.vis:main()"}

24
Pipfile.lock generated
View file

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "45cb685b915b5e5bab1ba5339a033373d210e09d6f0b4d9ac9acbe8fe15b5402"
"sha256": "0d398759a0afc9f55cbd2e892bf1f819b572167c2396e2b978515124ada547a9"
},
"pipfile-spec": 6,
"requires": {
@ -121,6 +121,15 @@
"markers": "python_version >= '3.5'",
"version": "==3.6"
},
"nodegraphqt": {
"hashes": [
"sha256:2f0e4b0a2c1a3360deaa2fb6cc42cbf0dce25a7076d036473773d58b0f4fae31",
"sha256:97796bf36845c7e0413e72608507401b04e92106de620b804a234d0cad26e24f"
],
"index": "pypi",
"markers": "python_version >= '3.6'",
"version": "==0.6.35"
},
"outcome": {
"hashes": [
"sha256:9dcf02e65f2971b80047b377468e72a268e15c0af3cf1238e6ff14f7f91143b8",
@ -137,6 +146,12 @@
],
"version": "==1.7.1"
},
"qt.py": {
"hashes": [
"sha256:ab072ac955bdc9318966c078f1bb5941e77166f9e92df3db5726b1eb5afea83b"
],
"version": "==1.3.9"
},
"selenium": {
"hashes": [
"sha256:5aee79026c07985dc1b0c909f34084aa996dfe5b307602de9016d7a621a473f2",
@ -233,6 +248,13 @@
"markers": "python_version >= '3.7'",
"version": "==0.11.1"
},
"types-pyside2": {
"hashes": [
"sha256:5bc2763bc6b595b2c5fc1191ce5147dcc6f44d56efdc2b264b6c082282cc1c57",
"sha256:de4b575e57fdb9e5fd8507537cdd770866fdbe1972eadc3793b2026eaec92875"
],
"version": "==5.15.2.1.6"
},
"typing-extensions": {
"hashes": [
"sha256:23478f88c37f27d76ac8aee6c905017a143b0b1b886c3c9f66bc2fd94f9f5783",

View file

@ -3,5 +3,18 @@
Using pipenv, run e.g.
```sh
pipenv run calc --result 'Molten Iron'
pipenv run fetch --result 'Molten Iron'
```
## Visualisation
Thanks to [NodeGraphQt](https://github.com/jchanvfx/NodeGraphQt) a graph base visualisation is available, which looks like the following:
[!Graph Visualisation Example](vis.png)
The visualisation window can be opened with e.g.
```sh
pipenv run fetch --result 'Plastic'
pipenv run vis 'Plastic'
```

View file

@ -1,306 +0,0 @@
#!/usr/bin/env python3
import datetime
from typing import Optional
from urllib.parse import urljoin
import click
from selenium.webdriver import Firefox
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from sqlalchemy import String, create_engine, select, ForeignKey, Select, Table, Column
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session
__debug = False
__recipe_info_timeout = datetime.timedelta(days=30)
__browser: Optional[Firefox] = None
def get_browser() -> Firefox:
global __browser, __debug
if __browser is None:
firefox_options = Options()
firefox_options.add_argument("--width=1600")
firefox_options.add_argument("--height=1015")
if not __debug:
firefox_options.add_argument("--headless")
__browser = Firefox(options=firefox_options)
__browser.implicitly_wait(5)
return __browser
def browser_cleanup():
global __browser, __debug
if not __debug and __browser is not None:
__browser.quit()
class Base(DeclarativeBase):
pass
class Resource(Base):
__tablename__ = "resources"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str]
recipes_populated_at: Mapped[Optional[datetime.datetime]]
flows: Mapped[list["ResourceFlow"]] = relationship(back_populates="resource")
@classmethod
def by_label(cls, search: str) -> Select[tuple["Resource"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return (
f"Resource(id={self.id}, "
f"label={self.label}, "
f"wiki_url={self.wiki_url}, "
f"recipes_populated_at={self.recipes_populated_at})"
)
class Factory(Base):
__tablename__ = "factories"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str]
@classmethod
def by_label(cls, search: str) -> Select[tuple["Factory"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return f"Factory(id={self.id}, label={self.label}, wiki_url={self.wiki_url})"
ingredients_table = Table(
"recipe_ingredients",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
)
results_table = Table(
"recipe_results",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
)
class ResourceFlow(Base):
__tablename__ = "resource_flows"
id: Mapped[int] = mapped_column(primary_key=True)
ingredient_in: Mapped[Optional["Recipe"]] = relationship(secondary=ingredients_table, back_populates="ingredients")
result_of: Mapped[Optional["Recipe"]] = relationship(secondary=results_table, back_populates="results")
resource_id: Mapped[int] = mapped_column(ForeignKey("resources.id"))
resource: Mapped["Resource"] = relationship(back_populates="flows")
amount: Mapped[str]
time: Mapped[str]
def __repr__(self):
return f"ResourceFlow(id={self.id}, resource_id={self.resource_id}, amount={self.amount}, time={self.time})"
class Recipe(Base):
__tablename__ = "recipes"
id: Mapped[int] = mapped_column(primary_key=True)
factory_id: Mapped[int] = mapped_column(ForeignKey("factories.id"))
factory: Mapped["Factory"] = relationship()
ingredients: Mapped[list["ResourceFlow"]] = relationship(
secondary=ingredients_table, back_populates="ingredient_in"
)
results: Mapped[list["ResourceFlow"]] = relationship(secondary=results_table, back_populates="result_of")
def __repr__(self):
return f"Recipe(id={self.id}, factory={self.factory}, ingredients={self.ingredients}, results={self.results})"
def normalize_url(href: str) -> str:
return urljoin(base=get_browser().current_url, url=href)
def populate_recipes(session: Session, input_resource_label: str) -> Resource:
browser = get_browser()
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click()
recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div")
resources: dict[str, Resource] = {}
new_resources: list[Resource] = []
for recipe_idx in range(len(recipes_html_elems)):
recipe_html_elem = recipes_html_elems[recipe_idx]
factory_html_elem = recipe_html_elem.find_element(By.CSS_SELECTOR, ".flex-col > span > a")
factory_label = factory_html_elem.text.strip()
factory_url = urljoin(base=browser.current_url, url=factory_html_elem.get_attribute("href"))
def extract_resource_flow(html_elem):
resource_img = html_elem.find_element(By.TAG_NAME, "img")
resource_label = resource_img.get_attribute("alt").strip()
assert resource_label, "resource label is missing"
if resource_label in resources:
resource = resources[resource_label]
else:
resource = session.scalars(Resource.by_label(resource_label)).one_or_none()
if not resource:
wiki_url = normalize_url(
href=html_elem.find_element(By.TAG_NAME, "a").get_attribute("href"),
)
resource = Resource(label=resource_label, wiki_url=wiki_url)
new_resources.append(resource)
resources[resource_label] = resource
amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip()
time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip()
return ResourceFlow(resource=resource, amount=amount, time=time)
ingredient_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)"
)
ingredients: list[ResourceFlow] = []
for ingredient_idx in range(len(ingredient_html_elems)):
resource_flow = extract_resource_flow(ingredient_html_elems[ingredient_idx])
ingredients.append(resource_flow)
result_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)"
)
results: list[ResourceFlow] = []
for result_idx in range(len(result_html_elems)):
resource_flow = extract_resource_flow(result_html_elems[result_idx])
results.append(resource_flow)
with session.no_autoflush:
# re-use existing Factory or create new
factory = session.scalars(Factory.by_label(factory_label)).one_or_none()
if not factory:
factory = Factory(label=factory_label, wiki_url=factory_url)
session.add(factory)
session.add_all(new_resources)
session.add_all(ingredients)
session.add_all(results)
session.add(Recipe(factory=factory, ingredients=ingredients, results=results))
session.flush()
updated_resource = session.scalars(Resource.by_label(input_resource_label)).one()
updated_resource.recipes_populated_at = datetime.datetime.utcnow()
session.flush()
return updated_resource
@click.command()
@click.option("--result", is_flag=True)
@click.option("--debug", is_flag=True)
@click.option("--refetch", is_flag=True)
@click.argument("search")
def main(result: bool, debug: bool, refetch: bool, search: str):
global __debug
__debug = debug
engine = create_engine("sqlite:///file.db", echo=debug)
Base.metadata.create_all(bind=engine)
if result and search:
wiki_search = True
resource: Optional[Resource] = None
with Session(engine) as session:
matching_resources = session.scalars(Resource.by_label(search)).all()
if len(matching_resources) == 0:
print("Could not find existing resources matching the search string.. starting wiki search")
else:
for idx in range(1, len(matching_resources) + 1):
print(f"{idx}: {matching_resources[idx - 1].label}")
user_choice = click.prompt(
"Chose a resource to continue or 0 to continue with a wiki search", default=1
)
if user_choice != 0:
resource = matching_resources[user_choice - 1]
wiki_search = False
try:
if wiki_search:
browser = get_browser()
browser.get("https://wiki.kyrium.space/")
search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']")
search_bar.click()
search_bar.send_keys(search)
search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']")
search_button.click()
choices = browser.find_elements(
By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center"
)
if not choices:
print("No wiki entries found for this result")
return
elif len(choices) > 1:
default_choice = 1
choice_names: list[str] = []
for choice_idx in range(1, len(choices) + 1):
recipe_choice = choices[choice_idx - 1]
name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt")
choice_names.append(name)
if name.casefold() == search.casefold():
default_choice = choice_idx
print(f"{choice_idx}: {name}")
user_choice = click.prompt("Chose a recipe to continue…", default=default_choice)
if not user_choice:
user_choice = default_choice
else:
user_choice = int(user_choice)
link_html_elem = choices[user_choice - 1]
else:
link_html_elem = choices[0]
alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt")
resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none()
if not resource:
resource_fetch_url = normalize_url(href=link_html_elem.get_attribute("href"))
refetch = (
refetch
or resource is None
or resource.recipes_populated_at is None
or datetime.datetime.utcnow() - resource.recipes_populated_at > __recipe_info_timeout
)
if refetch and resource is not None:
print("Deleting recipes for", resource.label)
with session.begin_nested():
for flow in session.scalars(
select(ResourceFlow).where(ResourceFlow.resource_id == resource.id)
):
if flow.result_of:
for flow2 in flow.result_of.ingredients:
session.delete(flow2)
for flow2 in flow.result_of.results:
session.delete(flow2)
session.delete(flow.result_of)
if refetch:
browser = get_browser()
if resource is None:
print("Fetching recipes for new resource", alt_resource_label)
assert resource_fetch_url, "Resource wiki url not set"
browser.get(resource_fetch_url)
resource_label = alt_resource_label
else:
print("Refetching recipes for", resource.label)
browser.get(resource.wiki_url)
resource_label = resource.label
with session.begin_nested():
resource = populate_recipes(session=session, input_resource_label=resource_label)
session.refresh(resource)
assert resource, "Resource must be set at this point"
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
for recipe in session.scalars(stmt):
print(recipe)
for flow in recipe.ingredients:
print("ingredient:", flow.resource, flow)
for flow in recipe.results:
print("result: ", flow.resource, flow)
finally:
browser_cleanup()

View file

@ -1,3 +0,0 @@
from .SatisfactoryCalculator import main
main()

89
factorygame/data/fetch.py Executable file
View file

@ -0,0 +1,89 @@
#!/usr/bin/env python3
import datetime
from typing import Optional
import click
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
from .models import Base, Resource, ResourceFlow, Recipe
from .sfp import SatisfactoryPlus
from ..helper import prompt
__recipe_info_timeout = datetime.timedelta(days=30)
@click.command()
@click.option("--result", is_flag=True)
@click.option("--debug", is_flag=True)
@click.option("--refetch", is_flag=True)
@click.argument("search")
def main(result: bool, debug: bool, refetch: bool, search: str):
engine = create_engine("sqlite:///file.db", echo=debug)
Base.metadata.create_all(bind=engine)
if result and search:
do_provider_search = True
resource: Optional[Resource] = None
with Session(engine) as session:
matching_resources = session.scalars(Resource.by_label(search)).all()
if len(matching_resources) == 0:
print("Could not find existing resources matching the search string.. starting wiki search")
else:
options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))}
options[0] = ""
selected_res = prompt(
options=options, text="Chose a resource to continue or 0 to continue with a wiki search", default=1
)
if selected_res is None:
return
if selected_res != 0:
resource = matching_resources[selected_res - 1]
do_provider_search = False
exists_in_db = True
with SatisfactoryPlus(debug=debug) as data_provider:
if do_provider_search:
ret = data_provider.search_for_resource(session=session, search=search)
if ret is None:
return
else:
resource, exists_in_db = ret
refetch = (
refetch
or resource.recipes_populated_at is None
or datetime.datetime.utcnow() - resource.recipes_populated_at > __recipe_info_timeout
)
if refetch and exists_in_db:
print("Deleting recipes for", resource.label)
with session.begin_nested():
for flow in session.scalars(
select(ResourceFlow).where(ResourceFlow.resource_id == resource.id)
):
if flow.result_of:
for flow2 in flow.result_of.ingredients:
session.delete(flow2)
for flow2 in flow.result_of.results:
session.delete(flow2)
session.delete(flow.result_of)
if refetch:
if exists_in_db:
print("Refetching recipes for", resource.label)
else:
print("Fetching recipes for new resource", resource.label)
with session.begin_nested():
resource = data_provider.update_resource_recipes(session=session, resource=resource)
session.refresh(resource)
assert resource, "Resource must be set at this point"
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
for recipe in session.scalars(stmt):
print("Recipe:", recipe.describe())
if __name__ == "__main__":
main()

114
factorygame/data/models.py Normal file
View file

@ -0,0 +1,114 @@
import datetime
import re
from typing import Optional
from sqlalchemy import String, Select, select, Table, Column, ForeignKey
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class Resource(Base):
__tablename__ = "resources"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str] # FIXME: rename to uri
recipes_populated_at: Mapped[Optional[datetime.datetime]]
flows: Mapped[list["ResourceFlow"]] = relationship(back_populates="resource")
@classmethod
def by_label(cls, search: str) -> Select[tuple["Resource"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return (
f"Resource(id={self.id}, "
f"label={self.label}, "
f"wiki_url={self.wiki_url}, "
f"recipes_populated_at={self.recipes_populated_at})"
)
class Factory(Base):
__tablename__ = "factories"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str] = mapped_column(String(127))
wiki_url: Mapped[str] # FIXME: rename to uri
@classmethod
def by_label(cls, search: str) -> Select[tuple["Factory"]]:
return select(cls).where(cls.label.ilike(search))
def __repr__(self):
return f"Factory(id={self.id}, label={self.label}, wiki_url={self.wiki_url})"
ingredients_table = Table(
"recipe_ingredients",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
)
results_table = Table(
"recipe_results",
Base.metadata,
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
)
amount_number = re.compile(r"^\d*(\.\d+)?")
time_number = re.compile(r"^(\d+) seconds?")
class ResourceFlow(Base):
__tablename__ = "resource_flows"
id: Mapped[int] = mapped_column(primary_key=True)
ingredient_in: Mapped[Optional["Recipe"]] = relationship(secondary=ingredients_table, back_populates="ingredients")
result_of: Mapped[Optional["Recipe"]] = relationship(secondary=results_table, back_populates="results")
resource_id: Mapped[int] = mapped_column(ForeignKey("resources.id"))
resource: Mapped["Resource"] = relationship(back_populates="flows")
amount: Mapped[str]
time: Mapped[str]
def amount_per_minute(self) -> float:
per_crafting_step_str = amount_number.match(self.amount).group(0)
per_crafting_step = float(per_crafting_step_str)
crafting_time_seconds_str = time_number.match(self.time).group(1)
crafting_time_seconds = float(crafting_time_seconds_str)
return per_crafting_step * (60.0 / crafting_time_seconds)
def describe(self) -> str:
return f"{repr(str(self.resource.label))}x{self.amount}"
def __repr__(self):
return f"ResourceFlow(id={self.id}, resource_id={self.resource_id}, amount={self.amount}, time={self.time})"
class Recipe(Base):
__tablename__ = "recipes"
id: Mapped[int] = mapped_column(primary_key=True)
factory_id: Mapped[int] = mapped_column(ForeignKey("factories.id"))
factory: Mapped["Factory"] = relationship()
ingredients: Mapped[list["ResourceFlow"]] = relationship(
secondary=ingredients_table, back_populates="ingredient_in"
)
results: Mapped[list["ResourceFlow"]] = relationship(secondary=results_table, back_populates="result_of")
def describe(self) -> str:
def list_flows(flows: list["ResourceFlow"]) -> str:
return ", ".join(map(ResourceFlow.describe, flows))
return (
f"in machine: {self.factory.label}, "
f"ingredients: {list_flows(self.ingredients)}, "
f"results: {list_flows(self.results)}"
)
def __repr__(self):
return f"Recipe(id={self.id}, factory={self.factory}, ingredients={self.ingredients}, results={self.results})"

View file

@ -0,0 +1,16 @@
import abc
from sqlalchemy.orm import Session
from .models import Resource
class RecipeProvider(abc.ABC):
@abc.abstractmethod
def search_for_resource(self, session: Session, search: str) -> tuple[Resource, bool]:
pass
@abc.abstractmethod
def update_resource_recipes(self, session: Session, resource: Resource):
pass

150
factorygame/data/sfp.py Normal file
View file

@ -0,0 +1,150 @@
from contextlib import AbstractContextManager
from datetime import datetime
from typing import Optional
from urllib.parse import urljoin
from selenium.webdriver import Firefox
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from sqlalchemy.orm import Session
from .models import Resource, ResourceFlow, Factory, Recipe
from .provider import RecipeProvider
from ..helper import prompt
class SatisfactoryPlus(RecipeProvider, AbstractContextManager):
_browser: Optional[Firefox] = None
def __init__(self, debug: bool = False):
super().__init__()
self.debug = debug
def _init_browser(self) -> Firefox:
if self._browser is None:
firefox_options = Options()
firefox_options.add_argument("--width=1600")
firefox_options.add_argument("--height=1015")
if not self.debug:
firefox_options.add_argument("--headless")
self._browser = Firefox(options=firefox_options)
self._browser.implicitly_wait(5)
return self._browser
def _browser_cleanup(self):
if not self.debug and self._browser is not None:
self._browser.quit()
def _normalize_url(self, href: str) -> str:
return urljoin(base=self._init_browser().current_url, url=href)
def __enter__(self):
return self
def __exit__(self, __exc_type, __exc_value, __traceback):
self._browser_cleanup()
def search_for_resource(self, session: Session, search: str) -> tuple[Resource, bool] | None:
browser = self._init_browser()
browser.get("https://wiki.kyrium.space/")
search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']")
search_bar.click()
search_bar.send_keys(search)
search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']")
search_button.click()
choices = browser.find_elements(By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center")
if not choices:
print("No wiki entries found for this result")
return None
elif len(choices) > 1:
default_choice = 1
options: dict[int, str] = {}
for idx in range(len(choices)):
recipe_choice = choices[idx]
name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt")
options[idx + 1] = name
if name.casefold() == search.casefold():
default_choice = idx + 1
user_choice = prompt(options=options, text="Chose a recipe to continue…", default=default_choice)
if user_choice is None:
return None
link_html_elem = choices[user_choice - 1]
else:
link_html_elem = choices[0]
alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt")
resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none()
if resource:
return resource, True
else:
resource_fetch_url = self._normalize_url(href=link_html_elem.get_attribute("href"))
return Resource(label=alt_resource_label, wiki_url=resource_fetch_url), False
def update_resource_recipes(self, session: Session, resource: Resource) -> Resource:
assert resource.wiki_url, "Resource wiki url not set"
browser = self._init_browser()
browser.get(resource.wiki_url)
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click()
recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div")
resources: dict[str, Resource] = {}
new_resources: list[Resource] = []
for recipe_idx in range(len(recipes_html_elems)):
recipe_html_elem = recipes_html_elems[recipe_idx]
factory_html_elem = recipe_html_elem.find_element(By.CSS_SELECTOR, ".flex-col > span > a")
factory_label = factory_html_elem.text.strip()
factory_url = urljoin(base=browser.current_url, url=factory_html_elem.get_attribute("href"))
def extract_resource_flow(html_elem):
resource_img = html_elem.find_element(By.TAG_NAME, "img")
resource_label = resource_img.get_attribute("alt").strip()
assert resource_label, "resource label is missing"
if resource_label in resources:
res = resources[resource_label]
else:
res = session.scalars(Resource.by_label(resource_label)).one_or_none()
if not res:
wiki_url = self._normalize_url(
href=html_elem.find_element(By.TAG_NAME, "a").get_attribute("href"),
)
res = Resource(label=resource_label, wiki_url=wiki_url)
new_resources.append(res)
resources[resource_label] = res
amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip()
time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip()
return ResourceFlow(resource=res, amount=amount, time=time)
ingredient_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)"
)
ingredients: list[ResourceFlow] = []
for ingredient_idx in range(len(ingredient_html_elems)):
resource_flow = extract_resource_flow(ingredient_html_elems[ingredient_idx])
ingredients.append(resource_flow)
result_html_elems = recipe_html_elem.find_elements(
By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)"
)
results: list[ResourceFlow] = []
for result_idx in range(len(result_html_elems)):
resource_flow = extract_resource_flow(result_html_elems[result_idx])
results.append(resource_flow)
with session.no_autoflush:
# re-use existing Factory or create new
factory = session.scalars(Factory.by_label(factory_label)).one_or_none()
if not factory:
factory = Factory(label=factory_label, wiki_url=factory_url)
session.add(factory)
session.add_all(new_resources)
session.add_all(ingredients)
session.add_all(results)
session.add(Recipe(factory=factory, ingredients=ingredients, results=results))
session.flush()
# refresh by label, because id might not be set
updated_resource = session.scalars(Resource.by_label(resource.label)).one()
updated_resource.recipes_populated_at = datetime.utcnow()
session.flush()
return updated_resource

254
factorygame/data/vis.py Executable file
View file

@ -0,0 +1,254 @@
#!/usr/bin/env python3
import click
from NodeGraphQt import BaseNode, NodeBaseWidget
from NodeGraphQt import NodeGraph, Port
from NodeGraphQt.constants import PortTypeEnum
from PySide2.QtCore import Qt
from PySide2.QtWidgets import QSlider
from Qt import QtWidgets
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session
from .models import Recipe, ResourceFlow, Resource
from .sfp import SatisfactoryPlus
from ..helper import prompt
class NodeSlider(NodeBaseWidget):
def __init__(self, name, label="", parent=None):
super(NodeSlider, self).__init__(parent)
self.set_name(name)
self.pure_label = label or name
slider = QSlider(Qt.Horizontal)
slider.setMinimum(1)
slider.setMaximum(250)
slider.setValue(100)
slider.setSingleStep(1)
slider.setTickInterval(10)
slider.setTickPosition(QSlider.TicksBelow)
slider.valueChanged.connect(self.on_value_changed)
slider.valueChanged.connect(self._update_label)
self.set_custom_widget(slider)
self._update_label()
def get_value(self):
widget: QSlider = self.get_custom_widget()
return widget.value()
def set_value(self, value):
widget: QSlider = self.get_custom_widget()
widget.setValue(value)
def _update_label(self):
self.set_label(f"{self.pure_label} ({int(self.get_value())}%)")
class GlobalInput(BaseNode):
__identifier__ = "factorygame"
NODE_NAME = "Global Input"
def __init__(self):
super().__init__()
self.add_output("Create Machine")
class Machine(BaseNode):
__identifier__ = "factorygame"
NODE_NAME = "FactoryGame Machine"
def assign_recipe(self, recipe: Recipe):
for port_idx in range(len(self._inputs)):
self.delete_input(port_idx)
for port_idx in range(len(self._outputs)):
self.delete_output(port_idx)
self.set_property("name", recipe.factory.label, push_undo=False)
def in_amount_name(resource_label: str):
return f"In {resource_label} amount"
def out_amount_name(resource_label: str):
return f"Out {resource_label} amount"
input_resources: dict[str, float] = {}
for ingredient in recipe.ingredients:
resource_label = ingredient.resource.label
if resource_label in input_resources:
input_resources[resource_label] += ingredient.amount_per_minute()
self.get_widget(in_amount_name(resource_label)).set_value(
f"{input_resources[resource_label]:.2f} / min"
)
else:
port = self.add_input(resource_label, color=(180, 80, 0))
port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.Machine")
port.add_accept_port_type("Create Machine", PortTypeEnum.OUT.value, "factorygame.GlobalInput")
self._add_text_label(in_amount_name(resource_label), ingredient, input_resources, resource_label)
output_resources: dict[str, float] = {}
for result in recipe.results:
resource_label = result.resource.label
if resource_label in output_resources:
output_resources[resource_label] += result.amount_per_minute()
self.get_widget(out_amount_name(resource_label)).set_value(
f"{output_resources[resource_label]:.2f} / min"
)
else:
port = self.add_output(resource_label, color=(200, 20, 0))
port.add_accept_port_type(resource_label, PortTypeEnum.IN.value, "factorygame.Machine")
self._add_text_label(out_amount_name(resource_label), result, output_resources, resource_label)
performance_slider_name = "machine performance"
def set_performance(percentage):
factor = percentage / 100.0
for ingredient_label, amount in input_resources.items():
self.get_widget(in_amount_name(ingredient_label)).set_value(f"{amount * factor:.2f} / min")
for result_label, amount in output_resources.items():
self.get_widget(out_amount_name(result_label)).set_value(f"{amount * factor:.2f} / min")
self.get_widget(performance_slider_name).get_value()
slider = NodeSlider(name=performance_slider_name, label="Overclocking Performance", parent=self.view)
self.add_custom_widget(slider)
slider.value_changed.connect(lambda name, value: set_performance(max(1, min(250, value))))
def _add_text_label(self, name, flow, resource_amounts, resource_label):
resource_amounts[resource_label] = flow.amount_per_minute()
self.add_text_input(name=name, label=name, text=f"{resource_amounts[resource_label]} / min")
widget = self.get_widget(name)
widget.get_custom_widget().setReadOnly(True)
widget.widget().setMaximumWidth(220)
def on_port_connected(input_port: Port, output_port: Port):
global debug
if debug:
print(f"Port {output_port} connected to {input_port}")
if isinstance(output_port.node(), GlobalInput) and output_port.name() == "Create Machine":
output_port.clear_connections(push_undo=False, emit_signal=False)
with Session(engine) as session, SatisfactoryPlus() as data_provider:
do_provider_search = False
resource_label = input_port.name()
matching_resources = session.scalars(Resource.by_label(resource_label)).all()
if len(matching_resources) == 0:
print("Could not find existing resources matching the search string.. starting wiki search")
else:
options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))}
options[0] = ""
selected_res = prompt(
options=options,
text="Chose a resource to continue or 0 to continue with a wiki search",
default=1,
)
if selected_res is None:
return
if selected_res != 0:
resource = matching_resources[selected_res - 1]
do_provider_search = False
if do_provider_search:
ret = data_provider.search_for_resource(session=session, search=resource_label)
if ret is None:
print("Resource not found")
return
resource, exists_in_db = ret
if not exists_in_db:
print("Resource not yet fetched, run fetch first")
return
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
recipes = session.scalars(stmt).all()
if len(recipes) == 0:
print("No recipes found for resource")
return
elif len(recipes) > 1:
options = {(idx + 1): recipes[idx].describe() for idx in range(len(recipes))}
user_choice = prompt(options=options, text="Select recipe", default=1)
if user_choice is None:
return
recipe = recipes[user_choice - 1]
else:
recipe = recipes[0]
recipe_machine = graph.create_node("factorygame.Machine", push_undo=True)
recipe_machine.assign_recipe(recipe)
recipe_machine.update()
recipe_machine.set_x_pos(input_port.node().x_pos() - recipe_machine.view.width - 200)
recipe_machine.get_output(input_port.name()).connect_to(input_port)
if recipe_machine.x_pos() - (global_input.x_pos() - global_input.view.width) < 200:
global_input.set_x_pos(recipe_machine.x_pos() - global_input.view.width - 200)
@click.command
@click.option("--debug", is_flag=True)
@click.argument("search")
def main(debug: bool, search: str):
global engine
globals()["debug"] = debug
engine = create_engine("sqlite:///file.db", echo=debug)
with Session(engine) as session, SatisfactoryPlus(debug=debug) as data_provider:
do_provider_search = False
matching_resources = session.scalars(Resource.by_label(search)).all()
if len(matching_resources) == 0:
print("Could not find existing resources matching the search string.. starting wiki search")
else:
options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))}
options[0] = ""
selected_res = prompt(
options=options, text="Chose a resource to continue or 0 to continue with a wiki search", default=1
)
if selected_res is None:
return
if selected_res != 0:
resource = matching_resources[selected_res - 1]
do_provider_search = False
if do_provider_search:
ret = data_provider.search_for_resource(session=session, search=search)
if ret is None:
print("Resource not found")
return
resource, exists_in_db = ret
if not exists_in_db:
print("Resource not yet fetched, run fetch first")
return
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
recipes = session.scalars(stmt).all()
if len(recipes) == 0:
print("No recipes found for resource")
return
elif len(recipes) > 1:
options = {(idx + 1): recipes[idx].describe() for idx in range(len(recipes))}
user_choice = prompt(options=options, text="Select recipe", default=1)
if user_choice is None:
return
recipe = recipes[user_choice - 1]
else:
recipe = recipes[0]
app = QtWidgets.QApplication([])
global graph
graph = NodeGraph()
graph.widget.resize(1280, 720)
graph.register_node(Machine)
graph.register_node(GlobalInput)
graph_widget = graph.widget
graph_widget.show()
global global_input
global_input = graph.create_node("factorygame.GlobalInput", push_undo=False)
recipe_machine = graph.create_node("factorygame.Machine", push_undo=False)
recipe_machine.assign_recipe(recipe)
graph.auto_layout_nodes([global_input, recipe_machine])
global_input.set_y_pos(recipe_machine.y_pos())
global_input.set_x_pos(global_input.x_pos() - global_input.model.width - 200)
graph.center_on([global_input, recipe_machine])
graph.port_connected.connect(on_port_connected)
app.exec_()
if __name__ == "__main__":
main()

13
factorygame/helper.py Normal file
View file

@ -0,0 +1,13 @@
import click
def prompt(options: dict[int, str], **kwargs) -> int | None:
for idx, label in options.items():
if label:
click.echo(f"{idx}: {label}")
ret = click.prompt(**kwargs)
ret = int(ret)
if ret not in options:
click.echo("Invalid choice.")
return None
return ret

BIN
vis.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 69 KiB