如何在不重复选择查询结果的情况下使用MYSQL选择查询与线程?

huangapple go评论92阅读模式
英文:

How to use MYSQL select query with threading without duplicating the select query results?

问题

问题:如何同时处理多行?每当我使用一个线程来启动函数时,它为每个线程选择相同的值(即游标为每个线程返回相同的值)。我需要每个线程的不同值来进行处理,以便减少一些时间。

我的程序是:

import requests
import os
import json
import pymysql
import threading

conn = pymysql.connect(host='localhost', user=USER, passwd=PASSWORD, db='sampledb', charset='utf8mb4', autocommit=True)

url = "http://www.someapi.com/somelink/"

cur = conn.cursor()

def main():
    cur.execute("select asset_id from getprocessid where status = %s LIMIT 1", ("uploaded",))
    idofassets = cur.fetchone()[0]
    req = requests.Session()
    resp = req.get(url + str(idofassets))
    resp_json = json.loads(resp.text)
    actual = resp_json['getResponse']
    cur.execute("update getprocessid set class = %s, status = %s where asset_id = %s", (str(actual), "completed", str(idofassets),))

while True:

    # For threading purpose i added

    thread1 = threading.Thread(target=main)
    thread2 = threading.Thread(target=main)
    thread3 = threading.Thread(target=main)

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join()
    thread2.join()
    thread3.join()

请注意,我没有翻译代码部分,只翻译了您提供的问题和程序注释。

英文:

Short context: I am using mysql table to select a value, by using an API+value i fetch a result and the result is saved into the same table.

Problem : How to process multiple rows simultaneously? whenever i use a thread to start the function , it selects the same value for each thread (i.e cursor returns same value for each thread). i need different value for each thread to process. so that i will reduce some time.

My program is

import requests
import os
import json
import pymysql
import threading

conn = pymysql.connect(host='localhost', user=USER, passwd=PASSWORD, db='sampledb',charset='utf8mb4',autocommit=True)

url = "http://www.someapi.com/somelink/"

cur = conn.cursor()

def main():
    cur.execute("select asset_id from getprocessid where status =%s LIMIT 1",("uploaded",))
    idofassets = cur.fetchone()[0]
    req = requests.Session()
    resp = req.get(url+str(idofassets))
    resp_json = json.loads(resp.text)
    actual = resp_json['getResponse']
    cur.execute("update getprocessid set class = %s ,status =%s where asset_id = %s",(str(actual),"completed",str(idofasset),))

while True:

    # For threading purpose i added

    thread1 = threading.Thread(target=main)
    thread2 = threading.Thread(target=main)
    thread3 = threading.Thread(target=main)

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join()
    thread2.join()
    thread3.join()

答案1

得分: 0

你的问题似乎分为两个主要任务:

1 - 从 getprocessid MySQL 表中获取结果

2 - 处理结果并更新同一表格(但不同字段)

因此,优化代码的一种方法是让一个线程(可以是主线程)执行步骤 1,然后将步骤 2 中的问题分配给三个线程中的其中一个:

import requests
import os
import json
import pymysql
import threading

# 如果需要更多(或更少)线程,可以动态创建这些
batches = [[], [], []]

conn = pymysql.connect(host='localhost', user=USER,
                       passwd=PASSWORD,
                       db='sampledb', charset='utf8mb4', autocommit=True)

url = "http://www.someapi.com/somelink/"

cur = conn.cursor()

def fetch_and_split():
    cur.execute("select asset_id from getprocessid "
                "where status =%s LIMIT 1", ("uploaded",))
    results = cur.fetchall()
    count = 0
    # 这将使用资产 ID 填充要处理的列表
    while count < len(results):
        cur_batch = batches[count % len(batches)]
        cur_batch.append(results[count][0])
        count += 1

def process_and_update(batch):
    # 每个线程接收自己的列表
    for idofassets in batch:
        req = requests.Session()
        resp = req.get(url + str(idofassets))
        resp_json = json.loads(resp.text)
        actual = resp_json['getResponse']
        cur.execute("update getprocessid set class = %s "
                    ",status =%s where asset_id = %s",
                    (str(actual), "completed", str(idofasset),))


while True:

    # 为了线程处理,我添加了以下部分
    # 主线程分割结果
    fetch_and_split()
    # 其他线程处理结果并更新值
    thread1 = threading.Thread(target=process_and_update, args=(batches[0],))
    thread2 = threading.Thread(target=process_and_update, args=(batches[1],))
    thread3 = threading.Thread(target=process_and_update, args=(batches[2],))

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join()
    thread2.join()
    thread3.join()

注意:在上述代码中,我修复了一些拼写错误,并将“size”更正为“len”以获取列表的长度。此外,我还更正了在更新数据库时使用的变量名称错误(应为“idofassets”而不是“idofasset”)。

英文:

Your problem seems to be divided into two main different tasks:

1 - Fetching the results from the getprocessid MySQL table

2 - Processing the result and updating the same table (but different fields)

So one way to optimize your code is to have a thread (it could be the main thread) do step 1 and then divide the problem in step 2 among your 3 threads:

import requests
import os
import json
import pymysql
import threading
#you can create these dynamically if you 
#want more (or less) threads
batches = [[], [], []]

conn = pymysql.connect(host=&#39;localhost&#39;, user=USER, 
  passwd=PASSWORD, 
db=&#39;sampledb&#39;,charset=&#39;utf8mb4&#39;,autocommit=True)

url = &quot;http://www.someapi.com/somelink/&quot;

cur = conn.cursor()

def fetch_and_split():
    cur.execute(&quot;select asset_id from getprocessid 
      where status =%s LIMIT 1&quot;,(&quot;uploaded&quot;,))
    results = cur.fetchall()
    count = 0
    #this populates the lists to be processed with the ids
    while count &lt; size(results):
        cur_batch = batches[size(batches) % count ]
        cur_batch.append(results[count][0])
        count++
    
def process_and_update(batch):
    #each thread receives its own list
    for idofassets in batch:
        req = requests.Session()
        resp = req.get(url+str(idofassets))
        resp_json = json.loads(resp.text)
        actual = resp_json[&#39;getResponse&#39;]
        cur.execute(&quot;update getprocessid set class = %s 
          ,status =%s where asset_id = %s&quot;, 
          (str(actual),&quot;completed&quot;,str(idofasset),))


while True:

    # For threading purpose i added
    # The main thread splits the results
    fetch_and_split()    
    # The other threads process the 
    # results and update the values
    thread1 = threading.Thread(target=process_and_update, args=(batches[0],))
    thread2 = threading.Thread(target=process_and_update, args=(batches[1],))
    thread3 = threading.Thread(target=process_and_update, args=(batches[2],))

    thread1.start()
    thread2.start()
    thread3.start()

    thread1.join()
    thread2.join()
    thread3.join()

答案2

得分: 0

每个线程必须有自己的编号,存储在 my_number 变量中,该编号在所有线程中是唯一的。

在结构体中添加一个 thread INT DEFAULT NULL 字段。

线程尝试通过以下方式来预留一个未被预留的记录:

cur.execute("UPDATE getprocessid SET thread = %s WHERE thread IS NULL AND status=%s LIMIT 1",(my_number,"uploaded",))

然后线程处理此已预留的记录:

cur.execute("select asset_id from getprocessid where thread=%s",(my_number,))
row = cur.fetchone()
if row is not None:
    处理记录

如果预订成功,那么已预订的记录将被处理。如果另一个线程已经覆盖了预订值,那么不会返回任何记录,并且会被 IF 检测到 - 处理代码将被跳过,线程会尝试预订另一条记录。

英文:

One of the simplest ways (the syntax is approximate).

Each thread must have its own number in my_number variable, which is unique over all threads.

Add thread INT DEFAULT NULL field into the structure.

The thread tries to reserve one non-reserved record by

cur.execute(&quot;UPDATE getprocessid SET thread = %s WHERE thread IS NULL AND status=%s LIMIT 1&quot;,(my_number,&quot;uploaded&quot;,))

Then thread processes this reserved record:

cur.execute(&quot;select asset_id from getprocessid where thread=%s&quot;,(my_number,))
row = cur.fetchone()
if row is not None:
    process the record

If reservation was successful then the reserved record is processed. If another thread had overwrited the reservation value then none record returned, and it will be detected by IF - the processing code is skipped, and the thread tries to reserve another record.

huangapple
  • 本文由 发表于 2020年1月6日 21:05:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/59612671.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定