SatisfactoryPlusCalculator/factorygame/data/common.py

63 lines
2.4 KiB
Python
Raw Normal View History

from concurrent.futures import Future
2024-01-30 17:07:28 +00:00
from datetime import timedelta, datetime
from typing import Callable, Optional
from sqlalchemy import select
from sqlalchemy.orm import Session as AlchemySession
2024-01-30 17:07:28 +00:00
from .models import Resource, Recipe, ResourceFlow
def resource_needs_update(resource: Resource | None, recipe_info_timeout: Optional[timedelta] = None) -> bool:
if recipe_info_timeout is None:
recipe_info_timeout = timedelta(days=30)
return (
resource is None
or resource.recipes_populated_at is None
or datetime.utcnow() - resource.recipes_populated_at > recipe_info_timeout
)
def chose_resource(session: AlchemySession, resource_label: str, prompt: Callable) -> Future[Resource | None]:
2024-01-30 17:07:28 +00:00
matching_resources = session.scalars(Resource.by_label(resource_label)).all()
ret = Future()
2024-01-30 17:07:28 +00:00
if len(matching_resources) == 0:
print("Could not find any resource matching the search string…")
ret.set_result(None)
2024-01-30 17:07:28 +00:00
else:
options = {(idx + 1): str(matching_resources[idx].label) for idx in range(len(matching_resources))}
options[0] = "Continue with wiki search…"
2024-01-30 17:07:28 +00:00
def resource_selected_cb(res_future: Future[int | None]):
selected_res = res_future.result()
if selected_res is not None and selected_res != 0:
ret.set_result(matching_resources[selected_res - 1])
else:
ret.set_result(None)
2024-01-30 17:07:28 +00:00
prompt(options=options, text="Chose a resource", default=1).add_done_callback(resource_selected_cb)
return ret
def chose_recipe(session: AlchemySession, resource: Resource, prompt: Callable) -> Future[Recipe | None]:
2024-02-02 16:44:14 +00:00
stmt = select(Recipe).distinct().join(Recipe.results).filter(ResourceFlow.resource_id == resource.id)
2024-01-30 17:07:28 +00:00
recipes = session.scalars(stmt).all()
ret = Future()
2024-01-30 17:07:28 +00:00
if len(recipes) == 0:
print("No recipes found for resource")
ret.set_result(None)
2024-01-30 17:07:28 +00:00
elif len(recipes) > 1:
options = {(idx + 1): recipes[idx].describe() for idx in range(len(recipes))}
def user_choice_cb(choice_future: Future[int | None]):
user_choice = choice_future.result()
if user_choice is None:
ret.set_result(None)
else:
ret.set_result(recipes[user_choice - 1])
prompt(options=options, text="Select recipe", default=1).add_done_callback(user_choice_cb)
2024-01-30 17:07:28 +00:00
else:
ret.set_result(recipes[0])
return ret