| 12345678910111213141516171819202122232425262728 |
- import asyncio
- import unittest
- from app.pipeline.workflow import Workflow
- class WorkflowIntegrationTest(unittest.TestCase):
- def test_workflow_executes_end_to_end(self):
- wf = Workflow()
- result = asyncio.run(
- wf.execute_workflow(
- {
- "dashboard_id": "DB001",
- "card_ids": ["C0001", "C0002"],
- "bbk": "BBK002",
- "user_request": "",
- }
- )
- )
- self.assertGreaterEqual(len(result), 1)
- first = result[0]
- self.assertEqual(first.dashboard_id, "DB001")
- self.assertIn(first.card_id, ["C0001", "C0002"])
- if __name__ == "__main__":
- unittest.main()
|