cuga-agent / src /system_tests /e2e /crm_followup_test.py
Sami Marreed
feat: docker-v1 with optimized frontend
0646b18
import asyncio
import unittest
import uuid
from system_tests.e2e.base_crm_test import BaseCRMTestServerStream
from system_tests.e2e.digital_sales_test_helpers import DigitalSalesTestHelpers
class TestCRMFollowup(BaseCRMTestServerStream):
"""
Test class for CRM follow-up queries with lite mode enabled.
Tests the flow of querying contacts.txt, then following up with detail queries.
"""
# test_env_vars = {
# "DYNACONF_ADVANCED_FEATURES__LITE_MODE": "true",
# "DYNACONF_ADVANCED_FEATURES__LITE_MODE_TOOL_THRESHOLD": "15",
# }
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.helpers = DigitalSalesTestHelpers()
self.thread_id = None
async def asyncSetUp(self):
"""Set up test environment and generate thread ID."""
await super().asyncSetUp()
# Generate a unique thread ID for this test
self.thread_id = str(uuid.uuid4())
print(f"\n=== Test thread ID: {self.thread_id} ===")
async def test_crm_contacts_followup_fast(self):
"""Test CRM contacts query with follow-up questions in fast mode."""
print(f"Running test with thread ID: {self.thread_id}")
# First query
query = "from contacts.txt show me which users belong to the crm system"
all_events = await self.run_task(query, thread_id=self.thread_id)
self._assert_answer_event(all_events)
# First followup - using same thread ID
followup_query = "show me details of sarah"
all_followup_events = await self.run_task(followup_query, thread_id=self.thread_id)
self._assert_answer_event(
all_followup_events, expected_keywords=["Sarah", "[email protected]"]
)
# Second followup - using same thread ID
second_followup_query = "how many employee's work at her account's company?"
all_second_followup_events = await self.run_task(second_followup_query, thread_id=self.thread_id)
answer_event = next((e for e in all_second_followup_events if e.get("event") == "Answer"), None)
self.assertIsNotNone(answer_event, "The 'Answer' event was not found in the stream.")
answer_str = str(answer_event.get("data", "")).lower()
has_employee_count = "4,260" in answer_str or "4260" in answer_str
self.assertTrue(
has_employee_count, f"Answer does not contain employee count '4,260' or '4260'. Got: {answer_str}"
)
# Third followup - read contacts.txt and show initials (using same thread ID)
third_followup_query = "read contacts.txt and show me their initials"
all_third_followup_events = await self.run_task(third_followup_query, thread_id=self.thread_id)
# Assert that we got an answer
self._assert_answer_event(all_third_followup_events)
# Verify that the answer contains some expected initials
# Expected initials: SB (Sarah Bell), SJ (Sharon Jimenez), RR (Ruth Ross),
# DR (Dorothy Richardson), JR (James Richardson), MT (Michael Torres), EL (Emma Larsson)
initials_answer_event = next(
(e for e in all_third_followup_events if e.get("event") == "Answer"), None
)
self.assertIsNotNone(initials_answer_event, "The 'Answer' event was not found in the stream.")
initials_answer_str = str(initials_answer_event.get("data", ""))
# Check for at least a few initials (being flexible as format might vary)
print(f"Initials answer received: {initials_answer_str}")
# Sleep to allow traces to be saved
print("\n--- Sleeping for 5 seconds to allow traces to save ---")
await asyncio.sleep(10)
print("--- Sleep complete ---")
async def test_crm_contacts_revenue_percentile_email(self):
"""Test filtering contacts from CRM, calculating revenue percentile, and drafting email in parallel."""
n_tasks = 1
async def run_one(turn):
thread_id = str(uuid.uuid4())
query = (
"From the list of emails in the file contacts.txt, please filter those who exist in the CRM application. "
"For the filtered contacts, retrieve their name and their associated account name, and calculate their account's revenue percentile across all accounts. "
"Finally, draft a an email based on email_template.md template summarizing the result"
)
try:
all_events = await self.run_task(query, thread_id=thread_id)
# Assert that we got an answer
self._assert_answer_event(
all_events,
expected_keywords=[
"NextGen",
"Sigma",
"Gamma Delta",
"Upsilon",
"85",
"79",
"77",
"59",
"Account Performance Update",
],
)
return True
except Exception as e:
print(f"Task {turn} failed: {e}")
return False
results = await asyncio.gather(*(run_one(i) for i in range(n_tasks)))
success_count = sum(results)
print(f"\nSuccess rate: {success_count}/{n_tasks}")
# Assert that all tasks succeeded
await asyncio.sleep(10)
self.assertEqual(
success_count,
n_tasks,
f"{n_tasks - success_count} out of {n_tasks} tasks failed. Check the output above for error details.",
)
# Sleep to allow traces to be saved
print("--- Sleep complete ---")
if __name__ == "__main__":
unittest.main()