test_workflow.py 732 B

12345678910111213141516171819202122232425262728
  1. import asyncio
  2. import unittest
  3. from app.pipeline.workflow import Workflow
  4. class WorkflowIntegrationTest(unittest.TestCase):
  5. def test_workflow_executes_end_to_end(self):
  6. wf = Workflow()
  7. result = asyncio.run(
  8. wf.execute_workflow(
  9. {
  10. "dashboard_id": "DB001",
  11. "card_ids": ["C0001", "C0002"],
  12. "bbk": "BBK002",
  13. "user_request": "",
  14. }
  15. )
  16. )
  17. self.assertGreaterEqual(len(result), 1)
  18. first = result[0]
  19. self.assertEqual(first.dashboard_id, "DB001")
  20. self.assertIn(first.card_id, ["C0001", "C0002"])
  21. if __name__ == "__main__":
  22. unittest.main()