File size: 5,815 Bytes
0646b18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import asyncio
import unittest
import uuid

from system_tests.e2e.base_crm_test import BaseCRMTestServerStream
from system_tests.e2e.digital_sales_test_helpers import DigitalSalesTestHelpers


class TestCRMFollowup(BaseCRMTestServerStream):
    """
    Test class for CRM follow-up queries with lite mode enabled.
    Tests the flow of querying contacts.txt, then following up with detail queries.
    """

    # test_env_vars = {
    #     "DYNACONF_ADVANCED_FEATURES__LITE_MODE": "true",
    #     "DYNACONF_ADVANCED_FEATURES__LITE_MODE_TOOL_THRESHOLD": "15",
    # }

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.helpers = DigitalSalesTestHelpers()
        self.thread_id = None

    async def asyncSetUp(self):
        """Set up test environment and generate thread ID."""
        await super().asyncSetUp()
        # Generate a unique thread ID for this test
        self.thread_id = str(uuid.uuid4())
        print(f"\n=== Test thread ID: {self.thread_id} ===")

    async def test_crm_contacts_followup_fast(self):
        """Test CRM contacts query with follow-up questions in fast mode."""
        print(f"Running test with thread ID: {self.thread_id}")

        # First query
        query = "from contacts.txt show me which users belong to the crm system"
        all_events = await self.run_task(query, thread_id=self.thread_id)
        self._assert_answer_event(all_events)

        # First followup - using same thread ID
        followup_query = "show me details of sarah"
        all_followup_events = await self.run_task(followup_query, thread_id=self.thread_id)
        self._assert_answer_event(
            all_followup_events, expected_keywords=["Sarah", "[email protected]"]
        )

        # Second followup - using same thread ID
        second_followup_query = "how many employee's work at her account's company?"
        all_second_followup_events = await self.run_task(second_followup_query, thread_id=self.thread_id)
        answer_event = next((e for e in all_second_followup_events if e.get("event") == "Answer"), None)
        self.assertIsNotNone(answer_event, "The 'Answer' event was not found in the stream.")
        answer_str = str(answer_event.get("data", "")).lower()
        has_employee_count = "4,260" in answer_str or "4260" in answer_str
        self.assertTrue(
            has_employee_count, f"Answer does not contain employee count '4,260' or '4260'. Got: {answer_str}"
        )

        # Third followup - read contacts.txt and show initials (using same thread ID)
        third_followup_query = "read contacts.txt and show me their initials"
        all_third_followup_events = await self.run_task(third_followup_query, thread_id=self.thread_id)

        # Assert that we got an answer
        self._assert_answer_event(all_third_followup_events)

        # Verify that the answer contains some expected initials
        # Expected initials: SB (Sarah Bell), SJ (Sharon Jimenez), RR (Ruth Ross),
        # DR (Dorothy Richardson), JR (James Richardson), MT (Michael Torres), EL (Emma Larsson)
        initials_answer_event = next(
            (e for e in all_third_followup_events if e.get("event") == "Answer"), None
        )
        self.assertIsNotNone(initials_answer_event, "The 'Answer' event was not found in the stream.")
        initials_answer_str = str(initials_answer_event.get("data", ""))

        # Check for at least a few initials (being flexible as format might vary)
        print(f"Initials answer received: {initials_answer_str}")

        # Sleep to allow traces to be saved
        print("\n--- Sleeping for 5 seconds to allow traces to save ---")
        await asyncio.sleep(10)
        print("--- Sleep complete ---")

    async def test_crm_contacts_revenue_percentile_email(self):
        """Test filtering contacts from CRM, calculating revenue percentile, and drafting email in parallel."""
        n_tasks = 1

        async def run_one(turn):
            thread_id = str(uuid.uuid4())
            query = (
                "From the list of emails in the file contacts.txt, please filter those who exist in the CRM application. "
                "For the filtered contacts, retrieve their name and their associated account name, and calculate their account's revenue percentile across all accounts. "
                "Finally, draft a an email based on email_template.md template summarizing the result"
            )
            try:
                all_events = await self.run_task(query, thread_id=thread_id)

                # Assert that we got an answer
                self._assert_answer_event(
                    all_events,
                    expected_keywords=[
                        "NextGen",
                        "Sigma",
                        "Gamma Delta",
                        "Upsilon",
                        "85",
                        "79",
                        "77",
                        "59",
                        "Account Performance Update",
                    ],
                )
                return True
            except Exception as e:
                print(f"Task {turn} failed: {e}")
                return False

        results = await asyncio.gather(*(run_one(i) for i in range(n_tasks)))
        success_count = sum(results)
        print(f"\nSuccess rate: {success_count}/{n_tasks}")

        # Assert that all tasks succeeded
        await asyncio.sleep(10)
        self.assertEqual(
            success_count,
            n_tasks,
            f"{n_tasks - success_count} out of {n_tasks} tasks failed. Check the output above for error details.",
        )

        # Sleep to allow traces to be saved
        print("--- Sleep complete ---")


if __name__ == "__main__":
    unittest.main()