Compare commits
2 commits
ba4aa26be3
...
91a0edda5b
Author | SHA1 | Date | |
---|---|---|---|
Ben | 91a0edda5b | ||
Ben | 71e2c46949 |
4
Pipfile
4
Pipfile
|
@ -7,6 +7,7 @@ name = "pypi"
|
||||||
selenium = "*"
|
selenium = "*"
|
||||||
click = "*"
|
click = "*"
|
||||||
sqlalchemy = "*"
|
sqlalchemy = "*"
|
||||||
|
nodegraphqt = "*"
|
||||||
|
|
||||||
[dev-packages]
|
[dev-packages]
|
||||||
|
|
||||||
|
@ -14,4 +15,5 @@ sqlalchemy = "*"
|
||||||
python_version = "3.11"
|
python_version = "3.11"
|
||||||
|
|
||||||
[scripts]
|
[scripts]
|
||||||
calc = {call = "factorygame.calculator.SatisfactoryCalculator:main()"}
|
fetch = {call = "factorygame.data.fetch:main()"}
|
||||||
|
vis = {call = "factorygame.data.vis:main()"}
|
||||||
|
|
24
Pipfile.lock
generated
24
Pipfile.lock
generated
|
@ -1,7 +1,7 @@
|
||||||
{
|
{
|
||||||
"_meta": {
|
"_meta": {
|
||||||
"hash": {
|
"hash": {
|
||||||
"sha256": "45cb685b915b5e5bab1ba5339a033373d210e09d6f0b4d9ac9acbe8fe15b5402"
|
"sha256": "0d398759a0afc9f55cbd2e892bf1f819b572167c2396e2b978515124ada547a9"
|
||||||
},
|
},
|
||||||
"pipfile-spec": 6,
|
"pipfile-spec": 6,
|
||||||
"requires": {
|
"requires": {
|
||||||
|
@ -121,6 +121,15 @@
|
||||||
"markers": "python_version >= '3.5'",
|
"markers": "python_version >= '3.5'",
|
||||||
"version": "==3.6"
|
"version": "==3.6"
|
||||||
},
|
},
|
||||||
|
"nodegraphqt": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:2f0e4b0a2c1a3360deaa2fb6cc42cbf0dce25a7076d036473773d58b0f4fae31",
|
||||||
|
"sha256:97796bf36845c7e0413e72608507401b04e92106de620b804a234d0cad26e24f"
|
||||||
|
],
|
||||||
|
"index": "pypi",
|
||||||
|
"markers": "python_version >= '3.6'",
|
||||||
|
"version": "==0.6.35"
|
||||||
|
},
|
||||||
"outcome": {
|
"outcome": {
|
||||||
"hashes": [
|
"hashes": [
|
||||||
"sha256:9dcf02e65f2971b80047b377468e72a268e15c0af3cf1238e6ff14f7f91143b8",
|
"sha256:9dcf02e65f2971b80047b377468e72a268e15c0af3cf1238e6ff14f7f91143b8",
|
||||||
|
@ -137,6 +146,12 @@
|
||||||
],
|
],
|
||||||
"version": "==1.7.1"
|
"version": "==1.7.1"
|
||||||
},
|
},
|
||||||
|
"qt.py": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:ab072ac955bdc9318966c078f1bb5941e77166f9e92df3db5726b1eb5afea83b"
|
||||||
|
],
|
||||||
|
"version": "==1.3.9"
|
||||||
|
},
|
||||||
"selenium": {
|
"selenium": {
|
||||||
"hashes": [
|
"hashes": [
|
||||||
"sha256:5aee79026c07985dc1b0c909f34084aa996dfe5b307602de9016d7a621a473f2",
|
"sha256:5aee79026c07985dc1b0c909f34084aa996dfe5b307602de9016d7a621a473f2",
|
||||||
|
@ -233,6 +248,13 @@
|
||||||
"markers": "python_version >= '3.7'",
|
"markers": "python_version >= '3.7'",
|
||||||
"version": "==0.11.1"
|
"version": "==0.11.1"
|
||||||
},
|
},
|
||||||
|
"types-pyside2": {
|
||||||
|
"hashes": [
|
||||||
|
"sha256:5bc2763bc6b595b2c5fc1191ce5147dcc6f44d56efdc2b264b6c082282cc1c57",
|
||||||
|
"sha256:de4b575e57fdb9e5fd8507537cdd770866fdbe1972eadc3793b2026eaec92875"
|
||||||
|
],
|
||||||
|
"version": "==5.15.2.1.6"
|
||||||
|
},
|
||||||
"typing-extensions": {
|
"typing-extensions": {
|
||||||
"hashes": [
|
"hashes": [
|
||||||
"sha256:23478f88c37f27d76ac8aee6c905017a143b0b1b886c3c9f66bc2fd94f9f5783",
|
"sha256:23478f88c37f27d76ac8aee6c905017a143b0b1b886c3c9f66bc2fd94f9f5783",
|
||||||
|
|
15
README.md
15
README.md
|
@ -3,5 +3,18 @@
|
||||||
Using pipenv, run e.g.
|
Using pipenv, run e.g.
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
pipenv run calc --result 'Molten Iron'
|
pipenv run fetch --result 'Molten Iron'
|
||||||
|
```
|
||||||
|
|
||||||
|
## Visualisation
|
||||||
|
|
||||||
|
Thanks to [NodeGraphQt](https://github.com/jchanvfx/NodeGraphQt) a graph base visualisation is available, which looks like the following:
|
||||||
|
|
||||||
|
[!Graph Visualisation Example](vis.png)
|
||||||
|
|
||||||
|
The visualisation window can be opened with e.g.
|
||||||
|
|
||||||
|
```sh
|
||||||
|
pipenv run fetch --result 'Plastic'
|
||||||
|
pipenv run vis 'Plastic'
|
||||||
```
|
```
|
||||||
|
|
|
@ -1,306 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
import datetime
|
|
||||||
from typing import Optional
|
|
||||||
from urllib.parse import urljoin
|
|
||||||
|
|
||||||
import click
|
|
||||||
from selenium.webdriver import Firefox
|
|
||||||
from selenium.webdriver.common.by import By
|
|
||||||
from selenium.webdriver.firefox.options import Options
|
|
||||||
from sqlalchemy import String, create_engine, select, ForeignKey, Select, Table, Column
|
|
||||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session
|
|
||||||
|
|
||||||
__debug = False
|
|
||||||
__recipe_info_timeout = datetime.timedelta(days=30)
|
|
||||||
__browser: Optional[Firefox] = None
|
|
||||||
|
|
||||||
|
|
||||||
def get_browser() -> Firefox:
|
|
||||||
global __browser, __debug
|
|
||||||
if __browser is None:
|
|
||||||
firefox_options = Options()
|
|
||||||
firefox_options.add_argument("--width=1600")
|
|
||||||
firefox_options.add_argument("--height=1015")
|
|
||||||
if not __debug:
|
|
||||||
firefox_options.add_argument("--headless")
|
|
||||||
__browser = Firefox(options=firefox_options)
|
|
||||||
__browser.implicitly_wait(5)
|
|
||||||
return __browser
|
|
||||||
|
|
||||||
|
|
||||||
def browser_cleanup():
|
|
||||||
global __browser, __debug
|
|
||||||
if not __debug and __browser is not None:
|
|
||||||
__browser.quit()
|
|
||||||
|
|
||||||
|
|
||||||
class Base(DeclarativeBase):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class Resource(Base):
|
|
||||||
__tablename__ = "resources"
|
|
||||||
|
|
||||||
id: Mapped[int] = mapped_column(primary_key=True)
|
|
||||||
label: Mapped[str] = mapped_column(String(127))
|
|
||||||
wiki_url: Mapped[str]
|
|
||||||
recipes_populated_at: Mapped[Optional[datetime.datetime]]
|
|
||||||
flows: Mapped[list["ResourceFlow"]] = relationship(back_populates="resource")
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def by_label(cls, search: str) -> Select[tuple["Resource"]]:
|
|
||||||
return select(cls).where(cls.label.ilike(search))
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return (
|
|
||||||
f"Resource(id={self.id}, "
|
|
||||||
f"label={self.label}, "
|
|
||||||
f"wiki_url={self.wiki_url}, "
|
|
||||||
f"recipes_populated_at={self.recipes_populated_at})"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class Factory(Base):
|
|
||||||
__tablename__ = "factories"
|
|
||||||
|
|
||||||
id: Mapped[int] = mapped_column(primary_key=True)
|
|
||||||
label: Mapped[str] = mapped_column(String(127))
|
|
||||||
wiki_url: Mapped[str]
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def by_label(cls, search: str) -> Select[tuple["Factory"]]:
|
|
||||||
return select(cls).where(cls.label.ilike(search))
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return f"Factory(id={self.id}, label={self.label}, wiki_url={self.wiki_url})"
|
|
||||||
|
|
||||||
|
|
||||||
ingredients_table = Table(
|
|
||||||
"recipe_ingredients",
|
|
||||||
Base.metadata,
|
|
||||||
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
|
|
||||||
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
results_table = Table(
|
|
||||||
"recipe_results",
|
|
||||||
Base.metadata,
|
|
||||||
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
|
|
||||||
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class ResourceFlow(Base):
|
|
||||||
__tablename__ = "resource_flows"
|
|
||||||
|
|
||||||
id: Mapped[int] = mapped_column(primary_key=True)
|
|
||||||
ingredient_in: Mapped[Optional["Recipe"]] = relationship(secondary=ingredients_table, back_populates="ingredients")
|
|
||||||
result_of: Mapped[Optional["Recipe"]] = relationship(secondary=results_table, back_populates="results")
|
|
||||||
resource_id: Mapped[int] = mapped_column(ForeignKey("resources.id"))
|
|
||||||
resource: Mapped["Resource"] = relationship(back_populates="flows")
|
|
||||||
amount: Mapped[str]
|
|
||||||
time: Mapped[str]
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return f"ResourceFlow(id={self.id}, resource_id={self.resource_id}, amount={self.amount}, time={self.time})"
|
|
||||||
|
|
||||||
|
|
||||||
class Recipe(Base):
|
|
||||||
__tablename__ = "recipes"
|
|
||||||
|
|
||||||
id: Mapped[int] = mapped_column(primary_key=True)
|
|
||||||
factory_id: Mapped[int] = mapped_column(ForeignKey("factories.id"))
|
|
||||||
factory: Mapped["Factory"] = relationship()
|
|
||||||
ingredients: Mapped[list["ResourceFlow"]] = relationship(
|
|
||||||
secondary=ingredients_table, back_populates="ingredient_in"
|
|
||||||
)
|
|
||||||
results: Mapped[list["ResourceFlow"]] = relationship(secondary=results_table, back_populates="result_of")
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return f"Recipe(id={self.id}, factory={self.factory}, ingredients={self.ingredients}, results={self.results})"
|
|
||||||
|
|
||||||
|
|
||||||
def normalize_url(href: str) -> str:
|
|
||||||
return urljoin(base=get_browser().current_url, url=href)
|
|
||||||
|
|
||||||
|
|
||||||
def populate_recipes(session: Session, input_resource_label: str) -> Resource:
|
|
||||||
browser = get_browser()
|
|
||||||
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click()
|
|
||||||
recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div")
|
|
||||||
resources: dict[str, Resource] = {}
|
|
||||||
new_resources: list[Resource] = []
|
|
||||||
|
|
||||||
for recipe_idx in range(len(recipes_html_elems)):
|
|
||||||
recipe_html_elem = recipes_html_elems[recipe_idx]
|
|
||||||
factory_html_elem = recipe_html_elem.find_element(By.CSS_SELECTOR, ".flex-col > span > a")
|
|
||||||
factory_label = factory_html_elem.text.strip()
|
|
||||||
factory_url = urljoin(base=browser.current_url, url=factory_html_elem.get_attribute("href"))
|
|
||||||
|
|
||||||
def extract_resource_flow(html_elem):
|
|
||||||
resource_img = html_elem.find_element(By.TAG_NAME, "img")
|
|
||||||
resource_label = resource_img.get_attribute("alt").strip()
|
|
||||||
assert resource_label, "resource label is missing"
|
|
||||||
if resource_label in resources:
|
|
||||||
resource = resources[resource_label]
|
|
||||||
else:
|
|
||||||
resource = session.scalars(Resource.by_label(resource_label)).one_or_none()
|
|
||||||
if not resource:
|
|
||||||
wiki_url = normalize_url(
|
|
||||||
href=html_elem.find_element(By.TAG_NAME, "a").get_attribute("href"),
|
|
||||||
)
|
|
||||||
resource = Resource(label=resource_label, wiki_url=wiki_url)
|
|
||||||
new_resources.append(resource)
|
|
||||||
resources[resource_label] = resource
|
|
||||||
amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip()
|
|
||||||
time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip()
|
|
||||||
return ResourceFlow(resource=resource, amount=amount, time=time)
|
|
||||||
|
|
||||||
ingredient_html_elems = recipe_html_elem.find_elements(
|
|
||||||
By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)"
|
|
||||||
)
|
|
||||||
ingredients: list[ResourceFlow] = []
|
|
||||||
for ingredient_idx in range(len(ingredient_html_elems)):
|
|
||||||
resource_flow = extract_resource_flow(ingredient_html_elems[ingredient_idx])
|
|
||||||
ingredients.append(resource_flow)
|
|
||||||
result_html_elems = recipe_html_elem.find_elements(
|
|
||||||
By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)"
|
|
||||||
)
|
|
||||||
results: list[ResourceFlow] = []
|
|
||||||
for result_idx in range(len(result_html_elems)):
|
|
||||||
resource_flow = extract_resource_flow(result_html_elems[result_idx])
|
|
||||||
results.append(resource_flow)
|
|
||||||
|
|
||||||
with session.no_autoflush:
|
|
||||||
# re-use existing Factory or create new
|
|
||||||
factory = session.scalars(Factory.by_label(factory_label)).one_or_none()
|
|
||||||
if not factory:
|
|
||||||
factory = Factory(label=factory_label, wiki_url=factory_url)
|
|
||||||
session.add(factory)
|
|
||||||
|
|
||||||
session.add_all(new_resources)
|
|
||||||
session.add_all(ingredients)
|
|
||||||
session.add_all(results)
|
|
||||||
session.add(Recipe(factory=factory, ingredients=ingredients, results=results))
|
|
||||||
session.flush()
|
|
||||||
|
|
||||||
updated_resource = session.scalars(Resource.by_label(input_resource_label)).one()
|
|
||||||
updated_resource.recipes_populated_at = datetime.datetime.utcnow()
|
|
||||||
session.flush()
|
|
||||||
return updated_resource
|
|
||||||
|
|
||||||
|
|
||||||
@click.command()
|
|
||||||
@click.option("--result", is_flag=True)
|
|
||||||
@click.option("--debug", is_flag=True)
|
|
||||||
@click.option("--refetch", is_flag=True)
|
|
||||||
@click.argument("search")
|
|
||||||
def main(result: bool, debug: bool, refetch: bool, search: str):
|
|
||||||
global __debug
|
|
||||||
__debug = debug
|
|
||||||
engine = create_engine("sqlite:///file.db", echo=debug)
|
|
||||||
Base.metadata.create_all(bind=engine)
|
|
||||||
if result and search:
|
|
||||||
wiki_search = True
|
|
||||||
resource: Optional[Resource] = None
|
|
||||||
with Session(engine) as session:
|
|
||||||
matching_resources = session.scalars(Resource.by_label(search)).all()
|
|
||||||
if len(matching_resources) == 0:
|
|
||||||
print("Could not find existing resources matching the search string.. starting wiki search")
|
|
||||||
else:
|
|
||||||
for idx in range(1, len(matching_resources) + 1):
|
|
||||||
print(f"{idx}: {matching_resources[idx - 1].label}")
|
|
||||||
user_choice = click.prompt(
|
|
||||||
"Chose a resource to continue or 0 to continue with a wiki search", default=1
|
|
||||||
)
|
|
||||||
if user_choice != 0:
|
|
||||||
resource = matching_resources[user_choice - 1]
|
|
||||||
wiki_search = False
|
|
||||||
|
|
||||||
try:
|
|
||||||
if wiki_search:
|
|
||||||
browser = get_browser()
|
|
||||||
browser.get("https://wiki.kyrium.space/")
|
|
||||||
search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']")
|
|
||||||
search_bar.click()
|
|
||||||
search_bar.send_keys(search)
|
|
||||||
search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']")
|
|
||||||
search_button.click()
|
|
||||||
choices = browser.find_elements(
|
|
||||||
By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center"
|
|
||||||
)
|
|
||||||
if not choices:
|
|
||||||
print("No wiki entries found for this result")
|
|
||||||
return
|
|
||||||
elif len(choices) > 1:
|
|
||||||
default_choice = 1
|
|
||||||
choice_names: list[str] = []
|
|
||||||
for choice_idx in range(1, len(choices) + 1):
|
|
||||||
recipe_choice = choices[choice_idx - 1]
|
|
||||||
name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt")
|
|
||||||
choice_names.append(name)
|
|
||||||
if name.casefold() == search.casefold():
|
|
||||||
default_choice = choice_idx
|
|
||||||
print(f"{choice_idx}: {name}")
|
|
||||||
user_choice = click.prompt("Chose a recipe to continue…", default=default_choice)
|
|
||||||
if not user_choice:
|
|
||||||
user_choice = default_choice
|
|
||||||
else:
|
|
||||||
user_choice = int(user_choice)
|
|
||||||
|
|
||||||
link_html_elem = choices[user_choice - 1]
|
|
||||||
else:
|
|
||||||
link_html_elem = choices[0]
|
|
||||||
|
|
||||||
alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt")
|
|
||||||
resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none()
|
|
||||||
if not resource:
|
|
||||||
resource_fetch_url = normalize_url(href=link_html_elem.get_attribute("href"))
|
|
||||||
|
|
||||||
refetch = (
|
|
||||||
refetch
|
|
||||||
or resource is None
|
|
||||||
or resource.recipes_populated_at is None
|
|
||||||
or datetime.datetime.utcnow() - resource.recipes_populated_at > __recipe_info_timeout
|
|
||||||
)
|
|
||||||
if refetch and resource is not None:
|
|
||||||
print("Deleting recipes for", resource.label)
|
|
||||||
with session.begin_nested():
|
|
||||||
for flow in session.scalars(
|
|
||||||
select(ResourceFlow).where(ResourceFlow.resource_id == resource.id)
|
|
||||||
):
|
|
||||||
if flow.result_of:
|
|
||||||
for flow2 in flow.result_of.ingredients:
|
|
||||||
session.delete(flow2)
|
|
||||||
for flow2 in flow.result_of.results:
|
|
||||||
session.delete(flow2)
|
|
||||||
session.delete(flow.result_of)
|
|
||||||
|
|
||||||
if refetch:
|
|
||||||
browser = get_browser()
|
|
||||||
if resource is None:
|
|
||||||
print("Fetching recipes for new resource", alt_resource_label)
|
|
||||||
assert resource_fetch_url, "Resource wiki url not set"
|
|
||||||
browser.get(resource_fetch_url)
|
|
||||||
resource_label = alt_resource_label
|
|
||||||
else:
|
|
||||||
print("Refetching recipes for", resource.label)
|
|
||||||
browser.get(resource.wiki_url)
|
|
||||||
resource_label = resource.label
|
|
||||||
|
|
||||||
with session.begin_nested():
|
|
||||||
resource = populate_recipes(session=session, input_resource_label=resource_label)
|
|
||||||
session.refresh(resource)
|
|
||||||
|
|
||||||
assert resource, "Resource must be set at this point"
|
|
||||||
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
|
|
||||||
for recipe in session.scalars(stmt):
|
|
||||||
print(recipe)
|
|
||||||
for flow in recipe.ingredients:
|
|
||||||
print("ingredient:", flow.resource, flow)
|
|
||||||
for flow in recipe.results:
|
|
||||||
print("result: ", flow.resource, flow)
|
|
||||||
finally:
|
|
||||||
browser_cleanup()
|
|
|
@ -1,3 +0,0 @@
|
||||||
from .SatisfactoryCalculator import main
|
|
||||||
|
|
||||||
main()
|
|
89
factorygame/data/fetch.py
Executable file
89
factorygame/data/fetch.py
Executable file
|
@ -0,0 +1,89 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import datetime
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import click
|
||||||
|
from sqlalchemy import create_engine, select
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from .models import Base, Resource, ResourceFlow, Recipe
|
||||||
|
from .sfp import SatisfactoryPlus
|
||||||
|
from ..helper import prompt
|
||||||
|
|
||||||
|
__recipe_info_timeout = datetime.timedelta(days=30)
|
||||||
|
|
||||||
|
|
||||||
|
@click.command()
|
||||||
|
@click.option("--result", is_flag=True)
|
||||||
|
@click.option("--debug", is_flag=True)
|
||||||
|
@click.option("--refetch", is_flag=True)
|
||||||
|
@click.argument("search")
|
||||||
|
def main(result: bool, debug: bool, refetch: bool, search: str):
|
||||||
|
engine = create_engine("sqlite:///file.db", echo=debug)
|
||||||
|
Base.metadata.create_all(bind=engine)
|
||||||
|
if result and search:
|
||||||
|
do_provider_search = True
|
||||||
|
resource: Optional[Resource] = None
|
||||||
|
with Session(engine) as session:
|
||||||
|
matching_resources = session.scalars(Resource.by_label(search)).all()
|
||||||
|
if len(matching_resources) == 0:
|
||||||
|
print("Could not find existing resources matching the search string.. starting wiki search")
|
||||||
|
else:
|
||||||
|
options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))}
|
||||||
|
options[0] = ""
|
||||||
|
selected_res = prompt(
|
||||||
|
options=options, text="Chose a resource to continue or 0 to continue with a wiki search", default=1
|
||||||
|
)
|
||||||
|
if selected_res is None:
|
||||||
|
return
|
||||||
|
if selected_res != 0:
|
||||||
|
resource = matching_resources[selected_res - 1]
|
||||||
|
do_provider_search = False
|
||||||
|
exists_in_db = True
|
||||||
|
|
||||||
|
with SatisfactoryPlus(debug=debug) as data_provider:
|
||||||
|
if do_provider_search:
|
||||||
|
ret = data_provider.search_for_resource(session=session, search=search)
|
||||||
|
if ret is None:
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
resource, exists_in_db = ret
|
||||||
|
|
||||||
|
refetch = (
|
||||||
|
refetch
|
||||||
|
or resource.recipes_populated_at is None
|
||||||
|
or datetime.datetime.utcnow() - resource.recipes_populated_at > __recipe_info_timeout
|
||||||
|
)
|
||||||
|
if refetch and exists_in_db:
|
||||||
|
print("Deleting recipes for", resource.label)
|
||||||
|
with session.begin_nested():
|
||||||
|
for flow in session.scalars(
|
||||||
|
select(ResourceFlow).where(ResourceFlow.resource_id == resource.id)
|
||||||
|
):
|
||||||
|
if flow.result_of:
|
||||||
|
for flow2 in flow.result_of.ingredients:
|
||||||
|
session.delete(flow2)
|
||||||
|
for flow2 in flow.result_of.results:
|
||||||
|
session.delete(flow2)
|
||||||
|
session.delete(flow.result_of)
|
||||||
|
|
||||||
|
if refetch:
|
||||||
|
if exists_in_db:
|
||||||
|
print("Refetching recipes for", resource.label)
|
||||||
|
else:
|
||||||
|
print("Fetching recipes for new resource", resource.label)
|
||||||
|
|
||||||
|
with session.begin_nested():
|
||||||
|
resource = data_provider.update_resource_recipes(session=session, resource=resource)
|
||||||
|
|
||||||
|
session.refresh(resource)
|
||||||
|
assert resource, "Resource must be set at this point"
|
||||||
|
|
||||||
|
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
|
||||||
|
for recipe in session.scalars(stmt):
|
||||||
|
print("Recipe:", recipe.describe())
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
114
factorygame/data/models.py
Normal file
114
factorygame/data/models.py
Normal file
|
@ -0,0 +1,114 @@
|
||||||
|
import datetime
|
||||||
|
import re
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from sqlalchemy import String, Select, select, Table, Column, ForeignKey
|
||||||
|
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
|
||||||
|
|
||||||
|
|
||||||
|
class Base(DeclarativeBase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class Resource(Base):
|
||||||
|
__tablename__ = "resources"
|
||||||
|
|
||||||
|
id: Mapped[int] = mapped_column(primary_key=True)
|
||||||
|
label: Mapped[str] = mapped_column(String(127))
|
||||||
|
wiki_url: Mapped[str] # FIXME: rename to uri
|
||||||
|
recipes_populated_at: Mapped[Optional[datetime.datetime]]
|
||||||
|
flows: Mapped[list["ResourceFlow"]] = relationship(back_populates="resource")
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def by_label(cls, search: str) -> Select[tuple["Resource"]]:
|
||||||
|
return select(cls).where(cls.label.ilike(search))
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return (
|
||||||
|
f"Resource(id={self.id}, "
|
||||||
|
f"label={self.label}, "
|
||||||
|
f"wiki_url={self.wiki_url}, "
|
||||||
|
f"recipes_populated_at={self.recipes_populated_at})"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class Factory(Base):
|
||||||
|
__tablename__ = "factories"
|
||||||
|
|
||||||
|
id: Mapped[int] = mapped_column(primary_key=True)
|
||||||
|
label: Mapped[str] = mapped_column(String(127))
|
||||||
|
wiki_url: Mapped[str] # FIXME: rename to uri
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def by_label(cls, search: str) -> Select[tuple["Factory"]]:
|
||||||
|
return select(cls).where(cls.label.ilike(search))
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"Factory(id={self.id}, label={self.label}, wiki_url={self.wiki_url})"
|
||||||
|
|
||||||
|
|
||||||
|
ingredients_table = Table(
|
||||||
|
"recipe_ingredients",
|
||||||
|
Base.metadata,
|
||||||
|
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
|
||||||
|
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
|
||||||
|
)
|
||||||
|
results_table = Table(
|
||||||
|
"recipe_results",
|
||||||
|
Base.metadata,
|
||||||
|
Column("recipe_id", ForeignKey("recipes.id"), primary_key=True),
|
||||||
|
Column("resource_flow_id", ForeignKey("resource_flows.id"), primary_key=True),
|
||||||
|
)
|
||||||
|
|
||||||
|
amount_number = re.compile(r"^\d*(\.\d+)?")
|
||||||
|
time_number = re.compile(r"^(\d+) seconds?")
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceFlow(Base):
|
||||||
|
__tablename__ = "resource_flows"
|
||||||
|
|
||||||
|
id: Mapped[int] = mapped_column(primary_key=True)
|
||||||
|
ingredient_in: Mapped[Optional["Recipe"]] = relationship(secondary=ingredients_table, back_populates="ingredients")
|
||||||
|
result_of: Mapped[Optional["Recipe"]] = relationship(secondary=results_table, back_populates="results")
|
||||||
|
resource_id: Mapped[int] = mapped_column(ForeignKey("resources.id"))
|
||||||
|
resource: Mapped["Resource"] = relationship(back_populates="flows")
|
||||||
|
amount: Mapped[str]
|
||||||
|
time: Mapped[str]
|
||||||
|
|
||||||
|
def amount_per_minute(self) -> float:
|
||||||
|
per_crafting_step_str = amount_number.match(self.amount).group(0)
|
||||||
|
per_crafting_step = float(per_crafting_step_str)
|
||||||
|
crafting_time_seconds_str = time_number.match(self.time).group(1)
|
||||||
|
crafting_time_seconds = float(crafting_time_seconds_str)
|
||||||
|
return per_crafting_step * (60.0 / crafting_time_seconds)
|
||||||
|
|
||||||
|
def describe(self) -> str:
|
||||||
|
return f"{repr(str(self.resource.label))}x{self.amount}"
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"ResourceFlow(id={self.id}, resource_id={self.resource_id}, amount={self.amount}, time={self.time})"
|
||||||
|
|
||||||
|
|
||||||
|
class Recipe(Base):
|
||||||
|
__tablename__ = "recipes"
|
||||||
|
|
||||||
|
id: Mapped[int] = mapped_column(primary_key=True)
|
||||||
|
factory_id: Mapped[int] = mapped_column(ForeignKey("factories.id"))
|
||||||
|
factory: Mapped["Factory"] = relationship()
|
||||||
|
ingredients: Mapped[list["ResourceFlow"]] = relationship(
|
||||||
|
secondary=ingredients_table, back_populates="ingredient_in"
|
||||||
|
)
|
||||||
|
results: Mapped[list["ResourceFlow"]] = relationship(secondary=results_table, back_populates="result_of")
|
||||||
|
|
||||||
|
def describe(self) -> str:
|
||||||
|
def list_flows(flows: list["ResourceFlow"]) -> str:
|
||||||
|
return ", ".join(map(ResourceFlow.describe, flows))
|
||||||
|
|
||||||
|
return (
|
||||||
|
f"in machine: {self.factory.label}, "
|
||||||
|
f"ingredients: {list_flows(self.ingredients)}, "
|
||||||
|
f"results: {list_flows(self.results)}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"Recipe(id={self.id}, factory={self.factory}, ingredients={self.ingredients}, results={self.results})"
|
16
factorygame/data/provider.py
Normal file
16
factorygame/data/provider.py
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
import abc
|
||||||
|
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from .models import Resource
|
||||||
|
|
||||||
|
|
||||||
|
class RecipeProvider(abc.ABC):
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def search_for_resource(self, session: Session, search: str) -> tuple[Resource, bool]:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def update_resource_recipes(self, session: Session, resource: Resource):
|
||||||
|
pass
|
150
factorygame/data/sfp.py
Normal file
150
factorygame/data/sfp.py
Normal file
|
@ -0,0 +1,150 @@
|
||||||
|
from contextlib import AbstractContextManager
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional
|
||||||
|
from urllib.parse import urljoin
|
||||||
|
|
||||||
|
from selenium.webdriver import Firefox
|
||||||
|
from selenium.webdriver.common.by import By
|
||||||
|
from selenium.webdriver.firefox.options import Options
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from .models import Resource, ResourceFlow, Factory, Recipe
|
||||||
|
from .provider import RecipeProvider
|
||||||
|
from ..helper import prompt
|
||||||
|
|
||||||
|
|
||||||
|
class SatisfactoryPlus(RecipeProvider, AbstractContextManager):
|
||||||
|
_browser: Optional[Firefox] = None
|
||||||
|
|
||||||
|
def __init__(self, debug: bool = False):
|
||||||
|
super().__init__()
|
||||||
|
self.debug = debug
|
||||||
|
|
||||||
|
def _init_browser(self) -> Firefox:
|
||||||
|
if self._browser is None:
|
||||||
|
firefox_options = Options()
|
||||||
|
firefox_options.add_argument("--width=1600")
|
||||||
|
firefox_options.add_argument("--height=1015")
|
||||||
|
if not self.debug:
|
||||||
|
firefox_options.add_argument("--headless")
|
||||||
|
self._browser = Firefox(options=firefox_options)
|
||||||
|
self._browser.implicitly_wait(5)
|
||||||
|
return self._browser
|
||||||
|
|
||||||
|
def _browser_cleanup(self):
|
||||||
|
if not self.debug and self._browser is not None:
|
||||||
|
self._browser.quit()
|
||||||
|
|
||||||
|
def _normalize_url(self, href: str) -> str:
|
||||||
|
return urljoin(base=self._init_browser().current_url, url=href)
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, __exc_type, __exc_value, __traceback):
|
||||||
|
self._browser_cleanup()
|
||||||
|
|
||||||
|
def search_for_resource(self, session: Session, search: str) -> tuple[Resource, bool] | None:
|
||||||
|
browser = self._init_browser()
|
||||||
|
browser.get("https://wiki.kyrium.space/")
|
||||||
|
search_bar = browser.find_element(By.CSS_SELECTOR, "nav input[placeholder='Search for an item...']")
|
||||||
|
search_bar.click()
|
||||||
|
search_bar.send_keys(search)
|
||||||
|
search_button = browser.find_element(By.CSS_SELECTOR, "nav button[type='submit']")
|
||||||
|
search_button.click()
|
||||||
|
choices = browser.find_elements(By.CSS_SELECTOR, "body > div > .container:nth-child(1) a.items-center")
|
||||||
|
if not choices:
|
||||||
|
print("No wiki entries found for this result")
|
||||||
|
return None
|
||||||
|
|
||||||
|
elif len(choices) > 1:
|
||||||
|
default_choice = 1
|
||||||
|
options: dict[int, str] = {}
|
||||||
|
for idx in range(len(choices)):
|
||||||
|
recipe_choice = choices[idx]
|
||||||
|
name = recipe_choice.find_element(By.TAG_NAME, "img").get_attribute("alt")
|
||||||
|
options[idx + 1] = name
|
||||||
|
if name.casefold() == search.casefold():
|
||||||
|
default_choice = idx + 1
|
||||||
|
user_choice = prompt(options=options, text="Chose a recipe to continue…", default=default_choice)
|
||||||
|
if user_choice is None:
|
||||||
|
return None
|
||||||
|
link_html_elem = choices[user_choice - 1]
|
||||||
|
else:
|
||||||
|
link_html_elem = choices[0]
|
||||||
|
|
||||||
|
alt_resource_label = link_html_elem.find_element(By.TAG_NAME, "img").get_attribute("alt")
|
||||||
|
resource = session.scalars(Resource.by_label(alt_resource_label)).one_or_none()
|
||||||
|
if resource:
|
||||||
|
return resource, True
|
||||||
|
else:
|
||||||
|
resource_fetch_url = self._normalize_url(href=link_html_elem.get_attribute("href"))
|
||||||
|
return Resource(label=alt_resource_label, wiki_url=resource_fetch_url), False
|
||||||
|
|
||||||
|
def update_resource_recipes(self, session: Session, resource: Resource) -> Resource:
|
||||||
|
assert resource.wiki_url, "Resource wiki url not set"
|
||||||
|
browser = self._init_browser()
|
||||||
|
browser.get(resource.wiki_url)
|
||||||
|
browser.find_element(By.CSS_SELECTOR, "button[id$='tab-0']").click()
|
||||||
|
recipes_html_elems = browser.find_elements(By.CSS_SELECTOR, "div[id$='tabpanel-0'] > div > div")
|
||||||
|
resources: dict[str, Resource] = {}
|
||||||
|
new_resources: list[Resource] = []
|
||||||
|
|
||||||
|
for recipe_idx in range(len(recipes_html_elems)):
|
||||||
|
recipe_html_elem = recipes_html_elems[recipe_idx]
|
||||||
|
factory_html_elem = recipe_html_elem.find_element(By.CSS_SELECTOR, ".flex-col > span > a")
|
||||||
|
factory_label = factory_html_elem.text.strip()
|
||||||
|
factory_url = urljoin(base=browser.current_url, url=factory_html_elem.get_attribute("href"))
|
||||||
|
|
||||||
|
def extract_resource_flow(html_elem):
|
||||||
|
resource_img = html_elem.find_element(By.TAG_NAME, "img")
|
||||||
|
resource_label = resource_img.get_attribute("alt").strip()
|
||||||
|
assert resource_label, "resource label is missing"
|
||||||
|
if resource_label in resources:
|
||||||
|
res = resources[resource_label]
|
||||||
|
else:
|
||||||
|
res = session.scalars(Resource.by_label(resource_label)).one_or_none()
|
||||||
|
if not res:
|
||||||
|
wiki_url = self._normalize_url(
|
||||||
|
href=html_elem.find_element(By.TAG_NAME, "a").get_attribute("href"),
|
||||||
|
)
|
||||||
|
res = Resource(label=resource_label, wiki_url=wiki_url)
|
||||||
|
new_resources.append(res)
|
||||||
|
resources[resource_label] = res
|
||||||
|
amount = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(2)").text.strip()
|
||||||
|
time = html_elem.find_element(By.CSS_SELECTOR, ".text-xs:nth-child(3)").text.strip()
|
||||||
|
return ResourceFlow(resource=res, amount=amount, time=time)
|
||||||
|
|
||||||
|
ingredient_html_elems = recipe_html_elem.find_elements(
|
||||||
|
By.CSS_SELECTOR, f".flex-row > div:nth-child(1) > div:has(> a)"
|
||||||
|
)
|
||||||
|
ingredients: list[ResourceFlow] = []
|
||||||
|
for ingredient_idx in range(len(ingredient_html_elems)):
|
||||||
|
resource_flow = extract_resource_flow(ingredient_html_elems[ingredient_idx])
|
||||||
|
ingredients.append(resource_flow)
|
||||||
|
result_html_elems = recipe_html_elem.find_elements(
|
||||||
|
By.CSS_SELECTOR, f".flex-row > div:nth-child(3) > div:has(> a)"
|
||||||
|
)
|
||||||
|
results: list[ResourceFlow] = []
|
||||||
|
for result_idx in range(len(result_html_elems)):
|
||||||
|
resource_flow = extract_resource_flow(result_html_elems[result_idx])
|
||||||
|
results.append(resource_flow)
|
||||||
|
|
||||||
|
with session.no_autoflush:
|
||||||
|
# re-use existing Factory or create new
|
||||||
|
factory = session.scalars(Factory.by_label(factory_label)).one_or_none()
|
||||||
|
if not factory:
|
||||||
|
factory = Factory(label=factory_label, wiki_url=factory_url)
|
||||||
|
session.add(factory)
|
||||||
|
|
||||||
|
session.add_all(new_resources)
|
||||||
|
session.add_all(ingredients)
|
||||||
|
session.add_all(results)
|
||||||
|
session.add(Recipe(factory=factory, ingredients=ingredients, results=results))
|
||||||
|
session.flush()
|
||||||
|
|
||||||
|
# refresh by label, because id might not be set
|
||||||
|
updated_resource = session.scalars(Resource.by_label(resource.label)).one()
|
||||||
|
updated_resource.recipes_populated_at = datetime.utcnow()
|
||||||
|
session.flush()
|
||||||
|
return updated_resource
|
254
factorygame/data/vis.py
Executable file
254
factorygame/data/vis.py
Executable file
|
@ -0,0 +1,254 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import click
|
||||||
|
from NodeGraphQt import BaseNode, NodeBaseWidget
|
||||||
|
from NodeGraphQt import NodeGraph, Port
|
||||||
|
from NodeGraphQt.constants import PortTypeEnum
|
||||||
|
from PySide2.QtCore import Qt
|
||||||
|
from PySide2.QtWidgets import QSlider
|
||||||
|
from Qt import QtWidgets
|
||||||
|
from sqlalchemy import create_engine, select
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from .models import Recipe, ResourceFlow, Resource
|
||||||
|
from .sfp import SatisfactoryPlus
|
||||||
|
from ..helper import prompt
|
||||||
|
|
||||||
|
|
||||||
|
class NodeSlider(NodeBaseWidget):
|
||||||
|
def __init__(self, name, label="", parent=None):
|
||||||
|
super(NodeSlider, self).__init__(parent)
|
||||||
|
self.set_name(name)
|
||||||
|
self.pure_label = label or name
|
||||||
|
slider = QSlider(Qt.Horizontal)
|
||||||
|
slider.setMinimum(1)
|
||||||
|
slider.setMaximum(250)
|
||||||
|
slider.setValue(100)
|
||||||
|
slider.setSingleStep(1)
|
||||||
|
slider.setTickInterval(10)
|
||||||
|
slider.setTickPosition(QSlider.TicksBelow)
|
||||||
|
slider.valueChanged.connect(self.on_value_changed)
|
||||||
|
slider.valueChanged.connect(self._update_label)
|
||||||
|
self.set_custom_widget(slider)
|
||||||
|
self._update_label()
|
||||||
|
|
||||||
|
def get_value(self):
|
||||||
|
widget: QSlider = self.get_custom_widget()
|
||||||
|
return widget.value()
|
||||||
|
|
||||||
|
def set_value(self, value):
|
||||||
|
widget: QSlider = self.get_custom_widget()
|
||||||
|
widget.setValue(value)
|
||||||
|
|
||||||
|
def _update_label(self):
|
||||||
|
self.set_label(f"{self.pure_label} ({int(self.get_value())}%)")
|
||||||
|
|
||||||
|
|
||||||
|
class GlobalInput(BaseNode):
|
||||||
|
__identifier__ = "factorygame"
|
||||||
|
NODE_NAME = "Global Input"
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__()
|
||||||
|
self.add_output("Create Machine")
|
||||||
|
|
||||||
|
|
||||||
|
class Machine(BaseNode):
|
||||||
|
__identifier__ = "factorygame"
|
||||||
|
NODE_NAME = "FactoryGame Machine"
|
||||||
|
|
||||||
|
def assign_recipe(self, recipe: Recipe):
|
||||||
|
for port_idx in range(len(self._inputs)):
|
||||||
|
self.delete_input(port_idx)
|
||||||
|
|
||||||
|
for port_idx in range(len(self._outputs)):
|
||||||
|
self.delete_output(port_idx)
|
||||||
|
|
||||||
|
self.set_property("name", recipe.factory.label, push_undo=False)
|
||||||
|
|
||||||
|
def in_amount_name(resource_label: str):
|
||||||
|
return f"In {resource_label} amount"
|
||||||
|
|
||||||
|
def out_amount_name(resource_label: str):
|
||||||
|
return f"Out {resource_label} amount"
|
||||||
|
|
||||||
|
input_resources: dict[str, float] = {}
|
||||||
|
for ingredient in recipe.ingredients:
|
||||||
|
resource_label = ingredient.resource.label
|
||||||
|
if resource_label in input_resources:
|
||||||
|
input_resources[resource_label] += ingredient.amount_per_minute()
|
||||||
|
self.get_widget(in_amount_name(resource_label)).set_value(
|
||||||
|
f"{input_resources[resource_label]:.2f} / min"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
port = self.add_input(resource_label, color=(180, 80, 0))
|
||||||
|
port.add_accept_port_type(resource_label, PortTypeEnum.OUT.value, "factorygame.Machine")
|
||||||
|
port.add_accept_port_type("Create Machine", PortTypeEnum.OUT.value, "factorygame.GlobalInput")
|
||||||
|
self._add_text_label(in_amount_name(resource_label), ingredient, input_resources, resource_label)
|
||||||
|
|
||||||
|
output_resources: dict[str, float] = {}
|
||||||
|
for result in recipe.results:
|
||||||
|
resource_label = result.resource.label
|
||||||
|
if resource_label in output_resources:
|
||||||
|
output_resources[resource_label] += result.amount_per_minute()
|
||||||
|
self.get_widget(out_amount_name(resource_label)).set_value(
|
||||||
|
f"{output_resources[resource_label]:.2f} / min"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
port = self.add_output(resource_label, color=(200, 20, 0))
|
||||||
|
port.add_accept_port_type(resource_label, PortTypeEnum.IN.value, "factorygame.Machine")
|
||||||
|
self._add_text_label(out_amount_name(resource_label), result, output_resources, resource_label)
|
||||||
|
|
||||||
|
performance_slider_name = "machine performance"
|
||||||
|
|
||||||
|
def set_performance(percentage):
|
||||||
|
factor = percentage / 100.0
|
||||||
|
for ingredient_label, amount in input_resources.items():
|
||||||
|
self.get_widget(in_amount_name(ingredient_label)).set_value(f"{amount * factor:.2f} / min")
|
||||||
|
for result_label, amount in output_resources.items():
|
||||||
|
self.get_widget(out_amount_name(result_label)).set_value(f"{amount * factor:.2f} / min")
|
||||||
|
self.get_widget(performance_slider_name).get_value()
|
||||||
|
|
||||||
|
slider = NodeSlider(name=performance_slider_name, label="Overclocking Performance", parent=self.view)
|
||||||
|
self.add_custom_widget(slider)
|
||||||
|
slider.value_changed.connect(lambda name, value: set_performance(max(1, min(250, value))))
|
||||||
|
|
||||||
|
def _add_text_label(self, name, flow, resource_amounts, resource_label):
|
||||||
|
resource_amounts[resource_label] = flow.amount_per_minute()
|
||||||
|
self.add_text_input(name=name, label=name, text=f"{resource_amounts[resource_label]} / min")
|
||||||
|
widget = self.get_widget(name)
|
||||||
|
widget.get_custom_widget().setReadOnly(True)
|
||||||
|
widget.widget().setMaximumWidth(220)
|
||||||
|
|
||||||
|
|
||||||
|
def on_port_connected(input_port: Port, output_port: Port):
|
||||||
|
global debug
|
||||||
|
if debug:
|
||||||
|
print(f"Port {output_port} connected to {input_port}")
|
||||||
|
if isinstance(output_port.node(), GlobalInput) and output_port.name() == "Create Machine":
|
||||||
|
output_port.clear_connections(push_undo=False, emit_signal=False)
|
||||||
|
with Session(engine) as session, SatisfactoryPlus() as data_provider:
|
||||||
|
do_provider_search = False
|
||||||
|
resource_label = input_port.name()
|
||||||
|
matching_resources = session.scalars(Resource.by_label(resource_label)).all()
|
||||||
|
if len(matching_resources) == 0:
|
||||||
|
print("Could not find existing resources matching the search string.. starting wiki search")
|
||||||
|
else:
|
||||||
|
options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))}
|
||||||
|
options[0] = ""
|
||||||
|
selected_res = prompt(
|
||||||
|
options=options,
|
||||||
|
text="Chose a resource to continue or 0 to continue with a wiki search",
|
||||||
|
default=1,
|
||||||
|
)
|
||||||
|
if selected_res is None:
|
||||||
|
return
|
||||||
|
if selected_res != 0:
|
||||||
|
resource = matching_resources[selected_res - 1]
|
||||||
|
do_provider_search = False
|
||||||
|
|
||||||
|
if do_provider_search:
|
||||||
|
ret = data_provider.search_for_resource(session=session, search=resource_label)
|
||||||
|
if ret is None:
|
||||||
|
print("Resource not found")
|
||||||
|
return
|
||||||
|
|
||||||
|
resource, exists_in_db = ret
|
||||||
|
if not exists_in_db:
|
||||||
|
print("Resource not yet fetched, run fetch first")
|
||||||
|
return
|
||||||
|
|
||||||
|
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
|
||||||
|
recipes = session.scalars(stmt).all()
|
||||||
|
if len(recipes) == 0:
|
||||||
|
print("No recipes found for resource")
|
||||||
|
return
|
||||||
|
elif len(recipes) > 1:
|
||||||
|
options = {(idx + 1): recipes[idx].describe() for idx in range(len(recipes))}
|
||||||
|
user_choice = prompt(options=options, text="Select recipe", default=1)
|
||||||
|
if user_choice is None:
|
||||||
|
return
|
||||||
|
recipe = recipes[user_choice - 1]
|
||||||
|
else:
|
||||||
|
recipe = recipes[0]
|
||||||
|
|
||||||
|
recipe_machine = graph.create_node("factorygame.Machine", push_undo=True)
|
||||||
|
recipe_machine.assign_recipe(recipe)
|
||||||
|
recipe_machine.update()
|
||||||
|
recipe_machine.set_x_pos(input_port.node().x_pos() - recipe_machine.view.width - 200)
|
||||||
|
recipe_machine.get_output(input_port.name()).connect_to(input_port)
|
||||||
|
if recipe_machine.x_pos() - (global_input.x_pos() - global_input.view.width) < 200:
|
||||||
|
global_input.set_x_pos(recipe_machine.x_pos() - global_input.view.width - 200)
|
||||||
|
|
||||||
|
|
||||||
|
@click.command
|
||||||
|
@click.option("--debug", is_flag=True)
|
||||||
|
@click.argument("search")
|
||||||
|
def main(debug: bool, search: str):
|
||||||
|
global engine
|
||||||
|
globals()["debug"] = debug
|
||||||
|
engine = create_engine("sqlite:///file.db", echo=debug)
|
||||||
|
with Session(engine) as session, SatisfactoryPlus(debug=debug) as data_provider:
|
||||||
|
do_provider_search = False
|
||||||
|
matching_resources = session.scalars(Resource.by_label(search)).all()
|
||||||
|
if len(matching_resources) == 0:
|
||||||
|
print("Could not find existing resources matching the search string.. starting wiki search")
|
||||||
|
else:
|
||||||
|
options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))}
|
||||||
|
options[0] = ""
|
||||||
|
selected_res = prompt(
|
||||||
|
options=options, text="Chose a resource to continue or 0 to continue with a wiki search", default=1
|
||||||
|
)
|
||||||
|
if selected_res is None:
|
||||||
|
return
|
||||||
|
if selected_res != 0:
|
||||||
|
resource = matching_resources[selected_res - 1]
|
||||||
|
do_provider_search = False
|
||||||
|
|
||||||
|
if do_provider_search:
|
||||||
|
ret = data_provider.search_for_resource(session=session, search=search)
|
||||||
|
if ret is None:
|
||||||
|
print("Resource not found")
|
||||||
|
return
|
||||||
|
|
||||||
|
resource, exists_in_db = ret
|
||||||
|
if not exists_in_db:
|
||||||
|
print("Resource not yet fetched, run fetch first")
|
||||||
|
return
|
||||||
|
|
||||||
|
stmt = select(Recipe).join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
|
||||||
|
recipes = session.scalars(stmt).all()
|
||||||
|
if len(recipes) == 0:
|
||||||
|
print("No recipes found for resource")
|
||||||
|
return
|
||||||
|
elif len(recipes) > 1:
|
||||||
|
options = {(idx + 1): recipes[idx].describe() for idx in range(len(recipes))}
|
||||||
|
user_choice = prompt(options=options, text="Select recipe", default=1)
|
||||||
|
if user_choice is None:
|
||||||
|
return
|
||||||
|
recipe = recipes[user_choice - 1]
|
||||||
|
else:
|
||||||
|
recipe = recipes[0]
|
||||||
|
|
||||||
|
app = QtWidgets.QApplication([])
|
||||||
|
global graph
|
||||||
|
graph = NodeGraph()
|
||||||
|
graph.widget.resize(1280, 720)
|
||||||
|
graph.register_node(Machine)
|
||||||
|
graph.register_node(GlobalInput)
|
||||||
|
graph_widget = graph.widget
|
||||||
|
graph_widget.show()
|
||||||
|
global global_input
|
||||||
|
global_input = graph.create_node("factorygame.GlobalInput", push_undo=False)
|
||||||
|
recipe_machine = graph.create_node("factorygame.Machine", push_undo=False)
|
||||||
|
recipe_machine.assign_recipe(recipe)
|
||||||
|
graph.auto_layout_nodes([global_input, recipe_machine])
|
||||||
|
global_input.set_y_pos(recipe_machine.y_pos())
|
||||||
|
global_input.set_x_pos(global_input.x_pos() - global_input.model.width - 200)
|
||||||
|
graph.center_on([global_input, recipe_machine])
|
||||||
|
graph.port_connected.connect(on_port_connected)
|
||||||
|
app.exec_()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
13
factorygame/helper.py
Normal file
13
factorygame/helper.py
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
import click
|
||||||
|
|
||||||
|
|
||||||
|
def prompt(options: dict[int, str], **kwargs) -> int | None:
|
||||||
|
for idx, label in options.items():
|
||||||
|
if label:
|
||||||
|
click.echo(f"{idx}: {label}")
|
||||||
|
ret = click.prompt(**kwargs)
|
||||||
|
ret = int(ret)
|
||||||
|
if ret not in options:
|
||||||
|
click.echo("Invalid choice.")
|
||||||
|
return None
|
||||||
|
return ret
|
Loading…
Reference in a new issue