使用LSH算法和Minhash在类似Omegle的应用程序中实现等待机制。

huangapple go评论69阅读模式
英文:

Implementing a waiting mechanism in an application similar to Omegle using LSH algorithm and Minhash

问题

我正在开发一个类似于Omegle的应用程序,用户根据共同兴趣与陌生人匹配。为了实现这一目标,我正在将LSH(局部敏感哈希)算法与Minhash技术结合使用。然而,在实现用户调用API后不能立即找到匹配对的等待机制时,我遇到了困难。

目前,我正在使用sleep函数来引入等待时间,然后返回“失败”的状态。然而,似乎sleep函数会阻塞其他API调用,并导致其他用户的延迟。我想知道像Omegle这样的网站如何处理这种情况,以及如何正确实施有效的等待机制的正确步骤。

以下是代码片段:

  1. from fastapi import FastAPI, Body
  2. from typing import Annotated
  3. from pydantic import BaseModel
  4. from sonyflake import SonyFlake
  5. import redis
  6. import time
  7. from datasketch import MinHash, MinHashLSH
  8. app = FastAPI()
  9. sf = SonyFlake()
  10. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  11. lsh = MinHashLSH(num_perm=128, threshold=0.5, storage_config={
  12. 'type': 'redis',
  13. 'redis': {'host': '127.0.0.1', 'port': 6379}
  14. }, prepickle=True)
  15. class Partner(BaseModel):
  16. client_id: int
  17. partner_id: str
  18. status: str = 'Failed';
  19. @app.post("/start", response_model=Partner)
  20. async def start(interests: Annotated[list[str] | None, Body()] = None) -> Partner:
  21. client_id=sf.next_id()
  22. partner_id = ''
  23. minhash = MinHash()
  24. if not interests:
  25. return Partner(client_id = client_id, partner_id = partner_id)
  26. client_hash = f"user:{client_id}:interests:hash"
  27. minhash.update_batch([*(map(lambda item: item.encode('utf-8'), interests))])
  28. lsh.insert(client_hash, minhash)
  29. matches = lsh.query(minhash)
  30. matches.remove(client_hash)
  31. if not matches:
  32. time.sleep(5)
  33. matches = lsh.query(minhash)
  34. matches.remove(client_hash)
  35. if not matches:
  36. lsh.remove(client_hash)
  37. return Partner(client_id = client_id, partner_id = partner_id)
  38. lsh.remove(client_hash)
  39. lsh.remove(matches[0])
  40. return Partner(client_id = client_id, partner_id = matches[0], status="Success")

我将不断尝试匹配,如果没有匹配,将会休眠5秒钟,然后再次尝试匹配。如果仍然没有匹配,返回"Failed"状态。

请分享有关在这种情况下如何有效实施等待机制的任何见解或建议,确保不会对应用程序的性能和响应性产生负面影响。在维护API对其他用户的响应性的同时,是否有推荐的方法或最佳实践来实现这一功能?

  • 请提供有关在这种情况下实施有效等待机制的见解或最佳实践。
  • 任何关于优化代码或提高响应性的建议将不胜感激。
  • 请提供更多阅读材料或链接。

谢谢。

英文:

I'm developing an application similar to Omegle, where users are matched with strangers based on their common interests. To achieve this, I'm combining the LSH (Locality Sensitive Hashing) algorithm with the Minhash technique. However, I'm facing difficulties in implementing a waiting mechanism for users who don't immediately find a matching pair when they call the API.

Currently, I'm using the sleep function to introduce a waiting period before returning the status "Failed". However, it seems that the sleep function is blocking other API calls and causing delays for other users. I'm curious to know how websites like Omegle handle this scenario and what would be the correct procedure to implement an efficient waiting mechanism.

Here's code snippet:

  1. from fastapi import FastAPI, Body
  2. from typing import Annotated
  3. from pydantic import BaseModel
  4. from sonyflake import SonyFlake
  5. import redis
  6. import time
  7. from datasketch import MinHash, MinHashLSH
  8. app = FastAPI()
  9. sf = SonyFlake()
  10. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  11. lsh = MinHashLSH(num_perm=128, threshold=0.5, storage_config={
  12. 'type': 'redis',
  13. 'redis': {'host': '127.0.0.1', 'port': 6379}
  14. }, prepickle=True)
  15. class Partner(BaseModel):
  16. client_id: int
  17. partner_id: str
  18. status: str = 'Failed'
  19. @app.post("/start", response_model=Partner)
  20. async def start(interests: Annotated[list[str] | None, Body()] = None) -> Partner:
  21. client_id=sf.next_id()
  22. partner_id = ''
  23. minhash = MinHash()
  24. if not interests:
  25. return Partner(client_id = client_id, partner_id = partner_id)
  26. client_hash = f"user:{client_id}:interests:hash"
  27. minhash.update_batch([*(map(lambda item: item.encode('utf-8'), interests))])
  28. lsh.insert(client_hash, minhash)
  29. matches = lsh.query(minhash)
  30. matches.remove(client_hash)
  31. if not matches:
  32. time.sleep(5)
  33. matches = lsh.query(minhash)
  34. matches.remove(client_hash)
  35. if not matches:
  36. lsh.remove(client_hash)
  37. return Partner(client_id = client_id, partner_id = partner_id)
  38. lsh.remove(client_hash)
  39. lsh.remove(matches[0])
  40. return Partner(client_id = client_id, partner_id = matches[0], status="Success")

I would appreciate any insights or suggestions on how to properly implement the waiting mechanism, ensuring that it doesn't negatively impact the performance and responsiveness of the application. Is there a recommended approach or best practice to achieve this functionality while maintaining the responsiveness of the API for other users?

  • Please share any insights or best practices on implementing an efficient waiting mechanism in this scenario.
  • Any suggestions on optimizing the code or improving its responsiveness would be greatly appreciated.
  • Please provide resources or links to read more about it.

Thank you.

答案1

得分: 1

根据我所了解的情况,似乎您正在实现一个同步等待机制,这会阻塞整个进程。您应该考虑使用一种轮询或异步的方式。这可以通过WebSocket或HTTP长轮询来实现。我认为WebSocket 在许多方面更好,主要是双向通信和保持连接的原因。我已经为您尝试实现了这一点:

  1. from fastapi import FastAPI, WebSocket, Body, Depends
  2. from typing import Annotated
  3. from pydantic import BaseModel
  4. from sonyflake import SonyFlake
  5. import redis
  6. from datasketch import MinHash, MinHashLSH
  7. import asyncio
  8. app = FastAPI()
  9. sf = SonyFlake()
  10. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  11. lsh = MinHashLSH(num_perm=128, threshold=0.5, storage_config={
  12. 'type': 'redis',
  13. 'redis': {'host': '127.0.0.1', 'port': 6379}
  14. }, prepickle=True)
  15. class Partner(BaseModel):
  16. client_id: int
  17. partner_id: str
  18. status: str = 'Failed'
  19. @app.post("/start", response_model=Partner)
  20. async def start(interests: Annotated[list[str] | None, Body()] = None) -> Partner:
  21. client_id = sf.next_id()
  22. partner_id = ''
  23. minhash = MinHash()
  24. if not interests:
  25. return Partner(client_id=client_id, partner_id=partner_id)
  26. client_hash = f"user:{client_id}:interests:hash"
  27. minhash.update_batch([*(map(lambda item: item.encode('utf-8'), interests))])
  28. lsh.insert(client_hash, minhash)
  29. return Partner(client_id=client_id, partner_id=partner_id)
  30. @app.websocket("/ws/{client_id}")
  31. async def websocket_endpoint(websocket: WebSocket, client_id: int):
  32. await websocket.accept()
  33. while True:
  34. client_hash = f"user:{client_id}:interests:hash"
  35. minhash = lsh.get(client_hash)
  36. if minhash is None:
  37. await websocket.send_json({"status": "Error", "message": "Client ID not found"})
  38. return
  39. matches = lsh.query(minhash)
  40. matches.remove(client_hash)
  41. if not matches:
  42. await websocket.send_json({"status": "Waiting"})
  43. else:
  44. lsh.remove(client_hash)
  45. lsh.remove(matches[0])
  46. await websocket.send_json({"status": "Success", "client_id": client_id, "partner_id": matches[0]})
  47. return
  48. await asyncio.sleep(5)

希望这有助于您的项目!

英文:

From what I can tell, it seems like you're implementing a synchronous waiting mechanism, which is blocking the entire process. You should look into using a form of polling or asynch. This can be done through a WebSocket or through HTTP long-polling. I think WebSockets are better for a few reasons, but mostly bi-diretional comms and keep-alive conns. I have tried to implement for that for you below:

  1. from fastapi import FastAPI, WebSocket, Body, Depends
  2. from typing import Annotated
  3. from pydantic import BaseModel
  4. from sonyflake import SonyFlake
  5. import redis
  6. from datasketch import MinHash, MinHashLSH
  7. app = FastAPI()
  8. sf = SonyFlake()
  9. r = redis.Redis(host='localhost', port=6379, decode_responses=True)
  10. lsh = MinHashLSH(num_perm=128, threshold=0.5, storage_config={
  11. 'type': 'redis',
  12. 'redis': {'host': '127.0.0.1', 'port': 6379}
  13. }, prepickle=True)
  14. class Partner(BaseModel):
  15. client_id: int
  16. partner_id: str
  17. status: str = 'Failed'
  18. @app.post("/start", response_model=Partner)
  19. async def start(interests: Annotated[list[str] | None, Body()] = None) -> Partner:
  20. client_id=sf.next_id()
  21. partner_id = ''
  22. minhash = MinHash()
  23. if not interests:
  24. return Partner(client_id = client_id, partner_id = partner_id)
  25. client_hash = f"user:{client_id}:interests:hash"
  26. minhash.update_batch([*(map(lambda item: item.encode('utf-8'), interests))])
  27. lsh.insert(client_hash, minhash)
  28. return Partner(client_id = client_id, partner_id = partner_id)
  29. @app.websocket("/ws/{client_id}")
  30. async def websocket_endpoint(websocket: WebSocket, client_id: int):
  31. await websocket.accept()
  32. while True:
  33. client_hash = f"user:{client_id}:interests:hash"
  34. minhash = lsh.get(client_hash)
  35. if minhash is None:
  36. await websocket.send_json({"status": "Error", "message": "Client ID not found"})
  37. return
  38. matches = lsh.query(minhash)
  39. matches.remove(client_hash)
  40. if not matches:
  41. await websocket.send_json({"status": "Waiting"})
  42. else:
  43. lsh.remove(client_hash)
  44. lsh.remove(matches[0])
  45. await websocket.send_json({"status": "Success", "client_id": client_id, "partner_id": matches[0]})
  46. return
  47. await asyncio.sleep(5)

huangapple
  • 本文由 发表于 2023年5月24日 18:21:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/76322466.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定