python3.x asyncio study

How to convert sync codes to be async one

  • Method1:

    • create a future object
    • define a callback function which will continue check future result is ready or not, future.set_result() will be invoked when future result is ready, else schedule the callback to be called again in the even loop
    • in main function, start the callback function the first time, so it will continue executing until the future result is set
    • use await keyword to evaluate the future object until it is ready

      event_loop = asyncio.get_event_loop()
      
      def retreive_from_db(future, id):
          # it should have a async result or other flag which indicate we get the result from db or not
          operation_flag = get_from_db(id)
          # if the flag is ready, then we assume we already get real result, set the future to the real result
          if operation_flag.ready():
              future.set_result(real_result)
          # else we scheduler to check operation done flag later
          else:
             event_loop.call_soon(retreive_from_db, future, id)
      
      def main():
          # create a future object and pass to callback function
          db_result = asyncio.Future()
          # scheduler the callback in event loop the first time
          event_loop.call_soon(retreive_from_db, db_result, id)
          # wait untile the db_result get the real result
          final_result = await db_result

    Below is a rewrite of Tornado implementation of celery result retrieving in Sanic:

        from celery import Celery
        celeryapp = Celery()
        celeryapp.config_from_object('celeryconfig')
    
        app = Sanic(__name__)
    
        def retrieve_task_result(event_loop, task_result, async_result):
            if async_result.ready():
                task_result.set_result(async_result.get())
            else:
                event_loop.call_soon(retrieve_task_result, event_loop, task_result, async_result)
    
        async def send_task(request_body):
            return celeryapp.send_task('tasks.test', (request_body,))
    
        @app.route('/', methods=['POST'])
        async def main(request):
            request_body = request.body.decode('utf-8')
            async_result = await send_task(request_body)
    
            event_loop = asyncio.get_event_loop()
            task_result = asyncio.Future()
            event_loop.call_soon(retrieve_task_result, event_loop, task_result, async_result)
            ret = await task_result
            return text(ret)
    
        if __name__ == "__main__":
            app.run(host="0.0.0.0", port=8080, debug=False)
  • Method2

    • encapsulate the sync function to be Future object, in asyncio, it can use asyncio.ensure_future() or asyncio.get_event_loop().create_task(). Or you are expert, you can use asyncio.Task(somefunc(id)) directly

    e,g: For below synchonize codes

        def handle(id):
            subject = get_subject_from_db(id)
            buyinfo = get_buyinfo(id)
            change = process(subject, buyinfo)
            notify_change(change)
            flush_cache(id)

    We can converted to below one:

        import asyncio
        import time
    
        async def get_subject_from_db(id):
            time.sleep(3)
            return "subject"
    
        async def handle(id):
            # turn blocked I/O function to be future object
            subject = asyncio.ensure_future(get_subject_from_db(id))
            buyinfo = asyncio.ensure_future(get_buyinfo(id))
            # explicitly announce future task can be schedulered in event loop
            results = await asyncio.gather(subject, buyinfo)
            # explicitly announce process() can be schedulered in event loop
            change = await process(results)
            await notify_change(change) 
            loop.call_soon(flush_cache, id)
  • Conclusion

    • encapsulate the sync function to be Future object, in asyncio, it can use asyncio.ensure_future() or asyncio.get_event_loop().create_task(). Or you are expert, you can use asyncio.Task(somefunc(id)) directly
    • use callback method, e,g:
      • asyncio.get_event_loop().call_soon(func, *args)
      • asyncio.get_event_loop().call_later(delay, func, *args)
      • asyncio.get_event_loop().call_at(when, func, *args)

References:

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇