英文:
How to use MYSQL select query with threading without duplicating the select query results?
问题
问题:如何同时处理多行?每当我使用一个线程来启动函数时,它为每个线程选择相同的值(即游标为每个线程返回相同的值)。我需要每个线程的不同值来进行处理,以便减少一些时间。
我的程序是:
import requests
import os
import json
import pymysql
import threading
conn = pymysql.connect(host='localhost', user=USER, passwd=PASSWORD, db='sampledb', charset='utf8mb4', autocommit=True)
url = "http://www.someapi.com/somelink/"
cur = conn.cursor()
def main():
cur.execute("select asset_id from getprocessid where status = %s LIMIT 1", ("uploaded",))
idofassets = cur.fetchone()[0]
req = requests.Session()
resp = req.get(url + str(idofassets))
resp_json = json.loads(resp.text)
actual = resp_json['getResponse']
cur.execute("update getprocessid set class = %s, status = %s where asset_id = %s", (str(actual), "completed", str(idofassets),))
while True:
# For threading purpose i added
thread1 = threading.Thread(target=main)
thread2 = threading.Thread(target=main)
thread3 = threading.Thread(target=main)
thread1.start()
thread2.start()
thread3.start()
thread1.join()
thread2.join()
thread3.join()
请注意,我没有翻译代码部分,只翻译了您提供的问题和程序注释。
英文:
Short context: I am using mysql table to select a value, by using an API+value i fetch a result and the result is saved into the same table.
Problem : How to process multiple rows simultaneously? whenever i use a thread to start the function , it selects the same value for each thread (i.e cursor returns same value for each thread). i need different value for each thread to process. so that i will reduce some time.
My program is
import requests
import os
import json
import pymysql
import threading
conn = pymysql.connect(host='localhost', user=USER, passwd=PASSWORD, db='sampledb',charset='utf8mb4',autocommit=True)
url = "http://www.someapi.com/somelink/"
cur = conn.cursor()
def main():
cur.execute("select asset_id from getprocessid where status =%s LIMIT 1",("uploaded",))
idofassets = cur.fetchone()[0]
req = requests.Session()
resp = req.get(url+str(idofassets))
resp_json = json.loads(resp.text)
actual = resp_json['getResponse']
cur.execute("update getprocessid set class = %s ,status =%s where asset_id = %s",(str(actual),"completed",str(idofasset),))
while True:
# For threading purpose i added
thread1 = threading.Thread(target=main)
thread2 = threading.Thread(target=main)
thread3 = threading.Thread(target=main)
thread1.start()
thread2.start()
thread3.start()
thread1.join()
thread2.join()
thread3.join()
答案1
得分: 0
你的问题似乎分为两个主要任务:
1 - 从 getprocessid
MySQL 表中获取结果
2 - 处理结果并更新同一表格(但不同字段)
因此,优化代码的一种方法是让一个线程(可以是主线程)执行步骤 1,然后将步骤 2 中的问题分配给三个线程中的其中一个:
import requests
import os
import json
import pymysql
import threading
# 如果需要更多(或更少)线程,可以动态创建这些
batches = [[], [], []]
conn = pymysql.connect(host='localhost', user=USER,
passwd=PASSWORD,
db='sampledb', charset='utf8mb4', autocommit=True)
url = "http://www.someapi.com/somelink/"
cur = conn.cursor()
def fetch_and_split():
cur.execute("select asset_id from getprocessid "
"where status =%s LIMIT 1", ("uploaded",))
results = cur.fetchall()
count = 0
# 这将使用资产 ID 填充要处理的列表
while count < len(results):
cur_batch = batches[count % len(batches)]
cur_batch.append(results[count][0])
count += 1
def process_and_update(batch):
# 每个线程接收自己的列表
for idofassets in batch:
req = requests.Session()
resp = req.get(url + str(idofassets))
resp_json = json.loads(resp.text)
actual = resp_json['getResponse']
cur.execute("update getprocessid set class = %s "
",status =%s where asset_id = %s",
(str(actual), "completed", str(idofasset),))
while True:
# 为了线程处理,我添加了以下部分
# 主线程分割结果
fetch_and_split()
# 其他线程处理结果并更新值
thread1 = threading.Thread(target=process_and_update, args=(batches[0],))
thread2 = threading.Thread(target=process_and_update, args=(batches[1],))
thread3 = threading.Thread(target=process_and_update, args=(batches[2],))
thread1.start()
thread2.start()
thread3.start()
thread1.join()
thread2.join()
thread3.join()
注意:在上述代码中,我修复了一些拼写错误,并将“size”更正为“len”以获取列表的长度。此外,我还更正了在更新数据库时使用的变量名称错误(应为“idofassets”而不是“idofasset”)。
英文:
Your problem seems to be divided into two main different tasks:
1 - Fetching the results from the getprocessid
MySQL table
2 - Processing the result and updating the same table (but different fields)
So one way to optimize your code is to have a thread (it could be the main thread) do step 1 and then divide the problem in step 2 among your 3 threads:
import requests
import os
import json
import pymysql
import threading
#you can create these dynamically if you
#want more (or less) threads
batches = [[], [], []]
conn = pymysql.connect(host='localhost', user=USER,
passwd=PASSWORD,
db='sampledb',charset='utf8mb4',autocommit=True)
url = "http://www.someapi.com/somelink/"
cur = conn.cursor()
def fetch_and_split():
cur.execute("select asset_id from getprocessid
where status =%s LIMIT 1",("uploaded",))
results = cur.fetchall()
count = 0
#this populates the lists to be processed with the ids
while count < size(results):
cur_batch = batches[size(batches) % count ]
cur_batch.append(results[count][0])
count++
def process_and_update(batch):
#each thread receives its own list
for idofassets in batch:
req = requests.Session()
resp = req.get(url+str(idofassets))
resp_json = json.loads(resp.text)
actual = resp_json['getResponse']
cur.execute("update getprocessid set class = %s
,status =%s where asset_id = %s",
(str(actual),"completed",str(idofasset),))
while True:
# For threading purpose i added
# The main thread splits the results
fetch_and_split()
# The other threads process the
# results and update the values
thread1 = threading.Thread(target=process_and_update, args=(batches[0],))
thread2 = threading.Thread(target=process_and_update, args=(batches[1],))
thread3 = threading.Thread(target=process_and_update, args=(batches[2],))
thread1.start()
thread2.start()
thread3.start()
thread1.join()
thread2.join()
thread3.join()
答案2
得分: 0
每个线程必须有自己的编号,存储在 my_number
变量中,该编号在所有线程中是唯一的。
在结构体中添加一个 thread INT DEFAULT NULL
字段。
线程尝试通过以下方式来预留一个未被预留的记录:
cur.execute("UPDATE getprocessid SET thread = %s WHERE thread IS NULL AND status=%s LIMIT 1",(my_number,"uploaded",))
然后线程处理此已预留的记录:
cur.execute("select asset_id from getprocessid where thread=%s",(my_number,))
row = cur.fetchone()
if row is not None:
处理记录
如果预订成功,那么已预订的记录将被处理。如果另一个线程已经覆盖了预订值,那么不会返回任何记录,并且会被 IF
检测到 - 处理代码将被跳过,线程会尝试预订另一条记录。
英文:
One of the simplest ways (the syntax is approximate).
Each thread must have its own number in my_number
variable, which is unique over all threads.
Add thread INT DEFAULT NULL
field into the structure.
The thread tries to reserve one non-reserved record by
cur.execute("UPDATE getprocessid SET thread = %s WHERE thread IS NULL AND status=%s LIMIT 1",(my_number,"uploaded",))
Then thread processes this reserved record:
cur.execute("select asset_id from getprocessid where thread=%s",(my_number,))
row = cur.fetchone()
if row is not None:
process the record
If reservation was successful then the reserved record is processed. If another thread had overwrited the reservation value then none record returned, and it will be detected by IF
- the processing code is skipped, and the thread tries to reserve another record.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论