Computer Engineering/Fluent Python 정리

Fluent Python Chapter 17. Future를 이용한 동시성

jordan.bae 2022. 3. 22. 09:54

Introduction

17장에서는 Future를 이용한 동시성 프로그래밍에 대해서 다룹니다. Future객체는 미래에 앞으로 일어날 일을 나타내는데 해당 객체는 concurrent.futures 뿐만 아니라 asyncio의 패키지의 기반이 되는 개념입니다. 전반적으로 해당 패키지를 통해서 어떻게 프로그램을 동시에 수행시키면 좋을지에 대한 코드를 설명합니다. 또, 파이썬의 GIL이 어떻게 동시성 프로그래밍에서는 동작하는지도 살펴봅니다. 마지막으로는 상위 인터페이스가 맞지 않는 경우에는 threading과 multiprocessing 모듈로 동시성 작업도처리할 수 있음을 간략히 설명합니다.

전반적인 파이썬에서의 동시성 프로그래밍에 대한 내용을 다룬 장이라고 생각됩니다.

 

책 정리를 시작한 이유

책 정리를 시작한 이유

Chapter1의 Introduction 부분에서 이야기 한 것처럼 지난 5년간 다양한 언어나 프레임워크 및 프로그램을 공부하고 이용하여 소프트웨어를 개발했는데 이것저것 하다 보니 자주 사용하는 언어임에도 불구하고 파이썬을 잘 활용하고 있느냐에 대한 답변을 자신 있게 하기 어렵다고 느껴서 Fluent Python이라는 책을 공부하며 정리하고 있습니다. 올해에는 새로운 기술들도 좋지만 기존에 활용하던 언어나 프레임워크 그리고 소프트웨어를 더 잘 사용할 수 있도록 깊게 공부할 수 있는 한 해를 만들고 싶은 소망을 가지고 있습니다. 21장까지 정리를 성공하고 맛있는 걸 먹으면서 스스로 축하할 날이 어서 왔으면 좋겠네요! 혹시, 제가 정리한 글이 괜찮으시면 구독 하시면 다음편이 나오면 바로 알림을 받으실 수 있습니다. 어느 덧 17장 정리하는 날이 왔네요. 예상한 것 보다는 많이 늦었지만 조금 만 더 힘내봐야겠습니다.

 

지난 chapter 정리한 포스팅

Fluent Python Chapter 1. 파이썬 데이터 모델 (Feat. 일관성 있는 언어)

Fluent Python Chapter 2-1. Sequence (Sequence의 분류, listcomp, generator, tuple, slicing

Fluent Python Chapter 2-2. Sequence (bisect, deque, memoryview)

Fluent Python Chapter 3. Dictionary (feat. hashtable)

Fluent Python Chapter 4. 텍스트와 바이트 (feat. 깨지지마라..)

Fluent Python Chapter 5. 일급 함수

Fluent Python Chapter 6. 일급 함수 디자인 패턴

Fluent Python Chapter 7. 함수 데커레이터와 클로저 (feat. 메타프로그래밍)

Fluent Python Chapter 8. 객체 잠조, 가변성, 재활용

Fluent Python Chapter 9. 파이썬스러운 객체

Fluent Python Chapter 10. 시퀀스 해킹, 해시, 슬라이스

Fluent Python Chapter 11. 인터페이스: 프로토콜에서 ABC까지

Fluent Python Chapter 12. 내장 자료형 상속과 다중 상속

Fluent Python Chapter 13. 연산자 오버로딩(feat. 제대로 하기)

Fluent Python Chapter 14. 반복형, 반복자, 제너레이터

Fluent Python Chapter 15. Context manager와 else 블록

Fluent Python Chapter 16. 코루틴 (Coroutine)

 

 

 

3가지 스타일의 웹 클라이언트(동기, concurrent.futures, asyncio)

설명에 앞서 코드를 통해 동기와 비동기의 성능 차이에 대해서 살펴봅니다. 

코드는 국기를 다운 받아서 local에 저장하는 코드입니다.

jordan.bae@jordanui-MacBook-Pro countries % python flags.py
BD BR CD CN DE EG ET FR ID IN IR JP MX NG PH PK RU TR US VN
20 flags downloaded in 19.61s
jordan.bae@jordanui-MacBook-Pro countries % python flags_threadpool.py
FR BD TR NG CD MX BR PH DE ID RU US IR IN EG VN JP ET CN PK
20 flags downloaded in 1.19s
jordan.bae@jordanui-MacBook-Pro countries % python flags_asyncio.py
CN TR ET EG ID MX US RU IN CD PK PH FR IR NG JP VN BR BD DE
20 flags downloaded in 1.07s

컴퓨터와 네트워크 컨디션에 따라 결과가 다르겠지만 위에 결과 처럼 synchrous와 asynchrous의 성능 차이는 엄청 큽니다.

각 코드는 아래 링크를 확인하시면 됩니다.

https://github.com/fluentpython/example-code/tree/master/17-futures-py3.7/countries

 

 

concurrent.futures 살펴보기

위에 예제에서 concurrent.futures 코드를 살펴봅니다. 여기서도 관련있는 부분의 함수만 살펴보겠습니다.

def get_flag(cc):
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    resp = requests.get(url)
    return resp.content
    
    
def download_one(cc):  # <3>
    image = get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many(cc_list):
    workers = min(MAX_WORKERS, len(cc_list))  # <4>
    with futures.ThreadPoolExecutor(workers) as executor:  # <5>
        res = executor.map(download_one, sorted(cc_list))  # <6>

    return len(list(res))  # <7>

get_flag 함수과 HTTP Client로 요청하는 부분을 가지고 있고 이 부분을 비동기 처리해주는게 핵심입니다. 해당 함수를 가지고 있는 download_one 함수가 동시에 실행될 수 있도록 처리해주고 있습니다. 이 부분에서 자세히 살펴봐야 할 부분 concurrent package의 코드이다.

 

concurrent.futures 패키지에서 개발자가 주로 사용하는 클래스는 ThreadPoolExecutor와 ProcessPoolExecutor 클래스이다.

- ThreadPoolExecutor: 여러 개의 스레드를 생성해서 동시성 처리. (IO Bound 작업)

- ProcessPoolExecutor: 여러 개의 프로세스를 사용하여 동시성 처리. (CPU Bound 작업)

 

위의 클래스들은 작업자 스레드나 프로세스를 관리하는 Pool과 실행할 작업을 담은 Queue를 가지고 있다. 

위에 executor instance의 map 함수는 내장 map함수와 비슷하게 작동하며 각 함수가 반환한 값을 가져올 수 있도록 반복할 수 있는 제너레이터를 반환합니다.

 

위에서 언급한 Future객체는 드러나지 않는데 Future는 직접 호출해서 사용하지 않기 때문이다. 파이썬 3.4 표준 라이브러리에서 Future라는 이름을 가진 클래스는 concurrent.futures.Future, asyncio.Future 이 두 개인데 Future 클래스의 객체는 완료되었을 수도 있고 아닐 수도 있는 지연된 계산을 표현하기 위해 사용된다. Future 객체는 다른 프레임워크나 라이브러리의 Deferred 클래스, Promise 객체와 비슷하다.

 

Future는 앞으로 일어날 일을 나타내는 객체로, Future의 실행을 스케쥴링하는 프레임워크(ex.ThreadPoolExecutor 등)만이 생성해야 한다. 그러므로 concurrent.futures.Future객체는 concurrent.futures.Executor의 서브클래스로 실행을 스케줄링한 후에만 생성된다. ex) Executor.submit() 메서드는 callable을 받아서 해당 callable과 인자의 실행을 스케줄링하고, Future객체를 반환하다. 그렇기 때문에 Client에서 직접 Future 객체의 상태를 변경하거나 해서는 안된다. Future클래스에는 몇몇 특징적인 메서드들이 있는데 아래와같다.

- done(): Future객체에 연결될 callable(작업) 실행이 완료되었는지 여부를 boolean형으로 반환하는 메서드

- add_done_callback(): callable 작업을 마치면 클라이언트 코드에게 통지(callback)해준다.

- result(): Future객체의 callable 작업이 완료된 경우 callable 결과를 반환하거나, callable에서 예외가 발생하면 해당 예외를 다시 발생시킨다.

 

result() 메서드의 경우 두 프레임워크(concurrent, asyncio)의 동작이 다르다. 

- concurrent: f.result()는 결과가 나올 때 까지 호출자의 스레드를 블로킹한다. timeout 인자를 전달해 정해진 시간이 지나면 TimeoutError 예외를 발생시킬 수 있다.

- asyncio: 시간 초과를 지원하지 않으며 yield from을 사용해서 Future객체의 상태를 가져오는 방법을 선호한다.

 

두 라이브러리에서 사용자에게 자주 사용되는 함수를 살펴보면 아래와 같다.

-  Executor.map(): 내부에서 Future객체를 사용하고, 반환하는 generator는 __next__()메서드가 호출될 때 마다 각 Future객체의 result()메서드(=callable의 반환 값 혹은 예외)를 가져올 수 있다.

- concurrent.futures.as_completed() 메서드는 Future 객체를 담은 반복형을 인수로 받아, 완료된 Future 객체를 생성하는 반복자를 반환하다. 완료된 Future 객체를 꺼내는 loop를 만들 때 사용할 수 있다. 

 

위의 코드를 concurrent.futures.as_completed()로 사용하는 코드로 변경하면 download_many 함수를 아래와 같이 변경할 수 있다.

def download_many(cc_list):
    cc_list = cc_list[:5]  # <1>
    with futures.ThreadPoolExecutor(max_workers=3) as executor:  # <2>
        to_do = []
        for cc in sorted(cc_list):  # <3>
            future = executor.submit(download_one, cc)  # <4>
            to_do.append(future)  # <5>
            msg = 'Scheduled for {}: {}'
            print(msg.format(cc, future))  # <6>
		
        # Future 객체를 직접 다룬다.
        results = []
        for future in futures.as_completed(to_do):  # <7>
        	# Future객체의 result 메서드로 callable의 반환값을 가져온다.
            res = future.result()  # <8>
            msg = '{} result: {!r}'
            print(msg.format(future, res)) # <9>
            results.append(res)

    return len(results)

 

 

블로킹 I/O 와 GIL

모든 사람들이 알고 있는 것처럼 Cpython은 GIL(전역 인터프리터 락)을 가지고 있어서 하나의 프로세스에서는 한 스레드만 파이썬 바이트코드를 실행할 수 있다. 파이썬 코드를 작성할 때는 GIL을 제어할 수 없지만, 내장 함수나 C로 작성된 확장은 시간이 오래 걸리는 작업을 실행할 때 GIL을 해제할 수 있다. 블로킹 입출력을 실행하는 모든 표준 라이브러리 함수는 OS에서 결과를 기다리는 동안 GIL을 해제한다. 즉, 입출력 위주의 작업을 실행하는 파이썬 프로그램은 파이썬으로 구현하더라도 비동기(스레드, 이벤트 루프)로 구현하면 이득을 볼 수 있다. v좀 더 구체적으로 이야기하면 파이썬 스레드가 네트워크로 부터를 응답을 기다리는 동안, 블로킹된 입출력 함수가 GIL을 해제함으로써 다른 스레드가 실행될 수 있다. 그렇기 때문에 I/O Bound 작업에서는 단일 CPU로 여러 스레드를 만들어서 작업할 수 있는 ThreadPoolExecutor가 효율적이라고 했던 것이다.

 

 

concurrent.futures로 계산 위주의 작업 실행하기

ProcessPoolExecutor는 GIL을 우회하므로(여러 CPU를 동시에 사용할 수 있다.) 계산 위주의 작업을 수행하는 경우에 사용한다.

ThreadPoolExecutor와 ProcessPoolExecutor의 __init__() 매서드는 max_worker 인자가 선택인자인지 아닌지만 차이가 난다. ProcessPoolExecutor는 선택적인 인자로 default로 os.cpu_count()가 반환하는 값을 사용한다.

 

책에서 계산 위주의 예제로 RC4, SHA 예제(암호화, 복호화)를 사용한다.

책에서 나온 결과는 작업자 수가 4개일 때 1개인 작업자보다 2배 정도 빠른 결과를 보여줬다.

 

 

Executor.map() 동작과정 살펴보기

위에서 Executor의 map()메서드는 Future객체의 result()를 Generator로 반환하는 iterator라고 이야기 했었다.

해당 generator는 Future객체가 작업을 마칠 때 까지 기다렸다가 결과를 반환해주는 것을 볼 수 있다.

from time import sleep, strftime
from concurrent import futures


def display(*args):  # <1>
    print(strftime('[%H:%M:%S]'), end=' ')
    print(*args)


def loiter(n):  # <2>
    msg = '{}loiter({}): doing nothing for {}s...'
    display(msg.format('\t'*n, n, n))
    sleep(n)
    msg = '{}loiter({}): done.'
    display(msg.format('\t'*n, n))
    return n * 10  # <3>


def main():
    display('Script starting.')
    executor = futures.ThreadPoolExecutor(max_workers=3)  # <4>
    results = executor.map(loiter, range(5))  # <5>
    display('results:', results)  # <6>.
    display('Waiting for individual results:')
    for i, result in enumerate(results):  # <7>
        display('result {}: {}'.format(i, result))


main()

-------
python demo_executor_map.py 결과

jordan.bae@jordanui-MacBook-Pro 17-futures-py3.7 % python demo_executor_map.py
[09:21:20] Script starting.
[09:21:20] loiter(0): doing nothing for 0s...
[09:21:20] loiter(0): done.
[09:21:20] 	loiter(1): doing nothing for 1s...
[09:21:20] 		loiter(2): doing nothing for 2s...
[09:21:20] 			loiter(3): doing nothing for 3s...
[09:21:20] results: <generator object Executor.map.<locals>.result_iterator at 0x10a74ef20>
[09:21:20] Waiting for individual results:
[09:21:20] result 0: 0
[09:21:21] 	loiter(1): done.
[09:21:21] 				loiter(4): doing nothing for 4s...
[09:21:21] result 1: 10
[09:21:22] 		loiter(2): done.
[09:21:22] result 2: 20
[09:21:23] 			loiter(3): done.
[09:21:23] result 3: 30
[09:21:25] 				loiter(4): done.
[09:21:25] result 4: 40

위에 이야기 나눈 것 처럼 loiter callable이 done 출력 후에 result가 출력되는 것을 확인할 수 있다. 또, max_workers=3 을 지정해줬기 때문에 0,1,2번이 먼저 동시에 수행되는 것을 확인할 수 있다. map()은 호출한 순서 그래도 결과를 반환하는 특징이 있다. 이러한 특징은 필요할 때도 있지만 맨 앞에 callable이 오래걸리는 경우에는 첫 번 째 결과를 가져오기 위해서만 오랜 시간을 기다려야 한다. 그 사이에 다른 작업들이 완료되었어도 처리할 수 없다. 완료되는 대로 결과를 가져와서 처리해야 하는 위에 예제에서 본 것 처럼 Executor.submit() 메서드와 futures.as_completed()함수를 함게 사용하면 된다.

 

Tip!
submit()은 다양한 callable과 인수를 제출할 수 있는 반면 executor.map()은 여러 인수에 동일한 callable을 실행하도록 설계되 었다. Executor.submit()/future.as_completed()조합이 좀 더 융퉁성이 높다. future.as_completed는 ThreadPoolExecutor에서 만든 future객체와 ProcessPoolExecutor에서 만든 future객체를 섞어서 사용할 수 도 있다.

 

 

Threading과 Multiprocesscing 모듈

futures.ThreadPoolExecutor로 처리하기 어려운 작업을 수행하는 경우 Thread, Lock, Semaphore등 threading 모듈의 기본 컴포넌트를 이용해서 처리할 수 있다. 또, 계산 위주의 작업에서 futures.ProcessPoolExecutor가 애플리케이션 구조에 잘 맞지 않으면 multiprocessing 패키지를 사용하면 된다. 

 

 

마무리

이번 장에서는 Python에서 동시에 처리하는 전반적인 프로그래밍 방법과 Future의 전반적인 개념 그리고 그 중에서 concurrent.futures 에 대해서 자세히 살펴봤다. 읽을 거리에 흥미로운 GIL과 관련된 글들도 정말 많아서 시간을 내서 틈틈이 읽고 정리해보고 싶다는 생각이든다. 다음 장이 asyncio를 이용한 동시성이기 때문에 이 장에서 asyncio예제 코드는 살펴보지 않았었는데 다음장에서 asyncio가 어떻게 동작하는지 왜 빠른지에 대해서 살펴볼 것 이다.

 

Reference

이 포스팅은 책 내용 중에 중요하다고 생각한 부분을 요약한 내용으로 구성되어 있습니다.

- Fluent Python Chapter17

-예제 코드:  https://github.com/fluentpython/example-code/tree/master/17-futures-py3.7

반응형