from collections import defaultdict, deque from pathlib import Path from threading import Lock from typing import List, Optional from typing_extensions import Self import yaml from app.const import CONFIG_DIR class PipelineManager: _instance = None _lock = Lock() def __new__(cls, pipeline_setup_path: Optional[Path] = None) -> Self: with cls._lock: if cls._instance is None: setup_path = Path(pipeline_setup_path or (CONFIG_DIR / "pipeline_settings.yaml")) cls._instance = super(PipelineManager, cls).__new__(cls) cls._instance.build(setup_path) return cls._instance def build(self, pipeline_setup_path: Path): with open(pipeline_setup_path, "r", encoding='utf8') as f: self.pipeline_setup = yaml.safe_load(f) self._nodes = [] self._node_map = {} for key in self.pipeline_setup: setattr(self, key, self.pipeline_setup[key]) self._nodes.append(key) self._node_map[key] = self.pipeline_setup[key] self._topological_sort() def _topological_sort(self): graph = defaultdict(list) in_degree = {node: 0 for node in self._nodes} for key, setup in self._node_map.items(): for dep in setup["dependencies"]: graph[dep].append(key) in_degree[key] += 1 queue = deque([node for node in self._nodes if in_degree[node] == 0]) sorted_order = [] while queue: current = queue.popleft() sorted_order.append(current) for neighbor in graph[current]: in_degree[neighbor] -= 1 if in_degree[neighbor] == 0: queue.append(neighbor) if len(sorted_order) != len(self._nodes): raise ValueError("Cycle detected in pipeline dependencies") self._nodes = sorted_order def get_node_config(self, node_name: str) -> dict: if hasattr(self, node_name): return getattr(self, node_name) raise ValueError(f"Node {node_name} not found in pipeline configuration") @property def nodes(self): return self._nodes @property def node_map(self): return self._node_map