-
Notifications
You must be signed in to change notification settings - Fork 733
/
Copy pathmulti_agent.py
177 lines (141 loc) ยท 6.01 KB
/
multi_agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import logging
from dataclasses import dataclass
from typing import Optional
from dotenv import load_dotenv
from livekit import api
from livekit.agents import (
Agent,
AgentSession,
ChatContext,
JobContext,
JobProcess,
RoomInputOptions,
RoomOutputOptions,
RunContext,
WorkerOptions,
cli,
metrics,
)
from livekit.agents.job import get_current_job_context
from livekit.agents.llm import function_tool
from livekit.agents.voice import MetricsCollectedEvent
from livekit.plugins import deepgram, openai, silero
# uncomment to enable Krisp BVC noise cancellation, currently supported on Linux and MacOS
# from livekit.plugins import noise_cancellation
## The storyteller agent is a multi-agent that can handoff the session to another agent.
## This example demonstrates more complex workflows with multiple agents.
## Each agent could have its own instructions, as well as different STT, LLM, TTS,
## or realtime models.
logger = logging.getLogger("multi-agent")
load_dotenv()
common_instructions = (
"Your name is Echo. You are a story teller that interacts with the user via voice."
"You are curious and friendly, with a sense of humor."
)
@dataclass
class StoryData:
# Shared data that's used by the storyteller agent.
# This structure is passed as a parameter to function calls.
name: Optional[str] = None
location: Optional[str] = None
class IntroAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions=f"{common_instructions} Your goal is to gather a few pieces of "
"information from the user to make the story personalized and engaging."
"You should ask the user for their name and where they are from."
"Start the conversation with a short introduction.",
)
async def on_enter(self):
# when the agent is added to the session, it'll generate a reply
# according to its instructions
self.session.generate_reply()
@function_tool
async def information_gathered(
self,
context: RunContext[StoryData],
name: str,
location: str,
):
"""Called when the user has provided the information needed to make the story
personalized and engaging.
Args:
name: The name of the user
location: The location of the user
"""
context.userdata.name = name
context.userdata.location = location
story_agent = StoryAgent(name, location)
# by default, StoryAgent will start with a new context, to carry through the current
# chat history, pass in the chat_ctx
# story_agent = StoryAgent(name, location, chat_ctx=context.chat_ctx)
logger.info(
"switching to the story agent with the provided user data: %s", context.userdata
)
return story_agent, "Let's start the story!"
class StoryAgent(Agent):
def __init__(self, name: str, location: str, *, chat_ctx: Optional[ChatContext] = None) -> None:
super().__init__(
instructions=f"{common_instructions}. You should use the user's information in "
"order to make the story personalized."
"create the entire story, weaving in elements of their information, and make it "
"interactive, occasionally interating with the user."
"do not end on a statement, where the user is not expected to respond."
"when interrupted, ask if the user would like to continue or end."
f"The user's name is {name}, from {location}.",
# each agent could override any of the model services, including mixing
# realtime and non-realtime models
llm=openai.realtime.RealtimeModel(voice="echo"),
tts=None,
chat_ctx=chat_ctx,
)
async def on_enter(self):
# when the agent is added to the session, we'll initiate the conversation by
# using the LLM to generate a reply
self.session.generate_reply()
@function_tool
async def story_finished(self, context: RunContext[StoryData]):
"""When you are fininshed telling the story (and the user confirms they don't
want anymore), call this function to end the conversation."""
# interrupt any existing generation
self.session.interrupt()
# generate a goodbye message and hang up
# awaiting it will ensure the message is played out before returning
await self.session.generate_reply(
instructions=f"say goodbye to {context.userdata.name}", allow_interruptions=False
)
job_ctx = get_current_job_context()
await job_ctx.api.room.delete_room(api.DeleteRoomRequest(room=job_ctx.room.name))
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
async def entrypoint(ctx: JobContext):
await ctx.connect()
session = AgentSession[StoryData](
vad=ctx.proc.userdata["vad"],
# any combination of STT, LLM, TTS, or realtime API can be used
llm=openai.LLM(model="gpt-4o-mini"),
stt=deepgram.STT(model="nova-3"),
tts=openai.TTS(voice="echo"),
userdata=StoryData(),
)
# log metrics as they are emitted, and total usage after session is over
usage_collector = metrics.UsageCollector()
@session.on("metrics_collected")
def _on_metrics_collected(ev: MetricsCollectedEvent):
metrics.log_metrics(ev.metrics)
usage_collector.collect(ev.metrics)
async def log_usage():
summary = usage_collector.get_summary()
logger.info(f"Usage: {summary}")
ctx.add_shutdown_callback(log_usage)
await session.start(
agent=IntroAgent(),
room=ctx.room,
room_input_options=RoomInputOptions(
# uncomment to enable Krisp BVC noise cancellation
# noise_cancellation=noise_cancellation.BVC(),
),
room_output_options=RoomOutputOptions(transcription_enabled=True),
)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))