Computer Engineering/Fluent Python 정리

Fluent Python Chapter 18. asyncio를 이용한 동시성

jordan.bae 2022. 4. 2. 16:57

Introduction

17장에서는 concurrent.Future를 이용한 동시성 프로그래밍을 다뤘습니다. 18장에서는 asyncio를 이용한 동시성을 구현합니다. concurrent는 thread와 process를 추가로 사용하여 동시성을 추구했었는데 asyncio는 주로 하나의 스레드의 이벤트 루프에서 동시성을 구현합니다. 재미있는 부분 중 하나는 zen of python 중 하나인 '작업을 수행하는 방식이 한 가지만 있어야 한다.' 라는 부분을 asyncio패키지 또한 concurrent와 비슷한 Future 인터페이스를 구현해서 따랐다는 점이다.  이 장에서는 주로 yield from, coroutine, Future 객체, asyncio event loop를 활용해서 동시성을 구현하는 방법을 설명한다. 또한, callback이 가진 심각한 문제를 코루틴이 해결하는 방법에 대해서도 살펴본다. 마지막으로 비동기 웹서버를 구현한다. 

 

책 정리를 시작한 이유

책 정리를 시작한 이유

Chapter1의 Introduction 부분에서 이야기 한 것처럼 지난 5년간 다양한 언어나 프레임워크 및 프로그램을 공부하고 이용하여 소프트웨어를 개발했는데 이것저것 하다 보니 자주 사용하는 언어임에도 불구하고 파이썬을 잘 활용하고 있느냐에 대한 답변을 자신 있게 하기 어렵다고 느껴서 Fluent Python이라는 책을 공부하며 정리하고 있습니다. 올해에는 새로운 기술들도 좋지만 기존에 활용하던 언어나 프레임워크 그리고 소프트웨어를 더 잘 사용할 수 있도록 깊게 공부할 수 있는 한 해를 만들고 싶은 소망을 가지고 있습니다. 21장까지 정리를 성공하고 맛있는 걸 먹으면서 스스로 축하할 날이 어서 왔으면 좋겠네요! 혹시, 제가 정리한 글이 괜찮으시면 구독 하시면 다음편이 나오면 바로 알림을 받으실 수 있습니다. 어느 덧 18장 정리하는 날이 왔네요. 예상한 것 보다는 많이 늦었지만 조금 만 더 힘내봐야겠습니다.

 

지난 chapter 정리한 포스팅

Fluent Python Chapter 1. 파이썬 데이터 모델 (Feat. 일관성 있는 언어)

Fluent Python Chapter 2-1. Sequence (Sequence의 분류, listcomp, generator, tuple, slicing

Fluent Python Chapter 2-2. Sequence (bisect, deque, memoryview)

Fluent Python Chapter 3. Dictionary (feat. hashtable)

Fluent Python Chapter 4. 텍스트와 바이트 (feat. 깨지지마라..)

Fluent Python Chapter 5. 일급 함수

Fluent Python Chapter 6. 일급 함수 디자인 패턴

Fluent Python Chapter 7. 함수 데커레이터와 클로저 (feat. 메타프로그래밍)

Fluent Python Chapter 8. 객체 잠조, 가변성, 재활용

Fluent Python Chapter 9. 파이썬스러운 객체

Fluent Python Chapter 10. 시퀀스 해킹, 해시, 슬라이스

Fluent Python Chapter 11. 인터페이스: 프로토콜에서 ABC까지

Fluent Python Chapter 12. 내장 자료형 상속과 다중 상속

Fluent Python Chapter 13. 연산자 오버로딩(feat. 제대로 하기)

Fluent Python Chapter 14. 반복형, 반복자, 제너레이터

Fluent Python Chapter 15. Context manager와 else 블록

Fluent Python Chapter 16. 코루틴 (Coroutine)

Fluent Python Chapter 17. Future를 이용한 동시성

 

 

동시성과 병렬성

Chapter의 intro 부분에서 동시성과 병렬성에 대한 간단한 설명과 함께 동시성의 훌륭항?에 대해서 설명한다. 이 부분에 대해서 설명하는 이유는 asyncio의 경우는 동시성을 구현한 패키지이기 때문이다. 병렬성과 동시성에 대한 설명으로 '4개의 vCPU를 가진 컴퓨터는 100개의 프로세스를 한 번에 4개 밖에 실행시키지 못하지만 100개를 동시에 수행하는 것 처럼 보인다' 는 예시를 설명하는데 개인적으로는 찰떡같은 설명이라고 생각한다. asyncio의 경우 동시에 여러 작업이 실행되지는 않지만 여러 작업이 실행되는 것 처럼 보이는 동시성을 아주 잘 구현한 패키지이다. 

 

아래는 동시성과 병렬성을 아주 잘 설명한 발표의 링크이다.

Concurrency is not Parallelism by Rob Pike

 

 

Thread vs Coroutine 

Thread와 Coroutine을 비교하기 위해서 spinner예제 코드를 살펴본다. 비교를 위한 핵심적인 부분만 가져오면 아래와 같다.

# python3.7

# thread
def supervisor():  # <9>
    done = threading.Event()
    spinner = threading.Thread(target=spin,
                               args=('thinking!', done))
    print('spinner object:', spinner)  # <10>
    spinner.start()  # <11>
    result = slow_function()  # <12>
    done.set()  # <13>
    spinner.join()  # <14>
    return result
    
# coroutine
async def supervisor():  # <6>
    spinner = asyncio.create_task(spin('thinking!'))  # <7>
    print('spinner object:', spinner)  # <8>
    result = await slow_function()  # <9>
    spinner.cancel()  # <10>
    return result

주요 차이점을 정리하면 아래와 같다.

- asyncio.Task는 threading.Thread와 거의 대등하다. 

- Task는 coroutine을 구동하고 Thread는 callable을 호출한다.

- Task 객체는 직접 생성하지 않고, coroutine을 asyncio.async()나 asyncio.create_task()에 전달해서 가져온다.

- asyncio.async()나 async.create_task()를 호출해서 Task 객체를 가져오면 이미 객체를 생성하는 함수에 의해 실행이 스케쥴링된다.Thread는 start()메서드를 호출해서 실행하라고 명령해야 한다.

- 스레드는 외부에서 API를 이용해서 중단시킬 수 없다. 스레드를 아무때나 중단시키면 시스템 상태의 무결성이 훼손되기 때문이다. Task에는 코루틴 안에 CancelledError를 발생시키는 Task.cancel() 메서드가 있기 때문에 예외를 핸들링 할 수 있다.

 

코루틴이 안전하게 취소할 수 있는 이유는 lock을 사용하는 대신 yield, yield from으로 제어권을 넘겨서 하나의 스레드만 실행되기 때문이다. 

 

 

Task, Future, Coroutine

asyncio.Task는 asyncio.Future의 서브클래스로 coroutine을 래핑하기 위해 설계됐다. 

yield from을 통해서 asyncio.Future 객체의 결과를 가져올 수 있으므로 coroutine과 Future와의 관계와는 밀접하다. 

 

asyncio에서는 await 표현식에서 사용될 수 있는 객체를 awaitable 객체라고 하는데 

이 유형의 주요 유형들이 Coroutine, Task, Future다. await keyword는 yield from과 거의 비슷해 보이는데(위에 버젼에서 대체된 것 같다...(개인적 추측..)) 나중에 다시 한 번 확인을 해봐야 할 것 같다.

 

 

Asyncio와 aiohttp로 웹 컨텐츠 내려받기

파이썬 3.4에서 asynio는 TCP와 UDP만 직접 지원한다. HTTP등의 프로토콜을 지원하려면 서드파티 패키지인 aiohttp를 주로 사용한다. 이 코드는 17장에서 동기, Future, asynico로 각각 구현해서 실행 속도를 비교했었던 코드이다. 

import os
import time
import sys
import asyncio  # <1>

import aiohttp  # <2>


POP20_CC = ('CN IN US ID BR PK NG BD RU JP '
            'MX PH VN ET EG DE IR TR CD FR').split()

BASE_URL = 'http://flupy.org/data/flags'

DEST_DIR = 'downloads/'


def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


async def get_flag(session, cc):  # <3>
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    async with session.get(url) as resp:        # <4>
        return await resp.read()  # <5>


def show(text):
    print(text, end=' ')
    sys.stdout.flush()


async def download_one(session, cc):  # <6>
    image = await get_flag(session, cc)  # <7>
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


async def download_many(cc_list):
    async with aiohttp.ClientSession() as session:  # <8>
        res = await asyncio.gather(                 # <9>
            *[asyncio.create_task(download_one(session, cc))
                for cc in sorted(cc_list)])

    return len(res)


def main():  # <10>
    t0 = time.time()
    count = asyncio.run(download_many(POP20_CC))
    elapsed = time.time() - t0
    msg = '\n{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))


if __name__ == '__main__':
    main()

전체 코드를 보면 위와 같다.

 

여기서 몇가지 API를 살펴보면 좋을 것 같다. 위 코드는 3.7기준의 코드로 3.4에서 살펴보던 API와는 많이 바꼈다. 비교적으로 최근에 추가된 패키지이고, 비동기 지원을 위해서 커뮤니티에서 노력을 하고 있는 만큼 변화가 많은 것 같다.

9 번 라인에서 asyncio.gather()는 전달 받은 코루틴들이 모두 완료되면 완료 된다. 여러 개의 task를 동시에 수행하고 기다릴 때 사용되는 API 이다. 공식 문서의 예제에서도 살펴볼 수 있다.

 

# https://docs.python.org/ko/3/library/asyncio-task.html#running-tasks-concurrently
import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2), currently i=2...
#     Task B: Compute factorial(3), currently i=2...
#     Task C: Compute factorial(4), currently i=2...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3), currently i=3...
#     Task C: Compute factorial(4), currently i=3...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4), currently i=4...
#     Task C: factorial(4) = 24
#     [2, 6, 24]

asyncio.run() 함수는 새 이벤트 루프를 생성하고 전달 닫은 코루틴을 스케쥴링하고 실행 합니다.  coroutine, Task, Future 실행의 진입점이다. 

 

위에 코드들을 통해 알 수 있는 것은 asynio를 사용할 때 우리가 구현하는 부분은 coroutine chain (coroutine함수 그리고 그 함수 안의 coroutine 함수)를 만드는 일이다. 가장 바깥 쪽 대표 제네레이터 (전에는 직접 구동하고 main loop에서 send같은 API를 통해 통신했었음.) 는 asyncio 자체에 의해 구동된다. 체인을 통해 궁극적으로 가장 안쪽에 있는 하위 제너레이터에 위임한다. 즉, asyncio 이벤트 푸르가 코루틴 체인을 구동하고, 코루틴 체인은 결국 저수준 비동기 입출력을 수행하는 라이브러리 함수에서 끝난다.

 

 

동시성 함수가 비교할 수 없이 빠른 이유

Node.js 창시자인 Ryan은 입출력에 대해서 잘 못 다루고 있다는 이야기를 하면서 블로킹 함수를 디스크나 네트워크 입출력의 수행으로 정의했다. 아래 표를 보면 디스크, 네트워크 작업시에 blocking 하는게 어느정도 CPU를 낭비하는 것인지 알 수 있다.

장치 CPU 사이클 수 비례 '체감' 규모
L1 캐시 3 3초
L2 캐시 14 14초
250 250초
디스크 41,000,000 1.3년
네트워크 240,000,000 7.6년

네트워크 IO뿐만 아니라 File IO 또한 동시성 처리시에 큰 성능 향상을 가져올 수 있다.

블로킹 함수가 전체 애플리케이션의 실행을 멈추지 않게 하는 두 가지 방법은 아래와 같다.

- 블로킹 연산을 각기 별도의 thread에서 실행. (파이썬에서 Threading, concurrent.futures.ThreadPoolExecutor 등)

- 모든 블로킹 연산을 논블로킹 비동기 연산으로 바꾼다.

 

스레드는 잘 작동하지만 Python 기준 OS마다 차이는 있는지만 수메가바이트의 메모리를 사용한다. 예전에는 메모리 부담을 줄이기 위해 콜백으로 비동기 호출을 구현했다.  하지만, 콜백은 콜백 지옥과 각 지역 함수가 다른 context를 유지하기 때문에 에러 핸들링등 불편한점이 있다. coroutine은 callback보다 뛰어난 디자인으로 이런 문제를 해결했다. 물론 약간의 메모리 부하는 있다. 

결국 flags_asyncio.py가 flags.py 빠른 이유는 flags.py는 수십억 CPU 사이클을 허비하기 때문이다

 

 

디스크 입출력도 Executor를 이용해서 event loop 블로킹 피하기.

위에서 살펴본것처럼 디스크 접근도 많은 CPU 사이클을 낭비하는 요소이다. 

위에서 save_flag()는 event loop를 blocking한다.

asyncio의 event loop는 threadpool executor를 내부에 가지고 있으며 run_in_executor() 메서드에 실행할 콜러블을 전달할 수 있다. 위 방법외에도 concurrent.Future의 threadpoolexecutor나 processpoolexecutor를 사용할 수 있다.

# 현재
async def download_one(session, cc):  # <6>
    image = await get_flag(session, cc)  # <7>
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc
    
    
# 수정

async def download_one(session, cc):  # <6>
    image = await get_flag(session, cc)  # <7>
    show(cc)
    loop = asyncio.get_running_loop()
    # option 1
    result = await loop.run_in_executor(
        None, save_flag, image, cc.lower() + '.gif')
    print('default thread pool', result)
    
    # 2. Run in a custom thread pool:
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(
            None, save_flag, image, cc.lower() + '.gif')
        print('custom thread pool', result)

    # 3. Run in a custom process pool:
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(
            None, save_flag, image, cc.lower() + '.gif')
        print('custom process pool', result)
    return cc

 

 

Callback vs Future와 Coroutine

코루틴이 어떠헥 고전적인 콜백 스타일을 향상시키는지 살펴보자.

# callback 지옥
def stage1(response1):
	request2 = step1(response1)
   	api_call2(request2, stage2)
   
def stage2(response2):
	request3 = step2(response2)
    api_call3(request3, stage3)

def stage3(response3):
	step3(response3)
    
api_call1(request1, stage1)


# 코루틴
@asyncio.coroutine
def three_stages(request1):
	response1 = yield from api_call1(request1)
    request2 = step1(response1)
    response2 = yield from api_call2(request2)
    ......

# 명시적으로 실행을 스케쥴링
asyncio.run(three_stages(requst1))

장점은 크게 두 가지이다.

1. 코드 가독성이 높아진다.

2. 앞 단계에서 결과를 사용하기가 쉽다. (예외 처리 등도 마찬가지)

 

단점은 코루틴을 사용하는 것이 익숙하지 않은 사람들에게는 장애물이 될 수 있다.

 

 

asyncio로 비동기 웹서버

import sys
import asyncio

from charfinder import UnicodeNameIndex  # <1>

CRLF = b'\r\n'
PROMPT = b'?> '

index = UnicodeNameIndex()  # <2>

async def handle_queries(reader, writer):  # <3>
    while True:  # <4>
        writer.write(PROMPT)  # can't await!  # <5>
        await writer.drain()  # must await!  # <6>
        data = await reader.readline()  # <7>
        try:
            query = data.decode().strip()
        except UnicodeDecodeError:  # <8>
            query = '\x00'
        client = writer.get_extra_info('peername')  # <9>
        print('Received from {}: {!r}'.format(client, query))  # <10>
        if query:
            if ord(query[:1]) < 32:  # <11>
                break
            lines = list(index.find_description_strs(query)) # <12>
            if lines:
                writer.writelines(line.encode() + CRLF for line in lines) # <13>
            writer.write(index.status(query, len(lines)).encode() + CRLF) # <14>

            await writer.drain()  # <15>
            print('Sent {} results'.format(len(lines)))  # <16>

    print('Close the client socket')  # <17>
    writer.close()  # <18>
# END TCP_CHARFINDER_TOP

# BEGIN TCP_CHARFINDER_MAIN
async def main(address='127.0.0.1', port=2323):  # <1>
    port = int(port)
    server = await asyncio.start_server(handle_queries, address, port) # <2>

    host = server.sockets[0].getsockname()  # <3>
    print('Serving on {}. Hit CTRL-C to stop.'.format(host))  # <4>

    async with server:
        await server.serve_forever()


if __name__ == '__main__':
    asyncio.run(main(*sys.argv[1:]))  # <5>

여기서 크게 말하고자 하는 부분은 없다. 이벤트 루프를 통해 웹서버를 만드는 usecase를 보여주고 싶었던 것 같다. blocking이 됐을 때 다른 요청이 오면 새로운 코루틴을 생성해서 응답을 할 수 있기 때문에 동시에 여러 요청을 처리할 수 있는 웹서버이다.  이외에 또, 하나 소개하는 부분은 일부 Framework나 non-blokcing코드가 주인 핸들러의 경우를 소개하면서 client에서 비동기로 server에 요청을 해서 기다리는 시간을 줄이는 방법에 대해서도 간략하게 이야기한다.

 

 

마무리

이번 장에서는 asyncio로 event loop를 생성하고 coroutine들을 동시에 수행하는 방법에 대해서 다뤘다. 또, 동시성과 병렬성 그리고 왜 동시성이 중요한지에 대한 이론적인 내용들도 다룬 장이었다. 책이 python3.4기준으로 작성되었다보니 바뀐 API들이 많이 있는데 코드는 모두 3.7 기준의 코드고 일부는 공식문서를 보면서 정리했다. 파이썬에서도 비동기 프레임워크와 라이브러리들이 많이 나오고 있고 발전하고 있어서 이 부분을 catch up 하는 부분도 중요할 것 같다는 생각이 든다.

 

 

Reference

- Fluent Python Chapter 18

- https://github.com/fluentpython/example-code/tree/master/18-asyncio-py3.7

= https://docs.python.org/ko/3/library/asyncio-eventloop.html#

 

 

 

 

반응형