如何在Langchain中流式传输Agent的响应?

huangapple go评论64阅读模式
英文:

How to stream Agent's response in Langchain?

问题

I am using Langchain with Gradio interface in Python. I have made a conversational agent and am trying to stream its responses to the Gradio chatbot interface. I have had a look at the Langchain docs and could not find an example that implements streaming with Agents.

Here are some parts of my code:

Loading the LLM

def load_llm():
return AzureChatOpenAI(
temperature=hparams["temperature"],
top_p=hparams["top_p"],
max_tokens=hparams["max_tokens"],
presence_penalty=hparams["presence_penalty"],
frequency_penalty=hparams["freq_penaulty"],
streaming=True,
callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]),
verbose=True,
model_name=hparams["model"],
deployment_name=models_dict[hparams["model"]],
)

Loading the agent

def load_chain(memory, sys_msg, llm):
"""Logic for loading the chain you want to use should go here."""
agent_chain = initialize_agent(
tools,
llm,
agent="conversational-react-description",
verbose=True,
memory=memory,
agent_kwargs={"added_prompt": sys_msg},
streaming=True,
)
return agent_chain

Creating the chatbot to be used in Gradio.

class ChatWrapper:

def __init__(self, sys_msg):
    self.lock = Lock()
    self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,)
    self.chain = load_chain(self.memory, sys_msg, load_llm())
    self.sysmsg = sys_msg

def __call__(
    self, api_key: str, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain]
):
    """Execute the chat functionality."""
    self.lock.acquire()
    try:
        history = history or []
        # Run chain and append input.
        output = self.chain.run(input=inp)

        history.append((inp, output))
    except Exception as e:
        raise e
    finally:
        self.lock.release()
    return history, history

I currently can stream into the terminal output but what I am looking for is streaming in my Gradio interface.

Can you please help me with that?

英文:

I am using Langchain with Gradio interface in Python. I have made a conversational agent and am trying to stream its responses to the Gradio chatbot interface. I have had a look at the Langchain docs and could not find an example that implements streaming with Agents.
Here are some parts of my code:

# Loading the LLM
def load_llm():
    return AzureChatOpenAI(
        temperature=hparams["temperature"],
        top_p=hparams["top_p"],
        max_tokens=hparams["max_tokens"],
        presence_penalty=hparams["presence_penalty"],
        frequency_penalty=hparams["freq_penaulty"],
        streaming=True, 
        callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]), 
        verbose=True,
        model_name=hparams["model"],
        deployment_name = models_dict[hparams["model"]],
        )

# Loading the agent
def load_chain(memory, sys_msg, llm):
    """Logic for loading the chain you want to use should go here."""
    agent_chain = initialize_agent(tools, 
                                   llm, 
                                   agent="conversational-react-description", 
                                   verbose=True, 
                                   memory=memory, 
                                   agent_kwargs = {"added_prompt": sys_msg},
                                   streaming=True, 
                                   )
    return agent_chain

# Creating the chatbot to be used in Gradio.
class ChatWrapper:

    def __init__(self, sys_msg):
        self.lock = Lock()
        self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True,)
        self.chain = load_chain(self.memory, sys_msg, load_llm())
        self.sysmsg = sys_msg
    def __call__(
        self, api_key: str, inp: str, history: Optional[Tuple[str, str]], chain: Optional[ConversationChain]
    ):
        """Execute the chat functionality."""
        self.lock.acquire()
        try:
            history = history or []
            # Run chain and append input.
            output = self.chain.run(input=inp)
            
            history.append((inp, output))
        except Exception as e:
            raise e
        finally:
            self.lock.release()
        return history, history

I currently can stream into the terminal output but what I am looking for is streaming in my Gradio interface.

Can you please help me with that?

答案1

得分: 5

Sure, here's the translated content you requested:

  1. 创建一个队列
from queue import SimpleQueue
q = SimpleQueue()
  1. 创建一个自定义回调,将生成的标记写入队列
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from typing import Any, Union

job_done = object()  # 表示处理完成

class StreamingGradioCallbackHandler(BaseCallbackHandler):
    def __init__(self, q: SimpleQueue):
        self.q = q

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        """当LLM开始运行时运行。清空队列。"""
        while not self.q.empty():
            try:
                self.q.get(block=False)
            except Empty:
                continue

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """在新的LLM标记上运行。仅在启用流式传输时可用。"""
        self.q.put(token)

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """当LLM结束运行时运行。"""
        self.q.put(job_done)

    def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> None:
        """当LLM发生错误时运行。"""
        self.q.put(job_done)
  1. 将回调提供给您的LLM
callback_manager = CallbackManager([StreamingGradioCallbackHandler(q),
                                    StreamingStdOutCallbackHandler()]),
  1. 在Gradio代码中创建一个并行线程,运行您的代理。从队列中读取。

我不理解您的ChatWrapper。实际上,我不熟悉Gradio,所以我将依赖于文档中的示例。

from threading import Thread

def bot(history):
    user_question = history[-1][0]
    thread = Thread(target=chain.run, kwargs={"input": user_question})
    thread.start()
    history[-1][1] = ""
    while True:
        next_token = q.get(block=True)  # 阻塞直到有输入可用
        if next_token is job_done:
            break
        history[-1][1] += next_token
        yield history
    thread.join()
英文:

One of possible solutions is to use a queue as a mediator.

  1. Create a queue
from queue import SimpleQueue
q = SimpleQueue()
  1. Create a custom callback, that will write produced tokens into the queue
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from typing import Any, Union


job_done = object() # signals the processing is done

class StreamingGradioCallbackHandler(BaseCallbackHandler):
    def __init__(self, q: SimpleQueue):
        self.q = q

    def on_llm_start(
        self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any
    ) -> None:
        """Run when LLM starts running. Clean the queue."""
        while not self.q.empty():
            try:
                self.q.get(block=False)
            except Empty:
                continue

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """Run on new LLM token. Only available when streaming is enabled."""
        self.q.put(token)

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Run when LLM ends running."""
        self.q.put(job_done)

    def on_llm_error(
        self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any
    ) -> None:
        """Run when LLM errors."""
        self.q.put(job_done)
  1. Give the callback to your LLM
callback_manager=CallbackManager([StreamingGradioCallbackHandler(q),
                                  StreamingStdOutCallbackHandler()]), 
  1. In Gradio code, create a parallel thread, that will run your agent. Read from the queue.

I don't understand your ChatWrapper. Actually, I am not familiar with Gradio, so I will rely on an example from the documentation.

from threading import Thread

def bot(history):
    user_question = history[-1][0]
    thread = Thread(target=chain.run, kwargs={"input": user_question})
    thread.start()
    history[-1][1] = ""
    while True:
        next_token = q.get(block=True) # Blocks until an input is available
        if next_token is job_done:
            break
        history[-1][1] += next_token
        yield history
    thread.join()

答案2

得分: 0

To stream an agent’s response in Langchain, you can use a StreamingStdOutCallbackHandler callback.

Here is an example of how to use it:

from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from langchain.callbacks.base import CallbackManager

chat = ChatOpenAI(
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()],
    temperature=0
)

resp = chat([HumanMessage(content="Write me a song about sparkling water.")])
英文:

To stream an agent’s response in Langchain, you can use a StreamingStdOutCallbackHandler callback.

Here is an example of how to use it:

from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain.chat_models import ChatOpenAI
from langchain.schema import HumanMessage
from langchain.callbacks.base import CallbackManager

chat = ChatOpenAI(
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()],
    temperature=0
)

resp = chat([HumanMessage(content="Write me a song about sparkling water.")])

答案3

得分: 0

以下是翻译好的部分:

"Following @Nokados excellent workaround, I found that the logic for streaming the response might not necessarily work if you are working with and LLMChain instead of an Agent. You can see below some logic I've used that helps for multiple LLM calls:

thread.start()
history-1 = ""
while thread.is_alive():
next_token = q.get(block=True) # Blocks until an input is available
if next_token != job_done:
history-1 += next_token
else:
history-1 += "\n"
yield history
thread.join()"

英文:

Following @Nokados excellent workaround, I found that the logic for streaming the response might not necessarily work if you are working with and LLMChain instead of an Agent. You can see below some logic I've used that helps for multiple LLM calls:

thread.start()
history[-1][1] = ""
while thread.is_alive():
    next_token = q.get(block=True) # Blocks until an input is available
    if next_token != job_done:
        history[-1][1] += next_token
    else:
        history[-1][1] += "\n"
    yield history
thread.join()

答案4

得分: 0

Here is the translated code portion you provided:

在我阅读了来自`langchain.chat_models.base`的源代码之后我尝试使用`stream`方法它确实有效

def create_model(temperature=0, **kwargs):
    extra_inputs = dict(kwargs.items())
    streaming_active = extra_inputs.get('streaming', False)
    model = AzureChatOpenAI(
        openai_api_base=BASE_URL,
        openai_api_key=API_KEY,
        openai_api_type="azure",
        openai_api_version=API_VERSION,
        deployment_name=DEPLOYMENT_NAME,
        temperature=temperature,
        streaming=streaming_active,
        callbacks=[StreamingStdOutCallbackHandler()] if streaming_active else [],
    )
    return model

from langchain.chat_models import ChatOpenAI
from langchain.schema import AIMessage, HumanMessage

import gradio as gr

llm = create_model(streaming=True)

def stream_predict(message, history):
    history_langchain_format = []
    for human, ai in history:
        history_langchain_format.append(HumanMessage(content=human))
        history_langchain_format.append(AIMessage(content=ai))
    history_langchain_format.append(HumanMessage(content=message))

    gpt_response = llm.stream(history_langchain_format)

    partial_message = ""
    for chunk in gpt_response:
        partial_message = partial_message + chunk.dict()['content']
        yield partial_message

gr.ChatInterface(stream_predict).queue().launch()

Please note that I've corrected a typo in your code where streaming_acitve should be streaming_active.

英文:

After I read a source code from langchain.chat_models.base, I try to use the stream method and it just works.

def create_model(temperature = 0, **kwargs):
    extra_inputs = dict(kwargs.items())
    streaming_acitve = extra_inputs.get('streaming', False)
    model = AzureChatOpenAI(
        openai_api_base=BASE_URL,
        openai_api_key=API_KEY,
        openai_api_type="azure",
        openai_api_version=API_VERSION,
        deployment_name=DEPLOYMENT_NAME,
        temperature=temperature,
        streaming=streaming_acitve,
        callbacks=[StreamingStdOutCallbackHandler()] if streaming_acitve else [],
    )
    return model

from langchain.chat_models import ChatOpenAI
from langchain.schema import AIMessage, HumanMessage

import gradio as gr

llm = create_model(streaming=True)

def stream_predict(message, history):
    history_langchain_format = []
    for human, ai in history:
        history_langchain_format.append(HumanMessage(content=human))
        history_langchain_format.append(AIMessage(content=ai))
    history_langchain_format.append(HumanMessage(content=message))
    
    gpt_response = llm.stream(history_langchain_format)

    partial_message = ""
    for chunk in gpt_response:
        partial_message = partial_message + chunk.dict()['content']
        yield partial_message   
      
gr.ChatInterface(stream_predict).queue().launch()

答案5

得分: -1

import subprocess

def listen(cmd):
"""
http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
"""
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout = []
while True:
line = p.stdout.readline()
stdout.append(line)
print(line, end='')
if line == '' and p.poll() is not None:
break
return ''.join(stdout)

From https://www.saltycrane.com/blog/2009/10/how-capture-stdout-in-real-time-python/

英文:

If you can write on stdout, why don't you also read from it?

import subprocess

def listen(cmd): # cmd = 'python', '-m' 'your_langchain.py'
    """from http://blog.kagesenshi.org/2008/02/teeing-python-subprocesspopen-output.html
    """
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    stdout = []
    while True:
        line = p.stdout.readline()
        stdout.append(line)
        print line,
        if line == '' and p.poll() != None:
            break
    return ''.join(stdout)

From https://www.saltycrane.com/blog/2009/10/how-capture-stdout-in-real-time-python/

huangapple
  • 本文由 发表于 2023年4月20日 00:58:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/76057076.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定