Design Pattern

생산자(Producer) 소비자(Consumer) 패턴

태인킴 2020. 8. 23. 17:52
반응형


오늘은 멀티 스레드 디자인 패턴인 생산자(Producer) 소비자(Consumer) 패턴의 대해서 포스팅 하겠습니다.

 

 

 

 

1. 생산자(Producer) 소비자(Consumer) 패턴 이란?

생산자(Producer) 소비자(Consumer) 패턴은 '작업' 목록을 가운데에 두고 '작업을 생산해 내는 주체' '작업을 처리하는 주체'분리시키는 설게 방법 입니다. '작업을 생성하는 부분(Producer)''처리하는 부분(Consumer)'이 각각 감당 할수 있는 부하를 조절 할 수 있다는 장점이 있습니다. Producer는 작업을 새로 만들어 쌓아 두고, Consumer에 쌓여 있는 작업을 가져다 처리하는 구조 입니다.

 

Producer는 어떤 Consumer가 몇 개동작하고 있는지에 대해 전혀 신경 쓰지 않을 수 있습니다. 단지 새로운 작업 내용을 만들어 큐에 쌓아두기만 하면 됩니다. 반대로 Consumer 역시 Producer에 대해서 뭔가를 알고 있어야 할 필요가 없습니다. 이때 BlockingQueue를 사용하면 코드를 작성하기 편합니다.

 

 

2. 생산자(Producer) 소비자(Consumer) 작업량 조절

이때, Producer가 Consumer가 감당할 수 있는 것보다 많은 양작업을 만들어 내면 에는 계속해서 작업이 누적되어 결국에는 메모리 오류가 발생하게 됩니다. 하지만, 큐의 크기에 제한을 두면 큐에 빈 공간이 생길 때 까지 작업을 추가 하지 않고, 대기 하도록 구현하여, 메모리 오류를 막을 수 있습니다. 또한, 큐에 작업이 없을 경우에는 Consumer가 큐의 작업이 추가 될때까지 대기하여, 불필요한 연산을 하지 않도록 구현 합니다. 이 역할을 하는것이 BlockingQueue 입니다.

 

BlockingQueue는 결국 Producer가 작업을 빨리 처리하거나, Consumer가 작업을 빨리 처리하는 경우의 오류를 피할수 있도록 돕고, 작업의 대한 스레드 간의 동기화 및 작업량을 조절Thread-Safe 하게 프로그램을 작성 할수 있습니다.

 

put()는 큐의 크기에 제한이 되어 있을 경우, 큐에 빈 공간이 생길 때 까지 대기 하고, 빈 공간이 생기면 작업을 큐의 넣어 줍니다. take()는 큐의 작업이 없을 경우, 작업이 생길때 까지 대기 하고, 작업이 생기면, 큐에서 작업을 가져 갑니다.

 

BlockingQueue의 종류에는 자바 클래스 라이브러리에서 LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue가 존재 합니다.  

 

또한, Producer디스크 I/O, 네트워크 작업을 하고, ConsumerCPU 작업을 주로 하는 스레드 라면, 성능이 크게 높일 수 있습니다.

 

class FileCrawler implements Runnable{
    private BlockingQueue<File> fileQueue;
    private File root;
    private FileFilter filter;

    FileCrawler(BlockingQueue<File> queue, FileFilter filter, File file){
        this.fileQueue = queue;
        this.filter = filter;
        this.root = file;
    }

    @Override
    public void run() {
        try {
            crawl(root);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void crawl(File file) throws InterruptedException {
        File[] entries = root.listFiles(filter);
        if (entries != null) {
            for (File entry : entries)
                if (entry.isDirectory())
                    crawl(entry);
                else if(!aleadyIndexed(entry))
                    fileQueue.put(entry);
        }
    }

    private boolean aleadyIndexed(File entry) {
        return false;
    }
}

예를 들어, 데스크탑 검색 프로그램을 만들려고 합니다. FileCrawler 클래스는 파일들을 찾아내는 역할을 하고, FileIndexer파일의 인덱싱을 하는 클래스 입니다. 위와 같은, FileCrawlerProducer가 되겠습니다. BlockingQueue를 멤버 변수로 가지고 있고, crawl 메서드에서 put()을 사용해서, 작업을 BlockingQueue에 할당 합니다. 사실 crawl()에서 재귀 함수를 사용해서, 대부분의 컴퓨터에서는 이 메소드는 StackOverflowError가 떨어질 것 입니다. 하지만, 생산자-소비자 패턴을 보여주기 위함이니, 이를 가만하고 읽어주시기 바랍니다.

 

 

class FileIndexer implements Runnable{
    private BlockingQueue<File> fileQueue;

    FileIndexer(BlockingQueue<File> queue){
        this.fileQueue = queue;
    }

    @Override
    public void run() {
        try {
            while(true)
                indexFile(fileQueue.take());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void indexFile(File file){
        System.out.printf("인데스 함(파일명 : %s)", file.getName());
        System.out.println();
    }
}

FileIndexer라는 Consumer가 있습니다. BlockingQueue를 멤버 변수로 가지고 있고, take 메서드를 이용해서 BlockingQueue에서 작업을 가져와 소비 합니다.

 

 

public class FileSearcher{
    public static void main(String[] args) {
        BlockingQueue<File> queue = new LinkedBlockingQueue<>(100);
        FileFilter fileFilter = pathname -> true;

        File root = new File("C:\\Program Files");
        new Thread(new FileCrawler(queue, fileFilter, root)).start();

        int CONSUMER_COUNT = 10;
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            new Thread(new FileIndexer(queue)).start();
        }
    }
}

FileSearcher를 통해서 파일을 찾고, 인덱싱을 할 수 있습니다. BlockingQueue를 사용해서 멀티스레드 생산자-소비자 패턴을 쉽게 구현 할수 있었습니다. 하지만, BlockingQueue사용하지 않으면 어떻게 구현해야 할까요? 아래 소스를 보시겠습니다.

 

 

class FileCrawler implements Runnable{
    private MyQueue fileQueue;
    private File root;
    private FileFilter filter;

    FileCrawler(MyQueue queue, FileFilter filter, File file){
        this.fileQueue = queue;
        this.filter = filter;
        this.root = file;
    }

    @Override
    public void run() {
        try {
            crawl(root);
        } catch (InterruptedException e) {
            e.printStackTrace();
            Thread.currentThread().interrupt();
        }
    }

    private synchronized void crawl(File file) throws InterruptedException {
        File[] entries = file.listFiles(filter);
        if (entries != null) {
            for (File entry : entries)
                if (entry.isDirectory()){
                    crawl(entry);
                }else if(!aleadyIndexed(entry)){
                    fileQueue.put(entry);
                }
        }
    }

    private boolean aleadyIndexed(File entry) {
        return false;
    }
}
class FileIndexer implements Runnable{
    private MyQueue fileQueue;

    FileIndexer(MyQueue queue){
        this.fileQueue = queue;
    }

    @Override
    public void run() {
        while(true) {
            try {
                indexFile(fileQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }

    private void indexFile(File file){
        System.out.printf("인데스 함(파일명 : %s)", file.getName());
        System.out.println();
    }
}
class MyQueue {
    private final File[] buffer;
    private int tail;
    private int head;
    private int count;

    public MyQueue(int count) {
        this.buffer = new File[count];
        this.head = 0;
        this.tail = 0;
        this.count = 0;
    }

    public synchronized void put(File file) throws InterruptedException {
        while (count >= buffer.length) {   // 버퍼가 가득 차면 대기
            wait();
        }

        buffer[tail] = file;   // 후입
        tail = (tail + 1) % buffer.length;  // Circular 큐라서 tail 의 위치가 바뀜
        count++;
        notifyAll();  // 버퍼의 후입 되었으므로, take 해도 된다고 이벤트 알림
    }

    public synchronized File take() throws InterruptedException {
        while (count <= 0) {   // 버퍼에 아무것도 없으면 대기
            wait();
        }

        File file = buffer[head];  // 선출
        head = (head + 1) % buffer.length;  // Circular 큐라서 header 의 위치가 바뀜
        count--;
        notifyAll();  // 버퍼에서 선출 되었으므로, put 해도 된다고 이벤트 알림
        return file;
    }

    public synchronized int size(){
        return count;
    }
}

FileCrawlerFileIndexer크게 바뀌지 않습니다. 다만 Queue를 대신하는 MyQueue라는 클래스가 생겼습니다. 이 MyQueue는 Queue의 역할 뿐만 아니라, 데이터를 추출하거나(take), 데이터를 넣을때(put), synchronized, wait(), notifyAll() 등의 스레드 동기화 코드들을 넣어주어, Thread-Safe 하게 구현해야 합니다. 하지만, 이와 같은 동기화 코드들은 고전적이고, 위험성들이 따릅니다. 따라서 이를 추상화 한 것이 BlockingQueue 입니다.

반응형