英文:
How to stream Agent's response in Langchain?
问题
I am using Langchain with Gradio interface in Python. I have made a conversational agent and am trying to stream its responses to the Gradio chatbot interface. I have had a look at the Langchain docs and could not find an example that implements streaming with Agents.
Here are some parts of my code:
Loading the LLM
def load_llm():
return AzureChatOpenAI(
temperature=hparams["temperature"],
top_p=hparams["top_p"],
max_tokens=hparams["max_tokens"],
presence_penalty=hparams["presence_penalty"],
frequency_penalty=hparams["freq_penaulty"],
streaming=True,
callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]),
verbose=True,
model_name=hparams["model"],
deployment_name=models_dict[hparams["model"]],
)
Loading the agent
def load_chain(memory, sys_msg, llm):
"""Logic for loading the chain you want to use should go here."""
agent_chain = initialize_agent(
tools,
llm,
agent="conversational-react-description",
verbose=True,
memory=memory,
agent_kwargs={"added_prompt": sys_msg},
streaming=True,
)
return agent_chain
Creating the chatbot to be used in Gradio.
class ChatWrapper:
def __init__(self, sys_msg):
self.lock = Lock()
self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,)
self.chain = load_chain(self.memory, sys_msg, load_llm())
self.sysmsg = sys_msg
def __call__(
self, api_key: str, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain]
):
"""Execute the chat functionality."""
self.lock.acquire()
try:
history = history or []
# Run chain and append input.
output = self.chain.run(input=inp)
history.append((inp, output))
except Exception as e:
raise e
finally:
self.lock.release()
return history, history
I currently can stream into the terminal output but what I am looking for is streaming in my Gradio interface.
Can you please help me with that?
英文:
I am using Langchain with Gradio interface in Python. I have made a conversational agent and am trying to stream its responses to the Gradio chatbot interface. I have had a look at the Langchain docs and could not find an example that implements streaming with Agents.
Here are some parts of my code:
# Loading the LLM
def load_llm():
return AzureChatOpenAI(
temperature=hparams["temperature"],
top_p=hparams["top_p"],
max_tokens=hparams["max_tokens"],
presence_penalty=hparams["presence_penalty"],
frequency_penalty=hparams["freq_penaulty"],
streaming=True,
callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]),
verbose=True,
model_name=hparams["model"],
deployment_name = models_dict[hparams["model"]],
)
# Loading the agent
def load_chain(memory, sys_msg, llm):
"""Logic for loading the chain you want to use should go here."""
agent_chain = initialize_agent(tools,
llm,
agent="conversational-react-description",
verbose=True,
memory=memory,
agent_kwargs = {"added_prompt": sys_msg},
streaming=True,
)
return agent_chain
# Creating the chatbot to be used in Gradio.
class ChatWrapper:
def __init__(self, sys_msg):
self.lock = Lock()
self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,)
self.chain = load_chain(self.memory, sys_msg, load_llm())
self.sysmsg = sys_msg
def __call__(
self, api_key: str, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain]
):
"""Execute the chat functionality."""
self.lock.acquire()
try:
history = history or []
# Run chain and append input.
output = self.chain.run(input=inp)
history.append((inp, output))
except Exception as e:
raise e
finally:
self.lock.release()
return history, history
I currently can stream into the terminal output but what I am looking for is streaming in my Gradio interface.
Can you please help me with that?
答案1
得分: 5
Sure, here's the translated content you requested:
- 创建一个队列
from queue import SimpleQueue
q = SimpleQueue()
- 创建一个自定义回调,将生成的标记写入队列
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from typing import Any, Union
job_done = object() # 表示处理完成
class StreamingGradioCallbackHandler(BaseCallbackHandler):
def __init__(self, q: SimpleQueue):
self.q = q
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""当LLM开始运行时运行。清空队列。"""
while not self.q.empty():
try:
self.q.get(block=False)
except Empty:
continue
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""在新的LLM标记上运行。仅在启用流式传输时可用。"""
self.q.put(token)
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""当LLM结束运行时运行。"""
self.q.put(job_done)
def on_llm_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> None:
"""当LLM发生错误时运行。"""
self.q.put(job_done)
- 将回调提供给您的LLM
callback_manager = CallbackManager([StreamingGradioCallbackHandler(q),
StreamingStdOutCallbackHandler()]),
- 在Gradio代码中创建一个并行线程,运行您的代理。从队列中读取。
我不理解您的ChatWrapper。实际上,我不熟悉Gradio,所以我将依赖于文档中的示例。
from threading import Thread
def bot(history):
user_question = history[-1][0]
thread = Thread(target=chain.run, kwargs={"input": user_question})
thread.start()
history[-1][1] = ""
while True:
next_token = q.get(block=True) # 阻塞直到有输入可用
if next_token is job_done:
break
history[-1][1] += next_token
yield history
thread.join()
英文:
One of possible solutions is to use a queue as a mediator.
- Create a queue
from queue import SimpleQueue
q = SimpleQueue()
- Create a custom callback, that will write produced tokens into the queue
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from typing import Any, Union
job_done = object() # signals the processing is done
class StreamingGradioCallbackHandler(BaseCallbackHandler):
def __init__(self, q: SimpleQueue):
self.q = q
def on_llm_start(
self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
) -> None:
"""Run when LLM starts running. Clean the queue."""
while not self.q.empty():
try:
self.q.get(block=False)
except Empty:
continue
def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
"""Run on new LLM token. Only available when streaming is enabled."""
self.q.put(token)
def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
"""Run when LLM ends running."""
self.q.put(job_done)
def on_llm_error(
self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
) -> None:
"""Run when LLM errors."""
self.q.put(job_done)
- Give the callback to your LLM
callback_manager=CallbackManager([StreamingGradioCallbackHandler(q),
StreamingStdOutCallbackHandler()]),
- In Gradio code, create a parallel thread, that will run your agent. Read from the queue.
I don't understand your ChatWrapper. Actually, I am not familiar with Gradio, so I will rely on an example from the documentation.
from threading import Thread
def bot(history):
user_question = history[-1][0]
thread = Thread(target=chain.run, kwargs={"input": user_question})
thread.start()
history[-1][1] = ""
while True:
next_token = q.get(block=True) # Blocks until an input is available
if next_token is job_done:
break
history[-1][1] += next_token
yield history
thread.join()
答案2
得分: 0
To stream an agent’s response in Langchain, you can use a StreamingStdOutCallbackHandler
callback.
Here is an example of how to use it:
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from langchain.callbacks.base import CallbackManager
chat = ChatOpenAI(
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()],
temperature=0
)
resp = chat([HumanMessage(content="Write me a song about sparkling water.")])
英文:
To stream an agent’s response in Langchain, you can use a StreamingStdOutCallbackHandler
callback.
Here is an example of how to use it:
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from langchain.callbacks.base import CallbackManager
chat = ChatOpenAI(
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()],
temperature=0
)
resp = chat([HumanMessage(content="Write me a song about sparkling water.")])
答案3
得分: 0
以下是翻译好的部分:
"Following @Nokados excellent workaround, I found that the logic for streaming the response might not necessarily work if you are working with and LLMChain instead of an Agent. You can see below some logic I've used that helps for multiple LLM calls:
thread.start()
history-1 = ""
while thread.is_alive():
next_token = q.get(block=True) # Blocks until an input is available
if next_token != job_done:
history-1 += next_token
else:
history-1 += "\n"
yield history
thread.join()"
英文:
Following @Nokados excellent workaround, I found that the logic for streaming the response might not necessarily work if you are working with and LLMChain instead of an Agent. You can see below some logic I've used that helps for multiple LLM calls:
thread.start()
history[-1][1] = ""
while thread.is_alive():
next_token = q.get(block=True) # Blocks until an input is available
if next_token != job_done:
history[-1][1] += next_token
else:
history[-1][1] += "\n"
yield history
thread.join()
答案4
得分: 0
Here is the translated code portion you provided:
在我阅读了来自`langchain.chat_models.base`的源代码之后,我尝试使用`stream`方法,它确实有效。
def create_model(temperature=0, **kwargs):
extra_inputs = dict(kwargs.items())
streaming_active = extra_inputs.get('streaming', False)
model = AzureChatOpenAI(
openai_api_base=BASE_URL,
openai_api_key=API_KEY,
openai_api_type="azure",
openai_api_version=API_VERSION,
deployment_name=DEPLOYMENT_NAME,
temperature=temperature,
streaming=streaming_active,
callbacks=[StreamingStdOutCallbackHandler()] if streaming_active else [],
)
return model
from langchain.chat_models import ChatOpenAI
from langchain.schema import AIMessage, HumanMessage
import gradio as gr
llm = create_model(streaming=True)
def stream_predict(message, history):
history_langchain_format = []
for human, ai in history:
history_langchain_format.append(HumanMessage(content=human))
history_langchain_format.append(AIMessage(content=ai))
history_langchain_format.append(HumanMessage(content=message))
gpt_response = llm.stream(history_langchain_format)
partial_message = ""
for chunk in gpt_response:
partial_message = partial_message + chunk.dict()['content']
yield partial_message
gr.ChatInterface(stream_predict).queue().launch()
Please note that I've corrected a typo in your code where streaming_acitve
should be streaming_active
.
英文:
After I read a source code from langchain.chat_models.base
, I try to use the stream
method and it just works.
def create_model(temperature = 0, **kwargs):
extra_inputs = dict(kwargs.items())
streaming_acitve = extra_inputs.get('streaming', False)
model = AzureChatOpenAI(
openai_api_base=BASE_URL,
openai_api_key=API_KEY,
openai_api_type="azure",
openai_api_version=API_VERSION,
deployment_name=DEPLOYMENT_NAME,
temperature=temperature,
streaming=streaming_acitve,
callbacks=[StreamingStdOutCallbackHandler()] if streaming_acitve else [],
)
return model
from langchain.chat_models import ChatOpenAI
from langchain.schema import AIMessage, HumanMessage
import gradio as gr
llm = create_model(streaming=True)
def stream_predict(message, history):
history_langchain_format = []
for human, ai in history:
history_langchain_format.append(HumanMessage(content=human))
history_langchain_format.append(AIMessage(content=ai))
history_langchain_format.append(HumanMessage(content=message))
gpt_response = llm.stream(history_langchain_format)
partial_message = ""
for chunk in gpt_response:
partial_message = partial_message + chunk.dict()['content']
yield partial_message
gr.ChatInterface(stream_predict).queue().launch()
答案5
得分: -1
import subprocess
def listen(cmd):
"""
从 http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
"""
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout = []
while True:
line = p.stdout.readline()
stdout.append(line)
print(line, end='')
if line == '' and p.poll() is not None:
break
return ''.join(stdout)
From https://www.saltycrane.com/blog/2009/10/how-capture-stdout-in-real-time-python/
英文:
If you can write on stdout, why don't you also read from it?
import subprocess
def listen(cmd): # cmd = 'python', '-m' 'your_langchain.py'
"""from http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
"""
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout = []
while True:
line = p.stdout.readline()
stdout.append(line)
print line,
if line == '' and p.poll() != None:
break
return ''.join(stdout)
From https://www.saltycrane.com/blog/2009/10/how-capture-stdout-in-real-time-python/
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论