Spaces:
Running
Running
| import asyncio | |
| import unittest | |
| import uuid | |
| from system_tests.e2e.base_crm_test import BaseCRMTestServerStream | |
| from system_tests.e2e.digital_sales_test_helpers import DigitalSalesTestHelpers | |
| class TestCRMFollowup(BaseCRMTestServerStream): | |
| """ | |
| Test class for CRM follow-up queries with lite mode enabled. | |
| Tests the flow of querying contacts.txt, then following up with detail queries. | |
| """ | |
| # test_env_vars = { | |
| # "DYNACONF_ADVANCED_FEATURES__LITE_MODE": "true", | |
| # "DYNACONF_ADVANCED_FEATURES__LITE_MODE_TOOL_THRESHOLD": "15", | |
| # } | |
| def __init__(self, *args, **kwargs): | |
| super().__init__(*args, **kwargs) | |
| self.helpers = DigitalSalesTestHelpers() | |
| self.thread_id = None | |
| async def asyncSetUp(self): | |
| """Set up test environment and generate thread ID.""" | |
| await super().asyncSetUp() | |
| # Generate a unique thread ID for this test | |
| self.thread_id = str(uuid.uuid4()) | |
| print(f"\n=== Test thread ID: {self.thread_id} ===") | |
| async def test_crm_contacts_followup_fast(self): | |
| """Test CRM contacts query with follow-up questions in fast mode.""" | |
| print(f"Running test with thread ID: {self.thread_id}") | |
| # First query | |
| query = "from contacts.txt show me which users belong to the crm system" | |
| all_events = await self.run_task(query, thread_id=self.thread_id) | |
| self._assert_answer_event(all_events) | |
| # First followup - using same thread ID | |
| followup_query = "show me details of sarah" | |
| all_followup_events = await self.run_task(followup_query, thread_id=self.thread_id) | |
| self._assert_answer_event( | |
| all_followup_events, expected_keywords=["Sarah", "[email protected]"] | |
| ) | |
| # Second followup - using same thread ID | |
| second_followup_query = "how many employee's work at her account's company?" | |
| all_second_followup_events = await self.run_task(second_followup_query, thread_id=self.thread_id) | |
| answer_event = next((e for e in all_second_followup_events if e.get("event") == "Answer"), None) | |
| self.assertIsNotNone(answer_event, "The 'Answer' event was not found in the stream.") | |
| answer_str = str(answer_event.get("data", "")).lower() | |
| has_employee_count = "4,260" in answer_str or "4260" in answer_str | |
| self.assertTrue( | |
| has_employee_count, f"Answer does not contain employee count '4,260' or '4260'. Got: {answer_str}" | |
| ) | |
| # Third followup - read contacts.txt and show initials (using same thread ID) | |
| third_followup_query = "read contacts.txt and show me their initials" | |
| all_third_followup_events = await self.run_task(third_followup_query, thread_id=self.thread_id) | |
| # Assert that we got an answer | |
| self._assert_answer_event(all_third_followup_events) | |
| # Verify that the answer contains some expected initials | |
| # Expected initials: SB (Sarah Bell), SJ (Sharon Jimenez), RR (Ruth Ross), | |
| # DR (Dorothy Richardson), JR (James Richardson), MT (Michael Torres), EL (Emma Larsson) | |
| initials_answer_event = next( | |
| (e for e in all_third_followup_events if e.get("event") == "Answer"), None | |
| ) | |
| self.assertIsNotNone(initials_answer_event, "The 'Answer' event was not found in the stream.") | |
| initials_answer_str = str(initials_answer_event.get("data", "")) | |
| # Check for at least a few initials (being flexible as format might vary) | |
| print(f"Initials answer received: {initials_answer_str}") | |
| # Sleep to allow traces to be saved | |
| print("\n--- Sleeping for 5 seconds to allow traces to save ---") | |
| await asyncio.sleep(10) | |
| print("--- Sleep complete ---") | |
| async def test_crm_contacts_revenue_percentile_email(self): | |
| """Test filtering contacts from CRM, calculating revenue percentile, and drafting email in parallel.""" | |
| n_tasks = 1 | |
| async def run_one(turn): | |
| thread_id = str(uuid.uuid4()) | |
| query = ( | |
| "From the list of emails in the file contacts.txt, please filter those who exist in the CRM application. " | |
| "For the filtered contacts, retrieve their name and their associated account name, and calculate their account's revenue percentile across all accounts. " | |
| "Finally, draft a an email based on email_template.md template summarizing the result" | |
| ) | |
| try: | |
| all_events = await self.run_task(query, thread_id=thread_id) | |
| # Assert that we got an answer | |
| self._assert_answer_event( | |
| all_events, | |
| expected_keywords=[ | |
| "NextGen", | |
| "Sigma", | |
| "Gamma Delta", | |
| "Upsilon", | |
| "85", | |
| "79", | |
| "77", | |
| "59", | |
| "Account Performance Update", | |
| ], | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"Task {turn} failed: {e}") | |
| return False | |
| results = await asyncio.gather(*(run_one(i) for i in range(n_tasks))) | |
| success_count = sum(results) | |
| print(f"\nSuccess rate: {success_count}/{n_tasks}") | |
| # Assert that all tasks succeeded | |
| await asyncio.sleep(10) | |
| self.assertEqual( | |
| success_count, | |
| n_tasks, | |
| f"{n_tasks - success_count} out of {n_tasks} tasks failed. Check the output above for error details.", | |
| ) | |
| # Sleep to allow traces to be saved | |
| print("--- Sleep complete ---") | |
| if __name__ == "__main__": | |
| unittest.main() | |