跳转至

Msg types

labridge.agent.chat_msg.msg_types

labridge.agent.chat_msg.msg_types.AgentResponse

Bases: BaseModel

The response of chat agent.

response (str): The response string. references (Optional[List[str]]): The paths of reference files.

Source code in labridge\agent\chat_msg\msg_types.py
164
165
166
167
168
169
170
171
172
class AgentResponse(BaseModel):
	r"""
	The response of chat agent.

	response (str): The response string.
	references (Optional[List[str]]): The paths of reference files.
	"""
	response: str
	references: Optional[List[str]]

labridge.agent.chat_msg.msg_types.BaseClientMessage

Bases: BaseModel

This is the base class for client's messages.

user_id (str): The user id of a Lab member. reply_in_speech (bool): If True, the agent will reply in speech. enable_instruct (bool): If True, enable the user to intervene into the agent's reasoning phase. enable_comment (bool): If True: enable the user to intervene into the agent's acting phase.

Source code in labridge\agent\chat_msg\msg_types.py
32
33
34
35
36
37
38
39
40
41
42
43
44
class BaseClientMessage(BaseModel):
	r"""
	This is the base class for client's messages.

	user_id (str): The user id of a Lab member.
	reply_in_speech (bool): If True, the agent will reply in speech.
	enable_instruct (bool): If True, enable the user to intervene into the agent's reasoning phase.
	enable_comment (bool): If True: enable the user to intervene into the agent's acting phase.
	"""
	user_id: str
	reply_in_speech: bool
	enable_instruct: bool
	enable_comment: bool

labridge.agent.chat_msg.msg_types.ChatMsgBuffer

Bases: object

This class includes buffers that manager the messages from users and the agent's corresponding reply.

Before a chat, the user's messages will put into the user_msg_buffer. When the agent get a user's messages, these messages will be packed and used as input to Call Chat().

Additionally, During the execution of Chat(), the agent is able to get new messages from the buffer, such as when collecting information from the user in some tools.

The response of the agent will be put into the agent_reply_buffer, similarly, the user may receive an 'inner' response from the buffer.

Depending on the user's choice reply_in_speech, the agent's response will be sent back to the user directly or transformed to speech before that.

Source code in labridge\agent\chat_msg\msg_types.py
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
class ChatMsgBuffer(object):
	r"""
	This class includes buffers that manager the messages from users and the agent's corresponding reply.

	Before a chat, the user's messages will put into the `user_msg_buffer`.
	When the agent get a user's messages, these messages will be packed and used as input to Call `Chat()`.

	Additionally, During the execution of `Chat()`, the agent is able to get new messages from the buffer, such as
	when collecting information from the user in some tools.

	The response of the agent will be put into the `agent_reply_buffer`, similarly, the user may receive an 'inner'
	response from the buffer.

	Depending on the user's choice `reply_in_speech`, the agent's response will be sent back to the user directly or
	transformed to speech before that.
	"""
	def __init__(self):
		root = Path(__file__)
		for i in range(4):
			root = root.parent
		self._root = root
		self.account_manager = AccountManager()
		self.user_msg_buffer: Dict[str, List[BaseClientMessage]] = {}
		self.agent_reply_buffer: Dict[str, Optional[Union[ServerReply, ServerSpeechReply]]] = {}
		self.config_buffer: Dict[str, ChatConfig] = {}
		self.user_msg_formatter = UserMsgFormatter()
		self.reset_buffer()
		self._fs = fsspec.filesystem("file")

	def reset_buffer(self):
		r"""
		Reset the user_msg_buffer and agent_reply_buffer.

		Returns:
			None
		"""
		users = self.account_manager.get_users()
		self.user_msg_buffer = {user: [] for user in users}
		self.agent_reply_buffer = {user: None for user in users}
		self.config_buffer = {user: ChatConfig() for user in users}

	def update_buffer_for_new_users(self):
		r"""
		When new users are registered, update the buffer.

		Returns:
			None
		"""
		users = self.account_manager.get_users()
		new_user_msg_buffer = {user: [] for user in users if user not in self.user_msg_buffer.keys()}
		new_agent_reply_buffer = {user: None for user in users if user not in self.agent_reply_buffer.keys()}
		new_config_buffer = {user: ChatConfig() for user in users if user not in self.config_buffer.keys()}
		self.user_msg_buffer.update(new_user_msg_buffer)
		self.agent_reply_buffer.update(new_agent_reply_buffer)
		self.config_buffer.update(new_config_buffer)

	def clear_user_msg(self, user_id: str):
		r"""
		Clear a user's messages in the buffer.

		Args:
			user_id (str): The user id of a Lab member.
		"""
		self.user_msg_buffer[user_id] = []

	def put_user_msg(self, user_msg: BaseClientMessage):
		r"""
		Put a new user message into the buffer.

		Args:
			user_msg (BaseClientMessage): A new message from a user.
		"""
		if not isinstance(user_msg, (FileWithTextMessage, ChatTextMessage, ChatSpeechMessage)):
			raise ValueError(f"The Msg type {type(user_msg)} is not supported.")

		user_id = user_msg.user_id
		self.account_manager.check_valid_user(user_id=user_id)
		self.user_msg_buffer[user_id].append(user_msg)
		self.config_buffer[user_id].update(
			enable_instruct=user_msg.enable_instruct,
			enable_comment=user_msg.enable_comment,
			reply_in_speech=user_msg.reply_in_speech,
		)

	async def get_user_msg(self, user_id: str, timeout: int = 240) -> Optional[PackedUserMessage]:
		r"""
		Wait until a user's messages are put into the buffer, and get them.

		Args:
			user_id (str): The user id of a Lab member.
			timeout (int): If timeout, return None.

		Returns:
			Optional[PackedUserMessage]: The obtained packed user messages.
				If no user messages if put in until time is out, return None.
		"""
		start_time = time.time()

		while True:
			msgs = self.user_msg_buffer[user_id]
			end_time = time.time()
			if len(msgs) > 0 or end_time > start_time + timeout:
				self.clear_user_msg(user_id=user_id)
				break
			await asyncio.sleep(1)
		if len(msgs) > 0:
			packed_msgs = self.user_msg_formatter.formatted_msgs(msgs=msgs)
			return packed_msgs

		no_reply_msg = PackedUserMessage(
			user_id=user_id,
			user_msg="",
			system_msg=f"The user {user_id} does not reply, end this conversation.",
		)
		return no_reply_msg

	def test_get_user_text(
		self,
		user_id: str,
		enable_instruct: bool = False,
		enable_comment: bool = False,
	) -> PackedUserMessage:
		r""" For debug. """
		user_msg = input("User: ")

		text_msg = ChatTextMessage(
			user_id=user_id,
			text=user_msg,
			reply_in_speech=False,
			enable_instruct=enable_instruct,
			enable_comment=enable_comment,
		)
		packed_msgs = self.user_msg_formatter.formatted_msgs(msgs=[text_msg])
		return packed_msgs

	def default_user_speech_path(self, user_id: str, speech_suffix: str) -> str:
		r""" Default save path of a user's speech. """
		if speech_suffix not in SUPPORT_USER_SPEECH_SUFFIX:
			raise ValueError(
				f"The audio file type {speech_suffix} is not supported,"
				f"use one of {SUPPORT_USER_SPEECH_SUFFIX} instead."
			)
		user_speech_path = self._root / f"{USER_TMP_DIR}/{user_id}/{USER_SPEECH_NAME}{speech_suffix}"
		dir_pth = str(user_speech_path.parent)
		if not self._fs.exists(dir_pth):
			self._fs.mkdirs(dir_pth)
		return str(user_speech_path)

	def default_agent_speech_path(self, user_id: str) -> str:
		r""" Default save path of agent's speech. """
		agent_speech_path = self._root / f"{USER_TMP_DIR}/{user_id}/{AGENT_SPEECH_NAME}"
		dir_pth = str(agent_speech_path.parent)
		if not self._fs.exists(dir_pth):
			self._fs.mkdirs(dir_pth)
		return str(agent_speech_path)

	def default_tmp_file_path(self, user_id: str, file_name: str) -> str:
		r""" Default save path of the user's uploaded file. """
		date, _ = get_time()
		tmp_dir = self._root / f"{USER_TMP_DIR}/{user_id}/{date}"
		tmp_path = str(tmp_dir / file_name)
		return tmp_path

	def put_agent_reply(
		self,
		user_id: str,
		reply_str: str,
		references: List[str] = None,
		inner_chat: bool = False,
		extra_info: str = None,
	):
		r"""
		Put an agent's reply into the buffer.

		Args:
			user_id (str): The user id of a Lab member.
			reply_str (str): The agent's reply string.
			references (List[str]): The dumped string of reference info dict. Defaults to None.
				The `file_path` is corresponding to the key `REF_INFO_FILE_PATH_KEY`.
				The `file_size` is corresponding to the key `REF_INFO_FILE_SIZE_KEY`
			inner_chat (bool): Whether the reply happens inside a chat.
			extra_info (str): extra information generally with long texts.
		"""
		self.account_manager.check_valid_user(user_id=user_id)

		if references is not None:
			ref_infos = []
			for ref_info_str in references:
				ref_info_dict = json.loads(ref_info_str)
				ref_path = ref_info_dict.get(REF_INFO_FILE_PATH_KEY, None)
				if ref_path is None or not self._fs.exists(ref_path):
					continue

				ref_size = os.path.getsize(ref_path)
				ref_info_dict[REF_INFO_FILE_SIZE_KEY] = ref_size
				ref_infos.append(ref_info_dict)
			if ref_infos:
				references = ref_infos
			else:
				references = None

		if not self.config_buffer[user_id].reply_in_speech:
			reply = ServerReply(
				reply_text=reply_str,
				references=references,
				valid=True,
				inner_chat=inner_chat,
				extra_info=extra_info,
			)
			self.agent_reply_buffer[user_id] = reply
			return

		speech_name = self.default_agent_speech_path(user_id=user_id)
		speech_path = TTSWorker.transform(text=reply_str, speech_name=speech_name)

		speech_size = os.path.getsize(speech_path)
		reply = ServerSpeechReply(
			reply_speech={
				speech_path: speech_size,
			},
			inner_chat=inner_chat,
			references=references,
			valid=True,
			extra_info=extra_info,
		)
		self.agent_reply_buffer[user_id] = reply

	def get_agent_reply(self, user_id: str) -> Union[ServerReply, ServerSpeechReply]:
		r"""
		Get the agent reply to a user from the buffer.

		Args:
			user_id (str): The user id of a Lab member.

		Returns:
			Union[ServerReply, ServerSpeechReply]: If an agent's reply exists, return a valid reply,
				otherwise, return an invalid reply.
		"""
		agent_reply = self.agent_reply_buffer[user_id]
		if agent_reply is None:
			return ServerReply(
				reply_text="Please wait.",
				valid=False,
			)
		else:
			self.agent_reply_buffer[user_id] = None
			return agent_reply

labridge.agent.chat_msg.msg_types.ChatMsgBuffer.clear_user_msg(user_id)

Clear a user's messages in the buffer.

PARAMETER DESCRIPTION
user_id

The user id of a Lab member.

TYPE: str

Source code in labridge\agent\chat_msg\msg_types.py
370
371
372
373
374
375
376
377
def clear_user_msg(self, user_id: str):
	r"""
	Clear a user's messages in the buffer.

	Args:
		user_id (str): The user id of a Lab member.
	"""
	self.user_msg_buffer[user_id] = []

labridge.agent.chat_msg.msg_types.ChatMsgBuffer.default_agent_speech_path(user_id)

Default save path of agent's speech.

Source code in labridge\agent\chat_msg\msg_types.py
462
463
464
465
466
467
468
def default_agent_speech_path(self, user_id: str) -> str:
	r""" Default save path of agent's speech. """
	agent_speech_path = self._root / f"{USER_TMP_DIR}/{user_id}/{AGENT_SPEECH_NAME}"
	dir_pth = str(agent_speech_path.parent)
	if not self._fs.exists(dir_pth):
		self._fs.mkdirs(dir_pth)
	return str(agent_speech_path)

labridge.agent.chat_msg.msg_types.ChatMsgBuffer.default_tmp_file_path(user_id, file_name)

Default save path of the user's uploaded file.

Source code in labridge\agent\chat_msg\msg_types.py
470
471
472
473
474
475
def default_tmp_file_path(self, user_id: str, file_name: str) -> str:
	r""" Default save path of the user's uploaded file. """
	date, _ = get_time()
	tmp_dir = self._root / f"{USER_TMP_DIR}/{user_id}/{date}"
	tmp_path = str(tmp_dir / file_name)
	return tmp_path

labridge.agent.chat_msg.msg_types.ChatMsgBuffer.default_user_speech_path(user_id, speech_suffix)

Default save path of a user's speech.

Source code in labridge\agent\chat_msg\msg_types.py
449
450
451
452
453
454
455
456
457
458
459
460
def default_user_speech_path(self, user_id: str, speech_suffix: str) -> str:
	r""" Default save path of a user's speech. """
	if speech_suffix not in SUPPORT_USER_SPEECH_SUFFIX:
		raise ValueError(
			f"The audio file type {speech_suffix} is not supported,"
			f"use one of {SUPPORT_USER_SPEECH_SUFFIX} instead."
		)
	user_speech_path = self._root / f"{USER_TMP_DIR}/{user_id}/{USER_SPEECH_NAME}{speech_suffix}"
	dir_pth = str(user_speech_path.parent)
	if not self._fs.exists(dir_pth):
		self._fs.mkdirs(dir_pth)
	return str(user_speech_path)

labridge.agent.chat_msg.msg_types.ChatMsgBuffer.get_agent_reply(user_id)

Get the agent reply to a user from the buffer.

PARAMETER DESCRIPTION
user_id

The user id of a Lab member.

TYPE: str

RETURNS DESCRIPTION
Union[ServerReply, ServerSpeechReply]

Union[ServerReply, ServerSpeechReply]: If an agent's reply exists, return a valid reply, otherwise, return an invalid reply.

Source code in labridge\agent\chat_msg\msg_types.py
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
def get_agent_reply(self, user_id: str) -> Union[ServerReply, ServerSpeechReply]:
	r"""
	Get the agent reply to a user from the buffer.

	Args:
		user_id (str): The user id of a Lab member.

	Returns:
		Union[ServerReply, ServerSpeechReply]: If an agent's reply exists, return a valid reply,
			otherwise, return an invalid reply.
	"""
	agent_reply = self.agent_reply_buffer[user_id]
	if agent_reply is None:
		return ServerReply(
			reply_text="Please wait.",
			valid=False,
		)
	else:
		self.agent_reply_buffer[user_id] = None
		return agent_reply

labridge.agent.chat_msg.msg_types.ChatMsgBuffer.get_user_msg(user_id, timeout=240) async

Wait until a user's messages are put into the buffer, and get them.

PARAMETER DESCRIPTION
user_id

The user id of a Lab member.

TYPE: str

timeout

If timeout, return None.

TYPE: int DEFAULT: 240

RETURNS DESCRIPTION
Optional[PackedUserMessage]

Optional[PackedUserMessage]: The obtained packed user messages. If no user messages if put in until time is out, return None.

Source code in labridge\agent\chat_msg\msg_types.py
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
async def get_user_msg(self, user_id: str, timeout: int = 240) -> Optional[PackedUserMessage]:
	r"""
	Wait until a user's messages are put into the buffer, and get them.

	Args:
		user_id (str): The user id of a Lab member.
		timeout (int): If timeout, return None.

	Returns:
		Optional[PackedUserMessage]: The obtained packed user messages.
			If no user messages if put in until time is out, return None.
	"""
	start_time = time.time()

	while True:
		msgs = self.user_msg_buffer[user_id]
		end_time = time.time()
		if len(msgs) > 0 or end_time > start_time + timeout:
			self.clear_user_msg(user_id=user_id)
			break
		await asyncio.sleep(1)
	if len(msgs) > 0:
		packed_msgs = self.user_msg_formatter.formatted_msgs(msgs=msgs)
		return packed_msgs

	no_reply_msg = PackedUserMessage(
		user_id=user_id,
		user_msg="",
		system_msg=f"The user {user_id} does not reply, end this conversation.",
	)
	return no_reply_msg

labridge.agent.chat_msg.msg_types.ChatMsgBuffer.put_agent_reply(user_id, reply_str, references=None, inner_chat=False, extra_info=None)

Put an agent's reply into the buffer.

PARAMETER DESCRIPTION
user_id

The user id of a Lab member.

TYPE: str

reply_str

The agent's reply string.

TYPE: str

references

The dumped string of reference info dict. Defaults to None. The file_path is corresponding to the key REF_INFO_FILE_PATH_KEY. The file_size is corresponding to the key REF_INFO_FILE_SIZE_KEY

TYPE: List[str] DEFAULT: None

inner_chat

Whether the reply happens inside a chat.

TYPE: bool DEFAULT: False

extra_info

extra information generally with long texts.

TYPE: str DEFAULT: None

Source code in labridge\agent\chat_msg\msg_types.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
def put_agent_reply(
	self,
	user_id: str,
	reply_str: str,
	references: List[str] = None,
	inner_chat: bool = False,
	extra_info: str = None,
):
	r"""
	Put an agent's reply into the buffer.

	Args:
		user_id (str): The user id of a Lab member.
		reply_str (str): The agent's reply string.
		references (List[str]): The dumped string of reference info dict. Defaults to None.
			The `file_path` is corresponding to the key `REF_INFO_FILE_PATH_KEY`.
			The `file_size` is corresponding to the key `REF_INFO_FILE_SIZE_KEY`
		inner_chat (bool): Whether the reply happens inside a chat.
		extra_info (str): extra information generally with long texts.
	"""
	self.account_manager.check_valid_user(user_id=user_id)

	if references is not None:
		ref_infos = []
		for ref_info_str in references:
			ref_info_dict = json.loads(ref_info_str)
			ref_path = ref_info_dict.get(REF_INFO_FILE_PATH_KEY, None)
			if ref_path is None or not self._fs.exists(ref_path):
				continue

			ref_size = os.path.getsize(ref_path)
			ref_info_dict[REF_INFO_FILE_SIZE_KEY] = ref_size
			ref_infos.append(ref_info_dict)
		if ref_infos:
			references = ref_infos
		else:
			references = None

	if not self.config_buffer[user_id].reply_in_speech:
		reply = ServerReply(
			reply_text=reply_str,
			references=references,
			valid=True,
			inner_chat=inner_chat,
			extra_info=extra_info,
		)
		self.agent_reply_buffer[user_id] = reply
		return

	speech_name = self.default_agent_speech_path(user_id=user_id)
	speech_path = TTSWorker.transform(text=reply_str, speech_name=speech_name)

	speech_size = os.path.getsize(speech_path)
	reply = ServerSpeechReply(
		reply_speech={
			speech_path: speech_size,
		},
		inner_chat=inner_chat,
		references=references,
		valid=True,
		extra_info=extra_info,
	)
	self.agent_reply_buffer[user_id] = reply

labridge.agent.chat_msg.msg_types.ChatMsgBuffer.put_user_msg(user_msg)

Put a new user message into the buffer.

PARAMETER DESCRIPTION
user_msg

A new message from a user.

TYPE: BaseClientMessage

Source code in labridge\agent\chat_msg\msg_types.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
def put_user_msg(self, user_msg: BaseClientMessage):
	r"""
	Put a new user message into the buffer.

	Args:
		user_msg (BaseClientMessage): A new message from a user.
	"""
	if not isinstance(user_msg, (FileWithTextMessage, ChatTextMessage, ChatSpeechMessage)):
		raise ValueError(f"The Msg type {type(user_msg)} is not supported.")

	user_id = user_msg.user_id
	self.account_manager.check_valid_user(user_id=user_id)
	self.user_msg_buffer[user_id].append(user_msg)
	self.config_buffer[user_id].update(
		enable_instruct=user_msg.enable_instruct,
		enable_comment=user_msg.enable_comment,
		reply_in_speech=user_msg.reply_in_speech,
	)

labridge.agent.chat_msg.msg_types.ChatMsgBuffer.reset_buffer()

Reset the user_msg_buffer and agent_reply_buffer.

RETURNS DESCRIPTION

None

Source code in labridge\agent\chat_msg\msg_types.py
343
344
345
346
347
348
349
350
351
352
353
def reset_buffer(self):
	r"""
	Reset the user_msg_buffer and agent_reply_buffer.

	Returns:
		None
	"""
	users = self.account_manager.get_users()
	self.user_msg_buffer = {user: [] for user in users}
	self.agent_reply_buffer = {user: None for user in users}
	self.config_buffer = {user: ChatConfig() for user in users}

labridge.agent.chat_msg.msg_types.ChatMsgBuffer.test_get_user_text(user_id, enable_instruct=False, enable_comment=False)

For debug.

Source code in labridge\agent\chat_msg\msg_types.py
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
def test_get_user_text(
	self,
	user_id: str,
	enable_instruct: bool = False,
	enable_comment: bool = False,
) -> PackedUserMessage:
	r""" For debug. """
	user_msg = input("User: ")

	text_msg = ChatTextMessage(
		user_id=user_id,
		text=user_msg,
		reply_in_speech=False,
		enable_instruct=enable_instruct,
		enable_comment=enable_comment,
	)
	packed_msgs = self.user_msg_formatter.formatted_msgs(msgs=[text_msg])
	return packed_msgs

labridge.agent.chat_msg.msg_types.ChatMsgBuffer.update_buffer_for_new_users()

When new users are registered, update the buffer.

RETURNS DESCRIPTION

None

Source code in labridge\agent\chat_msg\msg_types.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def update_buffer_for_new_users(self):
	r"""
	When new users are registered, update the buffer.

	Returns:
		None
	"""
	users = self.account_manager.get_users()
	new_user_msg_buffer = {user: [] for user in users if user not in self.user_msg_buffer.keys()}
	new_agent_reply_buffer = {user: None for user in users if user not in self.agent_reply_buffer.keys()}
	new_config_buffer = {user: ChatConfig() for user in users if user not in self.config_buffer.keys()}
	self.user_msg_buffer.update(new_user_msg_buffer)
	self.agent_reply_buffer.update(new_agent_reply_buffer)
	self.config_buffer.update(new_config_buffer)

labridge.agent.chat_msg.msg_types.ChatSpeechMessage

Bases: BaseClientMessage

This message includes:

  1. Basic: user_id.
  2. The save path of user's speech file data.

This message is used in the websocket_chat_speech.

Source code in labridge\agent\chat_msg\msg_types.py
100
101
102
103
104
105
106
107
108
109
110
class ChatSpeechMessage(BaseClientMessage):
	r"""
	This message includes:

	1. Basic: user_id.
	2. The save path of user's speech file data.

	This message is used in the `websocket_chat_speech`.
	"""
	speech_path: str
	reply_in_speech: bool = True

labridge.agent.chat_msg.msg_types.ChatTextMessage

Bases: BaseClientMessage

This message includes:

  1. Basic: user_id.
  2. The user's query.

This message is used in the websocket_chat_text.

Source code in labridge\agent\chat_msg\msg_types.py
87
88
89
90
91
92
93
94
95
96
97
class ChatTextMessage(BaseClientMessage):
	r"""
	This message includes:

	1. Basic: user_id.
	2. The user's query.

	This message is used in the `websocket_chat_text`.
	"""
	text: str
	reply_in_speech: bool = False

labridge.agent.chat_msg.msg_types.FileWithTextMessage

Bases: BaseClientMessage

This message includes:

  1. Basic: user_id
  2. The info of the file to be uploaded.
  3. The attached user's query.
  4. Whether to reply in speech or not.

This message is used in the websocket_chat_with_file.

Source code in labridge\agent\chat_msg\msg_types.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class FileWithTextMessage(BaseClientMessage):
	r"""
	This message includes:

	1. Basic: user_id
	2. The info of the file to be uploaded.
	3. The attached user's query.
	4. Whether to reply in speech or not.

	This message is used in the `websocket_chat_with_file`.
	"""
	attached_text: str
	reply_in_speech: bool = False
	file_path: Optional[str] = None

	def dumps(self) -> str:
		r"""
		The formatted string that the client sends to the server for uploading request,
		including the file info and the attached text.
		"""
		msg_dict = {
			"user_id": self.user_id,
			"file_name": self.file_name,
			"attached_text": self.attached_text
		}
		return json.dumps(msg_dict)

	@classmethod
	def loads(cls, dumped_str):
		msg_dict = json.loads(dumped_str)
		user_id = msg_dict.get("user_id")
		file_name = msg_dict.get("file_name")
		attached_text = msg_dict.get("attached_text")
		return cls(
			user_id=user_id,
			file_name=file_name,
			attached_text=attached_text,
		)

labridge.agent.chat_msg.msg_types.FileWithTextMessage.dumps()

The formatted string that the client sends to the server for uploading request, including the file info and the attached text.

Source code in labridge\agent\chat_msg\msg_types.py
62
63
64
65
66
67
68
69
70
71
72
def dumps(self) -> str:
	r"""
	The formatted string that the client sends to the server for uploading request,
	including the file info and the attached text.
	"""
	msg_dict = {
		"user_id": self.user_id,
		"file_name": self.file_name,
		"attached_text": self.attached_text
	}
	return json.dumps(msg_dict)

labridge.agent.chat_msg.msg_types.PackedUserMessage

Pack the user messages.

user_id (str): The user id of a Lab member. system_msg (str): The corresponding system message. user_msg (str): The packed user messages. reply_in_speech (bool): Whether the agent should reply in speech or not chat_group_id (Optional[str]): The ID of a chat group (If the messages are from a chat group). Defaults to None.

Source code in labridge\agent\chat_msg\msg_types.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
class PackedUserMessage:
	r"""
	Pack the user messages.

	user_id (str): The user id of a Lab member.
	system_msg (str): The corresponding system message.
	user_msg (str): The packed user messages.
	reply_in_speech (bool): Whether the agent should reply in speech or not
	chat_group_id (Optional[str]): The ID of a chat group (If the messages are from a chat group). Defaults to None.
	"""
	def __init__(
		self,
		user_id: str,
		system_msg: str,
		user_msg: str,
		chat_group_id: Optional[str] = None,
	):
		self.user_id = user_id
		self.system_msg = system_msg
		self.user_msg = user_msg
		self.chat_group_id = chat_group_id

	def dumps(self) -> str:
		msg_dict = {
			"user_id": self.user_id,
			"system_msg": self.system_msg,
			"user_msg": self.user_msg,
			"chat_group_id": self.chat_group_id,
		}
		return json.dumps(msg_dict)

	@classmethod
	def loads(cls, dumped_str: str):
		r"""
		Load from a dumped JSON string.

		Args:
			dumped_str (str): The dumped JSON string.

		Returns:
			PackedUserMessage
		"""
		msg_dict = json.loads(dumped_str)
		return cls(
			user_id=msg_dict["user_id"],
			system_msg=msg_dict["system_msg"],
			user_msg=msg_dict["user_msg"],
			chat_group_id=msg_dict["chat_group_id"],
		)

labridge.agent.chat_msg.msg_types.PackedUserMessage.loads(dumped_str) classmethod

Load from a dumped JSON string.

PARAMETER DESCRIPTION
dumped_str

The dumped JSON string.

TYPE: str

RETURNS DESCRIPTION

PackedUserMessage

Source code in labridge\agent\chat_msg\msg_types.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@classmethod
def loads(cls, dumped_str: str):
	r"""
	Load from a dumped JSON string.

	Args:
		dumped_str (str): The dumped JSON string.

	Returns:
		PackedUserMessage
	"""
	msg_dict = json.loads(dumped_str)
	return cls(
		user_id=msg_dict["user_id"],
		system_msg=msg_dict["system_msg"],
		user_msg=msg_dict["user_msg"],
		chat_group_id=msg_dict["chat_group_id"],
	)

labridge.agent.chat_msg.msg_types.ServerReply

Bases: BaseModel

The server's text reply.

reply_text (str): The reply text. valid (bool): Whether this reply contains valid information. references (Optional[List[dict]]): The infos of reference files. error (Optional[str]): The error information. If no error, it is None. inner_chat (Optional[bool]): Whether the reply is produced inside the Chat Call. - If this reply is the final response of the agent, it is False. - If this reply is an internal response such as collecting information from the user or getting authorization, it is True. When inner_chat is True, the client should post the user's answer to corresponding URL with flag Inner.

Source code in labridge\agent\chat_msg\msg_types.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class ServerReply(BaseModel):
	r"""
	The server's text reply.

	reply_text (str): The reply text.
	valid (bool): Whether this reply contains valid information.
	references (Optional[List[dict]]): The infos of reference files.
	error (Optional[str]): The error information. If no error, it is None.
	inner_chat (Optional[bool]): Whether the reply is produced inside the Chat Call.
		- If this reply is the final response of the agent, it is False.
		- If this reply is an internal response such as collecting information from the user or getting authorization,
		it is True. When `inner_chat` is True, the client should post the user's answer to corresponding URL with flag `Inner`.
	"""
	reply_text: str
	valid: bool
	references: Optional[List[dict]] = None
	extra_info: Optional[str] = None
	error: Optional[str] = None
	inner_chat: Optional[bool] = False

labridge.agent.chat_msg.msg_types.ServerSpeechReply

Bases: BaseModel

The server's speech reply.

reply_speech (Dict[str, int]): The path of the agent's speech file. valid (bool): Whether the reply contains valid information. When receiving an invalid reply, the client should continue to get the server's reply until get a valid reply. references (Optional[List[dict]]): The paths of reference files. inner_chat (Optional[bool]): Whether the reply is produced inside the Chat Call. - If this reply is the final response of the agent, it is False. - If this reply is an internal response such as collecting information from the user or getting authorization, it is True. When inner_chat is True, the client should post the user's answer to corresponding URL with flag Inner.

Source code in labridge\agent\chat_msg\msg_types.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class ServerSpeechReply(BaseModel):
	r"""
	The server's speech reply.

	reply_speech (Dict[str, int]): The path of the agent's speech file.
	valid (bool): Whether the reply contains valid information. When receiving an invalid reply,
		the client should continue to get the server's reply until get a valid reply.
	references (Optional[List[dict]]): The paths of reference files.
	inner_chat (Optional[bool]): Whether the reply is produced inside the Chat Call.
		- If this reply is the final response of the agent, it is False.
		- If this reply is an internal response such as collecting information from the user or getting authorization,
		it is True. When `inner_chat` is True, the client should post the user's answer to corresponding URL with flag `Inner`.
	"""
	reply_speech: Dict[str, int]
	valid: bool
	references: Optional[List[dict]] = None
	extra_info: Optional[str] = None
	error: Optional[str] = None
	inner_chat: Optional[bool] = False

labridge.agent.chat_msg.msg_types.UserMsgFormatter

Bases: object

This class transform the user's messages into specific formats and generate corresponding system messages.

Source code in labridge\agent\chat_msg\msg_types.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
class UserMsgFormatter(object):
	r"""
	This class transform the user's messages into specific formats and generate corresponding system messages.
	"""

	def _speech_to_text(self, msg: ChatSpeechMessage) -> str:
		r""" Speech message to text. """
		text = ASRWorker.transform(speech_path=msg.speech_path)
		return text

	def _formatted_file_with_text(self, msg: FileWithTextMessage, file_idx: int) -> Tuple[str, str]:
		r""" FileWithTextMessage to formatted text. """
		system_str = f"Path of File {file_idx}: {msg.file_path}"
		user_str = f"The user query about the File {file_idx}:\n{msg.attached_text}"
		return system_str, user_str

	def formatted_msgs(self, msgs: List[BaseClientMessage]) -> PackedUserMessage:
		r"""
		Turn into formatted text message.

		Args:
			msgs (List[BaseClientMessage]): The user's messages.

		Returns:
			PackedUserMessage: The packed user messages, and system message.

		"""
		file_idx = 1
		user_id = msgs[0].user_id
		reply_in_speech = msgs[0].reply_in_speech
		enable_instruct = msgs[0].enable_instruct
		enable_comment = msgs[0].enable_comment

		date_str, time_str = get_time()
		user_queries = []
		system_strings = [
			f"You are chatting with a user one-to-one\n"
			f"User id: {user_id}\n"
			f"Current date: {date_str}\n"
			f"Current time: {time_str}\n",
		]

		for msg in msgs:
			if isinstance(msg, ChatSpeechMessage):
				user_str = self._speech_to_text(msg=msg)
				if user_str:
					user_queries.append(user_str)
			elif isinstance(msg, FileWithTextMessage):
				system_str, user_str = self._formatted_file_with_text(msg=msg, file_idx=file_idx)
				file_idx += 1
				user_queries.append(user_str)
				system_strings.append(system_str)
			elif isinstance(msg, ChatTextMessage):
				user_queries.append(msg.text)
			else:
				raise ValueError(f"Invalid Msg type: {type(msg)}")

		system_msg = "\n".join(system_strings)
		user_msg = "\n".join(user_queries)
		packed_msg = PackedUserMessage(
			system_msg=system_msg,
			user_id=user_id,
			user_msg=user_msg,
		)
		return packed_msg

labridge.agent.chat_msg.msg_types.UserMsgFormatter.formatted_msgs(msgs)

Turn into formatted text message.

PARAMETER DESCRIPTION
msgs

The user's messages.

TYPE: List[BaseClientMessage]

RETURNS DESCRIPTION
PackedUserMessage

The packed user messages, and system message.

TYPE: PackedUserMessage

Source code in labridge\agent\chat_msg\msg_types.py
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def formatted_msgs(self, msgs: List[BaseClientMessage]) -> PackedUserMessage:
	r"""
	Turn into formatted text message.

	Args:
		msgs (List[BaseClientMessage]): The user's messages.

	Returns:
		PackedUserMessage: The packed user messages, and system message.

	"""
	file_idx = 1
	user_id = msgs[0].user_id
	reply_in_speech = msgs[0].reply_in_speech
	enable_instruct = msgs[0].enable_instruct
	enable_comment = msgs[0].enable_comment

	date_str, time_str = get_time()
	user_queries = []
	system_strings = [
		f"You are chatting with a user one-to-one\n"
		f"User id: {user_id}\n"
		f"Current date: {date_str}\n"
		f"Current time: {time_str}\n",
	]

	for msg in msgs:
		if isinstance(msg, ChatSpeechMessage):
			user_str = self._speech_to_text(msg=msg)
			if user_str:
				user_queries.append(user_str)
		elif isinstance(msg, FileWithTextMessage):
			system_str, user_str = self._formatted_file_with_text(msg=msg, file_idx=file_idx)
			file_idx += 1
			user_queries.append(user_str)
			system_strings.append(system_str)
		elif isinstance(msg, ChatTextMessage):
			user_queries.append(msg.text)
		else:
			raise ValueError(f"Invalid Msg type: {type(msg)}")

	system_msg = "\n".join(system_strings)
	user_msg = "\n".join(user_queries)
	packed_msg = PackedUserMessage(
		system_msg=system_msg,
		user_id=user_id,
		user_msg=user_msg,
	)
	return packed_msg