pipeline_manager.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from collections import defaultdict, deque
  2. from pathlib import Path
  3. from threading import Lock
  4. from typing import List, Optional
  5. from typing_extensions import Self
  6. import yaml
  7. from app.const import CONFIG_DIR
  8. class PipelineManager:
  9. _instance = None
  10. _lock = Lock()
  11. def __new__(cls, pipeline_setup_path: Optional[Path] = None) -> Self:
  12. with cls._lock:
  13. if cls._instance is None:
  14. setup_path = Path(pipeline_setup_path or (CONFIG_DIR / "pipeline_settings.yaml"))
  15. cls._instance = super(PipelineManager, cls).__new__(cls)
  16. cls._instance.build(setup_path)
  17. return cls._instance
  18. def build(self, pipeline_setup_path: Path):
  19. with open(pipeline_setup_path, "r", encoding='utf8') as f:
  20. self.pipeline_setup = yaml.safe_load(f)
  21. self._nodes = []
  22. self._node_map = {}
  23. for key in self.pipeline_setup:
  24. setattr(self, key, self.pipeline_setup[key])
  25. self._nodes.append(key)
  26. self._node_map[key] = self.pipeline_setup[key]
  27. self._topological_sort()
  28. def _topological_sort(self):
  29. graph = defaultdict(list)
  30. in_degree = {node: 0 for node in self._nodes}
  31. for key, setup in self._node_map.items():
  32. for dep in setup["dependencies"]:
  33. graph[dep].append(key)
  34. in_degree[key] += 1
  35. queue = deque([node for node in self._nodes if in_degree[node] == 0])
  36. sorted_order = []
  37. while queue:
  38. current = queue.popleft()
  39. sorted_order.append(current)
  40. for neighbor in graph[current]:
  41. in_degree[neighbor] -= 1
  42. if in_degree[neighbor] == 0:
  43. queue.append(neighbor)
  44. if len(sorted_order) != len(self._nodes):
  45. raise ValueError("Cycle detected in pipeline dependencies")
  46. self._nodes = sorted_order
  47. def get_node_config(self, node_name: str) -> dict:
  48. if hasattr(self, node_name):
  49. return getattr(self, node_name)
  50. raise ValueError(f"Node {node_name} not found in pipeline configuration")
  51. @property
  52. def nodes(self):
  53. return self._nodes
  54. @property
  55. def node_map(self):
  56. return self._node_map