将新记录在Azure SQL数据库中插入/更新/删除时,将更新推送到SignalR流。

huangapple go评论76阅读模式
英文:

Push updates into a SignalR stream when new records are inserted/updated/deleted in Azure SQL database

问题

我正在使用SignalR构建一个实时数据网格。我有一个SignalR Hub。

  • 客户端向服务器发送GetStocks消息。
  • 服务器用初始项列表响应。
  • 客户端订阅GetStockTickerStream流。此流仅用于更新。

现在我想用实际数据替换随机生成的股票行情数据,并在数据库中插入/更新/删除项目时将更新推送到SignalR的GetStockTickerStream

总结一下,有没有办法在Azure SQL数据库上设置触发器,以便在数据库中插入/更新/删除项目时触发Azure函数?

关于如何做到这一点以及一个小片段会很棒。

英文:

I'm working on a realtime data grid with SignalR. I have a SignalR Hub.

  • The client sends a GetStocks message to the server.
  • The server responds with the initial list of items.
  • The client subscribes to the GetStockTickerStream stream. This stream is used only for the updates.

将新记录在Azure SQL数据库中插入/更新/删除时,将更新推送到SignalR流。

Now I want to replace that randomly generated stock ticker data with an actual data, and when an item gets inserted/updated/deleted in the database, push that update to the SignalR GetStockTickerStream.

To recap, is there a way to put triggers on Azure SQL database, so that it triggers an Azure Function when an item gets inserted/updated/deleted in the database?

A good explanation on how to do that and a small snippet would be awesome.

public sealed class StockTickerHub : Hub
{
    private readonly IStockTickerService _stockTickerService;

    public StockTickerHub(IStockTickerService stockTickerService)
    {
        _stockTickerService = stockTickerService;
    }

    public ChannelReader<Stock> GetStockTickerStream(CancellationToken cancellationToken)
    {
        var channel = Channel.CreateUnbounded<Stock>();
        
        _ = _stockTickerService.SubscribeAsync(channel.Writer, cancellationToken);
        
        return channel.Reader;
    }

    public async Task GetStocks()
    {
        var stocks = await _stockTickerService.GetStocksAsync();
        await Clients.Caller.SendAsync("ReceiveStocks", stocks);
    }
}

public class Stock
{
    public required long Id { get; init; }

    public required string Symbol { get; init; }
    
    public required double Price { get; init; }
}

public interface IStockTickerService
{
    Task<IEnumerable<Stock>> GetStocksAsync();
    
    Task SubscribeAsync(ChannelWriter<Stock> writer, CancellationToken cancellationToken);
}

public sealed class StockTickerService : IStockTickerService
{
    private static readonly string[] Stocks =
    {
        "TESLA", "S&P 500", "DAX", "NASDAQ", "DOW"
    };

    public Task<IEnumerable<Stock>> GetStocksAsync()
    {
        return Task.FromResult(Enumerable.Range(1, 10).Select(index => new Stock
        {
            Id = index,
            Symbol = Stocks[Random.Shared.Next(Stocks.Length)],
            Price = Math.Round(Random.Shared.NextDouble() * 100, 2)
        }));
    }
    
    public async Task SubscribeAsync(ChannelWriter<Stock> writer, CancellationToken cancellationToken)
    {
        try
        {
            while (true)
            {
                var stock = new Stock
                {
                    Id = 1,
                    Symbol = "TESLA",
                    Price = Math.Round(Random.Shared.NextDouble() * 100, 2)
                };
                await writer.WriteAsync(stock, cancellationToken);

                await Task.Delay(1000, cancellationToken);
            }
        }
        finally
        {
            writer.Complete();
        }
    }
}

public static class ServiceCollectionExtensions
{
    public static IServiceCollection AddDataGrid(this IServiceCollection services)
    {
        ArgumentNullException.ThrowIfNull(services);

        services.AddSingleton<IStockTickerService, StockTickerService>();
        
        return services;
    }

    public static IEndpointRouteBuilder UseDataGrid(this IEndpointRouteBuilder app)
    {
        ArgumentNullException.ThrowIfNull(app);
        
        app.MapHub<StockTickerHub>("/stockticker");
        
        return app;
    }
}
import { useEffect } from "react";
import { useDispatch, useSelector } from "react-redux";
import { Table } from "antd";
import { HubConnectionState } from "redux-signalr";
import hubConnection from "../store/middlewares/signalr/signalrSlice";
import { Stock, addStock } from "../store/reducers/stockSlice";
import { RootState } from "../store";
import "./DataGrid.css";

const DataGrid = () => {
  const dispatch = useDispatch();
  const stocks = useSelector((state: RootState) => state.stock.stocks);

  useEffect(() => {
    if (hubConnection.state !== HubConnectionState.Connected) {
      hubConnection
        .start()
        .then(() => {
          console.log("Started connection via SignalR");

          hubConnection.send("GetStocks");

          hubConnection.stream("GetStockTickerStream").subscribe({
            next: async (item: Stock) => {
              console.log(item);
              dispatch(addStock(item)); // Dispatch addStock action to update Redux store
            },
            complete: () => {
              console.log("Completed");
            },
            error: (err) => {
              console.error(err);
            },
          });
        })
        .catch((err) => console.error(`Faulted: ${err.toString()}`));
    }

    // return () => {
    //   hubConnection
    //     .stop()
    //     .then(() => {})
    //     .catch((err) => console.error(err.toString()));
    // };
  }, [dispatch]);

  return (
    <Table dataSource={stocks} rowKey={(record) => record.id}>
      <Table.Column title="ID" dataIndex="id" key="id" />
      <Table.Column title="Symbol" dataIndex="symbol" key="symbol" />
      <Table.Column title="Price" dataIndex="price" key="price" />
    </Table>
  );
};

export default DataGrid;
import { createSlice, PayloadAction } from "@reduxjs/toolkit";

export type Stock = Readonly<{
  id: number;
  symbol: string;
  price: number;
}>;

export type StockState = Readonly<{
  stocks: Stock[];
}>;

const initialState: StockState = {
  stocks: [],
};

const stockSlice = createSlice({
  name: "stock",
  initialState: initialState,
  reducers: {
    getStocks: (state, action: PayloadAction<Stock[]>) => {
      state.stocks = [...state.stocks, ...action.payload];
    },
    addStock: (state, action: PayloadAction<Stock>) => {
      const stockIndex = state.stocks.findIndex(
        (stock) => stock.id === action.payload.id
      );
    
      if (stockIndex !== -1) {
        // If the stock already exists, update its price
        state.stocks[stockIndex].price = action.payload.price;
      } else {
        // If the stock doesn't exist, add it to the array
        state.stocks.push(action.payload);
      }
    },
  },
});

export const { getStocks, addStock } = stockSlice.actions;

export default stockSlice.reducer;

答案1

得分: 1

测试后,我发现 Azure SQL 数据库不支持 SqlDependency 的 OnChange 事件。如果想了解更多关于 SqlDependency 的信息,可以查看此线程

但我发现了 Azure SQL 触发器。Azure SQL 触发器绑定使用轮询循环来检查更改,在检测到更改时触发用户函数。

Azure Functions 的 Azure SQL 绑定概述(预览版)

英文:

After test, I found azure sql database not support SqlDependency OnChange event.
If you want learn more about SqlDependency, you can check this thread.

But I found Azure SQL trigger. The Azure SQL Trigger binding uses a polling loop to check for changes, triggering the user function when changes are detected.

Azure SQL bindings for Azure Functions overview (preview)

huangapple
  • 本文由 发表于 2023年3月7日 01:33:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/75654022.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定