MENU

Java言語における並列処理の基礎と実装技法

現代のコンピュータシステムにおいて、複数の処理を同時に実行する「並列処理」の重要性は増すばかりである。本章では、Java言語における並列処理の基本的な概念について解説する。

並列処理とシーケンシャル処理の違い

並列処理とは、複数の処理を同時並行的に実行する手法である。一方、シーケンシャル処理(逐次処理)とは、一つの処理を完了させてから次の処理に移る手法である。

シーケンシャル処理を用いたプログラムの例を以下に記す。

public class SequentialExample {
    public static void main(String[] args) {
        // タスク1の実行
        System.out.println("タスク1を実行中...");
        for (int i = 0; i < 5; i++) {
            System.out.println("タスク1: " + i);
            try {
                Thread.sleep(100); // 処理の遅延をシミュレート
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        // タスク2の実行
        System.out.println("タスク2を実行中...");
        for (int i = 0; i < 5; i++) {
            System.out.println("タスク2: " + i);
            try {
                Thread.sleep(100); // 処理の遅延をシミュレート
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

このプログラムでは、タスク1が完全に終了した後にタスク2が開始される。このため、全体の実行時間はタスク1とタスク2の合計時間となる。Thread.sleep()メソッドは実際の処理の遅延をシミュレートするために使用しているが、実務では入出力操作やネットワーク通信などの時間がかかる処理を表現している場合が多い。

対して、並列処理を用いたプログラムの例を以下に記す。

public class ParallelExample {
    public static void main(String[] args) {
        // タスク1を実行するスレッド
        Thread thread1 = new Thread(() -> {
            System.out.println("タスク1を実行中...");
            for (int i = 0; i < 5; i++) {
                System.out.println("タスク1: " + i);
                try {
                    Thread.sleep(100); // 処理の遅延をシミュレート
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        
        // タスク2を実行するスレッド
        Thread thread2 = new Thread(() -> {
            System.out.println("タスク2を実行中...");
            for (int i = 0; i < 5; i++) {
                System.out.println("タスク2: " + i);
                try {
                    Thread.sleep(100); // 処理の遅延をシミュレート
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        
        // 両方のスレッドを開始
        thread1.start();
        thread2.start();
        
        // メインスレッドは両方のスレッドの終了を待機
        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("すべての処理が完了しました");
    }
}

このプログラムでは、タスク1とタスク2が別々のスレッドで実行されるため、同時並行的に処理が進行する。出力結果を見ると、タスク1とタスク2の出力が交互に表示されることがわかる。Lambda式(() -> {})はJava 8から導入された機能で、匿名関数を簡潔に記述できる。また、join()メソッドは対象スレッドの終了を待機するメソッドである。

並列処理の最大のメリットは、適切に実装された場合に処理時間の短縮が期待できる点である。上記の例では、理想的な環境下では処理時間が約半分になると予想される。

マルチスレッドの仕組み

Javaにおけるマルチスレッドは、JVM(Java Virtual Machine)によって管理されるスレッドモデルに基づいている。スレッドとは、プロセス内で実行される処理の最小単位である。

JVMはOSのネイティブスレッドを利用してJavaのスレッドを実現している。以下にスレッドのライフサイクルを記す。

public class ThreadLifecycleDemo {
    private static final Object lock = new Object();
    
    public static void main(String[] args) throws InterruptedException {
        // NEW状態: スレッドが作成された状態
        Thread thread = new Thread(() -> {
            try {
                // RUNNABLE状態: 実行中または実行可能な状態
                System.out.println("スレッドが実行中です");
                
                // TIMED_WAITING状態: 指定時間の経過を待っている状態
                Thread.sleep(1000);
                
                // 再びRUNNABLE状態
                System.out.println("スレッドが再開しました");
                
                // BLOCKED状態: モニタロックの獲得を待機
                synchronized(lock) {
                    System.out.println("スレッドがロックを獲得しました");
                }
                
                // WAITING状態: 無期限に待機
                synchronized(lock) {
                    lock.wait();
                }
                
            } catch (InterruptedException e) {
                // スレッドが中断された場合
                System.out.println("スレッドが中断されました");
                return;
            }
            // タスク完了後はTERMINATED状態になる
        });
        
        System.out.println("スレッド状態: " + thread.getState()); // NEW
        
        thread.start(); // スレッドをRUNNABLE状態に変更
        System.out.println("スレッド状態: " + thread.getState()); // RUNNABLE
        
        Thread.sleep(500);
        System.out.println("スレッド状態: " + thread.getState()); // TIMED_WAITING
        
        // BLOCKEDとWAITING状態を示すためのヘルパースレッド
        Thread blockerThread = new Thread(() -> {
            synchronized(lock) {
                try {
                    Thread.sleep(2000); // ロックを2秒間保持
                    lock.notify(); // WAITINGスレッドを起こす
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        
        // メインスレッドが先のスレッドのTIMED_WAITING終了を待機
        Thread.sleep(1000);
        
        // BLOCKED状態を作るためにロックを保持したままにする
        blockerThread.start();
        Thread.sleep(100); // 先のスレッドがsynchronizedブロックに到達する時間
        
        System.out.println("スレッド状態: " + thread.getState()); // BLOCKED
        
        // blockerThreadがロックを解放するのを待つ
        Thread.sleep(2500);
        
        System.out.println("スレッド状態: " + thread.getState()); // WAITING
        
        // threadを中断して終了させる
        thread.interrupt();
        thread.join(); // スレッドの終了を待機
        
        System.out.println("スレッド状態: " + thread.getState()); // TERMINATED
    }
}

このプログラムではスレッドの状態遷移を示している。スレッドの状態は、NEW(新規作成)、RUNNABLE(実行可能)、BLOCKED(ブロック)、WAITING(待機)、TIMED_WAITING(タイムアウト付き待機)、TERMINATED(終了)の6種類がある。getState()メソッドを使用することで現在のスレッド状態を取得できる。BLOCKEDはモニタロックの獲得を待っている状態、WAITINGはwait()メソッドなどで無期限に待機している状態である。

Javaのスレッド実行はプリエンプティブ(先取り)方式を採用しており、OSのスケジューラによってスレッドの実行時間が割り当てられる。このため、スレッドの実行順序や切り替えのタイミングは、プログラマが完全に制御することはできない点に注意が必要である。

並列処理のメリットとデメリット

並列処理には多くのメリットが存在するが、同時に注意すべきデメリットも存在する。

メリット

  1. 処理速度の向上 – CPU使用率を最大化し、複数のタスクを同時に実行することで全体的な処理時間を短縮できる。特にI/O待ちの多いアプリケーションで効果的である。
public class PerformanceComparison {
    public static void main(String[] args) {
        long startTime, endTime;
        
        // シーケンシャル処理の時間計測
        startTime = System.currentTimeMillis();
        runSequential();
        endTime = System.currentTimeMillis();
        System.out.println("シーケンシャル処理時間: " + (endTime - startTime) + "ms");
        
        // 並列処理の時間計測
        startTime = System.currentTimeMillis();
        runParallel();
        endTime = System.currentTimeMillis();
        System.out.println("並列処理時間: " + (endTime - startTime) + "ms");
    }
    
    private static void runSequential() {
        // 重い計算処理をシミュレート
        for (int i = 0; i < 10; i++) {
            heavyComputation();
        }
    }
    
    private static void runParallel() {
        Thread[] threads = new Thread[10];
        
        // 10個のスレッドを作成
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(PerformanceComparison::heavyComputation);
            threads[i].start();
        }
        
        // すべてのスレッドの終了を待機
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    private static void heavyComputation() {
        // CPUバウンドな処理をシミュレート
        double result = 0;
        for (int i = 0; i < 10000000; i++) {
            result += Math.sin(i) * Math.cos(i);
        }
    }
}

このプログラムでは、シーケンシャル処理と並列処理の実行時間を比較している。マルチコアCPUの環境では、並列処理による大幅な速度向上が観察できる。メソッド参照(PerformanceComparison::heavyComputation)はJava 8から導入された機能で、メソッドを関数オブジェクトとして扱うことができる。

  1. 応答性の向上 – ユーザーインターフェイスを担当するスレッドと処理を行うスレッドを分離することで、アプリケーションの応答性を保つことができる。
  2. リソース効率の最大化 – 現代のマルチコアプロセッサを効率的に活用できる。

デメリット

  1. 複雑さの増加 – 並列プログラミングは本質的に複雑である。デバッグや理解が難しく、予測不能な動作を引き起こす可能性がある。
  2. 同期問題 – 複数のスレッドが共有リソースにアクセスする場合、競合状態などの問題が発生する可能性がある。
public class SynchronizationIssueDemo {
    private static int counter = 0;
    
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                counter++; // 競合状態が発生する可能性のある操作
            }
        });
        
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 10000; i++) {
                counter++; // 競合状態が発生する可能性のある操作
            }
        });
        
        t1.start();
        t2.start();
        
        t1.join();
        t2.join();
        
        System.out.println("期待される結果: 20000");
        System.out.println("実際の結果: " + counter);
    }
}

このプログラムでは、counter++という操作が不可分でないため、競合状態が発生する。実行結果は、20000よりも小さい値となる場合が多い。これは両方のスレッドが同時に同じ値を読み取り、インクリメントして書き戻すため、一部のインクリメント操作が失われることによる。

  1. オーバーヘッド – スレッドの作成や管理にはコストがかかる。また、コンテキストスイッチ(スレッドの切り替え)もパフォーマンスに影響を与える。
  2. デッドロック – 複数のスレッドが互いに待ち合う状態に陥り、プログラムが進行不能になる場合がある。

並列処理を効果的に活用するためには、これらのメリットとデメリットをよく理解し、適切な設計と実装を行うことが重要である。次章では、Javaでの並列処理の具体的な実装方法について解説する。

目次

Javaでの並列処理の実装方法

前章で解説した並列処理の基本概念を踏まえ、本章ではJavaにおける並列処理の具体的な実装方法について解説する。Javaでは、スレッドを操作するための多様なAPIが提供されており、開発者は目的に応じて適切な方法を選択することが可能である。

Threadクラスとその使い方

Javaにおける並列処理の基本は、java.lang.Threadクラスである。このクラスを利用することで、新しいスレッドを作成し、処理を並列に実行することができる。

以下に、Threadクラスを直接継承して実装する例を記す。

public class MyThread extends Thread {
    private final String name;
    
    public MyThread(String name) {
        this.name = name;
    }
    
    @Override
    public void run() {
        // スレッドで実行される処理を定義
        for (int i = 0; i < 5; i++) {
            System.out.println(name + "の処理: " + i);
            try {
                // Thread.sleepは現在のスレッドを指定ミリ秒間休止させる
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // スレッドが中断された場合の処理
                System.out.println(name + "が中断されました");
                return; // スレッドを終了
            }
        }
        System.out.println(name + "の処理が完了しました");
    }
    
    public static void main(String[] args) {
        // 複数のスレッドを生成
        MyThread thread1 = new MyThread("スレッド1");
        MyThread thread2 = new MyThread("スレッド2");
        
        // スレッドの開始
        thread1.start(); // start()メソッドを呼び出すことでスレッドが開始される
        thread2.start();
        
        // メインスレッドでの処理
        System.out.println("メインスレッドの処理を継続します");
        
        try {
            // join()メソッドを使用して、スレッドの終了を待機
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("すべてのスレッドが終了しました");
    }
}

このコードでは、Threadクラスを継承してrun()メソッドをオーバーライドすることで、スレッドで実行する処理を定義している。Threadクラスを継承する方法は直感的であるが、Javaではクラスの多重継承が許可されていないため、すでに他のクラスを継承している場合はこの方法が使用できない制約がある。

Threadクラスには多くの便利なメソッドが用意されている。以下に、よく使用される主要なメソッドを記す。

public class ThreadMethodsDemo {
    public static void main(String[] args) throws InterruptedException {
        // スレッドの生成
        Thread thread = new Thread(() -> {
            try {
                System.out.println("スレッドが開始されました");
                
                // スレッドの名前を取得
                System.out.println("スレッド名: " + Thread.currentThread().getName());
                
                // スレッドの優先度を取得
                System.out.println("スレッドの優先度: " + Thread.currentThread().getPriority());
                
                // スレッドが実行中かどうかをチェック
                System.out.println("スレッドは実行中か: " + Thread.currentThread().isAlive());
                
                Thread.sleep(2000);
                
                System.out.println("スレッドの処理が完了しました");
            } catch (InterruptedException e) {
                System.out.println("スレッドが中断されました");
            }
        });
        
        // スレッド名の設定
        thread.setName("カスタムスレッド");
        
        // スレッドの優先度を設定(1〜10、数値が大きいほど優先度が高い)
        thread.setPriority(Thread.MAX_PRIORITY); // MAX_PRIORITYは10
        
        // スレッドをデーモンスレッドとして設定
        // デーモンスレッドはメインスレッドが終了すると自動的に終了される
        thread.setDaemon(true);
        
        System.out.println("スレッドの状態(開始前): " + thread.getState()); // NEW
        
        // スレッドの開始
        thread.start();
        
        System.out.println("スレッドの状態(開始後): " + thread.getState()); // RUNNABLE
        
        // スレッドの中断(スレッドのInterruptedフラグをセット)
        // thread.interrupt();
        
        // 現在実行中のスレッドを指定ミリ秒間休止させる
        Thread.sleep(1000);
        
        System.out.println("スレッドの状態(sleep中): " + thread.getState()); // TIMED_WAITING
        
        // スレッドの終了を待機
        thread.join(3000); // 最大3秒間待機
        
        System.out.println("スレッドの状態(終了後): " + thread.getState()); // TERMINATED
    }
}

このコードでは、Threadクラスの主要なメソッドを紹介している。setPriority()メソッドでスレッドの優先度を設定できるが、優先度はOSのスケジューラに対する「ヒント」に過ぎず、必ずしも厳密に守られるとは限らない。また、setDaemon()メソッドでデーモンスレッドに設定すると、そのスレッドはアプリケーションのメインスレッドが終了した際に強制終了される。バックグラウンド処理など、明示的な終了処理が不要なケースで有用である。

Runnableインターフェース

Threadクラスを継承する方法の代わりに、Runnableインターフェースを実装する方法がある。これにより、Javaの単一継承の制約を回避することができる。

以下に、Runnableインターフェースを実装する基本的な例を記す。

public class RunnableExample implements Runnable {
    private final String name;
    
    public RunnableExample(String name) {
        this.name = name;
    }
    
    @Override
    public void run() {
        // Runnableインターフェースのrun()メソッドを実装
        for (int i = 0; i < 5; i++) {
            System.out.println(name + "の処理: " + i);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                System.out.println(name + "が中断されました");
                return;
            }
        }
        System.out.println(name + "の処理が完了しました");
    }
    
    public static void main(String[] args) {
        // Runnableオブジェクトの生成
        Runnable runnable1 = new RunnableExample("タスク1");
        Runnable runnable2 = new RunnableExample("タスク2");
        
        // Runnableオブジェクトをスレッドに渡して実行
        Thread thread1 = new Thread(runnable1);
        Thread thread2 = new Thread(runnable2);
        
        // スレッドの開始
        thread1.start();
        thread2.start();
        
        // 匿名クラスを使用した方法
        Thread thread3 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("匿名クラスによるタスクの処理");
            }
        });
        thread3.start();
        
        // Java 8以降のラムダ式を使用した方法(最も簡潔)
        Thread thread4 = new Thread(() -> {
            System.out.println("ラムダ式によるタスクの処理");
        });
        thread4.start();
        
        // すべてのスレッドの終了を待機
        try {
            thread1.join();
            thread2.join();
            thread3.join();
            thread4.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("すべてのスレッドが終了しました");
    }
}

このコード例では、Runnableインターフェースを実装するクラスを作成し、そのインスタンスをThreadコンストラクタに渡している。また、匿名クラスやラムダ式を用いた簡潔な記述方法も紹介している。Java 8以降では、ラムダ式を使用することで非常に簡潔にスレッド処理を記述できるようになった。

Runnableインターフェースを使用する方法は、オブジェクト指向設計の原則に則っており、処理ロジックとスレッド管理を分離することができる。また、同じRunnableオブジェクトを複数のスレッドで再利用することも可能である。

以下に、スレッド間でデータを共有する例を記す。

public class SharedResourceExample {
    // 複数スレッド間で共有されるカウンター
    private static class Counter {
        private int count = 0;
        
        // 同期化されていないメソッド
        public void increment() {
            count++;
        }
        
        // 同期化されたメソッド
        public synchronized void synchronizedIncrement() {
            count++;
        }
        
        public int getCount() {
            return count;
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        // 共有リソースのインスタンスを作成
        Counter unsafeCounter = new Counter();
        Counter safeCounter = new Counter();
        
        // 複数のスレッドが同じカウンターにアクセスする
        Runnable incrementTask = () -> {
            for (int i = 0; i < 10000; i++) {
                unsafeCounter.increment(); // 同期化されていない
                safeCounter.synchronizedIncrement(); // 同期化されている
            }
        };
        
        // 10個のスレッドを作成
        Thread[] threads = new Thread[10];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(incrementTask);
            threads[i].start();
        }
        
        // すべてのスレッドの終了を待機
        for (Thread thread : threads) {
            thread.join();
        }
        
        // 結果の表示
        System.out.println("同期化なしカウンター: " + unsafeCounter.getCount());
        System.out.println("同期化ありカウンター: " + safeCounter.getCount());
    }
}

このコードでは、複数のスレッドが共有リソース(Counter)にアクセスする例を示している。同期化されていないincrement()メソッドを使用した場合、複数のスレッドが同時にアクセスすることで「レースコンディション」が発生し、期待通りの結果(10 × 10000 = 100000)にならない場合がある。一方、synchronizedキーワードを使用したsynchronizedIncrement()メソッドでは、一度に1つのスレッドしかメソッドを実行できないため、正確な結果が得られる。

なお、Java 5以降では、java.util.concurrent.atomicパッケージのAtomicIntegerなどのクラスを使用することで、より効率的な同期処理が可能となっている。

ExecutorServiceの基本

Threadクラスを直接使用する方法は理解しやすいが、大量のスレッドを管理する場合や複雑なタスク実行パターンが必要な場合には、より高度なフレームワークであるExecutorServiceを使用することが推奨される。

ExecutorServiceはJava 5から導入されたjava.util.concurrentパッケージの一部であり、スレッドプールの管理やタスクの実行を効率的に行うための仕組みを提供している。

以下に、ExecutorServiceの基本的な使用例を記す。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceExample {
    public static void main(String[] args) {
        // 固定サイズのスレッドプールを作成(3スレッド)
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        try {
            // Runnableタスクをsubmitメソッドで実行
            for (int i = 0; i < 5; i++) {
                final int taskId = i;
                executor.submit(() -> {
                    String threadName = Thread.currentThread().getName();
                    System.out.println("タスク" + taskId + "が" + threadName + "で実行開始");
                    
                    try {
                        // 処理時間をシミュレート
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    
                    System.out.println("タスク" + taskId + "が" + threadName + "で実行完了");
                });
            }
            
            // Callableタスク(戻り値を返す処理)の実行
            Future<String> future = executor.submit(() -> {
                System.out.println("計算タスクを実行します");
                // 計算処理をシミュレート
                Thread.sleep(2000);
                return "計算結果: " + (100 * 100);
            });
            
            // Futureから結果を取得(ブロッキング呼び出し)
            try {
                String result = future.get();
                System.out.println("取得した結果: " + result);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } finally {
            // ExecutorServiceのシャットダウン
            // 新しいタスクの受付を停止
            executor.shutdown();
            
            try {
                // すべてのタスクの完了を最大5秒間待機
                if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                    // タイムアウトした場合、残りのタスクを強制終了
                    executor.shutdownNow();
                }
            } catch (InterruptedException e) {
                // 現在のスレッドが中断された場合、残りのタスクを強制終了
                executor.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("メインスレッドの処理を終了します");
    }
}

このコードでは、ExecutorServiceを使用してスレッドプールを作成し、複数のタスクを効率的に実行している。Executors.newFixedThreadPool()は指定した数のスレッドを持つスレッドプールを作成する。ここでは3つのスレッドを持つプールを作成しているため、同時に最大3つのタスクが並列に実行される。それ以上のタスクは、実行中のタスクが完了するまで待機キューに入れられる。

ExecutorServiceでは、submitメソッドにRunnableやCallableを渡してタスクを実行できる。Callableインターフェースは、Runnableと似ているが、値を返却することができ、例外をスローできる点が異なる。submitメソッドは、Futureオブジェクトを返し、これを使用してタスクの実行状態の監視や結果の取得ができる。

ExecutorServiceは必ず明示的にシャットダウンする必要がある。shutdown()メソッドは新しいタスクの受け付けを停止するが、すでにキューに入っているタスクは実行される。shutdownNow()メソッドは、実行中のタスクも含めて即座に終了させる試みを行う。

ExecutorServiceの主な種類として、以下のものがある。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ExecutorTypesExample {
    public static void main(String[] args) {
        // 1. 固定サイズのスレッドプール
        ExecutorService fixedPool = Executors.newFixedThreadPool(4);
        System.out.println("固定サイズのスレッドプール: 最大4スレッド");
        
        // 2. キャッシュされたスレッドプール(必要に応じてスレッド数を増加させ、アイドル状態のスレッドは60秒後に自動的に破棄する)
        ExecutorService cachedPool = Executors.newCachedThreadPool();
        System.out.println("キャッシュされたスレッドプール: 必要に応じてスレッド数を増加させ、アイドル状態のスレッドは一定時間後に自動的に破棄");
        
        // 3. 単一スレッドのExecutor(1つのスレッドでタスクを順次実行)
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        System.out.println("単一スレッドExecutor: 1つのスレッドでタスクを順次実行");
        
        // 4. スケジュール機能付きのExecutor
        ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
        System.out.println("スケジュール機能付きExecutor: 定期的なタスク実行が可能");
        
        // スケジュールされたタスクの例
        scheduledPool.schedule(() -> {
            System.out.println("3秒後に実行されるタスク");
        }, 3, TimeUnit.SECONDS);
        
        scheduledPool.scheduleAtFixedRate(() -> {
            System.out.println("1秒後に開始し、その後2秒ごとに実行されるタスク");
        }, 1, 2, TimeUnit.SECONDS);
        
        // 各ExecutorServiceのシャットダウン
        try {
            Thread.sleep(10000); // デモのため10秒間実行
            
            System.out.println("\nすべてのExecutorServiceをシャットダウンします");
            fixedPool.shutdown();
            cachedPool.shutdown();
            singleThreadExecutor.shutdown();
            scheduledPool.shutdown();
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

このコードでは、Executorsファクトリクラスを使用して作成できる主なExecutorServiceの種類を示している。固定サイズのスレッドプールは、指定した数のスレッドを常に保持するため、リソース使用量が安定する。キャッシュされたスレッドプールは、必要に応じてスレッド数を増減させるため、短時間のタスクが多数ある場合に効率的である。単一スレッドのExecutorは、タスクを順次実行するため、処理順序が重要な場合に有用である。スケジュール機能付きのExecutorは、タスクを指定した時間後に実行したり、定期的に実行したりできる。

ExecutorServiceを利用することで、スレッドの生成・管理・再利用が効率的に行われ、アプリケーションのパフォーマンスとリソース使用効率が向上する。また、直接Threadを操作する場合に比べて、より高レベルの抽象化が提供されるため、並列処理の実装がシンプルになる。

実践的な並列処理テクニック

前章で解説したJavaの基本的な並列処理実装に続き、本章ではより実践的かつ高度な並列処理テクニックについて解説する。Java 8以降で導入された機能を活用することで、より簡潔かつ効率的な並列処理が実現可能となった。これらのテクニックを習得することで、現代のマルチコアプロセッサの性能を最大限に引き出すアプリケーション開発が可能となる。

並列ストリーム処理

Java 8で導入されたStream APIは、コレクションに対する操作を宣言的に記述できる強力な機能である。このStream APIは並列処理にも対応しており、parallelStream()メソッドまたはparallel()メソッドを使用することで、逐次処理を並列処理に簡単に変換することができる。

以下に、並列ストリーム処理の基本的な例を記す。

import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;

public class ParallelStreamExample {
    public static void main(String[] args) {
        // 大きな配列を用意
        int[] numbers = new int[10_000_000];
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = i;
        }
        
        // JITコンパイラのウォームアップ
        for (int i = 0; i < 5; i++) {
            Arrays.stream(numbers)
                  .filter(n -> n % 2 == 0)
                  .map(n -> n * 2)
                  .sum();
                  
            Arrays.stream(numbers)
                  .parallel()
                  .filter(n -> n % 2 == 0)
                  .map(n -> n * 2)
                  .sum();
        }
        
        // シーケンシャルストリームでの処理時間測定
        long startTime = System.currentTimeMillis();
        
        int sequentialSum = Arrays.stream(numbers)
                                 .filter(n -> n % 2 == 0) // 偶数のみをフィルタリング
                                 .map(n -> n * 2)         // 各要素を2倍
                                 .sum();                  // 合計を計算
        
        long sequentialTime = System.currentTimeMillis() - startTime;
        System.out.println("シーケンシャル処理時間: " + sequentialTime + "ms");
        System.out.println("シーケンシャル結果: " + sequentialSum);
        
        // 並列ストリームでの処理時間測定
        startTime = System.currentTimeMillis();
        
        int parallelSum = Arrays.stream(numbers)
                               .parallel()              // ストリームを並列化
                               .filter(n -> n % 2 == 0) // 偶数のみをフィルタリング
                               .map(n -> n * 2)         // 各要素を2倍
                               .sum();                  // 合計を計算
        
        long parallelTime = System.currentTimeMillis() - startTime;
        System.out.println("並列処理時間: " + parallelTime + "ms");
        System.out.println("並列結果: " + parallelSum);
        System.out.println("高速化率: " + (double)sequentialTime / parallelTime + "倍");
        
        // コレクションから直接parallelStreamを取得する例
        List<String> words = Arrays.asList("Java", "Parallel", "Stream", "Processing", "Example");
        
        words.parallelStream()
             .map(String::toUpperCase)
             .forEach(s -> System.out.print(s + " "));
        
        System.out.println("\n注意: 上記の出力順序は実行ごとに異なる可能性があります");
    }
}

このコードでは、1000万個の要素を持つ配列に対して、シーケンシャルストリームと並列ストリームの両方で同じ処理(偶数のフィルタリング、2倍にする変換、合計の計算)を行い、処理時間を比較している。マルチコアプロセッサを搭載した環境では、並列ストリームを使用することで処理時間が大幅に短縮される場合が多い。

並列ストリームの特徴として、並列処理による処理順序の非決定性がある。このため、処理順序に依存する操作(forEachなど)では、実行するたびに異なる結果が得られる可能性がある。また、parallelStream()メソッドを使用すると、内部的にはJavaのFork/Joinフレームワークが利用され、デフォルトのForkJoinPoolが処理を実行する。

並列ストリームは全ての状況で速度向上をもたらすわけではない点に注意が必要である。以下に、並列ストリームの適切な使用と注意点の例を記す。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

public class ParallelStreamBestPractices {
    public static void main(String[] args) {
        // 大量のデータを生成
        List<Integer> numbers = new ArrayList<>();
        for (int i = 0; i < 1_000_000; i++) {
            numbers.add(i);
        }
        
        // 注意点1: 副作用のある操作は並列ストリームで問題を引き起こす可能性がある
        List<Integer> results1 = new ArrayList<>();
        
        // 問題のあるコード - 外部のArrayListに並列に追加
        numbers.parallelStream()
               .filter(n -> n % 10 == 0)
               .forEach(results1::add); // 危険: ArrayListは並列アクセスに安全ではない
        
        System.out.println("結果1のサイズ(期待値: 100,000): " + results1.size());
        
        // より良い方法1 - スレッドセーフなコレクションを使用
        List<Integer> results2 = new CopyOnWriteArrayList<>();
        
        numbers.parallelStream()
               .filter(n -> n % 10 == 0)
               .forEach(results2::add); // CopyOnWriteArrayListは並列アクセスに安全
        
        System.out.println("結果2のサイズ(期待値: 100,000): " + results2.size());
        
        // より良い方法2 - 並列ストリームの結果を直接収集
        List<Integer> results3 = numbers.parallelStream()
                                        .filter(n -> n % 10 == 0)
                                        .collect(Collectors.toList());
        
        System.out.println("結果3のサイズ(期待値: 100,000): " + results3.size());
        
        // 注意点2: 小さなデータサイズでは並列処理のオーバーヘッドが大きい
        List<Integer> smallList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            smallList.add(i);
        }
        
        long startTime = System.currentTimeMillis();
        int sum1 = smallList.stream().mapToInt(i -> i * 2).sum();
        long sequentialTime = System.currentTimeMillis() - startTime;
        
        startTime = System.currentTimeMillis();
        int sum2 = smallList.parallelStream().mapToInt(i -> i * 2).sum();
        long parallelTime = System.currentTimeMillis() - startTime;
        
        System.out.println("小さなリストでの処理:");
        System.out.println("シーケンシャル時間: " + sequentialTime + "ms");
        System.out.println("並列処理時間: " + parallelTime + "ms");
    }
}

このコードでは、並列ストリームを使用する際の重要な注意点を示している。まず、標準的なArrayListなどの非スレッドセーフなコレクションに並列ストリームから直接要素を追加すると、競合状態が発生し、予期しない結果が生じる可能性がある。スレッドセーフなCopyOnWriteArrayListを使用するか、collect()メソッドを使用して結果を収集することが推奨される。また、小さなデータセットでは、並列処理のオーバーヘッド(スレッドの作成・管理・調整コスト)が処理自体よりも大きくなり、かえってパフォーマンスが低下する場合がある。

並列ストリームが最も効果を発揮するのは、以下の条件を満たす場合である。

  1. データサイズが十分に大きい(目安として数万件以上)
  2. 各要素の処理が比較的重い計算を伴う
  3. 処理が独立しており副作用がない(ステートレスな処理)
  4. 処理対象のデータ構造が分割しやすい(ArrayListやArrayなど)

フォーク・ジョインフレームワーク

Java 7から導入されたフォーク・ジョインフレームワークは、「分割統治法」の原理に基づいた並列処理フレームワークである。大きなタスクを小さなサブタスクに再帰的に分割し(フォーク)、その結果を集約する(ジョイン)方法で効率的な並列処理を実現する。

以下に、フォーク・ジョインフレームワークを使用した例を記す。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinExample {
    // RecursiveTaskを継承し、整数配列の合計を計算するタスク
    static class SumTask extends RecursiveTask<Long> {
        private final int[] array;
        private final int start;
        private final int end;
        private static final int THRESHOLD = 10000; // 分割の閾値
        
        public SumTask(int[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }
        
        @Override
        protected Long compute() {
            int length = end - start;
            
            // 処理するデータサイズが閾値以下なら直接計算
            if (length <= THRESHOLD) {
                return computeDirectly();
            }
            
            // データを分割して子タスクを作成
            int middle = start + length / 2;
            
            SumTask leftTask = new SumTask(array, start, middle);
            SumTask rightTask = new SumTask(array, middle, end);
            
            // 左側のタスクを別スレッドで実行(フォーク)
            leftTask.fork();
            
            // 右側のタスクは現在のスレッドで実行
            Long rightResult = rightTask.compute();
            
            // 左側のタスクの結果を待機して取得(ジョイン)
            Long leftResult = leftTask.join();
            
            // 結果を結合して返す
            return leftResult + rightResult;
        }
        
        private long computeDirectly() {
            long sum = 0;
            for (int i = start; i < end; i++) {
                // 計算を少し重くするためにコストのかかる操作を追加
                sum += array[i] * array[i];
            }
            return sum;
        }
    }
    
    public static void main(String[] args) {
        // テスト用の大きな配列を生成
        int[] numbers = new int[100_000_000];
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = i % 100; // 0〜99の数値を繰り返し
        }
        
        long startTime, endTime;
        
        // シーケンシャル処理での計算
        startTime = System.currentTimeMillis();
        long sum1 = 0;
        for (int i = 0; i < numbers.length; i++) {
            sum1 += numbers[i] * numbers[i];
        }
        endTime = System.currentTimeMillis();
        System.out.println("シーケンシャル処理時間: " + (endTime - startTime) + "ms");
        System.out.println("シーケンシャル結果: " + sum1);
        
        // ForkJoinタスクの作成
        SumTask task = new SumTask(numbers, 0, numbers.length);
        
        // ForkJoinPoolの作成
        ForkJoinPool pool = new ForkJoinPool();
        
        // タスクの実行と時間計測
        startTime = System.currentTimeMillis();
        long sum2 = pool.invoke(task);
        endTime = System.currentTimeMillis();
        long forkJoinTime = endTime - startTime;

        System.out.println("ForkJoin処理時間: " + forkJoinTime + "ms");
        System.out.println("ForkJoin結果: " + sum2);
        System.out.println("高速化率: " + (double)(endTime - startTime) / forkJoinTime + "倍");
    }
}

このコードでは、1億個の要素を持つ配列の各要素を二乗した合計値を計算するタスクを、フォーク・ジョインフレームワークを使用して実装している。SumTaskクラスはRecursiveTaskクラスを継承し、compute()メソッドをオーバーライドして具体的な処理を定義している。

処理の流れは以下のとおりである。

  1. タスクが受け取ったデータサイズが閾値より大きい場合、データを半分に分割する
  2. 分割した前半部分を別スレッドで実行するためにfork()メソッドを呼び出す
  3. 後半部分は現在のスレッドで再帰的にcompute()メソッドを呼び出す
  4. 前半部分の結果をjoin()メソッドで待機して取得する
  5. 前半と後半の結果を合算して返す

フォーク・ジョインフレームワークの特徴として、ワーク・スティーリング(work-stealing)アルゴリズムがある。これは、キューが空になったスレッドが他のスレッドのキューからタスクを「盗む」仕組みで、スレッドの負荷を自動的に分散させる効果がある。

フォーク・ジョインフレームワークを効果的に利用するためのいくつかのガイドラインを以下に記す。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;

public class ForkJoinBestPractices {
   // RecursiveActionの例(結果を返さない)
   static class MultiplyArrayTask extends RecursiveAction {
       private final double[] array;
       private final int start;
       private final int end;
       private final double multiplier;
       private static final int THRESHOLD = 10000;
       
       public MultiplyArrayTask(double[] array, int start, int end, double multiplier) {
           this.array = array;
           this.start = start;
           this.end = end;
           this.multiplier = multiplier;
       }
       
       @Override
       protected void compute() {
           int length = end - start;
           
           if (length <= THRESHOLD) {
               // 直接計算
               for (int i = start; i < end; i++) {
                   array[i] *= multiplier;
               }
               return;
           }
           
           // ベストプラクティス1: バランスの良い分割
           int middle = start + length / 2;
           
           // ベストプラクティス2: 左右のタスクを作成
           MultiplyArrayTask leftTask = new MultiplyArrayTask(array, start, middle, multiplier);
           MultiplyArrayTask rightTask = new MultiplyArrayTask(array, middle, end, multiplier);
           
           // ベストプラクティス3: 一方のタスクだけをフォークし、他方は直接実行
           leftTask.fork();
           rightTask.compute(); // 現在のスレッドで直接実行
           leftTask.join();
       }
   }
   
   // グローバルな共通プールを使用する例
   public static void main(String[] args) {
       double[] array = new double[1_000_000];
       for (int i = 0; i < array.length; i++) {
           array[i] = i;
       }
       
       // ベストプラクティス4: 共通プール(CommonPool)の利用
       ForkJoinPool commonPool = ForkJoinPool.commonPool();
       System.out.println("利用可能なプロセッサ数: " + Runtime.getRuntime().availableProcessors());
       System.out.println("共通プールの並列処理レベル: " + commonPool.getParallelism());
       
       long startTime = System.currentTimeMillis();
       
       // タスクを作成して実行
       MultiplyArrayTask task = new MultiplyArrayTask(array, 0, array.length, 2.0);
       commonPool.invoke(task);
       
       long endTime = System.currentTimeMillis();
       System.out.println("処理時間: " + (endTime - startTime) + "ms");
       
       // 結果の検証
       System.out.println("最初の10要素:");
       for (int i = 0; i < 10; i++) {
           System.out.print(array[i] + " ");
       }
       
       // ベストプラクティス5: プールのシャットダウンは不要
       // 共通プールはJVMによって管理されるためシャットダウンしない
       
       System.out.println("\n\nカスタムプールの例:");
       
       // ベストプラクティス6: 必要に応じてカスタムプールを作成
       try (ForkJoinPool customPool = new ForkJoinPool(4)) { // 並列度を4に設定
           System.out.println("カスタムプールの並列処理レベル: " + customPool.getParallelism());
           
           double[] array2 = new double[1_000_000];
           for (int i = 0; i < array2.length; i++) {
               array2[i] = i;
           }
           
           startTime = System.currentTimeMillis();
           
           MultiplyArrayTask task2 = new MultiplyArrayTask(array2, 0, array2.length, 3.0);
           customPool.invoke(task2);
           
           endTime = System.currentTimeMillis();
           System.out.println("処理時間: " + (endTime - startTime) + "ms");
           
           // カスタムプールはtry-with-resourcesで自動的にシャットダウンされる
       }
   }
}

このコードでは、フォーク・ジョインフレームワークの効果的な使用方法をいくつか示している。主なベストプラクティスとして:

  1. バランスの良いデータ分割 – タスクを等分割することで、処理負荷を均等に分散させる
  2. 適切な閾値の設定 – 閾値が小さすぎるとスレッド生成のオーバーヘッドが大きくなり、大きすぎると並列化の恩恵が得られない
  3. 効率的なフォーク方法 – 子タスクの一方だけをフォークし、もう一方は現在のスレッドで実行することで、効率的にスレッドを利用する
  4. 共通プールの活用 – 特別な理由がない限り、ForkJoinPool.commonPool()を使用すると効率的
  5. try-with-resources構文の利用 – カスタムプールを使用する場合、自動的にクローズされるようにする

また、Java 8以降では、parallelStream()の内部実装としてもこのフォーク・ジョインフレームワークが使用されている。そのため、並列ストリーム処理がより身近なものとなっている。

効率的なスレッドプールの設計

複数のスレッドを効率的に管理するためのスレッドプールの設計は、並列処理アプリケーションのパフォーマンスに大きな影響を与える。適切なスレッドプールの設計により、システムリソースを最大限に活用しながら、オーバーヘッドを最小限に抑えることが可能となる。

以下に、効率的なスレッドプール設計の基本例を記す。

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolDesignExample {
    public static void main(String[] args) throws InterruptedException {
        // 基本的なスレッドプールファクトリ
        System.out.println("=== 基本的なスレッドプールの比較 ===");
        
        // 1. 固定サイズのスレッドプール
        int corePoolSize = Runtime.getRuntime().availableProcessors(); // CPUコア数に基づいたスレッド数
        ExecutorService fixedPool = Executors.newFixedThreadPool(corePoolSize);
        System.out.println("固定サイズのスレッドプール: " + corePoolSize + "スレッド");
        
        // 2. キャッシュされたスレッドプール
        ExecutorService cachedPool = Executors.newCachedThreadPool();
        System.out.println("キャッシュされたスレッドプール: 必要に応じて拡張");
        
        // 3. スケジュールされたスレッドプール
        ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(corePoolSize);
        System.out.println("スケジュールされたスレッドプール: " + corePoolSize + "スレッド(定期実行タスク用)");
        
        // 4. 単一スレッドのエグゼキュータ
        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
        System.out.println("単一スレッドプール: 1スレッド(順次実行が必要な場合)");
        
        // 5. ワークスティーリングプール(Java 8以降)
        ExecutorService workStealingPool = Executors.newWorkStealingPool();
        System.out.println("ワークスティーリングプール: ForkJoinPoolベース、利用可能なプロセッサ数に基づく");
        
        // 各プールを適切にシャットダウン
        fixedPool.shutdown();
        cachedPool.shutdown();
        scheduledPool.shutdown();
        singleThreadPool.shutdown();
        workStealingPool.shutdown();
        
        System.out.println("\n=== カスタムスレッドプールの設計 ===");
        
        // カスタムスレッドファクトリ
        ThreadFactory customThreadFactory = new ThreadFactory() {
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String namePrefix = "カスタム作業スレッド-";
            
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
                
                // スレッドの優先度を設定(1:最低〜10:最高)
                t.setPriority(Thread.NORM_PRIORITY); // 標準優先度 = 5
                
                // デーモンスレッドとして設定(バックグラウンドスレッド)
                t.setDaemon(true);
                
                return t;
            }
        };
        
        // カスタム拒否ハンドラ
        RejectedExecutionHandler customRejectedHandler = new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("タスクが拒否されました: キューが満杯かシャットダウン中です");
                // 代替処理: 呼び出し元のスレッドで実行(CallerRunsPolicy相当)
                if (!executor.isShutdown()) {
                    r.run();
                }
            }
        };
        
        // カスタムスレッドプールの作成
        ThreadPoolExecutor customPool = new ThreadPoolExecutor(
            corePoolSize,                  // コアスレッド数(常に維持されるスレッド数)
            corePoolSize * 2,              // 最大スレッド数(ピーク時に使用可能な最大数)
            60, TimeUnit.SECONDS,          // 余剰スレッドの生存時間
            new ArrayBlockingQueue<>(100), // 作業キュー(待機タスクを保持)
            customThreadFactory,           // スレッド生成に使用するファクトリ
            customRejectedHandler          // キューが満杯の場合のポリシー
        );
        
        // 予熱: コアスレッドをあらかじめ起動
        customPool.prestartAllCoreThreads();
        System.out.println("プリスタートされたコアスレッド数: " + customPool.getPoolSize());
        
        // モニタリングと調整
        ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
        monitor.scheduleAtFixedRate(() -> {
            System.out.println("=== スレッドプールステータス ===");
            System.out.println("プールサイズ: " + customPool.getPoolSize());
            System.out.println("アクティブスレッド: " + customPool.getActiveCount());
            System.out.println("完了タスク数: " + customPool.getCompletedTaskCount());
            System.out.println("キューサイズ: " + customPool.getQueue().size());
        }, 1, 1, TimeUnit.SECONDS);
        
        // テストタスクを送信
        for (int i = 0; i < 200; i++) {
            final int taskId = i;
            customPool.submit(() -> {
                try {
                    System.out.println("タスク " + taskId + " 実行中... by " + 
                                      Thread.currentThread().getName());
                    Thread.sleep((long)(Math.random() * 1000)); // ランダムな作業時間
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return "タスク " + taskId + " 完了";
            });
        }
        
        // 3秒待機してモニタリングを終了
        Thread.sleep(3000);
        monitor.shutdown();
        
        // シャットダウンを開始し、既存タスクを完了させる
        customPool.shutdown();
        
        try {
            // 最大10秒待機
            if (!customPool.awaitTermination(10, TimeUnit.SECONDS)) {
                // タイムアウトした場合、強制終了
                System.out.println("強制終了を実行...");
                customPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            customPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("カスタムプールは終了しました: " + customPool.isTerminated());
    }
}

このコードでは、様々な種類の標準スレッドプールと、細かくカスタマイズされたスレッドプールの例を示している。効果的なスレッドプール設計のポイントとして、以下の要素が挙げられる:

  1. 適切なスレッド数の決定 – 一般的にはCPUコア数に基づいた決定が効果的である。I/Oバウンドな処理にはコア数よりも多く、CPUバウンドな処理にはコア数に近いスレッド数が推奨される。
  2. キュー戦略の選択 – 作業キューの種類と容量は、アプリケーションの負荷パターンに大きく影響する。無限キュー(LinkedBlockingQueue)はメモリ使用量の増大を招く可能性がある一方、有限キュー(ArrayBlockingQueue)はタスクの拒否を引き起こす可能性がある。
  3. カスタムスレッドファクトリの利用 – スレッドの命名や優先度、デーモン設定などをカスタマイズできる。これにより、デバッグや監視が容易になる。
  4. 拒否ポリシーの定義 – キューが満杯になった場合の挙動を定義する。標準のポリシーとして、CallerRunsPolicy(呼び出し元スレッドでの実行)、AbortPolicy(例外のスロー)、DiscardPolicy(タスクの破棄)、DiscardOldestPolicy(最も古いタスクの破棄)がある。
  5. スレッドプールの監視と調整 – プールのサイズや活動状況を定期的に監視し、必要に応じて動的に調整することが重要である。

効率的なスレッドプール設計の具体的なシナリオ別の例を以下に記す。

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ThreadPoolScenariosExample {
    public static void main(String[] args) {
        // シナリオ1: CPU集約型タスク用のスレッドプール
        int cpuCores = Runtime.getRuntime().availableProcessors();
        ExecutorService cpuBoundPool = new ThreadPoolExecutor(
            cpuCores,               // コアスレッド数 = CPUコア数
            cpuCores,               // 最大スレッド数 = CPUコア数(オーバーサブスクリプションを避ける)
            0, TimeUnit.SECONDS,    // アイドルスレッドはすぐに破棄
            new LinkedBlockingQueue<>(), // 作業キュー
            new ThreadFactory() {
                private final AtomicInteger counter = new AtomicInteger(1);
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "cpu-worker-" + counter.getAndIncrement());
                    t.setPriority(Thread.MAX_PRIORITY); // 計算タスクに高い優先度
                    return t;
                }
            }
        );
        
        System.out.println("CPU集約型プール: " + cpuCores + " スレッド");
        
        // シナリオ2: I/O集約型タスク用のスレッドプール
        int ioThreads = cpuCores * 8; // I/Oタスクでは通常、CPUコア数の倍数が効果的
        ExecutorService ioBoundPool = new ThreadPoolExecutor(
            cpuCores,               // コアスレッド数 = CPUコア数
            ioThreads,              // 最大スレッド数 = CPUコア数 × 8
            60, TimeUnit.SECONDS,   // アイドルスレッドは1分後に破棄
            new SynchronousQueue<>(), // 同期キュー(直接ハンドオフ)
            new ThreadFactory() {
                private final AtomicInteger counter = new AtomicInteger(1);
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "io-worker-" + counter.getAndIncrement());
                    t.setPriority(Thread.NORM_PRIORITY); // 標準優先度
                    return t;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy() // 負荷調整のため呼び出し元スレッドでの実行
        );
        
        System.out.println("I/O集約型プール: 最大 " + ioThreads + " スレッド");
        
        // シナリオ3: バッチ処理用のスレッドプール(制限付き並列度)
        int batchParallelism = cpuCores - 1; // 1つのコアをシステム用に予約
        ExecutorService batchPool = new ThreadPoolExecutor(
            batchParallelism,       // コアスレッド数
            batchParallelism,       // 最大スレッド数
            0, TimeUnit.SECONDS,    // アイドルスレッドはすぐに破棄
            new ArrayBlockingQueue<>(1000), // 有限キュー
            new ThreadFactory() {
                private final AtomicInteger counter = new AtomicInteger(1);
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "batch-worker-" + counter.getAndIncrement());
                    t.setPriority(Thread.MIN_PRIORITY); // バックグラウンド処理には低い優先度
                    return t;
                }
            }
        );
        
        System.out.println("バッチ処理プール: " + batchParallelism + " スレッド");
        
        // シナリオ4: スケジュールされたメンテナンスタスク用のプール
        ScheduledExecutorService maintenancePool = Executors.newScheduledThreadPool(
            2, // 通常、少数のスレッドで十分
            r -> {
                Thread t = new Thread(r, "maintenance-worker");
                t.setDaemon(true); // デーモンスレッドとして設定
                return t;
            }
        );
        
        System.out.println("メンテナンスプール: 2 スレッド");
        
        // シナリオ5: 応答性が重要なUIタスク用のプール
        ExecutorService uiPool = new ThreadPoolExecutor(
            1, 1, // 単一スレッド(順序保証)
            0, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100), // 有限キュー
            r -> {
                Thread t = new Thread(r, "ui-worker");
                t.setPriority(Thread.NORM_PRIORITY + 1); // UIの応答性向上のため少し高い優先度
                return t;
            },
            new ThreadPoolExecutor.DiscardPolicy() // 古いUIタスクは破棄(最新状態のみ重要)
        );
        
        System.out.println("UIプール: 単一スレッド(順序保証)");
        
        // すべてのプールを適切にシャットダウン
        cpuBoundPool.shutdown();
        ioBoundPool.shutdown();
        batchPool.shutdown();
        maintenancePool.shutdown();
        uiPool.shutdown();
    }
}

このコードでは、異なるタイプのタスクに最適化された5つのスレッドプール設計例を表している。

  1. CPU集約型タスク – CPUコア数と同じスレッド数で、コンテキストスイッチのオーバーヘッドを最小限に抑える
  2. I/O集約型タスク – CPUコア数の倍数のスレッド数で、I/O待ち時間中に他のタスクを処理
  3. バッチ処理 – 限られたリソースを使用し、システムへの影響を最小限に抑える
  4. メンテナンスタスク – 定期的な少数のタスクを実行するための最小限のリソース
  5. UIタスク – 順序保証と応答性を重視した設計

スレッドプールの設計において重要なのは、アプリケーションの特性に合わせて最適化することである。単一の設計が全てのケースに適合するわけではなく、タスクの性質(CPU/IO集約型)、予想される負荷パターン、リソース制約などを考慮して適切に設計する必要がある。

以上。

よかったらシェアしてね!
  • URLをコピーしました!
目次