如何使用Python从Streamlabs API实时检索最新的捐赠信息?

huangapple go评论77阅读模式
英文:

How can I retrieve the latest donation information in real-time from Streamlabs API using Python?

问题

我尝试使用Python从Streamlabs API实时检索最新的捐赠信息,并使用授权代码流获得的访问令牌来实现这一目标,同时在我的API请求中设置了scope参数为donation.read。

然而,我一直在收到未授权的错误,不确定我做错了什么。我已检查了我的API密钥,并确保scope参数正确,但仍然收到相同的错误。

你能帮我找出我做错了什么吗?从Streamlabs API检索实时捐赠信息是否有特定的方法?

以下是我的设置的一些详细信息:

编程语言:Python
API请求方法:GET
API端点:https://streamlabs.com/api/v2.0/socket/token
授权方法:Bearer Token
Scope参数:donation.read

import requests

client_id = ""
client_secret = ""
redirect_uri = ""
scope = "donations.read"
state = "123456"

auth_url = f"https://streamlabs.com/api/v2.0/authorize?client_id={client_id}&redirect_uri={redirect_uri}&scope={scope}&response_type=code&state={state}"

# 获取授权代码
print(f"打开此URL以获取授权代码:{auth_url}")
auth_code = input("输入授权代码:")

# 用授权代码交换访问令牌
token_url = "https://streamlabs.com/api/v2.0/token"

headers = {
    "Content-Type": "application/x-www-form-urlencoded"
}

data = {
    "grant_type": "authorization_code",
    "client_id": client_id,
    "client_secret": client_secret,
    "redirect_uri": redirect_uri,
    "code": auth_code
}

response = requests.post(token_url, headers=headers, data=data)

if response.status_code == 200:
    access_token = response.json()["access_token"]
    print("访问令牌:", access_token)
else:
    print(f"无法获取访问令牌。错误代码:{response.status_code},错误消息:{response.json()['message']}")

# 获取Socket令牌
socket_url = "https://streamlabs.com/api/v2.0/socket/token"

headers = {
    "Authorization": f"Bearer {access_token}",
    "Accept": "application/json"
}

response = requests.get(socket_url, headers=headers)

if response.status_code == 200:
    socket_token = response.json()["socket_token"]
    print("Socket令牌:", socket_token)

    # 测试Socket令牌
    test_url = "https://sockets.streamlabs.com?token=" + socket_token
    test_response = requests.get(test_url)

    if test_response.status_code == 200:
        print("Socket令牌有效。")
    else:
        print(f"Socket令牌无效。错误代码:{test_response.status_code},错误消息:{test_response.text}")
else:
    print(f"无法获取Socket令牌。错误代码:{response.status_code},错误消息:{response.json()['message']}")

希望这可以帮助你找到问题所在。如果有任何其他问题,请随时询问。

英文:

I'm trying to retrieve the latest donation information from Streamlabs API using Python, and I want to do this in real-time. To achieve this, I'm using an access token that I obtained using the authorization code flow, and I'm setting the scope to donation.read in my API request.

However, I keep getting unauthorized errors, and I'm not sure what I'm doing wrong. I checked my API key and made sure that the scope parameter is correct, but I'm still getting the same error.

Can you help me figure out what I'm doing wrong? Is there a specific way to retrieve real-time donation information from Streamlabs API?

Here are some details about my setup:

Programming language: Python
API request method: GET
API endpoint: https://streamlabs.com/api/v2.0/socket/token
Authorization method: Bearer Token
Scope parameter: donation.read

import requests
client_id = ""
client_secret = ""
redirect_uri = ""
scope = "donations.read"
state = "123456"
auth_url = f"https://streamlabs.com/api/v2.0/authorize?client_id={client_id}&redirect_uri={redirect_uri}&scope={scope}&response_type=code&state={state}"
# Get authorization code
print(f"Open this URL to get authorization code: {auth_url}")
auth_code = input("Enter authorization code: ")
# Exchange authorization code for access token
token_url = "https://streamlabs.com/api/v2.0/token"
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
data = {
"grant_type": "authorization_code",
"client_id": client_id,
"client_secret": client_secret,
"redirect_uri": redirect_uri,
"code": auth_code
}
response = requests.post(token_url, headers=headers, data=data)
if response.status_code == 200:
access_token = response.json()["access_token"]
print("Access token:", access_token)
else:
print(f"Access token could not be retrieved. Error code: {response.status_code}, Error message: {response.json()['message']}")
# Get socket token
socket_url = "https://streamlabs.com/api/v2.0/socket/token"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json"
}
response = requests.get(socket_url, headers=headers)
if response.status_code == 200:
socket_token = response.json()["socket_token"]
print("Socket token:", socket_token)
# Test socket token
test_url = "https://sockets.streamlabs.com?token=" + socket_token
test_response = requests.get(test_url)
if test_response.status_code == 200:
print("Socket token is valid.")
else:
print(f"Socket token is invalid. Error code: {test_response.status_code}, Error message: {test_response.text}")
else:
print(f"Socket token could not be retrieved. Error code: {response.status_code}, Error message: {response.json()['message']}")

答案1

得分: 1

你正在使用Streamlabs的WebSocket API获取实时捐赠信息,但在你的代码中,它是对WebSocket端点的GET请求,这与HTTP请求不同。

你应该安装websocket库 pip install websocket-client,然后修改你的代码以正确获取它:

import websocket
import json

def on_message(ws, message):
    data = json.loads(message)
    if data['type'] == 'donation':
        print("新的捐赠:", data)

def on_error(ws, error):
    print("错误:", error)

def on_close(ws):
    print("连接已关闭")

def on_open(ws):
    print("已连接到Streamlabs WebSocket")

ws = websocket.WebSocketApp("wss://sockets.streamlabs.com?token=" + socket_token,
                            on_message=on_message,
                            on_error=on_error,
                            on_close=on_close)
ws.on_open = on_open
ws.run_forever()
英文:

You are using Streamlabs' WebSocket API to get the real-time donation information but in your code it is a GET request to the WebSocket endpoint, those are not the same as HTTP request.

You should install the websocket library pip install websocket-client then change your code to retrieve it properly:

import websocket
import json
def on_message(ws, message):
data = json.loads(message)
if data['type'] == 'donation':
print("New donation:", data)
def on_error(ws, error):
print("Error:", error)
def on_close(ws):
print("Connection closed")
def on_open(ws):
print("Connected to Streamlabs WebSocket")
ws = websocket.WebSocketApp("wss://sockets.streamlabs.com?token=" + socket_token,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
ws.run_forever()

huangapple
  • 本文由 发表于 2023年4月13日 18:47:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/76004527.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定