Synchronized로 검색한 결과 :: 시소커뮤니티[SSISO Community]
 
SSISO 카페 SSISO Source SSISO 구직 SSISO 쇼핑몰 SSISO 맛집
추천검색어 : JUnit   Log4j   ajax   spring   struts   struts-config.xml   Synchronized   책정보   Ajax 마스터하기   우측부분

회원가입 I 비밀번호 찾기


SSISO Community검색
SSISO Community메뉴
[카페목록보기]
[블로그등록하기]  
[블로그리스트]  
SSISO Community카페
블로그 카테고리
정치 경제
문화 칼럼
비디오게임 스포츠
핫이슈 TV
포토 온라인게임
PC게임 에뮬게임
라이프 사람들
유머 만화애니
방송 1
1 1
1 1
1 1
1 1
1

Synchronized로 검색한 결과
등록일:2018-09-10 20:41:36
작성자:
제목:자바 간단한 쓰레드풀 구현(Java Simple ThreadPool)


ThreadPool(이하 "쓰레드풀")의 개념은 학부생 시절에도 배웁니다. 하지만 개념으로만 파악하고 있기엔 뜬구름 잡는 느낌이 없지 않아 있어서 이번 포스팅은 실제로 ThreadPool을 구현해보려고 합니다. 정말 간단한 ThreadPool을 구현하는 것이니 참고만 해주셨으면 합니다. :3

1. ThreadPool

쓰레드풀의 설명은 Wikipedia - Thread pool pattern에서 참조하시기 바랍니다. 해당 링크의 패턴을 참조해서 쓰레드풀을 구현해 보려고 합니다.

2. Structure

이번에 만들고자 하는 쓰레드풀의 클래스들은 다음과 같습니다.

  • ThreadPool

    이 클래스는 추후에 기술될 ThreadPoolQueue, ThreadPoolRunnable를 가지는 쓰레드풀 메인 객체입니다. 사용자들은 이 클래스를 이용해서 쓰레드풀을 생성 할 수 있습니다.

  • ThreadPoolQueue

    이 클래스는 쓰레드풀 내의 쓰레드에서 공유할 Task 큐입니다. 처리할 Task를 Runnable 형태로 삽입하고 꺼낼 수 있습니다.

  • ThreadPoolRunnable

    쓰레드풀에서 실제로 Task를 처리할 Runnable 클래스입니다. ThreadPoolQueue에서 Task를 꺼낸 후 처리합니다.

3. Source Code

이제부터 코드를 하나씩 만들어 봅시다.

3.1. ThreadPoolQueue

우선적으로 Task를 삽입하고 꺼낼 수 있는 ThreadPoolQueue 클래스부터 만들어봅시다. 코드는 다음과 같습니다.

import java.util.LinkedList;

public class ThreadPoolQueue {

	/* item을 저장할 큐 선언 */
	private LinkedList<Object> queue = new LinkedList<Object>();

	/* 큐 최대 사이즈 선언 */
	private int MAX_QUEUE_SIZE = 5;

	/* 디버그를 위한 콘솔 출력 변수 */
	private boolean DEBUG = false;

	public ThreadPoolQueue(int MAX_QUEUE_SIZE) {
		this.MAX_QUEUE_SIZE = MAX_QUEUE_SIZE;
	}

	/* 아이템 삽입 */
	public Synchronized void enqueue(Object item) throws InterruptedException {
		// 현재 큐가 최대 사이즈면 멈춤
		while( queue.size() == MAX_QUEUE_SIZE ){
			console("enqueue waiting...");
			wait();
		}

		// 현재 큐가 비어있으면 일어나라!! 일해!!
		if( queue.size() == 0 ){
			console("enqueue notifyall...");
			notifyAll();
		}

		console("enqueue adding...");
		queue.add(item);
	}

	/* 아이템 반환 */
	public Synchronized Object dequeue() throws InterruptedException {
		// 반환할 아이템이 없으면 멈춤
		while( queue.size() == 0 ){
			console("dequeue waiting...");
			wait();
		}

		// 반환할 아이템이 가득찼다? 일어나서 일해!!
		if( queue.size() == MAX_QUEUE_SIZE ){
			console("dequeue notifyall...");
			notifyAll();
		}

		console("dequeue removing...");
		return queue.remove(0);
	}

	// 디버깅 설정
	public void toggleDebug(boolean flag){
		this.DEBUG = flag;
	}

	public void console(String msg){
		if( DEBUG )
			System.out.println(msg);
	}
}

이 클래스는 Task를 받아서 삽입하고 꺼내주는 용도의 큐로써 작성했습니다. 큐에 Task가 없으면 .wait() 메소드를 통해 잠시 멈춰 있고 반대로 큐에 Task가 가득 차 있으면 .notifyAll() 메소드를 호출합니다.

3.2. ThreadPoolRunnable

이번엔 쓰레드풀에서 실제 Task 처리를 위한 쓰레드를 생성하는 클래스입니다.

public class ThreadPoolRunnable implements Runnable {

	private int id; // Runnable ID
	private ThreadPoolQueue queue;
	private volatile boolean running = true;
	private boolean DEBUG = false;

	// Runnable 초기화
	public ThreadPoolRunnable(int THREAD_ID, ThreadPoolQueue queue) {
		this.id = THREAD_ID;
		this.queue = queue;
		console("ThreadPoolRunnable["+id+"] is created.");
	}

	// Runnable 실행
	@Override
	public void run() {
		while( running ){
			try{
				Thread.sleep(10);
				console("ThreadPoolRunnable["+id+"] is working.");
				Runnable r = (Runnable) queue.dequeue();
				r.run();
			}catch( InterruptedException e ){
				stop(); // 인터럽트 예외 발생시 해당 Runnable 정지
			}
		}
		console("ThreadPoolRunnable["+id+"] is dead.");
	}

	// Runnable 정지
	public void stop() {
		running = false;
		console("ThreadPoolRunnable["+id+"] is stopped.");
	}

	// 현재 Runnable 디버깅 설정
	public void toggleDebug(boolean flag){
		this.DEBUG = flag;
	}

	public void console(String msg){
		if( DEBUG )
			System.out.println(msg);
	}
}

단순하게 생각한다면 ThreadPoolQueue에서 Task를 꺼내고 그것을 처리하는 것이 전부입니다.

TIP volatile vs syncronized
syncronized 키워드는 블럭으로 이루어진 코드의 동작(operation)에 대한 원자성 보장이라면 volatile는 선언된 변수의 원자성을 보장합니다. 자바 버전 1.5 이상이어야 제대로 동작합니다.

3.3. ThreadPool

위의 두가지 클래스를 멤버 변수로 하는 쓰레드풀 객체입니다.

import java.util.ArrayList;
import java.util.List;

/* 쓰레드풀 클래스 */
public class ThreadPool {

	private ThreadPoolQueue queue;
	private List<ThreadPoolRunnable> runnableList = new ArrayList<ThreadPoolRunnable>();
	private volatile boolean running = true;

	/* 쓰레드풀 초기화 */
	public ThreadPool(int MAX_THREAD_NUM, int MAX_QUEUE_SIZE) {
		queue = new ThreadPoolQueue(MAX_QUEUE_SIZE);
		for(int i = 0 ; i < MAX_THREAD_NUM; i++){
			runnableList.add(new ThreadPoolRunnable(i, queue));
		}
		for( ThreadPoolRunnable r : runnableList ){
			new Thread(r).start();
		}
	}

	/* 쓰레드풀 실행 */
	public Synchronized void execute(Runnable item) throws Exception{
		if( !running ){
			throw new Exception("Thread Pool is not running.");
		}
		queue.enqueue(item);
	}

	/* 쓰레드풀 정지 */
	public Synchronized void stop() throws InterruptedException {
		running = false;
		for( ThreadPoolRunnable r : runnableList ){
			r.stop();
		}
	}

	/* Runnable console 디버깅 플래그 설정 */
	public void toggleDebugWithRunnable(boolean flag){
		for( ThreadPoolRunnable r : runnableList ){
			r.toggleDebug(flag);
		}
	}

	/* Queue console 디버깅 플래그 설정 */
	public void toggleDebugWithQueue(boolean flag){
		queue.toggleDebug(flag);
	}
}

4. Test

실제로 위에서 만든 쓰레드풀을 활용한 테스트 프로그램을 작성해봤습니다.

public class TestMain {

	public static void main(String[] args) {

		// 쓰레드가 5개이고 최대 처리할 아이템 개수는 10인 쓰레드풀 생성
		ThreadPool p = new ThreadPool(5, 10);

		p.toggleDebugWithQueue(true); // 아이템 큐 콘솔 디버깅 ON
		p.toggleDebugWithRunnable(true); // 쓰레드 콘솔 디버깅 ON

		// TEST CODE
		try {

			// 아이템 100개를 쓰레드풀에 삽입 및 실행
			for(int i = 0 ; i < 100; i++){
				Runnable r = new TestRunnable(i);
				p.execute(r);
			}

		} catch (Exception e) {
			e.printStackTrace();
		}

		// 아이템이 처리됐든 안됐든 일단 쓰레드풀 정지
		try {
			p.stop();
		} catch (InterruptedException e) {
			System.out.println("ThreadPool is stopped.");
		}

	}
}

// 테스트할 용도의 Runnable 클래스
class TestRunnable implements Runnable{
	int index;
	public TestRunnable(int i) {
		index = i;
	}
	@Override
	public void run() {
		System.out.println(index);
	}

}

프로그램을 실행시켜보면 작동하는것을 콘솔로 볼 수 있습니다. 정지 명령이 내려와도 하던 일은 마치고 죽는 아름다운 쓰레드들입니다. (--;)

...(생략)
enqueue adding...
enqueue adding...
ThreadPoolRunnable[0] is stopped.
ThreadPoolRunnable[1] is stopped.
ThreadPoolRunnable[2] is stopped.
ThreadPoolRunnable[3] is stopped.
ThreadPoolRunnable[4] is stopped.
ThreadPoolRunnable[1] is working.
dequeue notifyall...
dequeue removing...
90
ThreadPoolRunnable[1] is dead.
ThreadPoolRunnable[0] is working.
dequeue removing...
91
ThreadPoolRunnable[0] is dead.
ThreadPoolRunnable[3] is working.
dequeue removing...
92
ThreadPoolRunnable[2] is working.
dequeue removing...
ThreadPoolRunnable[4] is working.
dequeue removing...
94
ThreadPoolRunnable[4] is dead.
93
ThreadPoolRunnable[2] is dead.
ThreadPoolRunnable[3] is dead.

5. java.util.concurrent package

하지만 위처럼 직접 구현 방법 외에도 java.util.concurrent 패키지를 이용해 정말로 간단한 쓰레드풀을 구현 할 수 있습니다.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class TestMain {

	static int index = 0;

	public static void main(String[] args) {

		// 새로운 쓰레드 추가시 출력 문구를 위한 ThreadFactory
		ThreadFactory tf = new ThreadFactory() {
			@Override
			public Thread newThread(Runnable r) {
				Thread thread = new Thread(r);
				System.out.println("Runnable"+"["+ index++ +"]"+" is created.");
				return thread;
			}
		};

		// 쓰레드를 5개까지만 만드는 쓰레드풀 생성
		ExecutorService p = Executors.newFixedThreadPool(5, tf);

		// TEST CODE
		try {

			// 아이템 100개를 쓰레드풀에 삽입 및 실행
			for(int i = 0 ; i < 100; i++){
				Runnable r = new TestRunnable(i);
				p.execute(r);
			}

		} catch (Exception e) {
			e.printStackTrace();
		}

		p.shutdown();
	}
}

// 테스트할 용도의 Runnable 클래스
class TestRunnable implements Runnable{
	int index;
	public TestRunnable(int i) {
		index = i;
	}
	@Override
	public void run() {
		System.out.println(index);
	}
}

참 쉽죠? :)

부연 설명을 조금 넣자면 .newFixedThreadPool()로 반환 받는 객체는 ThreadPoolExecutor 클래스입니다. 이 클래스에서는 초기화 할 때에 LinkedBlockingQueue<Runnable>라는 내부 큐를 이용하게 되어 있습니다. 단지 외부에 노출을 안할 뿐이에요. :3