| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- from collections import defaultdict, deque
- from pathlib import Path
- from threading import Lock
- from typing import List, Optional
- from typing_extensions import Self
- import yaml
- from app.const import CONFIG_DIR
- class PipelineManager:
- _instance = None
- _lock = Lock()
- def __new__(cls, pipeline_setup_path: Optional[Path] = None) -> Self:
- with cls._lock:
- if cls._instance is None:
- setup_path = Path(pipeline_setup_path or (CONFIG_DIR / "pipeline_settings.yaml"))
- cls._instance = super(PipelineManager, cls).__new__(cls)
- cls._instance.build(setup_path)
- return cls._instance
-
- def build(self, pipeline_setup_path: Path):
- with open(pipeline_setup_path, "r", encoding='utf8') as f:
- self.pipeline_setup = yaml.safe_load(f)
- self._nodes = []
- self._node_map = {}
- for key in self.pipeline_setup:
- setattr(self, key, self.pipeline_setup[key])
- self._nodes.append(key)
- self._node_map[key] = self.pipeline_setup[key]
-
- self._topological_sort()
- def _topological_sort(self):
- graph = defaultdict(list)
- in_degree = {node: 0 for node in self._nodes}
- for key, setup in self._node_map.items():
- for dep in setup["dependencies"]:
- graph[dep].append(key)
- in_degree[key] += 1
-
- queue = deque([node for node in self._nodes if in_degree[node] == 0])
- sorted_order = []
- while queue:
- current = queue.popleft()
- sorted_order.append(current)
- for neighbor in graph[current]:
- in_degree[neighbor] -= 1
- if in_degree[neighbor] == 0:
- queue.append(neighbor)
-
- if len(sorted_order) != len(self._nodes):
- raise ValueError("Cycle detected in pipeline dependencies")
- self._nodes = sorted_order
- def get_node_config(self, node_name: str) -> dict:
- if hasattr(self, node_name):
- return getattr(self, node_name)
- raise ValueError(f"Node {node_name} not found in pipeline configuration")
- @property
- def nodes(self):
- return self._nodes
-
- @property
- def node_map(self):
- return self._node_map
|