需要帮助实现一个从SQL数据库中选择作业的调度程序。

huangapple go评论84阅读模式
英文:

Need help implementing a scheduler picking jobs from a SQL database

问题

我有一个SQL表,其中填充了第一天的待处理订单。在第二天的特定时间,我必须从数据库中获取“PENDING”订单,并通过调用外部API处理每个订单。这是我的代码流程:

SELECT * FROM orders where status = 'PENDING' LIMIT 200

现在,我将为每个订单调用外部API,该API将为每个订单返回成功或失败,并且我将在数据库中更新订单状态。

UPDATE orders SET status = 'COMPLETED' WHERE id = ANY(<success list>)

UPDATE orders SET status = 'FAILED' WHERE id = ANY(<failure list>)

上述流程将继续运行多次,直到选择查询返回0行。我在查询中设置了LIMIT,以避免内存问题和外部API的吞吐能力问题。

现在,上述流程存在几个问题:

  1. 假设我的代码执行了SELECT查询并开始处理订单。如果我的服务在中途崩溃了怎么办?会有一些订单已经通过API进行处理,并且已经收到了通过或失败的响应。但是我错过了在数据库中更新它们的状态,因此当我的服务重新启动时,它将再次选择这些订单并再次处理它们,这是我不想要的。
  2. 我的服务可以从多个实例运行,因此相同的状态为“PENDING”的订单可能会被多个实例选择,导致同一订单被重复处理。如何避免这种情况?

如果有帮助的话,我的技术栈是Go和PostgreSQL。我确信上述问题是一些常见问题,肯定有一些标准的解决方法。我可以更改任何部分,无论是Go代码还是数据库更改,包括锁定或事务。我只想知道解决方案的方向。任何帮助将不胜感激。

英文:

I have a SQL table that is filled with pending orders on day one. On the next day, at a specific time, I have to fetch the 'PENDING' orders from DB and process each of them by calling an external API. This is what my code flow looks like:

SELECT * FROM orders where status = &#39;PENDING&#39; LIMIT 200

Now I will call the external API for each of those orders which will either return success or failure for each order and then I'll update the order status in DB.

UPDATE orders SET status = &#39;COMPLETED&#39; WHERE id = ANY(&lt;success list&gt;)

UPDATE orders SET status = &#39;FAILED&#39; WHERE id = ANY(&lt;failure list&gt;)

The above flow will continue to run multiple times until the select query returns 0 rows. I've put a LIMIT in the query in order to avoid memory issues and the external API's throughput capability.

Now there are a couple of issues with the above flow:

  1. Let's say my code executed the SELECT query and started processing the orders. What if my service gets crashed in between? There will be some orders which would have gone through the API and would've received passed or failed responses. But I missed updating their status in DB, therefore when my service will start again, it will again pick those orders and process them again which I don't want.
  2. My service can be running from multiple instances, therefore same orders whose status = 'PENDING' can be picked by various instances leading to double processing of the same order. How to avoid this?

If it helps, my tech stack is Go and PostgreSQL. I am sure the above are some common issues and there must be some standard ways to approach them. I am open to changing any part whether it's the Go code or DB change which may include locks or transactions. I just want which direction to look for the solution. Any help would be appreciated.

答案1

得分: 2

-- 替换为以下代码
WITH cte AS (
    -- 这只是因为PostgreSQL不支持在UPDATE查询中使用LIMIT子句
    SELECT * FROM orders WHERE status = 'PENDING' LIMIT 200
)

-- 使用UPDATE-RETURNING先更改状态,然后检索行
UPDATE orders AS o SET status='PROCESSING'

-- 你可以使用这个列,以及状态列,以后确定是否有一些订单因为崩溃而被留在了中间状态
, updated_at=now()

-- 使用CTE结果将更新限制为上面WITH查询中选择的200行
FROM cte WHERE o.id=cte.id

-- 确保此工作进程忽略其他工作进程在此期间更新的行
--
-- 这是因为工作进程在释放锁之后会重新检查WHERE条件,
-- (请参阅下面的链接答案)
AND o.status='PENDING'

-- 返回匹配和更新的行集/子集
RETURNING o.*;

链接:https://stackoverflow.com/a/11769059/965900

英文:

Instead of

<s>

SELECT * FROM orders WHERE status = &#39;PENDING&#39; LIMIT 200

</s>

do

WITH cte AS (
    -- this is needed only because PostgreSQL doesn&#39;t
    -- support the LIMIT clause in UPDATE queries
    SELECT * FROM orders WHERE status = &#39;PENDING&#39; LIMIT 200
)

-- use UPDATE-RETURNING to first change
-- the status and then retrieve the rows
UPDATE orders AS o SET status=&#39;PROCESSING&#39;

-- you can use this column, together with the status column,
-- to later determine if some orders were left in limbo because
-- of a crash *after* they were sent off to the processor
, updated_at=now()

-- use the CTE result to limit the update to
-- the 200 rows selected in the WITH query above
FROM cte WHERE o.id=cte.id

-- make sure that this worker ignores rows that
-- were, in the meantime, updated by another worker
--
-- this works because the workers will re-check
-- the WHERE condition after the lock is released,
-- (see the linked answer below)
AND o.status=&#39;PENDING&#39;

-- return the matched &amp; updated set/subset of rows
RETURNING o.*;

https://stackoverflow.com/a/11769059/965900

答案2

得分: 1

由于您正在调用一个可能出现故障或超时等问题的外部API,您可能希望采用更逐行的方法(也许不太时尚,但在某些情况下仍然有用)。假设您想要使用Postgres本身而不是一些外部编程,以下代码可能适合您:

创建一个PL/pgSQL块来处理待处理的订单,使用游标:

DO LANGUAGE plpgsql $$
DECLARE
    c CURSOR FOR SELECT id FROM orders WHERE status = 'PENDING' LIMIT 200;
    order_id INTEGER;
BEGIN
    OPEN c;
    LOOP
        FETCH c INTO order_id;
        EXIT WHEN NOT FOUND;
        PERFORM process_order(order_id);
    END LOOP;
    CLOSE c;
END;
$$;

具体的处理逻辑在被调用的函数内部。这个示例在每一行之间都有一个显式的提交(即每一行都提交一次),这样可以允许一些行成功而其他行失败,并且可以在以后重新处理它们。根据您的需求以及API调用的速度和稳定性,您还可以包含一个“进行中”状态的逻辑。

CREATE OR REPLACE FUNCTION process_order(order_id INTEGER)
RETURNS VOID AS $$
DECLARE
    api_result BOOLEAN;
BEGIN
    -- 开启一个新的事务
    BEGIN
        -- 这里只是一个占位符,不知道您如何调用该API
        api_result := call_external_api(order_id);

        IF api_result THEN
            UPDATE orders SET status = 'COMPLETED' WHERE id = order_id;
        ELSE
            UPDATE orders SET status = 'FAILED' WHERE id = order_id;
        END IF;
        
        -- 提交事务
        COMMIT;
    EXCEPTION
        WHEN OTHERS THEN
            -- 出现错误时回滚事务
            ROLLBACK;
    END;
END;
$$ LANGUAGE plpgsql;

或者,如果不需要这种程度的谨慎,您可以将提交操作移到游标周围。请注意,逐行提交会增加处理时间(具体增加多少我无法告诉)。它还可能对回滚日志大小等事项产生影响,因此可能会影响您的选择。

英文:

As you are calling an external API, which might be broken or timeout etc. you might want a more row-by-row approach (perhaps not fashionable but useful in some situations still). Assuming you want to use Postgres itself and not some external programming this might work for you:

Create a PL/pgSQL block to process the pending orders using a cursor:

DO LANGUAGE plpgsql $$
DECLARE
    c CURSOR FOR SELECT id FROM orders WHERE status = &#39;PENDING&#39; LIMIT 200;
    order_id INTEGER;
BEGIN
    OPEN c;
    LOOP
        FETCH c INTO order_id;
        EXIT WHEN NOT FOUND;
        PERFORM process_order(order_id);
    END LOOP;
    CLOSE c;
END;
$$;

The heavy lifting is inside the called function. This example places an explicit commit here (i.e. per row) this may should allow the possibility that some rows will work whilst others don't and they can be picked-up/re-processed later. You could also include logic for an "in progress" status depending on your needs and how slow/flaky that API call actually is.

CREATE OR REPLACE FUNCTION process_order(order_id INTEGER)
RETURNS VOID AS $$
DECLARE
    api_result BOOLEAN;
BEGIN
    -- Start a new transaction
    BEGIN
        -- just a placeholder, how you call that api isn&#39;t known
        api_result := call_external_api(order_id);

        IF api_result THEN
            UPDATE orders SET status = &#39;COMPLETED&#39; WHERE id = order_id;
        ELSE
            UPDATE orders SET status = &#39;FAILED&#39; WHERE id = order_id;
        END IF;
        
        -- Commit the transaction
        COMMIT;
    EXCEPTION
        WHEN OTHERS THEN
            -- Rollback the transaction in case of an error
            ROLLBACK;
    END;
END;
$$ LANGUAGE plpgsql;    

Alternatively, you could move the commit around the cursor instead if this level of paranoia isn't needed. Note row levels commits will add time to the process (how much I cannot tell). It might also have an effect on things like rollback log size so that may affect your choice.

答案3

得分: 0

首先,使用Pgx库来访问PostgreSQL。然后,你需要使用事务和行锁定。为了提高性能,你可以使用goroutine来进行并发的选择和更新操作。

下面是一个示例代码,其中不包括goroutine:

import (
	"context"
	"fmt"

	"github.com/georgysavva/scany/pgxscan"
	"github.com/jackc/pgx/v4/pgxpool"
)

func main() {
	pool, err := pgxpool.Connect(context.Background(), "postgres://kaushik:abcd@localhost:5432/users")
	if err != nil {
		panic(err)
	}

	defer func() {
		pool.Close()
	}()

	ctx := context.Background()

	tx, err := pool.Begin(ctx)
	if err != nil {
		panic(err)
	}

	defer func() {
		if err != nil {
			err = tx.Rollback(ctx)
			fmt.Printf("回滚事务时出错:%v\n", err)
			return
		}
		err = tx.Commit(ctx)
		if err != nil {
			fmt.Printf("提交事务时出错:%v\n", err)
			return
		}
	}()

	_, err = tx.Exec(ctx, "LOCK TABLE employee IN ROW EXCLUSIVE MODE")
	if err != nil {
		panic(err)
	}

	var e Employee
	err = pgxscan.Get(ctx, tx, &e, "SELECT * FROM employee WHERE id = $1 FOR UPDATE", 1)
	if err != nil {
		panic(err)
	}

	if e.Status == "active" {
		fmt.Println("员工处于活动状态,正在停用...")
		e.Status = "inactive"

		_, err = tx.Exec(ctx, "UPDATE employee SET status = $1 WHERE id = $2", e.Status, e.ID)
		if err != nil {
			panic(err)
		}
	}

}

type Employee struct {
	ID        int       `db:"id"`
	Name      string    `db:"name"`
	CreatedAt time.Time `db:"created_at"`
	UpdatedAt time.Time `db:"updated_at"`
	Status    string    `db:"status"`
}
英文:

First of all use the Pgx library for accessing postgresql. Then you need to use transactions and row locking. For performance you can use goroutine to do concurrent selection and updation.

Below given is a sample code for the same. Goroutines are not included in the code

    import (
&quot;context&quot;
&quot;fmt&quot;
&quot;github.com/georgysavva/scany/pgxscan&quot;
&quot;github.com/jackc/pgx/v4/pgxpool&quot;
)
func main() {
pool, err := pgxpool.Connect(context.Background(), &quot;postgres://kaushik:abcd@localhost:5432/users&quot;)
if err != nil {
panic(err)
}
defer func() {
pool.Close()
}()
ctx := context.Background()
tx, err := pool.Begin(ctx)
if err != nil {
panic(err)
}
defer func() {
if err != nil {
err = tx.Rollback(ctx)
fmt.Printf(&quot;Error while rolling back transaction: %v\n&quot;, err)
return
}
err = tx.Commit(ctx)
if err != nil {
fmt.Printf(&quot;Error while committing transaction: %v\n&quot;, err)
return
}
}()
_, err = tx.Exec(ctx, &quot;LOCK TABLE employee IN ROW EXCLUSIVE MODE&quot;)
if err != nil {
panic(err)
}
var e Employee
err = pgxscan.Get(ctx, tx, &amp;e, &quot;SELECT * FROM employee WHERE id = $1 FOR UPDATE&quot;, 1)
if err != nil {
panic(err)
}
if e.Status == &quot;active&quot; {
fmt.Println(&quot;Employee is active, deactivating...&quot;)
e.Status = &quot;inactive&quot;
_, err = tx.Exec(ctx, &quot;UPDATE employee SET status = $1 WHERE id = $2&quot;, e.Status, e.ID)
if err != nil {
panic(err)
}
}
}
type Employee struct {
ID        int       `db:&quot;id&quot;`
Name      string    `db:&quot;name&quot;`
CreatedAt time.Time `db:&quot;created_at&quot;`
UpdatedAt time.Time `db:&quot;updated_at&quot;`
Status    string    `db:&quot;status&quot;`
}

huangapple
  • 本文由 发表于 2023年5月6日 13:52:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/76187353.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定