python

병렬처리 특성 비교, 그리고 파이썬 기반의 멀티쓰레딩

kimjy 2021. 12. 6. 21:03

 항상 파이썬을 데이터 후처리 및 가시화 용도로만 사용해서 인지, 파이썬을 사용한 병렬처리 방법은 사용한 경험이 없다. 하지만 이번에 파이썬을 처음부터 다시 공부하기로 마음먹었으므로 파이썬에서 사용할 수 있는 병렬처리방법에 대해 정리하고자 한다.

 

 들어가기에 앞서 쓰레드 기반의 병렬처리 방법과 프로세스 기반의 병렬처리 방법에 대해 논하고자 한다. 일반적으로 쓰레드는 프로세스를 구성하고 있는 가장 작은 단위이다. 또한 하나의 프로세스 내부의 쓰레드들 간에는 메모리가 공유되며, 따라서 모든 쓰레드는 프로세스의 메모리 공간을 참조(읽기/쓰기)가 가능하다. 이러한 특성을 바탕으로 쓰레드 기반의 병렬처리는 SMP(shared memory parallelism)이라고 부른다. 따라서 개발자는 쓰레드 기반의 병렬처리를 수행할 때 따로 메모리 통신을 설정할 필요가 없으며, 이러한 특성은 병렬처리 문제를 매우 간단하게 만들어 준다(racing condition 등을 조심한다면). 하지만 SMP는 하나의 프로세스에서 다중쓰레드를 포크(fork)시켜서 사용하는 특성으로 인하여 컴퓨팅 노드 안의 자원밖에 사용하지 못하여, 이는 SMP의 가장 큰 단점 중 하나이다.

 프로세스 기반의 병렬처리방법은 다중 프로세스를 사용하는 방법으로, 각 프로세스는 개개인의 메모리 공간을 갖는다. 이로 인하여 프로세스 기반의 병렬처리방법은 DMP(Dsitributed Memory Parallelism)라는 이름을 갖는다. 따라서, DMP 방법은 여러 컴퓨팅 노드를 사용하여 병렬계산을 수행할 수 있고, 이러한 특성으로 인하여 DMP 방법은 확장성(scalarbility)이 크다고 할 수 있다. DMP 방법은 HPC(High Performance Computing) 환경에서 많이 사용되고 있으며, 슈퍼컴퓨터에서는 MPI(Message Passing Interface) 라이브러리를 주로 사용한다. MPI는 프로세스 간의 메세지 교환 방식의 통신체계를 갖고, 이를 통해 데이터 교환을 수행한다. 여기서 코드 개발자는 교환을 수행할 메모리를 지정하여야 하는데, 이 때에 segmentation fault 등의 원인을 찾기 힘든 에러가 자주 발생한다. 따라서 DMP 방법은 개발 소요 시간이 크게 발생되는 등 개발에 있어서 많은 자원을 소비한다는 단점도 갖고 있다. 하지만 아직 파이썬의 multiprocessing 패키지를 경험하지 않았으므로, multiprocessing 패키지가 얼마나 파이써닉한 가에 대한 기대가 있다. 또, 추후 MPI와 multiprocessing 패키지에 대해 비교하는 것도 좋은 포스팅일 것 같다. 마지막으로, SMP와 DMP 방법의 특징을 간단히 요약하면 아래와 같다.

 

쓰레드 기반의 병렬처리: SMP(Shared Memory Parallelism), 하나의 컴퓨팅 노드만 사용이 가능, 메모리 통신이 필요 없음, 개발이 비교적 간단

프로세스 기반의 병렬처리: DMP(Distributed Memory Parallelism), 다수의 컴퓨팅 노드를 사용 가능, 메모리 통신 필요, 개발이 비교적 힘듬

 

threading 패키지를 사용한 멀티쓰레딩

 

threading 패키지는 멀티쓰레딩을 가능하게 해주는 패키지로, 아마 openMP와 마찬가지로 포크-조인 모델(Fork-Join model)을 따르는 것 같다. 포크-조인 모델이란, 기존에 메인 쓰레드가 프로그램을 실행하다가, 병렬화 구문을 만나면 쓰레드를 포크(생성)하여 병렬 계산을 수행한다. 또 병렬화 구문이 끝나면 생성한 쓰레드를 종료(조인)시킨다. 포크의 어원은 우리가 일상에서 사용하는 포크라는 식기에서 유래하였다고 한다.  포크는 하나의 막대기에서 여러갈래로 나누어지는 것 처럼, 메인 쓰레드 하나가 실행되다가 쓰레드를 여러개 생성하여 계산을 수행하는 것이 비슷한 까닭이라고 한다.

우리가 일상에서 사용하는 포크

threading 패키지는 import threading을 선언함으로써 사용이 가능하다. 글로써 설명을 하는 것보다 예제를 보는 것이 이해가 빠를 것이기 때문에, 아래와 같이 예제를 작성하였다.

 

import threading
from time import sleep

def reduction(name, target_number):
    
    #critic section
    lock.acquire()
    target_number[0]+=1
    print(name, target_number[0])
    lock.release()
    
thread_list=[]
number=[0]
lock=threading.Lock()

for i in range(16):
    name="thread_{}".format(i)
    thread=threading.Thread(target=reduction, args=(name, number,))
    thread_list.append(thread)
    thread.start()
    
for i in range(16):
    thread.join()
print("elapsed time(s):", etime-stime, "  number:", number[0])

>>> thread_0 1
>>> thread_1 2
>>> thread_2 3
>>> thread_3 4
>>> thread_4 5
>>> thread_5 6
>>> thread_6 7
>>> thread_7 8
>>> thread_8 9
>>> thread_9 10
>>> thread_10 11
>>> thread_11 12
>>> thread_12 13
>>> thread_13 14
>>> thread_14 15
>>> thread_15 16
>>> elapsed time(s): 0.013916015625   number: 16

코드 자체는 그리 길지 않은 편이다. 간략한 설명을 하자면 다음과 같다.

  • reduction함수에서 reduction알고리즘을 구현하였다. 
  • for 문을 사용하여 총 16번의 루프를 반복하도록 하였고, 이를 통해 총 16개의 쓰레드를 포크하였다.
  • name 변수를 통해서 쓰레드의 이름을 지정하였다.
  • threading.Thread()메서드를 사용하여 쓰레드를 포크할 준비를 하였고, target에는 쓰레드가 실행할 함수, args에는 target 함수의 인자들을 넘겨줃도록 하였다.
  • thread.start()를 통해 쓰레드를 포크하고, 미리 지정한 함수를 실행하도록 하였다.
  • thread.join()을 통해 포크된 쓰레드를 조인하였다.

 

이렇게 짧은 코드만을 통하여 멀티쓰레딩을 구현할 수가 있었다. 그러면 위의 코드에서 쓰레드를 16개를 사용한 것과, 쓰레드를 1개만 사용한 것의 속도 차이는 얼마나 날까에 대한 의문이 발생할 수 있다. 위의 코드에서 16개의 루프가 돌던 for 문을 1개로 변경하여 코드를 실행해보았다. 따라서 16개의 쓰레드가 생성되는 대신에 1개의 쓰레드만 생성될 것이다.

def reduction(name, target_number):

    #critic section
    lock.acquire()
    target_number[0]+=1
    print(name, target_number[0])
    lock.release()
    
thread_list=[]
number=[0]
lock=threading.Lock()

stime=time.time()
for i in range(1):
    name="thread_{}".format(i)
    thread=threading.Thread(target=reduction, args=(name, number,))
    thread_list.append(thread)
    thread.start()
    
etime=time.time()
for i in range(1):
    thread.join()

print("elapsed time(s):", etime-stime, "  number:", number[0])

>>> thread_0 1
>>> elapsed time(s): 0.0007328987121582031   number: 1

앞서 16개의 쓰레드가 0.013초의 시간이 소비된 반면, 1개의 쓰레드만 사용한 경우에는 0.0007초의 시간이 소비되었다. 보다 정확한 분석을 위해서는 프로파일러 등을 통하여 분석을 수행하여야겠지만, 그래도 이유를 추측해보자면 아래와 같다.

  • 16개 쓰레드 생성에서 발생하는 오버헤드
  • 쓰레드가 critic section으로 진입함으로써 발생하는 병목현상

 

오버헤드와 병목현상은 병렬컴퓨팅에서 모든 것이라고 해도 과언이 아닐만큼 중요한 이슈이다. CPU 쓰레드를 생성할 때에도 일정시간이 소비되기 때문에 여기서 발생하는 오버헤드를 유의하여야 한다. 두 번째 이슈는 critic section 진입에 따른 병목현상이다. 이는 racing condition과 관련이 있는데, racing condition이란 여러 다른 쓰레드가 하나의 메모리를 읽기/쓰기할 때 발생하는 현상이다. reduction 함수에서는 여러 쓰레드가 target_number[0]라는 메모리에 동시 쓰기를 수행할 수 있는데, 이러한 경우 최종적으로 결과 값은 16일 수도 있고 1~15의 값이 될 수도 있다. 이는 여러 쓰레드가 동시에 쓰기를 수행하기 때문이다.

 따라서 racing problem을 방지하기 위해서는 critic section이라는 것을 도입해야 한다. critic section에서는 여러개의 쓰레드가 동시에 작업하는 것이 아니라, 하나의 쓰레드만 critic section에서 작업이 가능하도록 작동한다. 이는 threading 패키지에서 제공하는 lock.acquire(), (혹은 threading.Lock().acquire()), lock.release()로 구현이 가능하다. 

  • lock.acquire(): critic section 진입 구간 설정
  • lock.release(): critic section 해제

우리는 동시에 계산을 수행하기 위해서 쓰레드를 포크하고 계산을 수행하도록 설정하였는데, critic section에서는 쓰레드 들이 순차적으로 수행되기 때문에 병렬계산이 수행될 수 없다. 이는 병렬 성능을 저해하는 큰 요소 중 하나이므로, 꼭 필요한 경우에만 critic section을 사용하여야 한다. 또, critic section의 시간복잡도는 O(n)이 될 것이라고 생각하지만, 병렬 처리할 수 있도록 reduction 알고리즘을 변경하면 critic section에서 계산 성능을 높일 수 있다.

 

멀티쓰레딩을 사용한 Jacobi method의 병렬화

 

끝내기에 앞서, 마지막으로 하나의 예제를 들어보도록 하겠다. 예제는 전산유체역학에서 사용되는 방법인, jacobi method를 사용하면 좋을 것 같다. 본 포스팅의 주제는 전산유체역학이 아닌 병렬처리에 대해 논하므로, jacobi method를 깊게 설명할 필요는 없을 것 같다. 다만 전산유체역학 분야와 병렬처리 분야는 매우 깊은 연관이 있으므로, 병렬처리에 관심이 많다면 전산유체역학에 대해 공부하거나 관련 강의를 수강하는 것을 추천한다.  

#set domain size
dx=1./40.
dy=1./40.
nx=int(2./dx)
ny=int(2./dy)

#initialization
t_old=np.zeros((nx, ny))
t_new=np.zeros((nx, ny))

for j in range(0, ny-1):
    if(j*dy >= 0.5 and j*dy <= 1.5):
        t_old[0,j]=1
        t_new[0,j]=1
        
eps=1
num_iter=0

stime=time.time()
#iteration for jacobi method
while(eps > 1e-5):
    for i in range(1, nx-1):
        for j in range(1, ny-1):
            t_new[i,j]=0.25 \
                       * (t_old[i-1,j]+t_old[i+1,j] \
                       +t_old[i,j-1]+t_old[i,j+1])
            eps+=abs((t_new[i,j]-t_old[i,j])/dx*dy)
    eps=eps/((nx-1)*(ny-1))
    t_old[:,:]=t_new[:,:]
    num_iter==1
    
etime=time.time()
print("elapsed time(s):", etime-stime)

>>> elapsed time(s): 25.468451738357544

코드의 설명을 간략히 하자면 아래와 같다.

  • set domain size 부분은 dx, dy를 통해서 도메인 사이즈를 결정하는 코드이다. dx, dy는 1/80으로 설정하였으며 도메인 사이즈는 80x80으로 설정된다.
  • initialization 부분은 계산을 위한 초기화를 하는 과정으로, t_old, t_new를 초기화하고 경계조건은 1로 설정된다.
  •  iteration for jacobi method는 jacobi method를  통해 실제 계산을 수행하는 부분으로 while 문은 eps가 1e-5 이하로 낮아질 때 까지 계산을 수행한다.
  • 주 계산 코드는 이중으로 중첩된 for문인데, t_new를 업데이트하기 위해 t_old의 위아래와 양옆의 메모리를 참조한다(t_new[i,j]를 계산하기 위하여 t_old[i-1,j], t_old[i+1,j], t_old[i,j-1], t_old[i,j+1]를 참조).
  • 이렇게 사방의 격자를 사용하는 연산 형태를 stencil 연산이라고 부르기도 한다.

stencil 연산의 모식도(출처: 영문 위키피디아)

  • 계산시간은 총 25초가 소비되는 것을 알 수 있다.
  • 참고로 이러한 계산 패턴은 전산유체역학 뿐만 아니라 이미지 프로세싱, 혹은 딥러닝 분야에서 사용되는 convolution연산에서도 쓰인다.
  • 계산 결과는 아래와 같으며, 초기 값에서는 잘 보이지가 않지만 도메인 아래쪽의 경계 부근에는 1로 설정이 되어있다. 나머지 공간은 0으로 설정된다. jacobi method를 사용하여 iteration을 수행하면 우측 그림과 같은 결과에 수렴하게 되는데, 간단히 벽면에 라디에이터와 같은 물체를 두었을 때 열이 최종적으로 확산되는 과정이라고 생각하면 이해가 빠를 수 있다.

Jacobi method의 결과. (좌:초기값, 우:결과값)

 

이를 병렬화하기 위한 방법은 여러 방법이 있는데, 일반적으로 사용되는 방법은 아래와 같다.

  •  80x80 도메인을 여러 도메인으로 나누어 처리한다. 예를 들어 4개의 쓰레드를 생성하는 경우라면 40x40 크기를 갖는 4개의 도메인으로 분할하면 된다.
  • 이 때 분할된 도메인의 경계부근을 계산하기 위해서는 다른 도메인의 격자 값을 참조해야 하는데, 이 때문에 halo exchange라는 것이 발생하게 된다. 
  • 다중 프로세스를 사용한 병렬계산의 경우에는 직접 halo exchange를 구현해야 하지만, 멀티쓰레딩 기반 병렬처리에서는 기본적으로 메모리를 공유하기 때문에 따로 처리할 필요가 없다.

stencil 연산에서 발생하는 halo exchange. (출처: 일리노이대학교 https://wgropp.cs.illinois.edu/courses/cs598-s15/lectures/lecture25.pdf)

 

사실 도메인을 위 그림과 같이 정사각형으로 분할하는 것이 프로세스간의 통신을 줄일 수 있다. 하지만, 예제를 단순하게 하기 위해서 i 루프 방향으로만 도메인을 분할해서 직사각형 형태의 도메인을 갖도록 설정하겠다. 

 

라고, 멀티쓰레딩을 수행하는 코드를 작성하였지만... 병렬계산결과는 문제가 없으나.... 계산 성능은 전혀 개선되지가 않는다. 찾아보니 GIL에 의하여 프로세스내의 자원이 관리되고, 이로 인하여 하나의 프로세스는 하나의 프로세서만 사용이 가능하다! 따라서 여러개의 쓰레드가 하나의 프로세서에서 context switch를 하면서 사용하는 것이다.. 물론 CPU가 I/O 작업 등에 의하여 idle 되어 있을 때는 다른 쓰레드가 프로세서 자원을 사용한다.. 한마디로 하루종일 성능개선을 위해서 삽질을 했다.. 물론 python 프로파일링을 위해서 yappi 등 여러 패키지를 사용하였으므로, 나쁘지 않은 경험이었긴 한다.. 따라서 위의 compute-intensive한 예제는 파이썬의 멀티코어 자원을 전혀 전혀 효율적으로 사용하지 못한다. 다만 multiprocessing은 멀티코어 자원을 사용할 수 있다고 한다.... 이런... 따라서 이번 포스팅을 통해서 배운 내용은 아래와 같다.

  • threading 패키지를 사용한 멀티쓰레딩은 멀티코어 자원을 활용하지 못한다. 그래서 패키지 이름도 multithreading이 아니라 그냥 threading인가...?
  • 따라서 compute-intensive한 코드를 병렬화할 때는 threading 기능은 전혀, 전혀 효율이 없다. CPU의 idle이 많이 발생하는 I/O-intensive한 작업등을 처리할 때는 효율이 있을 수도 있다..

마지막으로.. 혹시 나중을 위해서 멀티쓰레딩을 구현했던 파이썬 코드를 포스팅하고 끝내고자 한다...

import threading
import copy
import time

import line_profiler
import profile
import yappi

import numpy as np
import matplotlib.pyplot as plt

from time import sleep


def jacobi_method(its, ite, jts, jte, nx, ny, t_new, t_old, eps, num_iter, thread_num, nthread):

    #print(thread_num,"/",nthread,"thread is running!","its:",its, "ite:",ite, "jts:",jts, "jte:",jte)
    #print("parties:",barrier.parties)
    while(eps[0] > 1e-5):
        
        for i in range(its, ite):
            for j in range(jts, jte):
                t_new[i,j]=0.25 \
                           * (t_old[i-1,j]+t_old[i+1,j] \
                           +t_old[i,j-1]+t_old[i,j+1])
        barrier.wait()
        if(thread_num==0):
            eps[0]+=abs(np.sum(t_new[1:nx-1,1:ny-1]-t_old[1:nx-1,1:ny-1])/(dx*dy))
            eps[0]=eps[0]/((nx-1)*(ny-1))        
            t_old[:,:]=t_new[:,:]
            num_iter[0]+=1
            if(num_iter[0]%5==0):
                print("num_iter:",num_iter[0], "eps:",eps[0])           
        barrier.wait()

#set domain size
dx=1./400.
dy=1./400.
nx=int(2./dx)
ny=int(2./dy)

#initialization
t_old=np.zeros((nx, ny))
t_new=np.zeros((nx, ny))
eps=[10]
num_iter=[0]


for j in range(0, ny-1):
    if(j*dy >= 0.5 and j*dy <= 1.5):
        t_old[0,j]=1
        t_new[0,j]=1
        
num_thread=24
barrier=threading.Barrier(num_thread)
thread_list=[]
stime=time.time()


for i in range(num_thread):
    #domain decomposition
    dnx=nx/num_thread
    its=int(1+dnx*(i))
    ite=int(1+dnx*(i+1))
    if(its<1):
        ite=1
    if(ite>=nx):
        ite=nx-1
        
    thread=threading.Thread(target=jacobi_method, args=(its, ite, 1, ny-1, nx, ny, t_new, t_old, eps, num_iter, i, num_thread))
    thread_list.append(thread)
    yappi.start()
    thread.start()
    yappi.stop()


for i in range(num_thread):
    thread.join()

etime=time.time()   
print("iteration number:", num_iter[0], "eps:", eps[0])
print("elapsed time(s):", etime-stime)