非同期処理の基本概念
プログラミングにおける処理の実行方法には、同期処理と非同期処理という2つの重要な概念が存在する。本章では非同期処理の基本的な考え方から、その必要性、メリット・デメリットまでを詳細に解説する。プログラムの実行効率やユーザー体験を向上させるために不可欠な知識である。
同期処理と非同期処理の違い
同期処理とは、プログラム内の処理が順番に一つずつ実行され、前の処理が完了するまで次の処理が開始されない実行方式である。一方、非同期処理とは、ある処理の完了を待たずに次の処理を開始できる実行方式を指す。
同期処理の例を以下に記す。
// 同期処理の例
public void synchronousExample() {
System.out.println("ステップ1: データの準備");
// データベースからのデータ読み込み(時間がかかる処理)
List<String> data = loadDataFromDatabase();
System.out.println("ステップ2: データの処理");
// 読み込んだデータの処理
List<String> processedData = processData(data);
System.out.println("ステップ3: 結果の表示");
// 処理結果の表示
displayResults(processedData);
}
この同期処理の例では、各ステップが完了するまで次のステップに進まない。データベースからの読み込みに10秒かかる場合、プログラム全体は少なくとも10秒以上の実行時間を要する。ユーザーはその間、アプリケーションの応答を待つ必要がある。
次に非同期処理の例を見てみよう。
// 非同期処理の例
public void asynchronousExample() {
System.out.println("ステップ1: データの準備開始");
// データベースからのデータ読み込みを非同期で実行
CompletableFuture.supplyAsync(() -> loadDataFromDatabase())
.thenAccept(data -> {
// データが準備できたら処理を行う
System.out.println("ステップ2: データの処理");
List<String> processedData = processData(data);
System.out.println("ステップ3: 結果の表示");
displayResults(processedData);
});
System.out.println("ステップ4: 他の処理を実行");
// データ読み込みを待たずに他の処理を実行
doOtherTasks();
}
この非同期処理では、データベースからの読み込みを待たずに「他の処理」に進むことができる。データの準備ができ次第、それに続く処理が自動的に実行される仕組みである。実際の出力は「ステップ1」「ステップ4」が先に表示され、その後データの準備が完了してから「ステップ2」「ステップ3」が表示される点に注目されたい。
非同期処理におけるプログラムの流れを理解するには、「イベント駆動型」の考え方が有効である。処理の完了をイベントとし、そのイベントが発生したときに次の処理が自動的に実行される仕組みと捉えるとよい。JavaのCompletableFutureは、このイベント駆動型プログラミングを実現するために強力である。
非同期処理が必要となる場面
非同期処理が特に効果を発揮する代表的な場面を以下に記す。
- 時間のかかる入出力処理
ファイルの読み書き、データベース操作、ネットワーク通信など、入出力処理は一般的にCPU処理に比べて非常に時間がかかる。これらの処理を非同期で行うことで、入出力待ちの間に他の処理を実行できる。
// ネットワーク通信を非同期で行う例
public void fetchDataFromServer() {
System.out.println("サーバーからデータ取得を開始");
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// ネットワーク通信(時間のかかる処理)
try {
URL url = new URL("https://api.example.com/data");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
// 通信処理(レスポンスを待つ)
BufferedReader reader = new BufferedReader(
new InputStreamReader(conn.getInputStream()));
StringBuilder response = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
response.append(line);
}
reader.close();
// 少し待機してネットワーク遅延をシミュレート
Thread.sleep(2000);
return response.toString();
} catch (Exception e) {
return "エラー: " + e.getMessage();
}
});
System.out.println("他の処理を実行中...");
// データ取得完了後の処理
future.thenAccept(data -> {
System.out.println("取得したデータを処理: " + data.substring(0, Math.min(20, data.length())) + "...");
});
}
このコードでは、サーバーからのデータ取得を非同期で行いながら、他の処理を並行して実行している。HttpURLConnectionは実際のアプリケーションでは非推奨であり、HttpClientなどの現代的なAPIを使用することが推奨される。ただし、概念を理解するための例としては適切である。
- ユーザーインターフェースの応答性確保
特にGUIアプリケーションでは、時間のかかる処理をメインスレッド(UIスレッド)で実行すると、ユーザーインターフェースが応答しなくなる問題が発生する。
// SwingのGUIアプリケーションでの非同期処理例
import javax.swing.*;
import java.awt.*;
import java.util.concurrent.CompletableFuture;
public class AsyncUiExample extends JFrame {
private JButton startButton;
private JProgressBar progressBar;
private JLabel statusLabel;
public AsyncUiExample() {
setTitle("非同期処理デモ");
setSize(400, 200);
setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
setLayout(new FlowLayout());
startButton = new JButton("処理開始");
progressBar = new JProgressBar(0, 100);
statusLabel = new JLabel("準備完了");
add(startButton);
add(progressBar);
add(statusLabel);
startButton.addActionListener(e -> startHeavyTask());
}
private void startHeavyTask() {
// UIの状態を更新
startButton.setEnabled(false);
statusLabel.setText("処理中...");
progressBar.setValue(0);
// 重い処理を非同期で実行
CompletableFuture.runAsync(() -> {
try {
// 重い処理をシミュレート
for (int i = 0; i <= 100; i += 10) {
Thread.sleep(500); // 0.5秒待機
final int progress = i;
// UIの更新はEDT(Event Dispatch Thread)で行う
SwingUtilities.invokeLater(() -> {
progressBar.setValue(progress);
});
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).thenRun(() -> {
// 処理完了後のUI更新(EDTで実行)
SwingUtilities.invokeLater(() -> {
startButton.setEnabled(true);
statusLabel.setText("処理完了");
});
});
}
}
このコードは、時間のかかる処理を非同期で実行しながら、UIの応答性を確保している例である。重要なポイントは、UIの更新はEDT(Event Dispatch Thread)で行う必要がある点だ。SwingUtilities.invokeLaterメソッドを使用してこれを実現している。
- 複数の独立した処理の並行実行
互いに依存関係のない複数の処理がある場合、これらを同時に実行することで全体の処理時間を短縮できる。
// 複数の独立した処理を非同期で並行実行する例
public void processMultipleTasks() {
long startTime = System.currentTimeMillis();
// タスク1の非同期実行
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("タスク1開始");
Thread.sleep(3000); // 3秒の処理
return "タスク1の結果";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "タスク1は中断されました";
}
});
// タスク2の非同期実行
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("タスク2開始");
Thread.sleep(2000); // 2秒の処理
return "タスク2の結果";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "タスク2は中断されました";
}
});
// 両方のタスクが完了するのを待機
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2);
// すべてのタスクが完了したら結果を処理
allTasks.thenRun(() -> {
try {
String result1 = task1.get();
String result2 = task2.get();
long endTime = System.currentTimeMillis();
System.out.println(result1);
System.out.println(result2);
System.out.println("全処理時間: " + (endTime - startTime) + "ms");
} catch (Exception e) {
System.out.println("結果の取得中にエラーが発生: " + e.getMessage());
}
});
}
この例では、2つの独立したタスクを同時に開始し、両方が完了するのを待ってから結果を処理している。処理時間は最も時間のかかるタスク(ここではタスク1の3秒)に依存するが、同期的に実行した場合の合計5秒よりも短縮される。
非同期処理は、これらの場面以外にも、長時間実行されるバッチ処理、イベント駆動型のシステム、リアルタイム通信を必要とするアプリケーションなど、多くの場面で活用されている。
非同期処理のメリットとデメリット
非同期処理は多くの利点をもたらすが、同時に新たな複雑さも導入する。ここでは、そのメリットとデメリットを詳細に検討する。
メリット
- 応答性の向上
非同期処理の最大の利点は、ユーザーインターフェースの応答性を維持できることである。長時間実行される処理をバックグラウンドで実行することで、ユーザーは処理の完了を待つことなくアプリケーションとの対話を続けられる。
// UIの応答性を改善する非同期処理の例
public void improveResponsiveness() {
JButton searchButton = new JButton("検索");
JTextField resultField = new JTextField(20);
searchButton.addActionListener(e -> {
// ボタンを無効化して処理中であることを示す
searchButton.setEnabled(false);
searchButton.setText("検索中...");
// 検索処理を非同期で実行
CompletableFuture.supplyAsync(() -> {
// 時間のかかる検索処理
try {
Thread.sleep(3000);
return "検索結果: 123件のデータが見つかりました";
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return "検索が中断されました";
}
}).thenAccept(result -> {
// UI更新はEDTで行う
SwingUtilities.invokeLater(() -> {
resultField.setText(result);
searchButton.setText("検索");
searchButton.setEnabled(true);
});
});
});
}
このコードでは、検索ボタンをクリックすると、検索処理が非同期で実行される。この間もUIは応答性を保ち、ユーザーは他の操作を行うことができる。処理が完了すると結果が表示され、ボタンが元の状態に戻る。
- スループットの向上
非同期処理により、CPUの待機時間を最小化し、複数の処理を並行して実行することでシステム全体のスループットを向上させることができる。
// 並行処理によるスループット向上の例
public void processLargeDataSet(List<String> dataSet) {
int processorCount = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(processorCount);
try {
long startTime = System.currentTimeMillis();
// データセットを分割して並行処理
int batchSize = dataSet.size() / processorCount;
List<CompletableFuture<Integer>> futures = new ArrayList<>();
for (int i = 0; i < processorCount; i++) {
int start = i * batchSize;
int end = (i == processorCount - 1) ? dataSet.size() : (i + 1) * batchSize;
List<String> batch = dataSet.subList(start, end);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return processBatch(batch);
}, executor);
futures.add(future);
}
// すべての処理が完了するのを待つ
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// 結果を集計
allFutures.thenRun(() -> {
try {
int totalProcessed = 0;
for (CompletableFuture<Integer> future : futures) {
totalProcessed += future.get();
}
long endTime = System.currentTimeMillis();
System.out.println("処理完了: " + totalProcessed + " 件");
System.out.println("処理時間: " + (endTime - startTime) + " ms");
} catch (Exception e) {
System.out.println("結果集計中にエラー: " + e.getMessage());
} finally {
executor.shutdown();
}
});
} catch (Exception e) {
executor.shutdown();
System.out.println("処理の開始中にエラー: " + e.getMessage());
}
}
private int processBatch(List<String> batch) {
// バッチ処理のロジック
int processed = 0;
for (String item : batch) {
// データ処理ロジック
processed++;
}
return processed;
}
このコードでは、大きなデータセットをCPUコア数に応じて分割し、各バッチを並行して処理している。この方法により、単一スレッドで処理する場合と比較して大幅な処理時間の短縮が期待できる。ExecutorServiceを使用して適切なスレッドプールを作成し、リソースの効率的な管理を行っている点に注目されたい。
- リソースの効率的な利用
非同期処理は、I/O待ちなどの遊休時間を有効活用できる。あるスレッドがI/O操作で待機している間、他のスレッドが別の処理を実行できる。
デメリット
- コードの複雑さ増加
非同期処理は、同期処理に比べてプログラムの流れが追いにくくなる。特に多数の非同期処理が相互に依存する場合、いわゆる「コールバック地獄」に陥る可能性がある。
// コールバック地獄の例(悪い例)
public void callbackHell() {
fetchUserData(userId -> {
if (userId != null) {
fetchUserProfile(userId, profile -> {
if (profile != null) {
fetchUserFriends(profile.getFriendIds(), friends -> {
if (friends != null) {
fetchFriendProfiles(friends, friendProfiles -> {
if (friendProfiles != null) {
// さらにネストが深くなる...
processData(friendProfiles, result -> {
displayResult(result);
});
} else {
handleError("友達のプロフィール取得エラー");
}
});
} else {
handleError("友達リスト取得エラー");
}
});
} else {
handleError("プロフィール取得エラー");
}
});
} else {
handleError("ユーザーID取得エラー");
}
});
}
// 改善例(CompletableFutureを使用)
public void improvedAsyncCode() {
CompletableFuture.supplyAsync(() -> fetchUserData())
.thenCompose(userId -> fetchUserProfile(userId))
.thenCompose(profile -> fetchUserFriends(profile.getFriendIds()))
.thenCompose(friends -> fetchFriendProfiles(friends))
.thenApply(friendProfiles -> processData(friendProfiles))
.thenAccept(result -> displayResult(result))
.exceptionally(error -> {
handleError("処理中にエラーが発生: " + error.getMessage());
return null;
});
}
このコード例では、「コールバック地獄」と呼ばれる深いネストが発生している悪い例と、CompletableFutureを使ってコードを平坦化した改善例を対比している。モダンなJavaでは、CompletableFutureを使用することで、非同期処理を連鎖させる際の可読性を大幅に向上させることができる。
- デバッグの難しさ
非同期処理のデバッグは同期処理に比べて複雑である。特に、実行順序が非決定的になるため、問題の再現が困難になる場合がある。
- エラー処理の複雑さ
非同期処理でのエラー処理は、通常のtry-catchブロックでは捕捉できないことがある。適切なエラーハンドリングを設計する必要がある。
// 非同期処理におけるエラー処理の例
public void asyncErrorHandling() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("処理中にエラーが発生しました");
}
return "正常に処理が完了しました";
});
// エラー処理を追加
future
.thenAccept(result -> System.out.println("結果: " + result))
.exceptionally(error -> {
System.err.println("エラーが捕捉されました: " + error.getMessage());
// エラーのログ記録
logError(error);
// 必要に応じてフォールバック処理
handleFallback();
return null;
});
// または、handle()メソッドを使用してより柔軟に対応
future.handle((result, error) -> {
if (error != null) {
System.err.println("エラー発生: " + error.getMessage());
return "エラー発生時のデフォルト値";
} else {
System.out.println("成功: " + result);
return result;
}
});
}
private void logError(Throwable error) {
// エラーログの記録処理
System.err.println("エラーログ: " + error.getMessage());
error.printStackTrace();
}
private void handleFallback() {
// フォールバック処理
System.out.println("フォールバック処理を実行");
}
このコードは、非同期処理中に発生する可能性のあるエラーを適切に処理する2つの方法を示している。exceptionallyメソッドはエラーのみを処理し、handleメソッドは成功結果とエラーの両方を処理できる。機能を活用することで、非同期処理においても堅牢なエラー処理が可能になる。
- リソース管理の難しさ
多数の非同期タスクを並行して実行すると、システムリソースの過剰消費や競合状態が発生するリスクがある。適切なスレッド管理とリソース制限が必要である。
// スレッドとリソースの管理例
public void resourceManagement() {
// 制限されたスレッドプールを作成
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maxPoolSize = corePoolSize * 2;
long keepAliveTime = 60L;
// スレッドプールの設定
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100), // キューのサイズを制限
new ThreadPoolExecutor.CallerRunsPolicy() // 拒否された場合の処理方針
);
try {
// タスクの実行
for (int i = 0; i < 1000; i++) {
final int taskId = i;
executor.submit(() -> {
try {
System.out.println("タスク " + taskId + " 実行中 (スレッド: " +
Thread.currentThread().getName() + ")");
// 処理ロジック
Thread.sleep(100);
return "タスク " + taskId + " 完了";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "タスク " + taskId + " 中断";
}
});
}
} finally {
// 適切なシャットダウン処理
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
このコードでは、ThreadPoolExecutorを使用して非同期タスクのリソース使用量を制御している。コアプール数、最大プール数、キューのサイズ、拒否ポリシーなどを適切に設定することで、システムの安定性を確保している。また、適切なシャットダウン処理を行うことで、リソースリークを防止している。
非同期処理は現代のプログラミングにおいて不可欠な技術であるが、その導入には適切な設計と注意深い実装が求められる。メリットとデメリットを十分に理解し、適切な場面で適切な方法で非同期処理を活用することが重要である。
Javaで実装する非同期処理の基礎
前章で非同期処理の概念を理解したところで、本章ではJavaにおける非同期処理の基礎的な実装方法について解説する。Javaでは古くからスレッドを用いた並行処理の仕組みが提供されており、これを基盤として様々な非同期処理の手法が構築されている。スレッドの基本的な使い方から始め、RunnableインターフェースやCallableインターフェースの活用法、そしてFutureを使った非同期処理の結果管理までを段階的に学ぶことで、実践的な非同期プログラミングの基礎を身につけることができる。
スレッドの基本とその利用方法
スレッドとは、プログラムの実行単位であり、一つのプロセス内で複数のスレッドが並行して動作することができる。Javaではjava.lang.Threadクラスを使用してスレッドを作成し、実行することができる。
最も基本的なスレッドの作成方法は、Threadクラスを継承して独自のスレッドクラスを定義する方法である。
public class MyThread extends Thread {
@Override
public void run() {
// スレッドで実行したい処理をここに記述
System.out.println("スレッドが実行中: " + Thread.currentThread().getName());
// 1から5までカウントアップする処理
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + ": " + i);
try {
// 500ミリ秒(0.5秒)スリープ
Thread.sleep(500);
} catch (InterruptedException e) {
// スレッドが中断された場合の処理
System.out.println("スレッドが中断されました");
// 中断フラグを再設定
Thread.currentThread().interrupt();
return;
}
}
System.out.println("スレッド終了: " + Thread.currentThread().getName());
}
}
// 使用例
public class ThreadExample {
public static void main(String[] args) {
System.out.println("メインスレッド開始");
// スレッドのインスタンスを作成
MyThread thread1 = new MyThread();
thread1.setName("MyThread-1");
// スレッドを開始
thread1.start();
// メインスレッドでも並行して処理を実行
for (int i = 1; i <= 3; i++) {
System.out.println("メインスレッド: " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("メインスレッドが中断されました");
Thread.currentThread().interrupt();
return;
}
}
System.out.println("メインスレッド終了");
}
}
このコードでは、Threadクラスを継承してrunメソッドをオーバーライドし、独自のスレッドクラスを定義している。スレッドを実行するにはstart()メソッドを呼び出すことが重要であり、直接run()メソッドを呼び出すと新しいスレッドが作成されずに同期的に処理が実行されてしまう点に注意が必要である。
また、Thread.sleep()メソッドはスレッドの実行を指定した時間だけ一時停止させる。これによりスレッドの動作を遅くすることができるが、本番環境では主にテスト目的や特定のタイミング制御のために使用される。
スレッドの優先度を変更するにはsetPriority()メソッドを使用する。優先度は1(最低)から10(最高)までの値で設定できるが、実際の動作はJava実行環境やオペレーティングシステムによって異なることに留意しなければならない。
public class PriorityExample {
public static void main(String[] args) {
Thread lowPriorityThread = new Thread(() -> {
for (int i = 1; i <= 5; i++) {
System.out.println("低優先度スレッド: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
Thread highPriorityThread = new Thread(() -> {
for (int i = 1; i <= 5; i++) {
System.out.println("高優先度スレッド: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
// 優先度を設定
lowPriorityThread.setPriority(Thread.MIN_PRIORITY); // 1
highPriorityThread.setPriority(Thread.MAX_PRIORITY); // 10
// スレッドを開始
lowPriorityThread.start();
highPriorityThread.start();
}
}
このコードでは、低優先度と高優先度の2つのスレッドを作成し実行している。Thread.MIN_PRIORITY(値は1)とThread.MAX_PRIORITY(値は10)の定数を使用して優先度を設定しているが、優先度の差が実行結果に明確に表れるとは限らない。スレッドスケジューリングはOSやJVMの実装に依存するため、優先度の設定は確実な実行順序を保証するものではないことを理解しておく必要がある。
スレッドの状態を監視し、必要に応じて待機や中断を行う方法も重要である。以下のコードでは、スレッドの結合(join)と中断(interrupt)の基本的な使い方を表している。
public class ThreadControlExample {
public static void main(String[] args) {
Thread workerThread = new Thread(() -> {
System.out.println("ワーカースレッド開始");
for (int i = 1; i <= 10; i++) {
System.out.println("作業中: " + i);
try {
Thread.sleep(1000);
// スレッドの中断フラグをチェック
if (Thread.currentThread().isInterrupted()) {
System.out.println("中断を検出");
break;
}
} catch (InterruptedException e) {
// sleep中に割り込まれた場合
System.out.println("sleep中に中断を検出");
// 中断フラグを再設定(InterruptedException発生時にフラグはクリアされる)
Thread.currentThread().interrupt();
break;
}
}
System.out.println("ワーカースレッド終了");
});
// スレッドを開始
workerThread.start();
try {
// メインスレッドを3秒間スリープ
Thread.sleep(3000);
// ワーカースレッドに中断シグナルを送信
System.out.println("ワーカースレッドへ中断シグナルを送信");
workerThread.interrupt();
// ワーカースレッドの終了を待機
System.out.println("ワーカースレッドの終了を待機中...");
workerThread.join();
System.out.println("ワーカースレッドの終了を確認");
} catch (InterruptedException e) {
System.out.println("メインスレッドが中断されました");
Thread.currentThread().interrupt();
}
System.out.println("メインスレッド終了");
}
}
このコードでは、interrupt()メソッドを使用してスレッドに中断シグナルを送信している。中断されたスレッドはisInterrupted()メソッドで中断フラグを確認できる。また、join()メソッドを使用することで、呼び出し元のスレッドは対象スレッドの終了を待機する。これによりスレッド間の実行順序を制御することができる。
スレッドの中断処理において、InterruptedExceptionが発生すると中断フラグは自動的にクリアされるため、Thread.currentThread().interrupt()を使って再設定するのが一般的な慣習である点も覚えておきたい。
Javaでスレッド安全性を確保するための基本的な方法の一つに、synchronized修飾子がある。この修飾子はメソッドまたはブロックに適用でき、同時に一つのスレッドだけがその部分を実行できるようにする。
public class SynchronizedExample {
private int counter = 0;
// synchronizedメソッド
public synchronized void increment() {
counter++;
}
// synchronizedブロック
public void incrementWithBlock() {
synchronized (this) {
counter++;
}
// この部分は同期されない
}
public int getCounter() {
return counter;
}
public static void main(String[] args) throws InterruptedException {
SynchronizedExample example = new SynchronizedExample();
// 複数のスレッドでカウンターをインクリメント
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
example.increment();
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 100000; i++) {
example.incrementWithBlock();
}
});
thread1.start();
thread2.start();
// 両方のスレッドの終了を待つ
thread1.join();
thread2.join();
// 最終的なカウンター値を表示
System.out.println("最終カウンター値: " + example.getCounter());
// 期待値: 200000
}
}
このコードでは、synchronizedメソッドとsynchronizedブロックの2つの方法を示している。両方とも同じオブジェクト(この場合はthis)に対するロックを取得する。synchronizedブロックの利点は、同期が必要な部分のみを指定できることである。これにより、ロックの保持時間を最小限に抑え、パフォーマンスの向上が期待できる。
Javaのスレッドは強力な機能だが、直接扱うには複雑さを伴うこともある。そこで、より高レベルの抽象化としてRunnableインターフェースが導入されている。次節ではこのRunnableインターフェースの活用法について解説する。
Runnableインターフェースの活用
Runnableインターフェースは、Javaにおける並行処理の最も基本的な構成要素の一つである。Threadクラスを継承する方法と比較して、Runnableインターフェースを実装する方法には多くの利点がある。Javaは単一継承の言語であるため、クラスが既に別のクラスを継承している場合でもRunnableインターフェースを実装することができる。
Runnableインターフェースは非常にシンプルで、実行可能なタスクを表現するためのrun()メソッド一つのみを持つ関数型インターフェースである。
@FunctionalInterface
public interface Runnable {
void run();
}
Runnableインターフェースを実装したクラスを作成する基本的な方法は以下のとおりである。
public class MyRunnable implements Runnable {
private final String name;
public MyRunnable(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(name + " タスク開始");
// 実行したい処理
for (int i = 1; i <= 5; i++) {
System.out.println(name + ": " + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println(name + " が中断されました");
Thread.currentThread().interrupt();
return;
}
}
System.out.println(name + " タスク終了");
}
}
// 使用例
public class RunnableExample {
public static void main(String[] args) {
System.out.println("Runnableインターフェース使用例");
// Runnableインスタンスを作成
Runnable task1 = new MyRunnable("タスク1");
Runnable task2 = new MyRunnable("タスク2");
// スレッドにRunnableを渡して実行
Thread thread1 = new Thread(task1);
Thread thread2 = new Thread(task2);
// スレッド開始
thread1.start();
thread2.start();
System.out.println("メインスレッドは続行");
}
}
このコードでは、Runnableインターフェースを実装したMyRunnableクラスを定義し、それをThreadコンストラクタに渡してスレッドを作成している。各タスクは名前を持ち、それぞれ独立したスレッドで実行される。
Java 8以降では、ラムダ式を使用してRunnableインターフェースの実装をより簡潔に記述することができる。Runnableは関数型インターフェースであるため、ラムダ式での表現が可能である。
public class LambdaRunnableExample {
public static void main(String[] args) {
// ラムダ式でRunnableを定義
Runnable task1 = () -> {
System.out.println("ラムダタスク1 開始");
for (int i = 1; i <= 5; i++) {
System.out.println("ラムダタスク1: " + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println("ラムダタスク1 中断");
Thread.currentThread().interrupt();
return;
}
}
System.out.println("ラムダタスク1 終了");
};
// さらに簡略化した記法
Runnable task2 = () -> System.out.println("シンプルなラムダタスク実行");
// スレッドで実行
new Thread(task1).start();
new Thread(task2).start();
// 匿名クラスとラムダ式の比較
// 従来の匿名クラス記法
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("匿名クラスで実装");
}
});
// ラムダ式を使用した記法
Thread thread2 = new Thread(() -> {
System.out.println("ラムダ式で実装");
});
thread1.start();
thread2.start();
}
}
このコードでは、ラムダ式を使用してRunnableを直接定義する方法を示している。また、従来の匿名クラスによる実装と比較して、ラムダ式を使用した場合のコードの簡潔さが分かる。単一のステートメントしか含まない場合は、括弧を省略することも可能である。
Runnableの実用的な使用例として、UIスレッドをブロックせずにバックグラウンドで時間のかかる処理を実行する例を見てみよう。
import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
public class BackgroundProcessExample extends JFrame {
private JButton startButton;
private JTextArea resultArea;
public BackgroundProcessExample() {
setTitle("バックグラウンド処理の例");
setSize(400, 300);
setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
// UIコンポーネントの初期化
startButton = new JButton("処理開始");
resultArea = new JTextArea();
resultArea.setEditable(false);
setLayout(new BorderLayout());
add(startButton, BorderLayout.NORTH);
add(new JScrollPane(resultArea), BorderLayout.CENTER);
// ボタンのイベントハンドラを設定
startButton.addActionListener(new ActionListener() {
@Override
public void actionPerformed(ActionEvent e) {
// バックグラウンド処理を開始
startBackgroundProcess();
}
});
}
private void startBackgroundProcess() {
// ボタンを無効化して多重実行を防止
startButton.setEnabled(false);
resultArea.append("バックグラウンド処理を開始します...\n");
// バックグラウンドスレッドでの処理
Runnable backgroundTask = () -> {
try {
// 時間のかかる処理をシミュレート
for (int i = 1; i <= 10; i++) {
final int progress = i;
// UIの更新はEDT(Event Dispatch Thread)で行う
SwingUtilities.invokeLater(() -> {
resultArea.append("処理中... " + progress * 10 + "%完了\n");
});
// 実際の重い処理の代わり
Thread.sleep(1000);
}
// 処理完了時のUI更新
SwingUtilities.invokeLater(() -> {
resultArea.append("バックグラウンド処理が完了しました!\n");
startButton.setEnabled(true);
});
} catch (InterruptedException e) {
SwingUtilities.invokeLater(() -> {
resultArea.append("処理が中断されました\n");
startButton.setEnabled(true);
});
Thread.currentThread().interrupt();
}
};
// 新しいスレッドでタスクを実行
new Thread(backgroundTask).start();
}
public static void main(String[] args) {
SwingUtilities.invokeLater(() -> {
new BackgroundProcessExample().setVisible(true);
});
}
}
このコードは、Swingアプリケーションにおいてバックグラウンドで時間のかかる処理を実行する例である。ユーザーインターフェースをブロックしないために、処理は別スレッドで実行される。重要なポイントは、UIコンポーネントの更新は常にEDT(Event Dispatch Thread)で行うためにSwingUtilities.invokeLater()を使用している点である。
さらに実用的な例として、複数のタスクを順次実行する例を見てみよう。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class SequentialTasksExample {
public static void main(String[] args) throws InterruptedException {
// 実行するタスクのリスト
List<Runnable> tasks = new ArrayList<>();
// データベース接続を初期化するタスク
tasks.add(() -> {
System.out.println("タスク1: データベース接続の初期化");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("タスク1: 完了");
});
// 設定を読み込むタスク
tasks.add(() -> {
System.out.println("タスク2: 設定の読み込み");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("タスク2: 完了");
});
// キャッシュを準備するタスク
tasks.add(() -> {
System.out.println("タスク3: キャッシュの準備");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("タスク3: 完了");
});
// タスクを順番に実行
executeSequentially(tasks);
// タスクを並列に実行(比較用)
executeInParallel(tasks);
}
// タスクを順次実行するメソッド
private static void executeSequentially(List<Runnable> tasks) {
System.out.println("=== 順次実行開始 ===");
long startTime = System.currentTimeMillis();
for (Runnable task : tasks) {
task.run(); // 注意: 新しいスレッドでは実行されない
}
long endTime = System.currentTimeMillis();
System.out.println("=== 順次実行終了 ===");
System.out.println("所要時間: " + (endTime - startTime) + "ms");
System.out.println();
}
// タスクを並列実行するメソッド(比較用)
private static void executeInParallel(List<Runnable> tasks) throws InterruptedException {
System.out.println("=== 並列実行開始 ===");
long startTime = System.currentTimeMillis();
// すべてのタスクが完了するのを待つために使用
CountDownLatch latch = new CountDownLatch(tasks.size());
for (Runnable task : tasks) {
Thread thread = new Thread(() -> {
try {
task.run();
} finally {
latch.countDown();
}
});
thread.start();
}
// すべてのタスクが完了するまで待機
latch.await();
long endTime = System.currentTimeMillis();
System.out.println("=== 並列実行終了 ===");
System.out.println("所要時間: " + (endTime - startTime) + "ms");
}
}
このコードでは、複数のタスク(Runnable)を順次実行する方法と並列実行する方法を対比している。注目すべき点は、task.run()と呼び出す場合は新しいスレッドでは実行されず、呼び出し元のスレッドで同期的に実行される点である。新しいスレッドで実行するには、Threadのインスタンスを作成し、start()メソッドを呼び出す必要がある。また、CountDownLatchを使用して複数のスレッドの完了を待機する方法も記述している。
Runnableインターフェースは非常に有用だが、値を返さないという制限がある。タスクから値を返す必要がある場合は、次節で解説するCallableインターフェースが適している。
Callableインターフェースと戻り値の取得
非同期処理において、処理結果を返すタスクを定義したい場合、Runnableインターフェースではその要求に応えることができない。Callableインターフェースは、この制限を解消するためにJava 5で導入された。Callableは戻り値を持つタスクを表現するためのインターフェースであり、例外をスローすることもできる。
Callableインターフェースの定義は以下のとおりである。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Callableインターフェースを実装したクラスを作成する基本的な方法を以下に示す。
import java.util.concurrent.Callable;
public class MyCallable implements Callable<Integer> {
private final int number;
public MyCallable(int number) {
this.number = number;
}
@Override
public Integer call() throws Exception {
System.out.println("計算タスク開始: " + number + "の二乗を計算");
// 計算時間をシミュレート
Thread.sleep(1000);
// 例外のテスト用
if (number < 0) {
throw new IllegalArgumentException("負の数は処理できません: " + number);
}
// 結果を計算して返す
int result = number * number;
System.out.println("計算タスク完了: " + number + "の二乗は" + result);
return result;
}
}
このコードでは、与えられた数値の二乗を計算して返すCallableタスクを定義している。タスクは計算に1秒かかることをシミュレートしており、負の数が入力された場合には例外をスローする。
Callableを使用するには、ExecutorServiceなどのサービスと組み合わせる必要がある。以下の例では、Callableタスクを実行し、結果を取得する基本的な方法を表している。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CallableExample {
public static void main(String[] args) {
// ExecutorServiceを作成
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
// Callableタスクを作成
Callable<Integer> task = new MyCallable(5);
System.out.println("タスクを送信");
// タスクを実行し、Future(将来の結果)を取得
Future<Integer> future = executor.submit(task);
System.out.println("タスクは送信されましたが、まだ完了していません");
System.out.println("タスクは完了しましたか? " + future.isDone());
try {
// 結果が利用可能になるまでブロック
Integer result = future.get();
System.out.println("タスクの結果: " + result);
} catch (InterruptedException e) {
System.out.println("タスク待機中に中断されました");
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.out.println("タスク実行中に例外が発生: " + e.getCause().getMessage());
}
System.out.println("タスクは完了しましたか? " + future.isDone());
} finally {
// ExecutorServiceをシャットダウン
executor.shutdown();
}
}
}
このコードでは、ExecutorServiceを使用してCallableタスクを実行し、Futureオブジェクトを通じて結果を取得している。Future.get()メソッドは、タスクが完了して結果が利用可能になるまでブロックする。また、isDone()メソッドを使用してタスクの完了状態を確認している。
より実践的な例として、複数のCallableタスクを並行して実行し、すべての結果を収集する方法を見てみよう。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class MultipleCallableExample {
public static void main(String[] args) {
// 固定サイズのスレッドプールを作成
ExecutorService executor = Executors.newFixedThreadPool(3);
try {
// Callableタスクのリストを作成
List<Callable<Integer>> taskList = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int taskId = i;
taskList.add(() -> {
System.out.println("タスク" + taskId + " 開始 (スレッド: " +
Thread.currentThread().getName() + ")");
// 異なる処理時間をシミュレート
Thread.sleep(1000 + (int)(Math.random() * 2000));
int result = taskId * 10;
System.out.println("タスク" + taskId + " 完了: 結果 = " + result);
return result;
});
}
System.out.println("複数のタスクを送信");
// すべてのタスクを実行し、Futureのリストを取得
List<Future<Integer>> futures = new ArrayList<>();
for (Callable<Integer> task : taskList) {
futures.add(executor.submit(task));
}
System.out.println("すべてのタスクを送信済み、結果を収集します");
// すべての結果を収集
List<Integer> results = new ArrayList<>();
for (Future<Integer> future : futures) {
try {
// 各タスクの結果を取得
Integer result = future.get();
results.add(result);
} catch (InterruptedException e) {
System.out.println("タスク待機中に中断されました");
Thread.currentThread().interrupt();
break;
} catch (ExecutionException e) {
System.out.println("タスク実行中に例外が発生: " + e.getCause().getMessage());
}
}
System.out.println("すべてのタスクが完了しました");
System.out.println("結果: " + results);
System.out.println("合計: " + results.stream().mapToInt(Integer::intValue).sum());
} finally {
// ExecutorServiceをシャットダウン
executor.shutdown();
}
}
}
このコードでは、5つのCallableタスクを作成し、3つのスレッドを持つプールで実行している。各タスクは異なる処理時間がかかり、タスクIDの10倍の値を結果として返す。すべてのタスクのFutureオブジェクトから結果を収集し、最終的に合計を計算している。
タイムアウトを設定して、指定した時間内にタスクが完了しない場合は処理を中断する方法も重要である。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class CallableWithTimeoutExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
// 長時間実行されるタスクを作成
Callable<String> longRunningTask = () -> {
System.out.println("長時間実行タスク開始");
// タスクの実行に5秒かかると仮定
for (int i = 1; i <= 5; i++) {
System.out.println("タスク実行中... " + i + "秒経過");
Thread.sleep(1000);
}
System.out.println("長時間実行タスク完了");
return "タスク完了:処理結果";
};
System.out.println("タスクを送信、3秒のタイムアウトを設定");
// タスクを実行
Future<String> future = executor.submit(longRunningTask);
try {
// タイムアウトを3秒に設定して結果を取得
String result = future.get(3, TimeUnit.SECONDS);
System.out.println("タスク結果: " + result);
} catch (InterruptedException e) {
System.out.println("タスク待機中に中断されました");
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.out.println("タスク実行中に例外が発生: " + e.getCause().getMessage());
} catch (TimeoutException e) {
System.out.println("タイムアウトが発生しました!");
// タスクをキャンセル(実行中の場合は中断を試みる)
boolean canceled = future.cancel(true);
System.out.println("タスクのキャンセル: " +
(canceled ? "成功" : "失敗"));
}
System.out.println("タスク状態: " +
(future.isCancelled() ? "キャンセル済み" :
future.isDone() ? "完了" : "実行中"));
} finally {
executor.shutdownNow(); // 実行中のタスクを中断してシャットダウン
}
}
}
このコードでは、タスクの実行に5秒かかることを想定しているが、future.get(3, TimeUnit.SECONDS)を使用して3秒のタイムアウトを設定している。タイムアウトが発生すると、TimeoutExceptionがスローされ、タスクはcancel(true)メソッドを使用してキャンセルされる。
また、Callableを使用した例外処理の方法を見てみよう。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class CallableExceptionExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
// 例外をスローするタスク
Callable<Integer> taskWithException = () -> {
System.out.println("例外をスローするタスク開始");
if (Math.random() < 0.8) { // 80%の確率で例外を発生
System.out.println("例外を発生させます");
throw new IllegalStateException("意図的に発生させた例外");
}
return 42; // 例外が発生しない場合の戻り値
};
// タスクを実行
Future<Integer> future = executor.submit(taskWithException);
try {
Integer result = future.get();
System.out.println("タスク結果: " + result);
} catch (InterruptedException e) {
System.out.println("タスク待機中に中断されました");
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// タスク内で発生した例外はExecutionExceptionにラップされる
System.out.println("タスク内で例外が発生: " + e.getCause().getClass().getName());
System.out.println("例外メッセージ: " + e.getCause().getMessage());
// 特定の例外タイプに応じた処理
Throwable cause = e.getCause();
if (cause instanceof IllegalStateException) {
System.out.println("IllegalStateExceptionを処理: 代替処理を実行");
// 代替処理...
} else if (cause instanceof IllegalArgumentException) {
System.out.println("IllegalArgumentExceptionを処理");
// 別の処理...
} else {
System.out.println("未知の例外タイプ");
}
}
} finally {
executor.shutdown();
}
}
}
このコードでは、タスク内で発生する例外がExecutionExceptionにラップされ、future.get()を呼び出したときにスローされることを示している。例外の実際の原因はgetCause()メソッドを使用して取得でき、例外の種類に応じた処理を行うことができる。
Callableインターフェースを使用することで、非同期処理から値を返すことができるようになるが、これらの結果を効率的に管理するためには、次節で解説するFutureをより詳細に理解する必要がある。
Futureを使った結果の管理
Futureインターフェースは、非同期計算の結果を表現するための重要な概念である。前節で紹介したCallableインターフェースと密接に関連しており、非同期タスクの完了状態の確認や結果の取得、タスクのキャンセルなどの操作を提供する。
Futureインターフェースの主なメソッドは以下のとおりである。
boolean cancel(boolean mayInterruptIfRunning)– タスクの実行をキャンセルしようと試みるboolean isCancelled()– タスクがキャンセルされたかどうかを確認するboolean isDone()– タスクが完了したかどうかを確認するV get()– タスクの結果を取得する(完了するまでブロック)V get(long timeout, TimeUnit unit)– 指定されたタイムアウト内でタスクの結果を取得する
Futureを使用した基本的な例を以下に記す。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class FutureBasicExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
// 結果を返すCallableタスク
Callable<String> task = () -> {
System.out.println("タスク実行開始");
// 処理時間をシミュレート
Thread.sleep(3000);
System.out.println("タスク実行完了");
return "処理結果";
};
System.out.println("タスクを送信");
// タスクを実行し、Future(将来の結果)を取得
Future<String> future = executor.submit(task);
System.out.println("タスクは非同期で実行中");
// タスクの状態を確認
while (!future.isDone()) {
System.out.println("待機中...");
Thread.sleep(500);
}
try {
// タスクの結果を取得
String result = future.get();
System.out.println("タスク結果: " + result);
} catch (ExecutionException e) {
System.out.println("タスク実行中に例外が発生: " + e.getCause().getMessage());
}
} catch (InterruptedException e) {
System.out.println("メインスレッドが中断されました");
Thread.currentThread().interrupt();
} finally {
executor.shutdown();
}
}
}
このコードでは、ExecutorServiceを使用してCallableタスクを送信し、結果をFutureオブジェクトとして取得している。isDone()メソッドを使用してタスクの完了状態を定期的に確認し、完了したらget()メソッドで結果を取得している。
Futureの重要な機能の一つはタスクのキャンセル機能である。長時間実行されるタスクが不要になった場合や、タイムアウトが発生した場合にこの機能が役立つ。
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class FutureCancellationExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
// キャンセル可能な長時間実行タスク
Callable<Integer> longTask = () -> {
System.out.println("長時間タスク開始");
try {
for (int i = 1; i <= 10; i++) {
System.out.println("処理中... " + i);
// 割り込みをチェック
if (Thread.currentThread().isInterrupted()) {
System.out.println("中断を検出!");
return -1; // 早期リターン
}
Thread.sleep(1000);
}
return 100; // 正常終了
} catch (InterruptedException e) {
System.out.println("スリープ中に中断されました");
// 中断フラグを再設定
Thread.currentThread().interrupt();
return -1; // 中断時の結果
}
};
System.out.println("タスクを送信");
Future<Integer> future = executor.submit(longTask);
// メインスレッドで他の作業
Thread.sleep(3500);
// ユーザーがキャンセルを要求したと仮定
if (!future.isDone()) {
System.out.println("タスクはまだ実行中です、キャンセルします");
// mayInterruptIfRunningをtrueに設定して中断を許可
boolean canceled = future.cancel(true);
System.out.println("キャンセル結果: " +
(canceled ? "成功" : "失敗"));
}
try {
// キャンセルされたタスクから結果を取得しようとする
Integer result = future.get();
System.out.println("タスク結果: " + result);
} catch (ExecutionException e) {
System.out.println("タスク実行中に例外が発生: " + e.getCause().getMessage());
} catch (java.util.concurrent.CancellationException e) {
System.out.println("タスクはキャンセルされたため、結果は利用できません");
}
// タスクの最終状態を確認
System.out.println("タスクはキャンセルされましたか? " + future.isCancelled());
System.out.println("タスクは完了しましたか? " + future.isDone());
} catch (InterruptedException e) {
System.out.println("メインスレッドが中断されました");
Thread.currentThread().interrupt();
} finally {
executor.shutdownNow();
System.out.println("ExecutorServiceをシャットダウンしました");
}
}
}
このコードでは、長時間実行されるタスクを作成し、3.5秒後にキャンセルしている。cancel(true)を呼び出すと、タスクが既に実行中の場合はスレッドに対して割り込みを送信する。タスクは定期的に割り込みフラグをチェックするか、InterruptedExceptionをキャッチして適切に対応することで、キャンセル要求に応じることができる。
キャンセルされたタスクから結果を取得しようとすると、CancellationExceptionがスローされる点に注意が必要である。また、isCancelled()メソッドとisDone()メソッドを使用してタスクの状態を確認することができる。
複数のFutureを管理する実用的な例として、非同期でファイルをダウンロードし、進捗状況を監視する例を見てみよう。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class FileDownloadExample {
// ファイルのダウンロード進捗を表すクラス
static class DownloadProgress {
private final String fileName;
private int progress;
private boolean completed;
private boolean error;
private String errorMessage;
public DownloadProgress(String fileName) {
this.fileName = fileName;
this.progress = 0;
this.completed = false;
this.error = false;
}
public void updateProgress(int newProgress) {
this.progress = newProgress;
if (newProgress >= 100) {
this.completed = true;
}
}
public void setError(String message) {
this.error = true;
this.errorMessage = message;
}
public String getFileName() {
return fileName;
}
public int getProgress() {
return progress;
}
public boolean isCompleted() {
return completed;
}
public boolean hasError() {
return error;
}
public String getErrorMessage() {
return errorMessage;
}
@Override
public String toString() {
if (error) {
return fileName + ": エラー - " + errorMessage;
} else if (completed) {
return fileName + ": 完了 (100%)";
} else {
return fileName + ": " + progress + "% 完了";
}
}
}
// ファイルダウンロードをシミュレートするCallable
static class FileDownloader implements Callable<DownloadProgress> {
private final String fileName;
private final int fileSize;
private final DownloadProgress progress;
public FileDownloader(String fileName, int fileSize) {
this.fileName = fileName;
this.fileSize = fileSize;
this.progress = new DownloadProgress(fileName);
}
@Override
public DownloadProgress call() throws Exception {
System.out.println("ダウンロード開始: " + fileName);
try {
// ダウンロード処理をシミュレート
int downloadedBytes = 0;
while (downloadedBytes < fileSize) {
// 中断チェック
if (Thread.currentThread().isInterrupted()) {
progress.setError("ダウンロードが中断されました");
return progress;
}
// ランダムなバイト数をダウンロード
int bytesRead = Math.min(
(int)(Math.random() * 1000),
fileSize - downloadedBytes
);
downloadedBytes += bytesRead;
// 進捗を更新(0-100%)
int progressPercent = (int)((double)downloadedBytes / fileSize * 100);
progress.updateProgress(progressPercent);
// 実際のダウンロード処理の代わり
Thread.sleep(300);
}
System.out.println("ダウンロード完了: " + fileName);
return progress;
} catch (InterruptedException e) {
progress.setError("ダウンロードが中断されました");
Thread.currentThread().interrupt();
return progress;
} catch (Exception e) {
progress.setError("ダウンロード中にエラーが発生: " + e.getMessage());
throw e;
}
}
public DownloadProgress getProgress() {
return progress;
}
}
public static void main(String[] args) {
// 4スレッドのプールを作成(同時に4つのファイルをダウンロード可能)
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
// ダウンロードするファイルのリスト
List<FileDownloader> downloaders = new ArrayList<>();
downloaders.add(new FileDownloader("document.pdf", 5000));
downloaders.add(new FileDownloader("image.jpg", 2000));
downloaders.add(new FileDownloader("video.mp4", 8000));
downloaders.add(new FileDownloader("archive.zip", 15000));
downloaders.add(new FileDownloader("software.exe", 10000));
// タスクを送信してFutureのリストを取得
List<Future<DownloadProgress>> futures = new ArrayList<>();
for (FileDownloader downloader : downloaders) {
futures.add(executor.submit(downloader));
}
// ダウンロードが完了するまで進捗を監視
boolean allCompleted = false;
while (!allCompleted) {
allCompleted = true;
System.out.println("\n===== ダウンロード進捗 =====");
for (int i = 0; i < downloaders.size(); i++) {
DownloadProgress progress = downloaders.get(i).getProgress();
System.out.println(progress);
if (!progress.isCompleted() && !progress.hasError()) {
allCompleted = false;
}
}
// 1秒ごとに更新
Thread.sleep(1000);
// 例として、3秒後に2番目のダウンロードをキャンセル
if (System.currentTimeMillis() - startTime > 3000 && !futures.get(1).isDone()) {
System.out.println("\n2番目のダウンロードをキャンセルします...");
futures.get(1).cancel(true);
}
}
System.out.println("\n===== 最終結果 =====");
for (int i = 0; i < futures.size(); i++) {
Future<DownloadProgress> future = futures.get(i);
try {
if (!future.isCancelled()) {
DownloadProgress result = future.get();
System.out.println(result);
} else {
System.out.println(downloaders.get(i).getFileName() + ": キャンセルされました");
}
} catch (ExecutionException e) {
System.out.println(downloaders.get(i).getFileName() +
": 例外が発生 - " + e.getCause().getMessage());
}
}
} catch (InterruptedException e) {
System.out.println("メインプロセスが中断されました");
Thread.currentThread().interrupt();
} finally {
// ExecutorServiceを正しくシャットダウン
System.out.println("シャットダウンを開始...");
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("シャットダウン完了");
}
}
// プログラム開始時間を記録
private static final long startTime = System.currentTimeMillis();
}
このコードは、複数のファイルの非同期ダウンロードをシミュレートし、Futureを使用して進捗状況を監視する例である。各ファイルダウンロードタスクはFileDownloaderクラスとして実装され、現在の進捗状況はDownloadProgressオブジェクトを通じて追跡される。
プログラムは定期的に各ダウンロードの進捗状況を表示し、例として3秒後に2番目のダウンロードをキャンセルする。すべてのダウンロードが完了(またはエラーやキャンセルが発生)すると、最終結果が表示される。
このプログラムでは、以下のFutureの重要な概念がされている。
- タスクの同時実行 – 複数のファイルが並行してダウンロードされる
- 進捗状況の監視 – 各タスクの現在の状態を継続的に取得する
- タスクのキャンセル – 実行中のタスクを
cancel()メソッドで中断する - 最終結果の取得 –
get()メソッドを使用して完了したタスクの結果を取得する - 例外処理 – タスク実行中に発生する可能性のある例外を適切に処理する
Futureインターフェースは非同期タスクの結果を管理するための基本的なツールだが、より複雑な非同期処理パターンでは限界がある。そのため、Java 8以降ではCompletableFutureが導入され、より柔軟な非同期プログラミングが可能になった。これについては後の章で詳しく解説する。
Javaの主要な非同期処理API
前章で学んだスレッド、Runnable、Callable、Futureなどの基礎概念を踏まえ、本章ではJavaが提供する主要な非同期処理APIについて解説する。これらのAPIは基礎知識をベースに構築されたより高度で実用的なフレームワークであり、実際のアプリケーション開発において非同期処理を効率的に実装するための強力なツールとなる。ExecutorServiceフレームワークによるタスク管理から始め、ThreadPoolExecutorによるスレッド管理の詳細、そして近年のJavaで非常に重要となったCompletableFutureの使い方まで、体系的に学んでいくことで、現代的なJavaアプリケーションにおける非同期プログラミングの実践力を身につけることができる。
ExecutorServiceフレームワークの使い方
ExecutorServiceは、Java 5で導入されたjava.util.concurrentパッケージの中核をなすインターフェースであり、非同期タスクの実行を抽象化し管理するためのフレームワークである。スレッドの生成や管理、タスクのスケジューリングなどの複雑な作業を開発者から隠蔽し、より高レベルなAPIを提供することで、効率的かつ安全な非同期処理の実装を可能にする。
ExecutorServiceの基本的な使い方から見ていこう。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceBasics {
public static void main(String[] args) {
// 単一スレッドのExecutorServiceを作成
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
try {
// タスクをExecutorServiceに送信
singleThreadExecutor.execute(() -> {
// 実行するタスクの内容
System.out.println("タスク1実行中 (スレッド: " +
Thread.currentThread().getName() + ")");
try {
// 処理時間をシミュレート
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("タスク1完了");
});
// Runnableを使用して別のタスクを送信
singleThreadExecutor.execute(() -> {
System.out.println("タスク2実行中 (スレッド: " +
Thread.currentThread().getName() + ")");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("タスク2完了");
});
System.out.println("メインスレッドは続行...");
// タスクの完了を待つため、少し待機
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("メインスレッドが中断されました");
} finally {
// ExecutorServiceの適切なシャットダウン
shutdownAndAwaitTermination(singleThreadExecutor);
}
}
// ExecutorServiceを適切にシャットダウンするためのヘルパーメソッド
private static void shutdownAndAwaitTermination(ExecutorService executor) {
// まず新規タスクの受付を停止
executor.shutdown();
try {
// 既存タスクの終了を待機
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
// タイムアウトした場合、実行中のタスクを中断
executor.shutdownNow();
// 中断の完了を待機
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("ExecutorServiceが終了しません");
}
}
} catch (InterruptedException e) {
// 現在のスレッドが中断された場合
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
このコードでは、Executors.newSingleThreadExecutor()メソッドを使用して単一スレッドのExecutorServiceを作成し、2つのRunnableタスクを送信している。ExecutorServiceを使用する際の重要なポイントは、使用が終わったら適切にシャットダウンすることである。上記のコードでは、shutdownAndAwaitTerminationというヘルパーメソッドを定義しており、これはJavaの公式ドキュメントで推奨されている方法に基づいている。
単一スレッドのExecutorServiceでは、送信されたタスクは同じスレッドで順番に実行される。そのため、タスク1が完了してからタスク2が実行される。これによりスレッドの安全性が確保されるが、並行性は得られない。
ExecutorServiceには、異なるニーズに対応するための複数の種類が用意されている。主なものを記す。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceTypes {
public static void main(String[] args) {
// 1. 単一スレッドExecutor
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 2. 固定サイズのスレッドプール
int nThreads = Runtime.getRuntime().availableProcessors(); // CPUコア数に基づくスレッド数
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(nThreads);
// 3. 必要に応じて新しいスレッドを作成するキャッシュスレッドプール
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 4. スケジュールされたタスクをサポートするExecutor
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
try {
// 1. 単一スレッドでタスクを実行
System.out.println("=== 単一スレッドExecutor ===");
singleThreadExecutor.execute(() ->
System.out.println("単一スレッドでタスク実行: " + Thread.currentThread().getName()));
// 2. 固定サイズのスレッドプールでタスクを実行
System.out.println("\n=== 固定サイズスレッドプール (" + nThreads + "スレッド) ===");
for (int i = 0; i < 5; i++) {
final int taskId = i;
fixedThreadPool.execute(() ->
System.out.println("固定プールでタスク" + taskId + "実行: " +
Thread.currentThread().getName()));
}
// 3. キャッシュスレッドプールでタスクを実行
System.out.println("\n=== キャッシュスレッドプール ===");
for (int i = 0; i < 5; i++) {
final int taskId = i;
cachedThreadPool.execute(() -> {
System.out.println("キャッシュプールでタスク" + taskId + "実行: " +
Thread.currentThread().getName());
try {
// 短いスリープで新しいスレッド作成を促す
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 4. スケジュールされたタスクを実行
System.out.println("\n=== スケジュールされたExecutor ===");
// 2秒後に1回実行
scheduledExecutor.schedule(() ->
System.out.println("2秒後に実行: " + Thread.currentThread().getName()),
2, TimeUnit.SECONDS);
// 初期遅延1秒、その後2秒ごとに実行
scheduledExecutor.scheduleAtFixedRate(() ->
System.out.println("2秒ごとに実行: " + Thread.currentThread().getName()),
1, 2, TimeUnit.SECONDS);
// 初期遅延1秒、タスク完了から3秒後に実行
scheduledExecutor.scheduleWithFixedDelay(() -> {
System.out.println("固定遅延で実行: " + Thread.currentThread().getName());
try {
Thread.sleep(1000); // タスク自体が1秒かかる
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 1, 3, TimeUnit.SECONDS);
// メインスレッドでの操作を続行
System.out.println("\nメインスレッドは続行中...");
// スケジュールされたタスクの実行を観察するために待機
Thread.sleep(10000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// すべてのExecutorServiceを適切にシャットダウン
singleThreadExecutor.shutdown();
fixedThreadPool.shutdown();
cachedThreadPool.shutdown();
scheduledExecutor.shutdown();
}
}
}
このコードでは、JavaのExecutorsファクトリクラスが提供する主な4種類のExecutorServiceを示している。それぞれの特徴は以下のとおりである。
- SingleThreadExecutor – 単一のスレッドでタスクを順次実行する。タスク間の順序が重要な場合や、複数のスレッドによる同時実行を避けたい場合に使用する。
- FixedThreadPool – 固定数のスレッドを持つプール。CPUコア数に基づいてスレッド数を決定することが一般的である。高負荷の計算タスクに適している。
- CachedThreadPool – 必要に応じて新しいスレッドを作成し、既存の空きスレッドを再利用する。短時間で完了する多数のタスクに適しているが、無制限にスレッドが増える可能性があるため注意が必要である。
- ScheduledThreadPool – 定期的または遅延付きでタスクを実行するためのプール。
schedule、scheduleAtFixedRate、scheduleWithFixedDelayの3つの主要なスケジューリングメソッドを提供する。
特にScheduledExecutorServiceは、古いTimerクラスの代替として使用される。Timerと比較して、マルチスレッドサポート、例外処理の改善、より柔軟なスケジューリングオプションなど多くの利点がある。
ExecutorServiceを使用すると、非同期タスクの実行結果を取得することも可能である。以下は、前章で学んだCallableとFutureをExecutorServiceと組み合わせて使用する例である。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ExecutorWithCallableExample {
public static void main(String[] args) {
// CPU数に基づいたスレッドプールを作成
int processors = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(processors);
try {
// 複数のCallableタスクを作成
List<Callable<Integer>> tasks = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int taskNum = i;
tasks.add(() -> {
// タスクの実行時間はタスク番号に比例
int processingTime = taskNum * 500;
System.out.println("タスク" + taskNum + " 開始 (予定処理時間: " +
processingTime + "ms)");
try {
Thread.sleep(processingTime);
} catch (InterruptedException e) {
System.out.println("タスク" + taskNum + " 中断");
Thread.currentThread().interrupt();
return -1;
}
// タスク番号の2乗を結果として返す
int result = taskNum * taskNum;
System.out.println("タスク" + taskNum + " 完了: 結果 = " + result);
return result;
});
}
// 1. 単一のタスクを実行して結果を取得
Future<Integer> future = executor.submit(tasks.get(0));
try {
// 最大2秒待機して結果を取得
Integer result = future.get(2, TimeUnit.SECONDS);
System.out.println("最初のタスク結果: " + result);
} catch (TimeoutException e) {
System.out.println("タイムアウトが発生したため、タスクをキャンセル");
future.cancel(true);
} catch (ExecutionException e) {
System.out.println("タスク実行中に例外が発生: " + e.getCause().getMessage());
}
// 2. 複数のタスクを同時に実行して最初に完了したタスクの結果を取得
System.out.println("\n最初に完了するタスクの結果を取得:");
try {
Integer result = executor.invokeAny(tasks.subList(1, 4));
System.out.println("最初に完了したタスクの結果: " + result);
} catch (ExecutionException e) {
System.out.println("すべてのタスクが例外で失敗しました");
}
// 3. すべてのタスクを実行して結果をリストとして取得
System.out.println("\nすべてのタスクを実行して結果を収集:");
List<Future<Integer>> futures = executor.invokeAll(tasks);
List<Integer> results = new ArrayList<>();
for (Future<Integer> f : futures) {
try {
// タスクが完了していない場合はブロック
if (!f.isCancelled()) {
results.add(f.get());
} else {
results.add(null);
}
} catch (ExecutionException e) {
System.out.println("タスク実行中に例外: " + e.getCause().getMessage());
results.add(null);
}
}
System.out.println("すべてのタスク結果: " + results);
} catch (InterruptedException e) {
System.out.println("メインスレッドが中断されました");
Thread.currentThread().interrupt();
} finally {
// ExecutorServiceを適切にシャットダウン
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}
このコードでは、ExecutorServiceを使用してCallableタスクを実行し、結果を取得している。
submit(Callable)– 単一のCallableタスクを送信し、結果を表すFutureを返す。invokeAny(Collection<Callable>)– 複数のタスクを送信し、最初に正常に完了したタスクの結果を返す。invokeAll(Collection<Callable>)– すべてのタスクを送信し、各タスクのFutureを含むリストを返す。
invokeAnyメソッドは特に冗長性を持たせた処理に有用である。例えば、同じデータを異なるサーバーから取得する場合、最初に応答したサーバーの結果を使用できる。一方、invokeAllメソッドは、複数の独立したタスクを並行して実行し、すべての結果を収集する場合に適している。
ExecutorServiceを使用する際の重要な注意点として、必ず適切にシャットダウンする必要がある。シャットダウンしないと、アプリケーションが終了しない(スレッドが生き続ける)原因となる。通常、以下の2つのステップで行う。
shutdown()– 新しいタスクの受け入れを停止し、既存のタスクの完了を待つ。awaitTermination(長さ, 時間単位)– 指定された時間内にすべてのタスクが完了するのを待つ。
タスクが完了しない場合は、shutdownNow()を呼び出して実行中のタスクを中断する試みを行うことができる。
実際のアプリケーションでは、ExecutorServiceを効果的に使用するためにいくつかのベストプラクティスがある。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class ExecutorServiceBestPractices {
public static void main(String[] args) {
// カスタムThreadFactoryを使用して意味のあるスレッド名を設定
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "AppWorker-" + threadNumber.getAndIncrement());
// デーモンスレッドとして設定(アプリケーション終了を妨げない)
t.setDaemon(false);
// 通常優先度を設定
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
};
// カスタムThreadFactoryを使用してExecutorServiceを作成
ExecutorService executor = Executors.newFixedThreadPool(3, threadFactory);
try {
// 複数のタスクを送信
for (int i = 0; i < 5; i++) {
final int taskId = i;
try {
executor.execute(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("タスク" + taskId + " 実行中 (スレッド: " + threadName + ")");
try {
// 例外が発生する可能性のあるコード
if (taskId == 3) {
throw new RuntimeException("タスク" + taskId + "で意図的なエラー");
}
// 処理ロジック
Thread.sleep(1000);
System.out.println("タスク" + taskId + " 完了");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("タスク" + taskId + " 中断");
} catch (Exception e) {
// タスク内の例外を適切に処理
System.err.println("タスク" + taskId + " エラー: " + e.getMessage());
// 例外のログ記録やモニタリングを行う(実際の実装では)
}
});
} catch (RejectedExecutionException e) {
// ExecutorServiceがシャットダウン済みか過負荷の場合
System.err.println("タスク" + taskId + " 拒否: " + e.getMessage());
// タスクをキューに戻すか、別の方法で処理
}
}
// メインスレッドで他の処理を実行
System.out.println("すべてのタスクを送信済み");
// 十分な時間待機してタスクの実行を観察
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// ExecutorServiceの適切なシャットダウン
shutdownExecutor(executor);
}
}
// ExecutorServiceを適切にシャットダウンするためのヘルパーメソッド
private static void shutdownExecutor(ExecutorService executor) {
try {
System.out.println("ExecutorServiceのシャットダウンを開始...");
// 新しいタスクを受け付けない
executor.shutdown();
// 既存のタスクの完了を最大5秒間待つ
if (!executor.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
System.err.println("タスクが正常に終了しないため、強制シャットダウンを実行");
// 未完了のタスクを中断
executor.shutdownNow();
// 中断がタスクを停止するのを最大5秒間待つ
if (!executor.awaitTermination(5, java.util.concurrent.TimeUnit.SECONDS)) {
System.err.println("ExecutorServiceの完全なシャットダウンに失敗");
}
}
System.out.println("ExecutorServiceのシャットダウン完了");
} catch (InterruptedException e) {
// 現在のスレッドが中断された場合
System.err.println("シャットダウンプロセスが中断されました");
// タスクを中断
executor.shutdownNow();
// 中断フラグを再設定
Thread.currentThread().interrupt();
}
}
}
このコードは、ExecutorServiceを使用する上でのいくつかの重要なベストプラクティスをしている。
- カスタムThreadFactoryの使用 – わかりやすいスレッド名を設定したり、スレッドの属性(デーモン/非デーモン、優先度など)をカスタマイズしたりするために使用する。これにより、デバッグやモニタリングが容易になる。
- 例外処理の適切な実装 – ExecutorServiceに送信されたタスク内で発生した例外は、そのスレッド内でキャッチされない限り、スレッドが終了する原因となる。そのため、各タスク内で適切な例外処理を行うことが重要である。
- RejectedExecutionExceptionの処理 – ExecutorServiceがシャットダウン済みか、キューが一杯で新しいタスクを受け付けられない場合に発生する可能性がある。適切に処理することで、タスクの損失を防ぐことができる。
- 適切なシャットダウン手順 – 二段階のシャットダウン処理(まず
shutdown()を呼び出し、必要に応じてshutdownNow()を呼び出す)を実装することで、タスクが適切に終了する機会を与えつつ、リソースリークを防ぐことができる。
これらのベストプラクティスを適用することで、より堅牢で保守しやすい非同期処理を実装することができる。特に、長時間実行されるアプリケーションや、多数のタスクを処理するシステムでは、これらの考慮事項が重要となる。
ExecutorServiceは、非同期処理の基盤として最も広く使用されているJavaのAPIである。しかし、より細かい制御が必要な場合は、次節で解説するThreadPoolExecutorを直接使用することで、スレッドプールの動作をより詳細に制御することができる。
ThreadPoolExecutorによるスレッド管理
ThreadPoolExecutorは、ExecutorServiceインターフェースの実装クラスであり、より高度なスレッドプール管理機能を提供する。Executorsファクトリクラスが提供するExecutorServiceの多くは、内部的にThreadPoolExecutorを使用している。ThreadPoolExecutorを直接使用することで、スレッドプールの挙動をより細かく制御することができる。
ThreadPoolExecutorの基本的な構成要素を理解するところから始めよう。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolExecutorBasics {
public static void main(String[] args) {
// 1. コアスレッド数(常に維持されるスレッド数)
int corePoolSize = 2;
// 2. 最大スレッド数(ピーク時に許容されるスレッド数)
int maximumPoolSize = 4;
// 3. 余剰スレッドの生存時間
long keepAliveTime = 60;
// 4. 時間単位
TimeUnit unit = TimeUnit.SECONDS;
// 5. ワークキュー(実行待ちのタスクを保持)
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
// 6. スレッドファクトリ(新しいスレッドを生成)
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "CustomWorker-" + threadNumber.getAndIncrement());
t.setDaemon(false);
return t;
}
};
// 7. 拒否ポリシー(キューが一杯の場合の動作)
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
// ThreadPoolExecutorのインスタンスを作成
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue,
threadFactory,
rejectedExecutionHandler
);
try {
// スレッドプールの状態を表示
printPoolStatus("初期状態", executor);
// タスクを送信して動作を確認
for (int i = 1; i <= 10; i++) {
final int taskId = i;
executor.execute(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("タスク" + taskId + " 開始 (スレッド: " + threadName + ")");
try {
// タスク実行をシミュレート
Thread.sleep(2000);
System.out.println("タスク" + taskId + " 完了 (スレッド: " + threadName + ")");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("タスク" + taskId + " 中断");
}
});
// 定期的にプールの状態を表示
if (i % 3 == 0 || i == 10) {
printPoolStatus("タスク" + i + "送信後", executor);
// プールの動作を観察するための短い待機
Thread.sleep(500);
}
}
// すべてのタスクが処理されるのを待機
Thread.sleep(5000);
printPoolStatus("すべてのタスク処理後", executor);
// プレスタートコアスレッドを実行
System.out.println("\nプレスタートコアスレッドを実行");
executor.prestartAllCoreThreads();
printPoolStatus("プレスタート後", executor);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// スレッドプールをシャットダウン
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// スレッドプールの現在の状態を表示するヘルパーメソッド
private static void printPoolStatus(String prefix, ThreadPoolExecutor executor) {
System.out.println("\n=== " + prefix + " ===");
System.out.println("プールサイズ: " + executor.getPoolSize());
System.out.println("アクティブスレッド: " + executor.getActiveCount());
System.out.println("キューに待機中のタスク: " + executor.getQueue().size());
System.out.println("これまでに完了したタスク: " + executor.getCompletedTaskCount());
System.out.println("これまでに送信されたタスク: " + executor.getTaskCount());
System.out.println("==============================");
}
}
このコードでは、ThreadPoolExecutorを構成する7つの主要なパラメータを記述している。
- corePoolSize – プールに常に維持されるスレッドの数。アイドル状態でもこの数のスレッドは維持される。
- maximumPoolSize – プールに存在できる最大スレッド数。ワークキューが一杯になったときに追加のスレッドが作成される(コアスレッド数を超えて)。
- keepAliveTime – コアスレッド数を超えたスレッドが、アイドル状態になってからプールから削除されるまでの時間。
- unit – keepAliveTimeの時間単位。
- workQueue – 実行待ちのタスクを保持するキュー。異なる種類のキューは異なる振る舞いをする。
- threadFactory – 新しいスレッドを作成するためのファクトリ。スレッドの名前付けやプロパティ設定に使用する。
- rejectedExecutionHandler – キューが一杯で新しいタスクを受け入れられない場合の動作を定義する。
また、printPoolStatusメソッドでは、スレッドプールの現在の状態を監視するために使用できるいくつかの重要なメソッドを示している。これらのメソッドは、デバッグやモニタリングに有用である。
ThreadPoolExecutorの動作を理解するために、タスクが送信されたときの振る舞いを見てみよう:
- コアスレッド数未満のスレッドが動作している場合、新しいスレッドが作成され、タスクが実行される。
- コアスレッド数に達している場合、タスクはワークキューに追加される。
- ワークキューが一杯になると、コアスレッド数を超えて最大スレッド数までの追加スレッドが作成される。
- 最大スレッド数に達し、ワークキューも一杯の場合、拒否ポリシーが適用される。
スレッドプールの設定は、アプリケーションの特性に合わせて調整することが重要である。以下は、一般的なシナリオに対する設定例である。
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorConfigurations {
public static void main(String[] args) {
// システムのCPUコア数を取得
int availableProcessors = Runtime.getRuntime().availableProcessors();
System.out.println("システムのCPUコア数: " + availableProcessors);
// 1. CPU負荷の高い処理向けの設定
// CPUコア数に等しいスレッド数を使用
ThreadPoolExecutor cpuBoundPool = new ThreadPoolExecutor(
availableProcessors, // コアスレッド数 = CPUコア数
availableProcessors, // 最大スレッド数 = CPUコア数
0L, TimeUnit.MILLISECONDS, // 余剰スレッドはすぐに終了(ただし最大=コアなので実質効果なし)
new LinkedBlockingQueue<>(1000), // 大きめのキュー
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 2. IO負荷の高い処理向けの設定
// スレッド数はCPUコア数よりも多く設定
ThreadPoolExecutor ioBoundPool = new ThreadPoolExecutor(
availableProcessors * 2, // コアスレッド数 = CPUコア数の2倍
availableProcessors * 4, // 最大スレッド数 = CPUコア数の4倍
60L, TimeUnit.SECONDS, // 余剰スレッドは1分後に終了
new LinkedBlockingQueue<>(100), // 適度なサイズのキュー
new ThreadPoolExecutor.AbortPolicy()
);
// 3. 混合負荷向けの設定
ThreadPoolExecutor mixedWorkloadPool = new ThreadPoolExecutor(
availableProcessors, // コアスレッド数 = CPUコア数
availableProcessors * 2, // 最大スレッド数 = CPUコア数の2倍
30L, TimeUnit.SECONDS, // 余剰スレッドは30秒後に終了
new LinkedBlockingQueue<>(500), // 中程度のキュー
new ThreadPoolExecutor.DiscardOldestPolicy()
);
// 4. 即時実行またはキャッシュプール相当の設定
ThreadPoolExecutor immediatePool = new ThreadPoolExecutor(
0, // コアスレッド数 = 0(アイドル状態ではスレッドを維持しない)
Integer.MAX_VALUE, // 最大スレッド数 = 無制限(実質的には制限あり)
60L, TimeUnit.SECONDS, // 余剰スレッドは60秒後に終了
new SynchronousQueue<>(), // キューなし(直接ハンドオフ)
new ThreadPoolExecutor.DiscardPolicy()
);
// シャットダウン(実際の使用後に呼び出す)
cpuBoundPool.shutdown();
ioBoundPool.shutdown();
mixedWorkloadPool.shutdown();
immediatePool.shutdown();
// 各設定の説明と使用ケース
System.out.println("\n=== スレッドプール設定の解説 ===");
System.out.println("\n1. CPU負荷の高い処理向け:");
System.out.println(" - CPUコア数と同じスレッド数を使用");
System.out.println(" - 大きなキューで多数のタスクをバッファリング");
System.out.println(" - 適用例: 数値計算、暗号化、イメージ処理など");
System.out.println("\n2. IO負荷の高い処理向け:");
System.out.println(" - CPUコア数よりも多いスレッド数を使用(I/O待ち時間を考慮)");
System.out.println(" - 小〜中サイズのキュー");
System.out.println(" - 適用例: データベース操作、ファイルI/O、ネットワーク通信など");
System.out.println("\n3. 混合負荷向け:");
System.out.println(" - バランスの取れたスレッド数と中程度のキュー");
System.out.println(" - 適用例: Webサーバー、多様なタスクを処理するアプリケーションなど");
System.out.println("\n4. 即時実行/キャッシュプール:");
System.out.println(" - タスクが送信されるとすぐに新しいスレッドを作成");
System.out.println(" - 短時間で完了する多数のタスクに適している");
System.out.println(" - 適用例: 小さな独立したタスク、低レイテンシが重要なケースなど");
System.out.println(" - 注意: スレッド数が無制限に増える可能性があるため、慎重に使用する必要がある");
}
}
このコードでは、4つの異なるワークロードタイプに対するThreadPoolExecutorの設定例を示している。各設定は、特定のタイプの作業負荷に最適化されている:
- CPU負荷の高い処理 – CPUコア数に等しいスレッド数を使用する。これは、スレッドが主にCPU計算を行い、ほとんど待機状態にならない場合に適している。CPU数以上のスレッドを作成しても、コンテキストスイッチのオーバーヘッドが増すだけで、パフォーマンスが向上しない。
- IO負荷の高い処理 – CPUコア数よりも多いスレッド数を使用する。I/O操作中にスレッドが待機状態になるため、その間に他のスレッドが実行できるようにする。典型的には、CPUコア数の2倍から4倍のスレッド数が使用される。
- 混合負荷 – CPUとI/Oの両方の操作を行うタスクに対してバランスの取れた設定。コア数と最大数の間に適度な差を設け、負荷の変動に対応できるようにする。
- 即時実行/キャッシュプール – Executors.newCachedThreadPool()に相当する設定。タスクが送信されるとすぐに実行される。短時間で完了する多数のタスクに適しているが、スレッド数が無制限に増える可能性があるため注意が必要。
これらの設定は、出発点として参考にすべきだが、実際のアプリケーションでは、負荷テストやモニタリングを通じて最適な値を見つける必要がある。
ThreadPoolExecutorの高度な機能として、動的な再設定とカスタムフックメソッドがある。これらを活用する例を見てみよう。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class AdvancedThreadPoolExecutor {
public static void main(String[] args) {
// カスタムThreadPoolExecutorを作成
CustomThreadPoolExecutor executor = new CustomThreadPoolExecutor(
2, // コアスレッド数
5, // 最大スレッド数
60, TimeUnit.SECONDS, // 余剰スレッドの生存時間
new LinkedBlockingQueue<>(10) // 作業キュー
);
try {
// タスクを送信
for (int i = 1; i <= 15; i++) {
final int taskId = i;
executor.execute(() -> {
System.out.println("タスク" + taskId + " 開始 (スレッド: " +
Thread.currentThread().getName() + ")");
try {
// タスク実行時間をシミュレート
Thread.sleep(2000);
// ランダムに例外を発生させる(デモ目的)
if (taskId % 7 == 0) {
throw new RuntimeException("意図的なエラー in タスク" + taskId);
}
System.out.println("タスク" + taskId + " 完了");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("タスク" + taskId + " 中断");
}
});
// 5つのタスクを送信した後で設定を変更
if (i == 5) {
System.out.println("\n*** プール設定を変更 ***");
System.out.println("現在のコアサイズ: " + executor.getCorePoolSize());
System.out.println("現在の最大サイズ: " + executor.getMaximumPoolSize());
// スレッドプールのサイズを動的に変更
executor.setCorePoolSize(3); // コアスレッド数を3に増加
executor.setMaximumPoolSize(6); // 最大スレッド数を6に増加
System.out.println("新しいコアサイズ: " + executor.getCorePoolSize());
System.out.println("新しい最大サイズ: " + executor.getMaximumPoolSize());
}
}
// すべてのタスクの完了を待機するための十分な時間
Thread.sleep(10000);
// 統計情報を表示
System.out.println("\n=== 最終統計 ===");
System.out.println("正常に完了したタスク: " + executor.getSuccessfulTasks());
System.out.println("失敗したタスク: " + executor.getFailedTasks());
System.out.println("総実行時間: " + executor.getTotalExecutionTime() + "ms");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// スレッドプールをシャットダウン
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// カスタムThreadPoolExecutor
static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
// 統計情報を追跡するためのカウンター
private final AtomicInteger successfulTasks = new AtomicInteger(0);
private final AtomicInteger failedTasks = new AtomicInteger(0);
private long totalExecutionTime = 0;
// スレッドローカル変数を使用してタスクの開始時間を記録
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
public CustomThreadPoolExecutor(int corePoolSize, int maxPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
// スレッドプールの使用状況を定期的にログに記録
Runnable monitorTask = () -> {
while (!isShutdown()) {
System.out.println("\n=== プール状態監視 ===");
System.out.println("アクティブスレッド: " + getActiveCount() +
" / " + getPoolSize());
System.out.println("キューサイズ: " + getQueue().size());
System.out.println("完了タスク: " + getCompletedTaskCount());
System.out.println("===================");
try {
Thread.sleep(3000); // 3秒ごとに状態を表示
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
};
// 監視スレッドを起動
new Thread(monitorTask, "PoolMonitor").start();
}
// タスク実行前のフック
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
System.out.println("タスク実行前 (スレッド: " + t.getName() + ")");
startTime.set(System.currentTimeMillis());
}
// タスク実行後のフック
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.currentTimeMillis();
long taskTime = endTime - startTime.get();
synchronized (this) {
totalExecutionTime += taskTime;
}
if (t == null) {
// タスクが正常に完了
successfulTasks.incrementAndGet();
System.out.println("タスク正常完了 (実行時間: " + taskTime + "ms)");
} else {
// タスクが例外で失敗
failedTasks.incrementAndGet();
System.out.println("タスク失敗: " + t.getMessage() +
" (実行時間: " + taskTime + "ms)");
t.printStackTrace();
}
} finally {
super.afterExecute(r, t);
}
}
// プール終了時のフック
@Override
protected void terminated() {
System.out.println("スレッドプール終了: 合計タスク = " +
(successfulTasks.get() + failedTasks.get()));
super.terminated();
}
// 統計メソッド
public int getSuccessfulTasks() {
return successfulTasks.get();
}
public int getFailedTasks() {
return failedTasks.get();
}
public long getTotalExecutionTime() {
return totalExecutionTime;
}
}
}
このコードでは、ThreadPoolExecutorを拡張して以下の高度な機能を実装している:
- カスタムフックメソッド
beforeExecute– タスク実行前に呼び出される。ここではタスクの開始時間を記録している。afterExecute– タスク実行後に呼び出される。タスクの実行時間を計算し、成功または失敗を記録している。terminated– スレッドプールが終了したときに呼び出される。最終的な統計を表示している。
- 動的な再設定
- 実行中にコアスレッド数と最大スレッド数を変更している。これにより、アプリケーションの負荷に応じてスレッドプールのサイズを調整することができる。
- 監視メカニズム
- 別のスレッドを使用して、スレッドプールの状態を定期的に監視し、ログに記録している。これは、長時間実行されるアプリケーションで特に有用である。
- 統計収集
- 成功したタスク数、失敗したタスク数、総実行時間などの統計情報を収集している。これらの指標は、パフォーマンスチューニングやトラブルシューティングに役立つ。
これらの高度な機能を使用することで、アプリケーションの要件に合わせてThreadPoolExecutorの動作をカスタマイズし、パフォーマンスを最適化することができる。
ThreadPoolExecutorの一般的な問題と解決策についても理解しておくことが重要である。以下はいくつかの典型的な問題とその対策である。
- メモリリーク – タスクが正しく終了しないか、スレッドプールが適切にシャットダウンされない場合、メモリリークが発生する可能性がある。適切な例外処理と確実なシャットダウン手順を実装することで防止できる。
- スレッドスタベーション – すべてのスレッドが長時間実行されるタスクでブロックされ、新しいタスクが処理されない状態。タスクを適切な粒度に分割し、タイムアウトを設定することで緩和できる。
- デッドロック – スレッド間の相互ブロックによって、すべての処理が停止する状況。適切なロック順序の維持、タイムアウト付きロックの使用、大きなタスクの分割などで防止できる。
- スレッドリーク – タスクが完了しないまま、新しいスレッドが継続的に作成される状況。適切なタイムアウト設定と監視によって検出し、修正できる。
ThreadPoolExecutorは強力なツールだが、適切に設定し管理することが重要である。アプリケーションの特性や要件に合わせて調整し、継続的にモニタリングすることで、最適なパフォーマンスと信頼性を確保することができる。
次節では、Java 8で導入されたCompletableFutureという、より現代的で使いやすい非同期プログラミングのためのAPIについて解説する。CompletableFutureは、従来のFutureの限界を克服し、非同期処理をより宣言的かつ組み合わせ可能な方法で記述することを可能にする。
CompletableFutureの基本
CompletableFutureは、Java 8で導入されたjava.util.concurrentパッケージの一部であり、従来のFutureインターフェースの機能を大幅に拡張した非同期処理のためのクラスである。Futureがシンプルだが機能が制限されていたのに対し、CompletableFutureはより豊富なAPIを提供し、非同期処理の組み合わせや連鎖、例外処理などを容易にする。
CompletableFutureの基本的な使い方から見ていこう。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class CompletableFutureBasics {
public static void main(String[] args) {
// 1. 空のCompletableFuture(後で結果を設定)
CompletableFuture<String> future1 = new CompletableFuture<>();
// 別スレッドで結果を設定(通常はバックグラウンド処理の結果として)
new Thread(() -> {
try {
// 処理時間をシミュレート
System.out.println("future1の結果を計算中...");
Thread.sleep(2000);
// 結果を設定(正常完了)
future1.complete("計算結果");
// または例外で完了させる場合
// future1.completeExceptionally(new RuntimeException("エラーが発生しました"));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
future1.completeExceptionally(e);
}
}).start();
// 2. すでに完了しているCompletableFuture
CompletableFuture<String> future2 = CompletableFuture.completedFuture("即時の結果");
// 3. 非同期処理を実行するCompletableFuture
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("future3の処理実行中(スレッド: " +
Thread.currentThread().getName() + ")");
Thread.sleep(1500);
return "非同期計算の結果";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
// 4. 戻り値のない非同期処理
CompletableFuture<Void> future4 = CompletableFuture.runAsync(() -> {
try {
System.out.println("future4の処理実行中(スレッド: " +
Thread.currentThread().getName() + ")");
Thread.sleep(1000);
System.out.println("future4の処理完了");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
try {
// future1の結果を取得(完了するまでブロック)
System.out.println("future1の結果を待機中...");
String result1 = future1.get();
System.out.println("future1の結果: " + result1);
// future2の結果を取得(すでに完了しているので即時返る)
String result2 = future2.get();
System.out.println("future2の結果: " + result2);
// future3の結果を取得(タイムアウト付き)
String result3 = future3.get(3, TimeUnit.SECONDS);
System.out.println("future3の結果: " + result3);
// future4の完了を待機
future4.get();
System.out.println("future4が完了しました");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("待機中に中断されました");
} catch (ExecutionException e) {
System.out.println("処理中に例外が発生: " + e.getCause().getMessage());
} catch (java.util.concurrent.TimeoutException e) {
System.out.println("タイムアウトが発生しました");
}
// 5. ブロックせずに結果を処理する方法
System.out.println("\n=== ブロックしない結果処理 ===");
CompletableFuture<String> future5 = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("future5の処理実行中...");
Thread.sleep(2000);
return "非同期処理の結果";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
// 結果が利用可能になったときに実行されるコールバックを登録
future5.thenAccept(result -> {
System.out.println("future5の結果が到着: " + result);
});
System.out.println("future5の結果を待たずに次の処理へ進む");
// すべての非同期処理が完了するのを待つため、少し待機
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
このコードでは、CompletableFutureの基本的な使用方法を5つの例で示している。
手動完了のCompletableFuture – 新しいCompletableFutureインスタンスを作成し、後でcompleteまたはcompleteExceptionallyメソッドを呼び出して完了させる。これは、非同期処理の結果を外部から設定する必要がある場合に便利である。
- 即時完了のCompletableFuture –
completedFuture静的メソッドを使用して、指定した値ですでに完了しているCompletableFutureを作成する。これは、同期結果を非同期APIとして提供する必要がある場合などに便利である。 - サプライヤーを使用した非同期処理 –
supplyAsyncメソッドを使用して、戻り値を持つ処理を非同期に実行する。処理はForkJoinPoolのcommonPoolまたは指定されたExecutorで実行される。 - ランナブルを使用した非同期処理 –
runAsyncメソッドを使用して、戻り値のない処理を非同期に実行する。処理完了後のCompletableFutureの結果はnullである。 - 非ブロッキング処理 –
thenAcceptメソッドを使用して、結果が利用可能になったときに実行されるコールバックを登録する。これにより、結果を待つためにスレッドをブロックする必要がなくなる。
従来のFutureとの主な違いは、CompletableFutureがより豊富なAPIを提供し、特に結果が利用可能になったときに自動的に次の処理を開始する機能があることである。これにより、コールバック地獄を避けつつ、複雑な非同期処理フローを構築することができる。
非同期処理の結果を変換したり、他の処理と連携したりする方法を見てみよう。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class CompletableFutureTransformations {
public static void main(String[] args) {
System.out.println("=== CompletableFutureの変換と連鎖 ===");
// 1. 結果の変換 (map に相当)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("最初の処理を実行中...");
return 42; // 整数を返す
}).thenApply(num -> {
System.out.println("結果を文字列に変換中: " + num);
return "結果は " + num; // 整数を文字列に変換
});
// 2. 結果を使用した非同期処理 (flatMap に相当)
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("ユーザーIDを取得中...");
return 123; // ユーザーIDを返す
}).thenCompose(userId -> {
// 最初の結果を使用して新しい非同期処理を開始
return CompletableFuture.supplyAsync(() -> {
System.out.println("ユーザー情報をデータベースから取得中: ID = " + userId);
try {
Thread.sleep(1000); // データベースアクセスをシミュレート
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return userId * 10; // 処理結果を返す
});
});
// 3. 2つの非同期処理の結果を組み合わせる
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("ユーザー名を取得中...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "田中";
}).thenCombine(CompletableFuture.supplyAsync(() -> {
System.out.println("メールアドレスを取得中...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "tanaka@example.com";
}), (name, email) -> {
// 両方の結果が利用可能になったときに実行される
return name + " <" + email + ">";
});
// 4. 副作用のためだけに結果を使用
CompletableFuture<Void> future4 = CompletableFuture.supplyAsync(() -> {
System.out.println("メッセージを生成中...");
return "重要なお知らせ";
}).thenAccept(message -> {
// 結果を使用するが、新しい結果は返さない
System.out.println("メッセージを処理: " + message);
});
// 5. 結果を使用せずに処理を実行
CompletableFuture<Void> future5 = future4.thenRun(() -> {
// 前の処理が完了したことだけを使用
System.out.println("すべての処理が完了しました");
});
try {
// 各Futureの結果を取得して表示
System.out.println("future1の結果: " + future1.get());
System.out.println("future2の結果: " + future2.get());
System.out.println("future3の結果: " + future3.get());
// future4とfuture5は結果がないため、完了するのを待つだけ
future4.get();
future5.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("待機中に中断されました");
} catch (ExecutionException e) {
System.out.println("処理中に例外が発生: " + e.getCause().getMessage());
}
// 変換メソッドの説明
System.out.println("\n=== CompletableFutureの主要変換メソッド ===");
System.out.println("thenApply : 結果を変換(同期的)");
System.out.println("thenApplyAsync: 結果を変換(非同期的)");
System.out.println("thenCompose : 結果を使用して新しいCompletableFutureを返す");
System.out.println("thenCombine : 2つのCompletableFutureの結果を組み合わせる");
System.out.println("thenAccept : 結果を使用するが、新しい結果は返さない");
System.out.println("thenAcceptAsync: 結果を使用(非同期的)");
System.out.println("thenRun : 前の処理の完了後に実行されるが、結果は使用しない");
}
}
このコードでは、CompletableFutureの値を変換したり、複数の非同期処理を連携させたりするための主要なメソッドを示している:
- thenApply – 前の処理の結果を使用して新しい値に変換する。関数型インターフェースのFunctionを使用しており、基本的にはStreamのmapに似ている。変換処理自体は同期的に実行される。
- thenCompose – 前の処理の結果を使用して新しいCompletableFutureを返す。これはStreamのflatMapに相当し、非同期処理の連鎖に適している。
- thenCombine – 2つの独立したCompletableFutureの結果が両方利用可能になったときに、それらを組み合わせて新しい結果を生成する。両方の処理が並行して実行され、両方が完了したときに組み合わせ処理が実行される。
- thenAccept – 前の処理の結果を使用するが、新しい結果は生成しない。Consumerインターフェースを使用しており、副作用のためだけに結果を使用する場合に適している。
- thenRun – 前の処理が完了したことだけを条件に、新しい処理を実行する。前の処理の結果は使用せず、新しい結果も生成しない。Runnableインターフェースを使用している。
また、これらのメソッドには、thenApplyAsync、thenComposeAsyncなどのように、Asyncサフィックスを持つバージョンもある。これらのメソッドは、変換処理自体を別のスレッドで非同期的に実行する。デフォルトではForkJoinPoolのcommonPoolが使用されるが、オーバーロードされたメソッドにExecutorを指定することもできる。
CompletableFutureを使用した実用的な例として、複数のサービスから並行してデータを取得し、結果を組み合わせる例を見てみよう。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
// APIから取得するデータを表すクラス
class UserData {
private final int userId;
private final String name;
private final String email;
private final String address;
private final String paymentInfo;
public UserData(int userId, String name, String email, String address, String paymentInfo) {
this.userId = userId;
this.name = name;
this.email = email;
this.address = address;
this.paymentInfo = paymentInfo;
}
@Override
public String toString() {
return "UserData{" +
"userId=" + userId +
", name='" + name + '\'' +
", email='" + email + '\'' +
", address='" + address + '\'' +
", paymentInfo='" + paymentInfo + '\'' +
'}';
}
}
// 各種サービスクラス(実際のAPIを模擬)
class UserService {
public CompletableFuture<String> getUserName(int userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("ユーザー名を取得中... (Thread: " +
Thread.currentThread().getName() + ")");
simulateNetworkLatency(500, 1000);
// 例外の可能性をシミュレート
if (userId < 0) {
throw new IllegalArgumentException("無効なユーザーID: " + userId);
}
return "田中太郎"; // 模擬データ
});
}
public CompletableFuture<String> getUserEmail(int userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("メールアドレスを取得中... (Thread: " +
Thread.currentThread().getName() + ")");
simulateNetworkLatency(600, 1200);
return "tanaka@example.com"; // 模擬データ
});
}
}
class AddressService {
public CompletableFuture<String> getUserAddress(int userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("住所を取得中... (Thread: " +
Thread.currentThread().getName() + ")");
simulateNetworkLatency(800, 1500);
return "東京都渋谷区..."; // 模擬データ
});
}
}
class PaymentService {
public CompletableFuture<String> getPaymentInfo(int userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("支払い情報を取得中... (Thread: " +
Thread.currentThread().getName() + ")");
simulateNetworkLatency(1000, 2000);
return "クレジットカード: ****-****-****-1234"; // 模擬データ
});
}
}
public class CompletableFutureExample {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
// 各サービスのインスタンスを作成
UserService userService = new UserService();
AddressService addressService = new AddressService();
PaymentService paymentService = new PaymentService();
int userId = 123;
// 各サービスから並行してデータを取得
CompletableFuture<String> nameFuture = userService.getUserName(userId);
CompletableFuture<String> emailFuture = userService.getUserEmail(userId);
CompletableFuture<String> addressFuture = addressService.getUserAddress(userId);
CompletableFuture<String> paymentFuture = paymentService.getPaymentInfo(userId);
// すべてのFutureを組み合わせて最終結果を生成
CompletableFuture<UserData> userDataFuture = nameFuture
.thenCombine(emailFuture, (name, email) -> {
System.out.println("名前とメールアドレスを組み合わせました");
return new Object[]{name, email};
})
.thenCombine(addressFuture, (nameEmail, address) -> {
System.out.println("住所情報を追加しました");
return new Object[]{nameEmail[0], nameEmail[1], address};
})
.thenCombine(paymentFuture, (data, paymentInfo) -> {
System.out.println("支払い情報を追加しました");
return new UserData(
userId,
(String) data[0],
(String) data[1],
(String) data[2],
paymentInfo
);
});
try {
// 結果を取得(すべての非同期処理が完了するまでブロック)
UserData userData = userDataFuture.get();
long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
System.out.println("\n取得したユーザーデータ:");
System.out.println(userData);
System.out.println("\n合計処理時間: " + totalTime + "ms");
// 比較:同期処理での実行時間をシミュレート
System.out.println("\n同期処理だった場合の合計時間(推定): " +
estimateSynchronousTime() + "ms");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("処理が中断されました");
} catch (ExecutionException e) {
System.out.println("処理中にエラーが発生しました: " + e.getCause().getMessage());
}
}
// ネットワークレイテンシをシミュレート
private static void simulateNetworkLatency(int minLatency, int maxLatency) {
try {
// minLatencyとmaxLatencyの間のランダムな時間待機
long latency = minLatency + (long) (Math.random() * (maxLatency - minLatency));
Thread.sleep(latency);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 同期処理の実行時間を推定
private static long estimateSynchronousTime() {
// 各APIの平均レイテンシを合計
return 750 + 900 + 1150 + 1500; // 各サービスの最小/最大レイテンシの平均の合計
}
}
このコードは、複数のマイクロサービスから並行してデータを取得し、それらを組み合わせて単一のユーザーデータオブジェクトを作成する実用的な例である。主なポイントは以下のとおり:
- 複数の独立したサービス – UserService、AddressService、PaymentServiceの3つのサービスがあり、それぞれが異なるデータを非同期的に提供する。
- 並行データ取得 – すべてのサービスへのリクエストが同時に開始され、並行して実行される。これにより、シーケンシャル(順次)実行と比較して大幅な時間短縮が可能となる。
- 結果の組み合わせ –
thenCombineメソッドを使用して、各サービスからの結果が利用可能になったときに、それらを段階的に組み合わせている。 - エラー処理 – getUserName内で特定の条件下で例外をスローする可能性があり、これは最終的にExecutionExceptionとしてキャッチされる。
この例では、すべてのサービスリクエストが並行して実行されるため、全体の処理時間は最も遅いサービスのレイテンシにほぼ等しくなる。対照的に、同期処理では各サービスのレイテンシが累積されるため、非同期処理による並行実行は大幅な高速化をもたらす。
CompletableFutureの強力な機能により、複雑な非同期処理フローも読みやすく保守しやすいコードで表現できる。特に複数のサービスやAPIからデータを取得し、それらを組み合わせる必要がある現代のマイクロサービスアーキテクチャでは、この機能は非常に価値がある。
次節では、CompletableFutureのより高度な機能である、複数のFutureの合成と例外処理について詳しく解説する。
CompletableFutureの合成と例外処理
前節でCompletableFutureの基本的な使い方を学んだ。本節では、より高度な機能である複数のCompletableFutureの合成と例外処理について詳しく見ていく。これらの機能を理解することで、複雑な非同期処理フローを効率的かつ堅牢に実装することができる。
まず、複数のCompletableFutureを合成する様々な方法を見てみよう。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
public class CompletableFutureComposition {
public static void main(String[] args) {
// 1. allOf - すべてのCompletableFutureが完了するのを待つ
System.out.println("=== CompletableFuture.allOf ===");
List<CompletableFuture<String>> futures1 = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int id = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int delay = ThreadLocalRandom.current().nextInt(1000, 3000);
try {
System.out.println("タスク" + id + "実行中 (待機時間: " + delay + "ms)");
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "タスク" + id + "の結果";
});
futures1.add(future);
}
// すべてのFutureが完了するのを待機
CompletableFuture<Void> allFuture = CompletableFuture.allOf(
futures1.toArray(new CompletableFuture[0])
);
// allOfの結果自体はVoidなので、結果が必要な場合は手動で収集
CompletableFuture<List<String>> allResultsFuture = allFuture.thenApply(v -> {
// すべてのFutureがすでに完了しているので、ブロックせずに結果を取得できる
return futures1.stream()
.map(CompletableFuture::join) // joinはgetと似ているが、例外をラップしない
.collect(Collectors.toList());
});
try {
List<String> results = allResultsFuture.get();
System.out.println("すべての結果:");
for (String result : results) {
System.out.println(" - " + result);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 2. anyOf - いずれかのCompletableFutureが完了するのを待つ
System.out.println("\n=== CompletableFuture.anyOf ===");
List<CompletableFuture<String>> futures2 = new ArrayList<>();
for (int i = 1; i <= 5; i++) {
final int id = i;
final int delay = 500 + id * 1000; // 各タスクの遅延を増加
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("タスク" + id + "実行中 (待機時間: " + delay + "ms)");
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "タスク" + id + "の結果 (待機時間: " + delay + "ms)";
});
futures2.add(future);
}
// いずれかのFutureが完了するのを待機
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(
futures2.toArray(new CompletableFuture[0])
);
try {
// 最初に完了したFutureの結果を取得
Object firstResult = anyFuture.get();
System.out.println("最初に完了したタスクの結果: " + firstResult);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 3. thenCompose - 非同期処理を連鎖させる(前の結果を使用して次のFutureを作成)
System.out.println("\n=== thenCompose (flatMap) ===");
CompletableFuture<String> composedFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("最初のタスク: 顧客IDを取得中");
return "C12345";
}).thenCompose(customerId -> {
// 前の結果を使用して新しいCompletableFutureを作成
return CompletableFuture.supplyAsync(() -> {
System.out.println("2番目のタスク: 顧客IDを使って注文履歴を取得 - " + customerId);
return "3件の注文";
});
}).thenCompose(orderHistory -> {
// さらに連鎖させる
return CompletableFuture.supplyAsync(() -> {
System.out.println("3番目のタスク: 注文履歴の詳細を取得 - " + orderHistory);
return "注文詳細: 商品A, 商品B, 商品C";
});
});
try {
String result = composedFuture.get();
System.out.println("最終結果: " + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 4. thenCombine - 2つの独立したFutureの結果を組み合わせる
System.out.println("\n=== thenCombine ===");
CompletableFuture<Double> priceTask = CompletableFuture.supplyAsync(() -> {
System.out.println("価格情報を取得中...");
return 29.99;
});
CompletableFuture<Double> taxRateTask = CompletableFuture.supplyAsync(() -> {
System.out.println("消費税率を取得中...");
return 0.10; // 10%
});
CompletableFuture<String> totalPriceTask = priceTask.thenCombine(taxRateTask,
(price, taxRate) -> {
double taxAmount = price * taxRate;
double totalPrice = price + taxAmount;
return String.format("価格: %.2f 円, 消費税: %.2f 円, 合計: %.2f 円",
price, taxAmount, totalPrice);
});
try {
String priceInfo = totalPriceTask.get();
System.out.println(priceInfo);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 待機して他のタスクも完了するのを見届ける
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
このコードでは、CompletableFutureを合成するための4つの主要な方法を実装している。
- allOf – 複数のCompletableFutureがすべて完了するのを待つ。結果はVoid型なので、実際の結果値が必要な場合は、それぞれのFutureから手動で収集する必要がある。すべてのタスクの完了を待機し、全ての結果に対して一括処理を行う場合に適している。
- anyOf – 複数のCompletableFutureのうち、どれか1つが完了するのを待つ。最初に完了したFutureの結果が返される。これは、複数のサービスから同じデータを取得し、最も早く応答したサービスの結果を使用する場合などに便利である。
- thenCompose – 前の処理の結果を使用して、新しいCompletableFutureを作成する。これは関数型プログラミングのflatMap操作に相当し、非同期処理の連鎖に適している。
- thenCombine – 2つの独立したCompletableFutureの結果を組み合わせて、新しい結果を生成する。両方のFutureは並行して実行され、両方が完了したときに組み合わせ関数が呼び出される。
これらの合成メソッドを使用することで、複雑な依存関係を持つ非同期処理フローを簡潔かつ読みやすいコードで表現することができる。
次に、CompletableFutureにおける例外処理の手法を見ていく。非同期処理では例外の伝播が複雑になりがちだが、CompletableFutureは例外を処理するための様々なメソッドを実装している。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
public class CompletableFutureExceptionHandling {
public static void main(String[] args) {
// 1. exceptionally - 例外が発生した場合にフォールバック値を提供
System.out.println("=== exceptionally ===");
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("意図的なエラー");
}
return "処理成功";
}).exceptionally(ex -> {
System.out.println("例外が発生: " + ex.getMessage());
return "フォールバック値"; // エラー時の代替値
});
try {
String result1 = future1.get();
System.out.println("結果: " + result1);
} catch (InterruptedException | ExecutionException e) {
// exceptionallyを使用しているため、ExecutionExceptionは発生しない
e.printStackTrace();
}
// 2. handle - 成功と失敗の両方のケースを処理
System.out.println("\n=== handle ===");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("意図的なエラー");
}
return "元の結果";
}).handle((result, ex) -> {
if (ex != null) {
System.out.println("エラーを処理: " + ex.getMessage());
return "エラー時の代替値";
} else {
System.out.println("成功時の処理");
return result + " (正常に処理されました)";
}
});
try {
String result2 = future2.get();
System.out.println("handle結果: " + result2);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 3. whenComplete - 結果または例外に対して副作用のみを実行(結果は変更しない)
System.out.println("\n=== whenComplete ===");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("意図的なエラー");
}
return "処理成功";
}).whenComplete((result, ex) -> {
// 結果または例外に対して処理を行うが、値は変更しない
if (ex != null) {
System.out.println("whenComplete: 例外を検出 - " + ex.getMessage());
// ログ記録などの副作用を実行
} else {
System.out.println("whenComplete: 成功を検出 - " + result);
}
});
try {
String result3 = future3.get();
System.out.println("最終結果: " + result3);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.out.println("ExecutionExceptionが発生: " + e.getCause().getMessage());
}
// 4. 例外チェインの構築 - 複数の例外ハンドラを連鎖させる
System.out.println("\n=== 例外チェイン ===");
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.7) {
if (Math.random() < 0.5) {
throw new IllegalArgumentException("不正な引数例外");
} else {
throw new IllegalStateException("不正な状態例外");
}
}
return "処理成功";
}).exceptionally(ex -> {
// 特定の例外タイプを処理
if (ex.getCause() instanceof IllegalArgumentException) {
System.out.println("IllegalArgumentExceptionを処理");
return "引数エラー発生";
}
// 他の例外は再スロー
throw new CompletionException(ex);
}).exceptionally(ex -> {
// 2番目のハンドラで残りの例外を処理
if (ex.getCause() instanceof IllegalStateException) {
System.out.println("IllegalStateExceptionを処理");
return "状態エラー発生";
}
// それでも処理できない例外はデフォルト値を返す
return "未知のエラー発生";
});
try {
String result4 = future4.get();
System.out.println("例外チェイン結果: " + result4);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 5. completeExceptionally - 手動で例外を設定
System.out.println("\n=== completeExceptionally ===");
CompletableFuture<String> future5 = new CompletableFuture<>();
// 別スレッドで処理を実行(シミュレーション)
new Thread(() -> {
try {
System.out.println("長時間処理を実行中...");
Thread.sleep(2000);
// 条件によって成功または失敗
double random = Math.random();
if (random < 0.5) {
// 正常に完了
future5.complete("処理が完了しました");
} else {
// 例外で完了
future5.completeExceptionally(new RuntimeException("手動で設定したエラー"));
}
} catch (InterruptedException e) {
future5.completeExceptionally(e);
Thread.currentThread().interrupt();
}
}).start();
try {
String result5 = future5.get();
System.out.println("結果: " + result5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
System.out.println("ExecutionException: " + e.getCause().getMessage());
}
// 6. 例外の伝播 - 連鎖的な処理での例外伝播の動作
System.out.println("\n=== 例外の伝播 ===");
CompletableFuture<String> future6 = CompletableFuture.supplyAsync(() -> {
// 最初のステップで例外をスロー
throw new RuntimeException("ステップ1でエラー発生");
}).thenApply(result -> {
// 前のステップが失敗すると、このステップは実行されない
System.out.println("ステップ2実行: " + result);
return result + " -> ステップ2完了";
}).thenApply(result -> {
// このステップも実行されない
System.out.println("ステップ3実行: " + result);
return result + " -> ステップ3完了";
}).exceptionally(ex -> {
// 最終的にここで例外がキャッチされる
System.out.println("伝播された例外をキャッチ: " + ex.getCause().getMessage());
return "エラーからの復旧結果";
});
try {
String result6 = future6.get();
System.out.println("最終結果: " + result6);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 処理の終了を待機(デモのため)
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
このコードでは、CompletableFutureにおける様々な例外処理の手法を示している。
- exceptionally – 例外が発生した場合のみ呼び出され、フォールバック(代替)値を提供する。例外を結果に変換するためのシンプルな方法であり、try-catchブロックの「catch」部分に相当する。
- handle – 成功と失敗の両方のケースを処理する。これは、結果が正常に得られた場合と例外が発生した場合の両方に対応できる汎用的な方法。結果と例外の両方を引数として受け取り、新しい結果を返す。
- whenComplete – 成功または失敗に対して副作用(ログ記録など)を実行するが、結果自体は変更しない。例外が発生した場合、その例外は呼び出し元に伝播される。
- 例外チェイン – 複数のexceptionallyハンドラを連鎖させることで、異なる種類の例外を個別に処理することができる。最初のハンドラが処理しない例外は、次のハンドラに渡される。
- completeExceptionally – 手動で例外を設定する方法。非同期処理の中で例外が発生した場合、この方法でCompletableFutureを例外で完了させることができる。
- 例外の伝播 – 連鎖的な処理(thenApply, thenComposeなど)での例外の振る舞いを示している。いずれかのステップで例外が発生すると、それ以降のステップはスキップされ、最終的にexceptionallyか、あるいはget()呼び出し時のExecutionExceptionでキャッチされる。
実際のアプリケーションでは、これらの例外処理メカニズムを組み合わせて使用することで、堅牢な非同期処理を実装することができる。特に、マイクロサービスアーキテクチャのような分散システムでは、サービスの一部が失敗しても全体のシステムが機能し続けるための回復力(レジリエンス)を構築するために、これらの例外処理パターンが重要となる。
これらの例外処理パターンを実際のユースケースに適用した例を見てみよう。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
// データサービスのシミュレーション
class DataService {
// データベースからデータを取得(シミュレーション)
public CompletableFuture<String> fetchDataFromDatabase(String id) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("データベースからID " + id + " のデータを取得中...");
try {
Thread.sleep(1000);
// 一部のIDでは失敗する(デモ用)
if (id.equals("error")) {
throw new RuntimeException("データベース接続エラー");
}
return "データベースデータ for " + id;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("処理が中断されました", e);
}
});
}
// キャッシュからデータを取得(シミュレーション)
public CompletableFuture<String> fetchDataFromCache(String id) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("キャッシュからID " + id + " のデータを取得中...");
try {
Thread.sleep(300); // キャッシュは通常高速
// 一部のIDではキャッシュミス(デモ用)
if (id.equals("nocache")) {
throw new RuntimeException("キャッシュミス");
}
return "キャッシュデータ for " + id;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("処理が中断されました", e);
}
});
}
// バックアップサーバーからデータを取得(シミュレーション)
public CompletableFuture<String> fetchDataFromBackupServer(String id) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("バックアップサーバーからID " + id + " のデータを取得中...");
try {
Thread.sleep(2000); // バックアップは遅い
// 極めて稀なエラー(デモ用)
if (id.equals("totalfailure")) {
throw new RuntimeException("バックアップサーバーエラー");
}
return "バックアップデータ for " + id;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("処理が中断されました", e);
}
});
}
}
public class ResilientDataFetchExample {
public static void main(String[] args) {
DataService dataService = new DataService();
// テストするIDのリスト
String[] testIds = {"normal", "error", "nocache", "totalfailure"};
for (String id : testIds) {
System.out.println("\n=== ID: " + id + " のデータ取得テスト ===");
// レジリエントなデータ取得処理
CompletableFuture<String> resilientFetch = fetchDataWithResilience(dataService, id);
try {
// 結果を取得(タイムアウト付き)
String result = resilientFetch.get(5, TimeUnit.SECONDS);
System.out.println("取得成功: " + result);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("処理が中断されました");
} catch (ExecutionException e) {
System.out.println("すべての取得方法が失敗: " + e.getCause().getMessage());
} catch (TimeoutException e) {
System.out.println("データ取得がタイムアウトしました");
}
}
}
// レジリエンスパターンを実装したデータ取得メソッド
private static CompletableFuture<String> fetchDataWithResilience(DataService service, String id) {
// ステップ1: まずキャッシュから高速に取得を試みる
return service.fetchDataFromCache(id)
.exceptionally(cacheEx -> {
// キャッシュ取得に失敗した場合のログ記録
System.out.println("キャッシュ取得失敗: " + cacheEx.getMessage());
// ステップ2: キャッシュ失敗時はデータベースから取得を試みる
try {
return service.fetchDataFromDatabase(id)
.exceptionally(dbEx -> {
// データベース取得失敗のログ記録
System.out.println("データベース取得失敗: " + dbEx.getMessage());
// ステップ3: データベースも失敗した場合はバックアップから取得
try {
return service.fetchDataFromBackupServer(id).join();
} catch (Exception backupEx) {
// バックアップ取得失敗(すべてのソースが失敗)
System.out.println("バックアップ取得失敗: " + backupEx.getMessage());
throw new RuntimeException("すべてのデータソースが失敗しました", backupEx);
}
}).join();
} catch (Exception ex) {
throw new RuntimeException("データ取得処理でエラー", ex);
}
})
// 最後に: 取得したデータに対して必要な後処理
.thenApply(data -> {
System.out.println("データ後処理を実行中...");
return "処理済み: " + data;
});
}
}
このコードは、実際のアプリケーションで役立つレジリエントなデータ取得パターンを表している。主なポイントは以下のとおりだ。
- フォールバック戦略 – 複数のデータソース(キャッシュ、データベース、バックアップサーバー)を順番に試し、前のソースが失敗した場合に次のソースにフォールバックする。これにて、単一障害点を排除し、システムの可用性を高めている。
- exceptionallyの連鎖的使用 – 各データソースの失敗を個別に処理し、次のデータソースを試すためにexceptionallyを使用している。また、エラーの詳細をログに記録している。
- 段階的なエラー処理 – 最速のソース(キャッシュ)から始めて、より遅いが信頼性の高いソース(データベース、バックアップ)へと段階的に移行している。これにより、平均的なパフォーマンスを最適化しつつ、堅牢性も確保している。
- タイムアウト – 全体の処理に対してタイムアウトを設定している。これにより、すべてのデータソースが応答しない最悪のケースでも、アプリケーションが無期限にブロックされることを防いでいる。
このようなレジリエンスパターンは、特に分散システムや高可用性が要求されるアプリケーションで重要である。CompletableFutureのAPIを使用することで、このような複雑なエラー処理と復旧ロジックを読みやすく保守しやすいコードで表現することができる。
CompletableFutureの合成と例外処理の機能を理解することで、堅牢で効率的な非同期処理を実装することが可能になる。この手法は、現代のJavaアプリケーション開発における重要なツールとなっている。
非同期処理における注意点と対策
前章までで非同期処理の実装方法とパターンについて解説してきたが、非同期処理にはその性質上、様々な課題や落とし穴が存在する。本章では、非同期プログラミングにおける一般的な問題と、それらを避けるための対策について解説する。競合状態やデッドロックといった古典的な並行処理の問題から、スレッドセーフなコードの書き方、そしてメモリリークの防止まで、非同期処理を安全かつ効率的に行うための重要な知識を身につけることができる。注意点と対策を理解することで、より堅牢な非同期アプリケーションの開発が可能になる。
競合状態(Race Condition)の理解と回避
競合状態(Race Condition)とは、複数のスレッドが同じリソースに同時にアクセスし、その実行順序によって結果が変わってしまう状態を指す。このような状況は、予測不可能な動作やバグを引き起こす原因となる。
最も典型的な競合状態の例として、複数のスレッドが共有変数を更新する場合を見てみよう。
public class RaceConditionExample {
// 複数スレッドで共有される変数
private static int counter = 0;
public static void main(String[] args) throws InterruptedException {
// 1000回のインクリメント操作を行う2つのスレッドを作成
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter++; // 非アトミック操作
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter++; // 非アトミック操作
}
});
// スレッドを開始
thread1.start();
thread2.start();
// 両方のスレッドの終了を待機
thread1.join();
thread2.join();
// 結果を表示(理論的には2000になるはず)
System.out.println("最終的なカウンター値: " + counter);
}
}
このコードでは、2つのスレッドが共有変数counterを1000回ずつインクリメントしている。理論的には、最終的なcounterの値は2000になるはずだが、実際に実行すると2000より小さい値になることがある。これは、counter++が実際には以下の3つの操作から構成されるためである。
- 現在の値を読み取る
- 値に1を加える
- 新しい値を書き込む
これらの操作は原子的(アトミック)ではないため、2つのスレッドが同時に同じ値を読み取り、それぞれが増分した値を書き込む可能性がある。この場合、2回のインクリメント操作が実際には1回の効果しか持たないことになる。
競合状態を回避するための一般的な方法は、同期化(synchronization)を使用することである。Javaでは、synchronizedキーワードを使用して、特定のコードブロックやメソッド全体へのアクセスを一度に1つのスレッドに制限することができる。
public class SynchronizedExample {
private static int counter = 0;
// 同期に使用するロックオブジェクト
private static final Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
// synchronizedブロックを使用して競合を防止
synchronized (lock) {
counter++;
}
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
synchronized (lock) {
counter++;
}
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("最終的なカウンター値: " + counter); // 常に2000になる
}
}
このコードでは、synchronizedブロックを使用して、counterのインクリメント操作を同期化している。これにより、一度に1つのスレッドだけがcounterを更新できるようになり、競合状態が防止される。同期化にはパフォーマンスオーバーヘッドが伴うが、データの整合性を確保するために必要である。
より細かい粒度で同期を行うには、java.util.concurrent.atomicパッケージのアトミック変数クラスを使用することができる。これらのクラスは、特定の操作をロックなしで原子的に実行することができる。
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicExample {
// AtomicIntegerを使用してアトミック操作を実現
private static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.incrementAndGet(); // アトミックなインクリメント操作
}
});
Thread thread2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.incrementAndGet();
}
});
thread1.start();
thread2.start();
thread1.join();
thread2.join();
System.out.println("最終的なカウンター値: " + counter.get()); // 常に2000になる
}
}
AtomicIntegerクラスのincrementAndGetメソッドは、インクリメント操作を原子的に行う。これにより、synchronizedブロックよりも低いオーバーヘッドで競合状態を回避することができる。アトミッククラスは、カウンター、フラグ、参照など、単一の値に対する操作に最適である。
より複雑な競合状態の一例として、複数のフィールドの一貫性を維持する必要がある場合を考えてみよう。
public class BankAccount {
private int balance; // 残高
private int transactionCount; // 取引回数
public BankAccount(int initialBalance) {
this.balance = initialBalance;
this.transactionCount = 0;
}
// 安全でない入金メソッド - 競合状態の可能性あり
public void depositUnsafe(int amount) {
balance += amount;
transactionCount++;
}
// 同期化された入金メソッド - 安全
public synchronized void depositSafe(int amount) {
balance += amount;
transactionCount++;
}
// 残高と取引回数の一貫性を確認するメソッド
public synchronized void validateState() {
if (transactionCount == 0 && balance != 0) {
System.err.println("不整合な状態: 取引なしで残高変動");
}
System.out.println("残高: " + balance + ", 取引回数: " + transactionCount);
}
public static void main(String[] args) throws InterruptedException {
// 安全でないメソッドを使用した場合
System.out.println("====== 安全でない入金メソッド ======");
BankAccount unsafeAccount = new BankAccount(0);
Thread[] unsafeThreads = new Thread[5];
for (int i = 0; i < 5; i++) {
unsafeThreads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
unsafeAccount.depositUnsafe(10);
}
});
unsafeThreads[i].start();
}
for (Thread t : unsafeThreads) {
t.join();
}
unsafeAccount.validateState();
// 安全なメソッドを使用した場合
System.out.println("\n====== 安全な入金メソッド ======");
BankAccount safeAccount = new BankAccount(0);
Thread[] safeThreads = new Thread[5];
for (int i = 0; i < 5; i++) {
safeThreads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
safeAccount.depositSafe(10);
}
});
safeThreads[i].start();
}
for (Thread t : safeThreads) {
t.join();
}
safeAccount.validateState();
}
}
このコードでは、銀行口座の残高と取引回数を管理するクラスを実装している。depositUnsafeメソッドは同期化されていないため、複数のスレッドが同時に実行すると、残高の更新と取引回数の更新の間に競合状態が発生する可能性がある。一方、depositSafeメソッドはsynchronized修飾子を使用して同期化されているため、複数のスレッドからの呼び出しでも一貫性が保たれる。
競合状態を検出することは難しい場合がある。特に、マルチコアシステムでの並行実行に依存するバグは、特定の実行環境でのみ再現される場合がある。そのため、競合状態の可能性がある箇所を設計段階で特定し、適切な同期化を行うことが重要である。
Java言語には、競合状態を回避するための様々なツールがある。適切なツールの選択は、具体的な要件と制約に依存する。
- synchronized – 最も基本的な同期化メカニズム。一度に1つのスレッドだけがsynchronizedブロックやメソッドにアクセスできる。
- java.util.concurrent.locks – より柔軟なロックメカニズム。ReentrantLock、ReadWriteLock、StampedLockなどがある。
- アトミッククラス – 単一の値に対する操作を原子的に行うためのクラス。AtomicInteger、AtomicLong、AtomicReferenceなどがある。
- スレッドセーフなコレクション – ConcurrentHashMap、CopyOnWriteArrayListなど、内部的に同期化されたコレクションクラス。
- イミュータブルオブジェクト – 状態が変化しないオブジェクトは本質的にスレッドセーフである。
競合状態の回避戦略を選択する際には、パフォーマンスと安全性のトレードオフを考慮する必要がある。過度な同期化はスレッド間の並行性を低下させ、最悪の場合デッドロックを引き起こす可能性がある。一方、不十分な同期化はデータ破損やバグを引き起こす。次節では、デッドロックの原因と防止策について詳しく解説する。
デッドロックの原因と防止策
デッドロックとは、複数のスレッドがそれぞれ相手が保持しているリソース(ロック)を待っている状態で、どのスレッドも進行できなくなってしまう状況を指す。デッドロックは、適切な配慮なしに複数のロックを使用すると容易に発生する可能性があり、システムのハングや応答不能の原因となる。
典型的なデッドロックの例を以下に示す。
public class DeadlockExample {
// 2つのリソース(ロックとして使用)
private static final Object resource1 = new Object();
private static final Object resource2 = new Object();
public static void main(String[] args) {
// スレッド1: resource1を先に獲得し、次にresource2を獲得しようとする
Thread thread1 = new Thread(() -> {
System.out.println("スレッド1: resource1の獲得を試みる");
synchronized (resource1) {
System.out.println("スレッド1: resource1を獲得");
// 少し待機してデッドロックを発生させやすくする
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("スレッド1: resource2の獲得を試みる");
synchronized (resource2) {
System.out.println("スレッド1: resource2を獲得");
// 両方のリソースを使用した処理
System.out.println("スレッド1: 両方のリソースを使用した処理を実行");
}
}
});
// スレッド2: resource2を先に獲得し、次にresource1を獲得しようとする
Thread thread2 = new Thread(() -> {
System.out.println("スレッド2: resource2の獲得を試みる");
synchronized (resource2) {
System.out.println("スレッド2: resource2を獲得");
// 少し待機してデッドロックを発生させやすくする
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("スレッド2: resource1の獲得を試みる");
synchronized (resource1) {
System.out.println("スレッド2: resource1を獲得");
// 両方のリソースを使用した処理
System.out.println("スレッド2: 両方のリソースを使用した処理を実行");
}
}
});
// スレッドを開始
thread1.start();
thread2.start();
}
}
このコードでは、2つのスレッドがそれぞれ異なる順序で2つのリソースをロックしようとしている。スレッド1はresource1を先に獲得し、次にresource2を獲得しようとする。一方、スレッド2はresource2を先に獲得し、次にresource1を獲得しようとする。両方のスレッドが最初のリソースを獲得した後、互いに相手が保持しているリソースを待つことになり、デッドロックが発生する。
デッドロックの発生には、一般的に以下の4つの条件(Coffmanの条件)が必要である。
- 相互排除(Mutual Exclusion) – 少なくとも1つのリソースが排他的に使用される
- 保持と待機(Hold and Wait) – すでに一部のリソースを保持しながら、他のリソースを待機する
- 非剥奪(No Preemption) – リソースは所有者が自発的に解放するまで剥奪できない
- 循環待機(Circular Wait) – プロセス間で閉じた待機の連鎖が存在する
デッドロックを防止するには、これらの条件のうち少なくとも1つを破る必要がある。最も一般的な方法は、循環待機を防ぐことである。これは、すべてのスレッドが常に同じ順序でリソースを獲得するようにすることで実現できる。
上記の例を修正して、デッドロックを防止する方法を見てみよう。
public class DeadlockPreventionExample {
// 2つのリソース(ロックとして使用)
private static final Object resource1 = new Object();
private static final Object resource2 = new Object();
public static void main(String[] args) {
// スレッド1: 一貫した順序でリソースを獲得(resource1 -> resource2)
Thread thread1 = new Thread(() -> {
System.out.println("スレッド1: resource1の獲得を試みる");
synchronized (resource1) {
System.out.println("スレッド1: resource1を獲得");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("スレッド1: resource2の獲得を試みる");
synchronized (resource2) {
System.out.println("スレッド1: resource2を獲得");
System.out.println("スレッド1: 両方のリソースを使用した処理を実行");
}
}
});
// スレッド2: 同じ順序でリソースを獲得(resource1 -> resource2)
Thread thread2 = new Thread(() -> {
System.out.println("スレッド2: resource1の獲得を試みる");
synchronized (resource1) {
System.out.println("スレッド2: resource1を獲得");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("スレッド2: resource2の獲得を試みる");
synchronized (resource2) {
System.out.println("スレッド2: resource2を獲得");
System.out.println("スレッド2: 両方のリソースを使用した処理を実行");
}
}
});
// スレッドを開始
thread1.start();
thread2.start();
}
}
この修正されたコードでは、両方のスレッドが同じ順序(resource1を先に、次にresource2)でリソースを獲得するように変更している。これにより、循環待機の条件が破られ、デッドロックは発生しなくなる。この例では、一方のスレッドがresource1を獲得している間、もう一方のスレッドはresource1の解放を待つことになる。
より複雑なシステムでは、リソースに対して一貫した獲得順序を維持することが難しい場合がある。そのような場合、タイムアウト付きロックを使用して、デッドロックを検出し回復することができる。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TimeoutLockExample {
// タイムアウト付きロックを使用
private static final Lock lock1 = new ReentrantLock();
private static final Lock lock2 = new ReentrantLock();
public static void main(String[] args) {
Thread thread1 = new Thread(() -> {
boolean locked1 = false;
boolean locked2 = false;
try {
System.out.println("スレッド1: lock1の獲得を試みる");
// タイムアウト付きでロックを試みる
locked1 = lock1.tryLock(1, TimeUnit.SECONDS);
if (locked1) {
System.out.println("スレッド1: lock1を獲得");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("スレッド1: lock2の獲得を試みる");
locked2 = lock2.tryLock(1, TimeUnit.SECONDS);
if (locked2) {
System.out.println("スレッド1: lock2を獲得");
// 両方のロックを使用した処理
System.out.println("スレッド1: 両方のロックを使用した処理を実行");
} else {
System.out.println("スレッド1: lock2の獲得に失敗(タイムアウト)");
// 失敗時の代替処理またはリトライロジック
}
} else {
System.out.println("スレッド1: lock1の獲得に失敗(タイムアウト)");
// 失敗時の代替処理またはリトライロジック
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("スレッド1: 中断されました");
} finally {
// 獲得したロックを必ず解放
if (locked2) {
lock2.unlock();
System.out.println("スレッド1: lock2を解放");
}
if (locked1) {
lock1.unlock();
System.out.println("スレッド1: lock1を解放");
}
}
});
Thread thread2 = new Thread(() -> {
boolean locked1 = false;
boolean locked2 = false;
try {
System.out.println("スレッド2: lock2の獲得を試みる");
locked2 = lock2.tryLock(1, TimeUnit.SECONDS);
if (locked2) {
System.out.println("スレッド2: lock2を獲得");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("スレッド2: lock1の獲得を試みる");
locked1 = lock1.tryLock(1, TimeUnit.SECONDS);
if (locked1) {
System.out.println("スレッド2: lock1を獲得");
System.out.println("スレッド2: 両方のロックを使用した処理を実行");
} else {
System.out.println("スレッド2: lock1の獲得に失敗(タイムアウト)");
// 失敗時の代替処理またはリトライロジック
}
} else {
System.out.println("スレッド2: lock2の獲得に失敗(タイムアウト)");
// 失敗時の代替処理またはリトライロジック
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("スレッド2: 中断されました");
} finally {
// 獲得したロックを必ず解放
if (locked1) {
lock1.unlock();
System.out.println("スレッド2: lock1を解放");
}
if (locked2) {
lock2.unlock();
System.out.println("スレッド2: lock2を解放");
}
}
});
thread1.start();
thread2.start();
}
}
このコードでは、ReentrantLockクラスのtryLockメソッドを使用して、指定されたタイムアウト内にロックを獲得できない場合は処理を続行するようにしている。これにより、デッドロックが発生する可能性があるコードでも、無限に待機せずに代替処理を行うことができる。また、finallyブロック内でロックを解放することで、例外が発生した場合でもリソースがリークしないようにしている。
他のデッドロック防止策として、以下のような方法もある。
- ロックの階層化 – ロックに論理的な順序付けを行い、常にその順序でロックを獲得する
- ロックのグループ化 – 複数のロックを一つの複合ロックとして扱う
- ロックのタイムアウト – 一定時間内にロックを獲得できない場合は処理を中止する
- デッドロック検出 – システムがデッドロック状態を検出し、一部のスレッドを強制終了させる
デッドロックの検出や診断には、Javaの標準ツールであるjconsoleやjvisualvmを使用できる。また、スレッドダンプを取得して、スレッドの状態や保持しているロックを確認することもできる。
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Map;
public class ThreadDumpExample {
private static final Object lock1 = new Object();
private static final Object lock2 = new Object();
public static void main(String[] args) {
// デッドロックを引き起こす2つのスレッドを作成
Thread thread1 = new Thread(() -> {
synchronized (lock1) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lock2) {
System.out.println("スレッド1: 処理完了");
}
}
}, "DeadlockThread-1");
Thread thread2 = new Thread(() -> {
synchronized (lock2) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
synchronized (lock1) {
System.out.println("スレッド2: 処理完了");
}
}
}, "DeadlockThread-2");
// スレッドを開始
thread1.start();
thread2.start();
// 少し待ってからスレッドダンプを取得
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 現在のスレッドの状態を出力
printThreadInfo();
// 現在のJVMのすべてのスレッド情報を出力
System.out.println("\n現在のスレッドダンプ:");
for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
Thread t = entry.getKey();
StackTraceElement[] stack = entry.getValue();
if (t.getName().startsWith("DeadlockThread")) {
System.out.println("\nスレッド: " + t.getName() +
" / 状態: " + t.getState());
for (StackTraceElement element : stack) {
System.out.println("\t" + element);
}
}
}
}
private static void printThreadInfo() {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
long[] threadIds = threadMXBean.findDeadlockedThreads(); // デッドロックしているスレッドのIDを取得
if (threadIds != null) {
System.out.println("デッドロックを検出しました!");
ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadIds, true, true);
for (ThreadInfo threadInfo : threadInfos) {
System.out.println(threadInfo.getThreadName() + " は " +
threadInfo.getLockName() + " を待機中");
System.out.println("ロック所有者: " + threadInfo.getLockOwnerName());
}
} else {
System.out.println("デッドロックは検出されませんでした");
}
}
}
このコードでは、JavaのThreadMXBeanを使用してデッドロックを検出し、スレッドの状態やスタックトレースを出力している。findDeadlockedThreadsメソッドは、デッドロック状態にあるスレッドのIDを返す。また、getAllStackTracesメソッドを使用して、すべてのスレッドのスタックトレースを取得している。
デッドロックの防止は、複雑な並行処理システムを設計する上で重要な考慮事項である。リソースの獲得順序の一貫性維持、タイムアウト付きロックの使用、適切なリソース管理など、複数の戦略を組み合わせることで、デッドロックのリスクを最小化することができる。次節では、スレッドセーフなコードの書き方について詳しく解説する。
スレッドセーフなコードの書き方
スレッドセーフなコードとは、複数のスレッドが同時にアクセスしても、予期せぬ動作やデータの破損が発生しないコードのことである。非同期処理を多用するアプリケーションでは、スレッドセーフ性の確保は極めて重要である。本節では、Javaでスレッドセーフなコードを書くための様々な手法について解説する。
まず、スレッドセーフ性を確保するための基本的なアプローチについて見ていこう。
1. 同期化(Synchronization)
同期化は、複数のスレッドが同時に特定のコード領域やデータにアクセスすることを防ぐ最も基本的な方法である。
public class SynchronizedCounter {
private int count = 0;
// synchronizedメソッド - このメソッド全体が同期される
public synchronized void increment() {
count++;
}
// synchronizedブロック - 特定の処理のみを同期
public void incrementWithBlock() {
// 他の非同期処理
doSomethingElse();
// カウンター更新部分のみを同期
synchronized (this) {
count++;
}
// 他の非同期処理
doAnotherThing();
}
// 読み取り操作も同期化が必要
public synchronized int getCount() {
return count;
}
// 同期化の必要がない処理
private void doSomethingElse() {
// 共有状態を変更しない処理
}
private void doAnotherThing() {
// 共有状態を変更しない処理
}
}
このコードでは、2つの方法で同期化を行っている。1つ目はsynchronizedメソッド修飾子を使用する方法で、メソッド全体が同期化される。2つ目はsynchronizedブロックを使用する方法で、特定のコード領域のみが同期化される。性能面では、必要な部分のみを同期化するsynchronizedブロックの方が効率的である。
2. ロックの使用
Java 5以降では、java.util.concurrent.locksパッケージが提供するより柔軟なロックメカニズムを使用することができる。
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Lock;
public class LockBasedCounter {
private final Lock lock = new ReentrantLock();
private int count = 0;
public void increment() {
// ロックを獲得
lock.lock();
try {
// クリティカルセクション(排他的にアクセスされる領域)
count++;
} finally {
// 必ずロックを解放
lock.unlock();
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
ReentrantLockクラスは、synchronizedよりも柔軟な機能を提供する。例えば、ロックの獲得試行のタイムアウト設定、公平性の制御(fairness)、条件変数(Condition)の使用などが可能である。ただし、lockとunlockのバランスを取り、finallyブロックでの確実な解放が重要である。
3. ReadWriteLockの使用
読み取り操作が多く、書き込み操作が少ない場合、ReadWriteLockを使用することで同時読み取りを許可しつつ、書き込み時の排他制御を実現できる。
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteCounter {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private int count = 0;
// 書き込み操作(インクリメント)
public void increment() {
// 書き込みロックを獲得
rwLock.writeLock().lock();
try {
count++;
} finally {
rwLock.writeLock().unlock();
}
}
// 読み取り操作
public int getCount() {
// 読み取りロックを獲得
rwLock.readLock().lock();
try {
return count;
} finally {
rwLock.readLock().unlock();
}
}
}
ReadWriteLockでは、複数のスレッドが同時に読み取りロックを獲得できるが、書き込みロックが獲得されている間は他のスレッドはどちらのロックも獲得できない。これにより、読み取り主体のアプリケーションでのパフォーマンスが向上する。
4. アトミッククラスの使用
単一の変数に対する操作を原子的に行いたい場合、java.util.concurrent.atomicパッケージのクラスを使用すると、ロックよりも高速に動作することが多い。
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicCounter {
// アトミックな整数値
private final AtomicInteger count = new AtomicInteger(0);
// アトミックな参照
private final AtomicReference<String> lastUpdater = new AtomicReference<>("none");
public void increment(String updaterName) {
// インクリメント操作を原子的に実行
count.incrementAndGet();
// 参照の更新も原子的に実行
lastUpdater.set(updaterName);
}
// 複合的な操作も原子的に実行可能
public void incrementByValueIfGreaterThan(int value, int threshold) {
// 現在値がthresholdより大きい場合のみ、valueを加算
count.updateAndGet(current -> current > threshold ? current + value : current);
}
public int getCount() {
return count.get();
}
public String getLastUpdater() {
return lastUpdater.get();
}
}
AtomicIntegerやAtomicReferenceなどのアトミッククラスは、Compare-And-Swap(CAS)操作を利用して、ロックを使わずに原子性を保証する。これらは単一の変数に対する操作に最適だが、複数の変数間の一貫性が必要な場合は、同期化やロックの使用が必要となる。
5. イミュータブルオブジェクト
イミュータブル(不変)オブジェクトは、一度作成された後に状態が変化しないオブジェクトである。これらは本質的にスレッドセーフであり、複数のスレッドで安全に共有できる。
// イミュータブルクラスの例
public final class ImmutablePoint {
// すべてのフィールドがfinal
private final int x;
private final int y;
// コンストラクタでのみ値を設定
public ImmutablePoint(int x, int y) {
this.x = x;
this.y = y;
}
// 値を変更するメソッドはなく、新しいインスタンスを返す
public ImmutablePoint translate(int dx, int dy) {
return new ImmutablePoint(x + dx, y + dy);
}
// ゲッターメソッドのみ提供
public int getX() {
return x;
}
public int getY() {
return y;
}
@Override
public String toString() {
return "(" + x + ", " + y + ")";
}
}
イミュータブルオブジェクトを作成するためのポイントは以下のとおりである。
- クラスを
finalにして継承を防ぐ - すべてのフィールドを
private finalにする - セッターメソッドを提供しない
- 変更が必要な場合は新しいインスタンスを返す
- 可変オブジェクトを含む場合は、防御的コピーを使用する
イミュータブルオブジェクトは、特に関数型プログラミングスタイルを採用する場合に有用である。String、Integer、LocalDateなど、多くのJavaの標準クラスはイミュータブルである。
6. スレッドローカル変数
スレッドローカル変数は、各スレッドが独自のコピーを持ち、他のスレッドからアクセスできない変数である。これにより、共有状態を避けつつ、スレッド固有のコンテキストを維持することができる。
import java.text.SimpleDateFormat;
import java.util.Date;
public class ThreadLocalExample {
// ThreadLocalを使用して各スレッドに独自のSimpleDateFormatインスタンスを提供
private static final ThreadLocal<SimpleDateFormat> dateFormatThreadLocal =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
public static String formatDate(Date date) {
// 現在のスレッド用のフォーマッタを取得
return dateFormatThreadLocal.get().format(date);
}
public static void main(String[] args) {
// 複数スレッドからの日付フォーマット(スレッドセーフ)
Runnable task = () -> {
String threadName = Thread.currentThread().getName();
Date now = new Date();
String formatted = formatDate(now);
System.out.println(threadName + ": " + formatted);
};
// 複数のスレッドを作成
for (int i = 0; i < 5; i++) {
new Thread(task, "Thread-" + i).start();
}
}
}
SimpleDateFormatはスレッドセーフではないため、複数のスレッドで共有すると問題が発生する可能性がある。ThreadLocalを使用することで、各スレッドが独自のインスタンスを持つようになり、安全に使用できる。
Java 8以降では、DateTimeFormatterなど、多くのスレッドセーフな代替クラスが提供されているため、そちらを使用することも検討すべきである。
7. スレッドセーフなコレクション
Java標準ライブラリには、スレッドセーフなコレクションクラスが多数用意されている。
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.ArrayList;
import java.util.List;
public class ThreadSafeCollections {
public static void main(String[] args) {
// 1. 同期化ラッパー - 既存のコレクションをスレッドセーフにする
List<String> synchronizedList = Collections.synchronizedList(new ArrayList<>());
Map<String, Integer> synchronizedMap = Collections.synchronizedMap(new HashMap<>());
// 同期化リストの使用例(同期ブロック内で反復処理が必要)
synchronizedList.add("項目1");
synchronizedList.add("項目2");
// 重要: 同期化コレクションを反復処理する場合は同期ブロックが必要
synchronized (synchronizedList) {
for (String item : synchronizedList) {
System.out.println(item);
}
}
// 2. java.util.concurrentパッケージのコレクション
// ConcurrentHashMap - 高度に並行なマップ
Map<String, Integer> concurrentMap = new ConcurrentHashMap<>();
concurrentMap.put("key1", 1);
concurrentMap.put("key2", 2);
// ConcurrentHashMapは反復処理中の同期が不要
for (Map.Entry<String, Integer> entry : concurrentMap.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
// CopyOnWriteArrayList - 書き込みコストが高いが読み取りに最適化
List<String> cowList = new CopyOnWriteArrayList<>();
cowList.add("項目A");
cowList.add("項目B");
// 反復処理中に変更が加えられても安全
for (String item : cowList) {
System.out.println(item);
// 以下は安全だが非効率(新しいコピーが作成される)
cowList.add("新しい項目"); // 反復に影響しない
break; // 無限ループを避けるため
}
}
}
主なスレッドセーフなコレクションは以下のとおりである。
- 同期化ラッパー –
Collections.synchronizedList,synchronizedMapなど - ConcurrentHashMap – 高度に並行なハッシュマップ
- CopyOnWriteArrayList – 読み取り操作に最適化されたリスト
- ConcurrentSkipListMap/Set – ソート済みのマップ/セット
- BlockingQueue – スレッド間の作業キュー
ConcurrentHashMapは部分的なロックを使用しており、全体をロックするsynchronizedMapよりも高いスループットを提供する。一方、CopyOnWriteArrayListは変更操作ごとに内部配列のコピーを作成するため、書き込みが少なく読み取りが多い場合に適している。
8. 複合操作の原子性
複数の操作を原子的に実行する必要がある場合は、それらをまとめて同期化する必要がある。
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class AtomicCompoundOperations {
private final Lock lock = new ReentrantLock();
private int count = 0;
private int total = 0;
// 複数の操作を原子的に実行
public void updateStatistics(int value) {
lock.lock();
try {
count++;
total += value;
} finally {
lock.unlock();
}
}
// 複数のフィールドに依存する操作も原子的に実行
public double getAverage() {
lock.lock();
try {
if (count == 0) {
return 0.0;
}
return (double) total / count;
} finally {
lock.unlock();
}
}
}
このコードでは、countとtotalの2つのフィールドを更新する操作を原子的に行うために、ロックを使用している。また、両方のフィールドに依存するgetAverageメソッドも同じロックを使用して同期化されている。
9. ダブルチェックロッキングパターン
遅延初期化(Lazy Initialization)を行う際に使用されるパターンであり、初期化のパフォーマンスコストを抑えつつスレッドセーフ性を確保する。
public class SingletonWithDoubleCheckedLocking {
// volatile修飾子が重要
private static volatile SingletonWithDoubleCheckedLocking instance;
// プライベートコンストラクタ
private SingletonWithDoubleCheckedLocking() {
// 初期化処理
}
// ダブルチェックロッキングを使用したシングルトンインスタンスの取得
public static SingletonWithDoubleCheckedLocking getInstance() {
// 最初のチェック(ロックなし)
if (instance == null) {
// ロックを獲得
synchronized (SingletonWithDoubleCheckedLocking.class) {
// 2番目のチェック(ロック内)
if (instance == null) {
instance = new SingletonWithDoubleCheckedLocking();
}
}
}
return instance;
}
}
このパターンでは、volatileキーワードが重要である。これにより、instance変数への参照が正しく発行され、部分的に初期化されたオブジェクトへの参照が返されることを防ぐ。Java 5以降では、メモリモデルの改訂によりこのパターンが正しく機能するようになった。
10. スレッドセーフなイニシャライザとファクトリ
クラスの初期化やオブジェクトの作成をスレッドセーフに行うパターン。
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
public class ThreadSafeInitializer {
// スレッドセーフなシングルトン(静的初期化ブロックを使用)
private static final ThreadSafeInitializer INSTANCE = new ThreadSafeInitializer();
public static ThreadSafeInitializer getInstance() {
return INSTANCE;
}
// 静的内部クラスを使用した遅延初期化(ホルダーパターン)
private static class LazyHolder {
private static final ThreadSafeInitializer INSTANCE = new ThreadSafeInitializer();
}
public static ThreadSafeInitializer getLazyInstance() {
return LazyHolder.INSTANCE;
}
// AtomicReferenceを使用したファクトリ
private static final AtomicReference<ExpensiveObject> cachedObject = new AtomicReference<>();
public static ExpensiveObject getExpensiveObject() {
ExpensiveObject object = cachedObject.get();
if (object == null) {
object = new ExpensiveObject();
if (!cachedObject.compareAndSet(null, object)) {
// 他のスレッドが既にオブジェクトを設定していた場合
object = cachedObject.get();
}
}
return object;
}
// 汎用的なオンデマンド初期化ヘルパー
public static <T> T initializeOnce(AtomicReference<T> ref, Supplier<T> factory) {
T value = ref.get();
if (value == null) {
T newValue = factory.get();
if (ref.compareAndSet(null, newValue)) {
return newValue;
}
return ref.get();
}
return value;
}
// テスト用の重いオブジェクト
private static class ExpensiveObject {
public ExpensiveObject() {
// 重い初期化処理をシミュレート
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
このコードでは、スレッドセーフなシングルトンパターンの実装方法をいくつか示している。特に、静的内部クラスを使用した遅延初期化(ホルダーパターン)は、ダブルチェックロッキングよりも簡潔かつ安全な方法として推奨される。
また、AtomicReferenceとcompareAndSetを使用して、スレッドセーフなキャッシュメカニズムを実装する方法も示している。
スレッドセーフなコードを書くには、共有される可変状態の慎重な管理が必要である。可能な限り共有状態を減らし、必要な場合は適切な同期メカニズムを使用することが重要である。また、既存のスレッドセーフなクラスやユーティリティを活用することで、バグの可能性を減らし、メンテナンス性を向上させることができる。次節では、非同期処理におけるメモリリークとリソース管理について解説する。
メモリリークとリソース管理
非同期処理においては、適切なリソース管理が特に重要である。長時間実行されるアプリケーションでは、小さなメモリリークでも時間とともに蓄積され、最終的にはOutOfMemoryErrorを引き起こす可能性がある。本節では、非同期処理におけるメモリリークの一般的な原因と、それを防ぐためのリソース管理手法について解説する。
1. スレッドプールによるメモリリーク
スレッドプールを使用する際、タスクが適切に終了しない場合、リソースがリークする可能性がある。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadPoolLeakExample {
public static void main(String[] args) {
// 固定サイズのスレッドプールを作成
ExecutorService executor = Executors.newFixedThreadPool(5);
// 無限ループを含むタスク(悪い例)
Runnable badTask = () -> {
try {
System.out.println("バッドタスク開始");
while (!Thread.currentThread().isInterrupted()) {
// 無限ループ - スレッドが解放されない
// ...処理...
// 中断チェックのない長時間のスリープもNG
Thread.sleep(1000);
}
} catch (InterruptedException e) {
// 中断フラグを再設定
Thread.currentThread().interrupt();
System.out.println("バッドタスクが中断されました");
}
};
// 適切なタスク(良い例)
Runnable goodTask = () -> {
try {
System.out.println("グッドタスク開始");
// 有限の処理
for (int i = 0; i < 5; i++) {
// 中断をチェック
if (Thread.currentThread().isInterrupted()) {
System.out.println("グッドタスクが中断されました");
return;
}
// 実際の処理
System.out.println("処理中: " + i);
// 中断可能なスリープ
Thread.sleep(100);
}
System.out.println("グッドタスク完了");
} catch (InterruptedException e) {
// 中断を適切に処理
Thread.currentThread().interrupt();
System.out.println("グッドタスクが中断されました");
}
};
// タスクをスレッドプールに送信
executor.submit(goodTask);
executor.submit(badTask);
// アプリケーション終了時の適切なシャットダウン
try {
Thread.sleep(2000); // 実際のアプリケーションでの作業をシミュレート
System.out.println("スレッドプールのシャットダウンを開始");
// 新しいタスクの受け入れを停止
executor.shutdown();
// 既存のタスクの完了を待機(タイムアウト付き)
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
System.out.println("タスクが完了しないため、強制シャットダウンを実行");
// 実行中のタスクを中断
executor.shutdownNow();
// 中断後の完了を待機
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
System.err.println("スレッドプールのシャットダウンに失敗");
}
}
System.out.println("スレッドプールのシャットダウン完了");
} catch (InterruptedException e) {
// 現在のスレッドが中断された場合
Thread.currentThread().interrupt();
// 強制シャットダウン
executor.shutdownNow();
}
}
}
このコードでは、メモリリークの原因となる「バッドタスク」と、適切に実装された「グッドタスク」を対比している。バッドタスクは無限ループを実行し、中断シグナルをチェックしないため、スレッドプールのスレッドが永続的に占有される。対照的に、グッドタスクは有限の処理を実行し、中断をチェックすることで、適切に終了できる。
また、スレッドプールを正しくシャットダウンするためのパターンも示している。shutdownとshutdownNowの両方のメソッドを、タイムアウト付きのawaitTerminationと組み合わせて使用することで、リソースリークを防ぐことができる。
2. タイマータスクのリーク
TimerクラスやScheduledExecutorServiceを使用した定期的なタスクも、メモリリークの原因になることがある。
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class TimerLeakExample {
public static void main(String[] args) throws InterruptedException {
// 問題のあるTimerの使用(古いAPI)
System.out.println("=== 古いTimerを使用した例 ===");
Timer timer = new Timer();
// カウントを保持するオブジェクト
AtomicInteger counter = new AtomicInteger();
// 例外をスローするタスク
TimerTask problematicTask = new TimerTask() {
@Override
public void run() {
int count = counter.incrementAndGet();
System.out.println("Timer タスク実行 #" + count);
if (count == 3) {
// 例外をスロー
throw new RuntimeException("意図的なタスク失敗");
// 注意: Timerでは、1つのタスクの例外がTimerを完全に停止させる!
}
}
};
// 1秒ごとに実行
timer.schedule(problematicTask, 0, 1000);
// 少し待機
Thread.sleep(5000);
// Timerのキャンセル(終了時に重要)
System.out.println("Timerをキャンセル");
timer.cancel();
// ---
// 改善版: ScheduledExecutorServiceの使用
System.out.println("\n=== ScheduledExecutorServiceを使用した例 ===");
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// カウンタをリセット
counter.set(0);
// スケジュールされたタスク
Runnable betterTask = () -> {
try {
int count = counter.incrementAndGet();
System.out.println("Scheduler タスク実行 #" + count);
if (count == 3) {
// 例外をスロー
throw new RuntimeException("意図的なタスク失敗");
// 注意: ScheduledExecutorServiceでは、1つのタスクの例外は他に影響しない
}
} catch (Exception e) {
System.err.println("タスク例外を捕捉: " + e.getMessage());
// エラーログの記録などの処理
}
};
// 1秒ごとに実行
scheduler.scheduleAtFixedRate(betterTask, 0, 1000, TimeUnit.MILLISECONDS);
// 少し待機
Thread.sleep(5000);
// スケジューラのシャットダウン
System.out.println("スケジューラをシャットダウン");
scheduler.shutdown();
if (!scheduler.awaitTermination(3, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
}
}
このコードでは、古いTimerクラスと最新のScheduledExecutorServiceを比較している。Timerでは、1つのタスクで例外が発生すると、タイマー全体が停止してしまう。これに対し、ScheduledExecutorServiceでは、例外が発生したタスクのみが影響を受け、他のタスクは引き続き実行される。
また、どちらのケースでも、アプリケーション終了時に適切にリソースを解放することが重要である。Timerではcancel()を、ScheduledExecutorServiceではshutdown()とshutdownNow()を使用する。
3. クロージャとコールバックによるメモリリーク
ラムダ式やコールバックを使用した非同期処理では、意図しない参照保持によるメモリリークが発生することがある。
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class CallbackLeakExample {
// メインクラス(長寿命)
public static class DataProcessor {
// 処理結果のコールバック保持
private final List<Runnable> callbacks = new ArrayList<>();
// コールバックを登録
public void registerCallback(Runnable callback) {
callbacks.add(callback);
}
// コールバックを実行(実際のアプリでは何らかのイベント発生時)
public void executeCallbacks() {
for (Runnable callback : callbacks) {
callback.run();
}
}
// コールバックのクリーンアップ(重要)
public void clearCallbacks() {
callbacks.clear();
}
}
// 短寿命のリクエストハンドラ
public static class RequestHandler {
// 大きなデータ(メモリリークの対象)
private final byte[] data = new byte[100 * 1024 * 1024]; // 100MB
private final String requestId;
public RequestHandler(String requestId) {
this.requestId = requestId;
System.out.println("RequestHandler作成: " + requestId);
}
// 問題のあるコールバック登録(暗黙的な`this`参照によるリーク)
public void processWithLeak(DataProcessor processor) {
// ラムダは外部クラス(RequestHandler)への参照を保持
processor.registerCallback(() -> {
// このコールバックはRequestHandlerインスタンスへの参照を保持するため、
// RequestHandlerがガベージコレクションされない
System.out.println("リークするコールバック: " + requestId + ", データサイズ: " + data.length);
});
// リクエスト処理完了...しかしコールバックがまだDataProcessorに登録されている
}
// 改善版: 明示的なクリーンアップ
public void processWithCleanup(DataProcessor processor) {
// 必要なデータのみキャプチャ
final String id = this.requestId;
final int dataSize = this.data.length;
processor.registerCallback(() -> {
// RequestHandlerインスタンス全体ではなく、
// 必要な情報のみをキャプチャ
System.out.println("安全なコールバック: " + id + ", データサイズ: " + dataSize);
});
}
// ファイナライザでメモリリークの検出を支援
@Override
protected void finalize() throws Throwable {
System.out.println("RequestHandler解放: " + requestId);
super.finalize();
}
}
public static void main(String[] args) throws Exception {
DataProcessor processor = new DataProcessor();
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
// リークするコールバックを使用
System.out.println("\n=== リークするコールバックの例 ===");
{
for (int i = 0; i < 3; i++) {
RequestHandler handler = new RequestHandler("leak-" + i);
handler.processWithLeak(processor);
}
}
// ガベージコレクションを促進
System.out.println("\nガベージコレクションを要求...");
System.gc();
Thread.sleep(1000);
// コールバックを実行
System.out.println("\nコールバックを実行:");
processor.executeCallbacks();
// クリーンアップするコールバックを使用
System.out.println("\n=== クリーンアップするコールバックの例 ===");
processor.clearCallbacks(); // 前のコールバックをクリア
{
for (int i = 0; i < 3; i++) {
RequestHandler handler = new RequestHandler("clean-" + i);
handler.processWithCleanup(processor);
}
}
// ガベージコレクションを促進
System.out.println("\nガベージコレクションを要求...");
System.gc();
Thread.sleep(1000);
// コールバックを実行
System.out.println("\nコールバックを実行:");
processor.executeCallbacks();
// クリーンアップ
processor.clearCallbacks();
// 非同期処理でのメモリリーク例
System.out.println("\n=== 非同期処理でのメモリリーク例 ===");
// WeakReference等を使用して実際のアプリケーションでは対処する必要あり
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
RequestHandler handler = new RequestHandler("async");
// 長時間実行される処理
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("非同期処理が完了: " + handler.requestId);
}, executor);
// 将来の結果にコールバックを追加(リークの可能性)
future.thenRun(() -> {
System.out.println("将来のコールバックが実行されました");
});
// 将来の状態を待機
future.join();
} finally {
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}
}
このコードでは、ラムダ式とクロージャがメモリリークを引き起こす可能性のある2つのケースを示している。
processWithLeakメソッドでは、ラムダ式が外部クラス(RequestHandler)への暗黙的な参照を保持している。これにより、RequestHandlerインスタンスがガベージコレクションされず、そのサイズの大きいdata配列もメモリ上に保持され続ける。processWithCleanupメソッドでは、必要な情報のみをローカル変数にコピーし、ラムダ式がそれらの変数のみをキャプチャするようにしている。これにより、RequestHandlerインスタンス全体への参照が保持されず、ガベージコレクションが可能になる。
長時間実行されるアプリケーションでは、これらの種類のメモリリークが時間とともに蓄積され、大きな問題になる可能性がある。特に、コールバックやリスナーを登録する場合は、それらが不要になったときに明示的に登録解除するか、弱参照(WeakReference)を使用することが重要である。
4. リソースの適切なクローズ
ファイル、ネットワーク接続、データベース接続などのリソースは、使用後に明示的にクローズする必要がある。Java 7以降のtry-with-resources構文を使用すると、これを簡潔かつ確実に行うことができる。
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ResourceManagementExample {
// ファイル読み取りの例
public static String readFileTraditional(String path) {
BufferedReader reader = null;
StringBuilder content = new StringBuilder();
try {
reader = new BufferedReader(new FileReader(path));
String line;
while ((line = reader.readLine()) != null) {
content.append(line).append("\n");
}
} catch (IOException e) {
System.err.println("ファイル読み取りエラー: " + e.getMessage());
} finally {
if (reader != null) {
try {
reader.close(); // 明示的にクローズ
} catch (IOException e) {
System.err.println("リソースクローズエラー: " + e.getMessage());
}
}
}
return content.toString();
}
// try-with-resourcesを使用した改善版
public static String readFileModern(String path) {
StringBuilder content = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new FileReader(path))) {
// AutoCloseableインターフェースを実装したリソースは自動的にクローズされる
String line;
while ((line = reader.readLine()) != null) {
content.append(line).append("\n");
}
} catch (IOException e) {
System.err.println("ファイル読み取りエラー: " + e.getMessage());
}
// finallyブロックは不要 - readerは自動的にクローズされる
return content.toString();
}
// 非同期処理でのリソース管理
public static CompletableFuture<Integer> countRecordsAsync(ExecutorService executor,
String dbUrl) {
return CompletableFuture.supplyAsync(() -> {
// 各タスク内でリソースを適切に管理
try (Connection conn = DriverManager.getConnection(dbUrl);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM users")) {
if (rs.next()) {
return rs.getInt(1);
}
return 0;
} catch (SQLException e) {
System.err.println("データベースエラー: " + e.getMessage());
throw new RuntimeException(e);
}
// try-with-resourcesにより、例外発生時も含めてリソースは確実にクローズされる
}, executor);
}
public static void main(String[] args) {
// 実際のデータベースURLを指定
String dbUrl = "jdbc:h2:mem:test";
// ExecutorServiceの管理
ExecutorService executor = Executors.newFixedThreadPool(4);
try {
// 複数の非同期タスクを実行
CompletableFuture<Integer> future1 = countRecordsAsync(executor, dbUrl);
CompletableFuture<Integer> future2 = countRecordsAsync(executor, dbUrl);
// 処理結果を組み合わせるチェーン
CompletableFuture<Integer> combined = future1.thenCombine(future2, Integer::sum);
// 結果を待機
try {
Integer result = combined.join();
System.out.println("合計結果: " + result);
} catch (Exception e) {
System.err.println("エラーが発生: " + e.getMessage());
}
} finally {
// ExecutorServiceの適切なシャットダウン
executor.shutdown();
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
// カスタムリソースクラスの例
public static class CustomResource implements AutoCloseable {
private final String name;
private boolean closed = false;
public CustomResource(String name) {
this.name = name;
System.out.println(name + " リソースを開きました");
}
public void doWork() {
if (closed) {
throw new IllegalStateException("クローズされたリソースを使用しようとしています");
}
System.out.println(name + " で作業中...");
}
@Override
public void close() {
if (!closed) {
System.out.println(name + " リソースをクローズします");
closed = true;
// 実際のクリーンアップ処理
}
}
}
// カスタムリソースの使用例
public static void useCustomResources() {
// 複数のリソースをtry-with-resourcesで管理
try (CustomResource res1 = new CustomResource("Res1");
CustomResource res2 = new CustomResource("Res2")) {
res1.doWork();
res2.doWork();
// 例外が発生しても、res1とres2は適切にクローズされる
} // ここで自動的にres2.close()、次いでres1.close()が呼ばれる
}
}
このコードでは、リソース管理の伝統的な方法と最新の方法を比較している。try-with-resources構文を使用すると、リソースの取得とクローズをより簡潔かつ安全に行うことができる。適切なクローズが保証されるため、リソースリークが減少する。
また、非同期処理でもリソース管理が重要であることを示している。各タスク内で適切にリソースを管理することで、タスクが完了したときにリソースが確実に解放される。
カスタムリソースクラスの例も示している。AutoCloseableインターフェースを実装することで、独自のリソースクラスもtry-with-resources構文で使用できるようになる。
5. リソース追跡とモニタリング
大規模なアプリケーションでは、リソースの使用状況を追跡し、リークを早期に検出することが重要である。
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ResourceTrackingExample {
// メモリ使用状況を表示
public static void printMemoryUsage() {
MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heapUsage = memoryBean.getHeapMemoryUsage();
MemoryUsage nonHeapUsage = memoryBean.getNonHeapMemoryUsage();
System.out.println("===== メモリ使用状況 =====");
System.out.println("ヒープ: 使用=" + (heapUsage.getUsed() / 1024 / 1024) +
"MB, 最大=" + (heapUsage.getMax() / 1024 / 1024) + "MB");
System.out.println("非ヒープ: 使用=" + (nonHeapUsage.getUsed() / 1024 / 1024) +
"MB, 最大=" + (nonHeapUsage.getMax() / 1024 / 1024) + "MB");
}
// リソーストラッカー
public static class ResourceTracker<T> {
private final Map<PhantomReference<T>, String> referenceMap = new HashMap<>();
private final ReferenceQueue<T> queue = new ReferenceQueue<>();
// リソースの追跡を開始
public void track(T resource, String description) {
PhantomReference<T> reference = new PhantomReference<>(resource, queue);
referenceMap.put(reference, description);
}
// ガベージコレクションされたリソースを処理
public void processCollectedResources() {
Reference<?> ref;
while ((ref = queue.poll()) != null) {
String description = referenceMap.remove(ref);
System.out.println("リソースがGCされました: " + description);
ref.clear();
}
System.out.println("現在追跡中のリソース数: " + referenceMap.size());
}
}
// リークするメモリを持つダミークラス
public static class LargeResource {
private final byte[] data;
private final String name;
public LargeResource(String name, int sizeInMB) {
this.name = name;
this.data = new byte[sizeInMB * 1024 * 1024]; // MB単位でサイズ指定
System.out.println("LargeResource作成: " + name + " (" + sizeInMB + "MB)");
}
@Override
public String toString() {
return "LargeResource{name='" + name + "', size=" + (data.length / 1024 / 1024) + "MB}";
}
}
public static void main(String[] args) {
// リソーストラッカーの作成
ResourceTracker<LargeResource> tracker = new ResourceTracker<>();
// 定期的なメモリ監視
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
printMemoryUsage();
tracker.processCollectedResources();
}, 0, 1, TimeUnit.SECONDS);
try {
// リークを引き起こす可能性のあるリソースのリスト
List<LargeResource> leakyList = new ArrayList<>();
// いくつかのリソースを作成して追跡
for (int i = 0; i < 5; i++) {
LargeResource resource = new LargeResource("Resource-" + i, 10); // 各10MB
tracker.track(resource, resource.toString());
// 一部のリソースをリストに保持(リーク)
if (i % 2 == 0) {
leakyList.add(resource);
System.out.println("リソースをリストに追加: " + resource.name);
}
}
// メモリ不足を引き起こさないようにするためのガベージコレクション呼び出し
System.out.println("\nガベージコレクションを要求...");
System.gc();
// 少し待機してGCの処理時間を確保
Thread.sleep(3000);
// リークしているリソースをクリア
System.out.println("\nリークしているリソースをクリア...");
leakyList.clear();
// 再度GCを促す
System.out.println("ガベージコレクションを再度要求...");
System.gc();
// 少し待機
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// スケジューラをシャットダウン
scheduler.shutdown();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
}
} catch (InterruptedException e) {
scheduler.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}
このコードでは、メモリ使用状況のモニタリングとリソース追跡の例を示している。ManagementFactoryを使用してメモリ使用状況を取得し、PhantomReferenceとReferenceQueueを使用してオブジェクトのガベージコレクションを検出している。
ResourceTrackerクラスは、追跡対象のリソースにPhantomReferenceを関連付け、ガベージコレクションされたときに通知を受け取る仕組みを提供している。これにより、リソースのライフサイクルを監視し、リークを検出することができる。
実際のアプリケーションでは、より高度なモニタリングツールやメモリプロファイラを使用することも検討すべきである。例えば、JVisualVM、Java Mission Control、YourKit、MAT(Memory Analyzer Tool)などのツールがある。
非同期処理におけるメモリリークとリソース管理は、長時間実行されるアプリケーションの安定性と性能において重要な要素である。スレッドプールの適切な管理、コールバックのクリーンアップ、リソースの確実なクローズ、そして定期的なモニタリングを組み合わせることで、リソースリークを防止し、堅牢なアプリケーションを構築することができる。
最新のJava非同期処理機能
前章までに解説した非同期処理の基本概念、Javaの実装方法、主要なAPI、実践的なパターン、そして注意点と対策は、Javaでの非同期プログラミングの基礎を構成するものである。ここでは、Java言語の最新版で導入された非同期処理の新機能について解説する。特に、Java 21で導入されたバーチャルスレッドは、Javaの並行処理モデルに革命をもたらすものであり、スケーラブルなアプリケーションの開発方法を根本から変える可能性を秘めている。また、構造化並行性の概念やリアクティブプログラミングとの関係性についても理解を深め、現代的なJavaアプリケーション開発における非同期処理の全体像を把握することを目指す。
Java 21のバーチャルスレッド入門
Java 21で正式に導入されたバーチャルスレッド(Virtual Thread)は、Project Loomの主要コンポーネントとして長年開発されてきた機能である。バーチャルスレッドは、軽量スレッドとも呼ばれ、従来のプラットフォームスレッド(OSスレッド)と比較して非常に少ないリソースで動作する。これにより、数百万のバーチャルスレッドを同時に実行することが可能となり、高度に並行性のあるアプリケーションの開発が容易になる。
まず、従来のプラットフォームスレッドとバーチャルスレッドの基本的な違いから見ていこう。
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class VirtualThreadHttpClient {
// 並行リクエスト数
private static final int REQUEST_COUNT = 1000;
public static void main(String[] args) throws Exception {
// 注意: これは標準のjava.net.http.HttpClientではなく、テスト用の簡易実装です
SimulatedHttpClient client = new SimulatedHttpClient();
// 従来のプラットフォームスレッドでのHTTPリクエスト
System.out.println("プラットフォームスレッドでHTTPリクエスト実行中...");
Instant platformStart = Instant.now();
try (ExecutorService executor = Executors.newFixedThreadPool(100)) {
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < REQUEST_COUNT; i++) {
final int requestId = i;
futures.add(executor.submit(() -> client.sendRequest("request" + requestId)));
}
// すべての結果を収集
int successCount = 0;
for (Future<String> future : futures) {
try {
String response = future.get();
if (response != null) {
successCount++;
}
} catch (Exception e) {
System.err.println("リクエスト失敗: " + e.getMessage());
}
}
Duration platformDuration = Duration.between(platformStart, Instant.now());
System.out.println("成功したリクエスト: " + successCount + "/" + REQUEST_COUNT);
System.out.println("プラットフォームスレッド実行時間: " + platformDuration.toMillis() + "ms");
}
// バーチャルスレッドでのHTTPリクエスト
System.out.println("\nバーチャルスレッドでHTTPリクエスト実行中...");
Instant virtualStart = Instant.now();
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < REQUEST_COUNT; i++) {
final int requestId = i;
futures.add(executor.submit(() -> client.sendRequest("request" + requestId)));
}
// すべての結果を収集
int successCount = 0;
for (Future<String> future : futures) {
try {
String response = future.get();
if (response != null) {
successCount++;
}
} catch (Exception e) {
System.err.println("リクエスト失敗: " + e.getMessage());
}
}
Duration virtualDuration = Duration.between(virtualStart, Instant.now());
System.out.println("成功したリクエスト: " + successCount + "/" + REQUEST_COUNT);
System.out.println("バーチャルスレッド実行時間: " + virtualDuration.toMillis() + "ms");
}
}
// HTTPリクエストをシミュレートするためのクラス(java.net.http.HttpClientとは異なる)
static class SimulatedHttpClient {
public String sendRequest(String payload) throws Exception {
// I/O待ち時間をシミュレート(実際のネットワークリクエスト)
Thread.sleep((int) (Math.random() * 100 + 50));
// ランダムに失敗するリクエスト(実際のネットワークエラー)
if (Math.random() < 0.05) { // 5%の確率で失敗
throw new Exception("ネットワークエラー");
}
// レスポンスを返す
return "Response for " + payload;
}
}
}
このコードでは、10万個の単純なタスクを実行する際の、プラットフォームスレッドとバーチャルスレッドのパフォーマンスを比較している。Executors.newFixedThreadPool(200)は従来の固定サイズスレッドプール(200個のOSスレッド)を使用し、Executors.newVirtualThreadPerTaskExecutor()は各タスクにバーチャルスレッドを割り当てるエグゼキュータを使用している。結果として、バーチャルスレッドを使用した方が、特に多数のスレッドを扱う場合に大幅にパフォーマンスが向上することがわかる。
バーチャルスレッドの主な特徴は以下の通りである。
- 軽量性 – バーチャルスレッドはプラットフォームスレッドよりも遥かに少ないメモリを使用する。プラットフォームスレッドが数MBのスタックメモリを必要とするのに対し、バーチャルスレッドは数KBで済む。
- スケーラビリティ – 軽量であるため、数十万から数百万のバーチャルスレッドを同時に実行することができ、I/O待ちの多いアプリケーションに特に適している。
- プログラミングモデルの簡素化 – 開発者はスレッドプールの管理や非同期コールバックの複雑さを気にすることなく、同期的に記述されたコードが非同期的に実行される恩恵を受けられる。
バーチャルスレッドを実際のアプリケーションで使用する例を見てみよう。
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class VirtualThreadHttpClient {
// 並行リクエスト数
private static final int REQUEST_COUNT = 1000;
public static void main(String[] args) throws Exception {
// 仮想的なHTTPクライアント
HttpClient client = new HttpClient();
// 従来のプラットフォームスレッドでのHTTPリクエスト
System.out.println("プラットフォームスレッドでHTTPリクエスト実行中...");
Instant platformStart = Instant.now();
try (ExecutorService executor = Executors.newFixedThreadPool(100)) {
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < REQUEST_COUNT; i++) {
final int requestId = i;
futures.add(executor.submit(() -> client.sendRequest("request" + requestId)));
}
// すべての結果を収集
int successCount = 0;
for (Future<String> future : futures) {
try {
String response = future.get();
if (response != null) {
successCount++;
}
} catch (Exception e) {
System.err.println("リクエスト失敗: " + e.getMessage());
}
}
Duration platformDuration = Duration.between(platformStart, Instant.now());
System.out.println("成功したリクエスト: " + successCount + "/" + REQUEST_COUNT);
System.out.println("プラットフォームスレッド実行時間: " + platformDuration.toMillis() + "ms");
}
// バーチャルスレッドでのHTTPリクエスト
System.out.println("\nバーチャルスレッドでHTTPリクエスト実行中...");
Instant virtualStart = Instant.now();
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < REQUEST_COUNT; i++) {
final int requestId = i;
futures.add(executor.submit(() -> client.sendRequest("request" + requestId)));
}
// すべての結果を収集
int successCount = 0;
for (Future<String> future : futures) {
try {
String response = future.get();
if (response != null) {
successCount++;
}
} catch (Exception e) {
System.err.println("リクエスト失敗: " + e.getMessage());
}
}
Duration virtualDuration = Duration.between(virtualStart, Instant.now());
System.out.println("成功したリクエスト: " + successCount + "/" + REQUEST_COUNT);
System.out.println("バーチャルスレッド実行時間: " + virtualDuration.toMillis() + "ms");
}
}
// 模擬的なHTTPクライアントクラス
static class HttpClient {
public String sendRequest(String payload) throws Exception {
// I/O待ち時間をシミュレート(実際のネットワークリクエスト)
Thread.sleep((int) (Math.random() * 100 + 50));
// ランダムに失敗するリクエスト(実際のネットワークエラー)
if (Math.random() < 0.05) { // 5%の確率で失敗
throw new Exception("ネットワークエラー");
}
// レスポンスを返す
return "Response for " + payload;
}
}
}
このコードでは、1000個の並行HTTPリクエストを、従来のプラットフォームスレッドとバーチャルスレッドの2つの方法で実行し、パフォーマンスを比較している。HttpClientクラスはHTTPリクエストをシミュレートし、ランダムな遅延とエラーを発生させる。実行結果では、バーチャルスレッドを使用した方が、特に多数の並行I/O操作が必要な場合に大幅にパフォーマンスが向上することがわかる。
バーチャルスレッドとプラットフォームスレッドの内部実装の違いを理解することも重要である。バーチャルスレッドは「スレッドマウンティング」と呼ばれる技術を使用している。バーチャルスレッドがブロッキング操作(例:Thread.sleep()やInputStream.read())を実行すると、そのバーチャルスレッドはキャリアスレッド(通常はプラットフォームスレッド)からアンマウント(切り離し)され、他のバーチャルスレッドがそのキャリアスレッドを使用できるようになる。ブロッキング操作が完了すると、バーチャルスレッドは再度キャリアスレッドにマウント(接続)され、実行が継続される。
バーチャルスレッドの生成と管理の方法をさらに詳しく見てみよう。
import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class VirtualThreadCreation {
public static void main(String[] args) throws Exception {
System.out.println("=== バーチャルスレッドの生成方法 ===");
// 1. Thread.ofVirtual().startメソッドを使用
Thread vthread1 = Thread.ofVirtual().start(() -> {
System.out.println("方法1: Thread.ofVirtual().startから実行");
System.out.println("スレッド情報: " + Thread.currentThread());
});
// 完了を待機
vthread1.join();
// 2. Thread.Builderを使用
Thread vthread2 = Thread.ofVirtual()
.name("custom-vthread")
.start(() -> {
System.out.println("方法2: Thread.Builderから実行");
System.out.println("スレッド情報: " + Thread.currentThread());
});
// 完了を待機
vthread2.join();
// 3. ThreadFactoryを使用
ThreadFactory factory = Thread.ofVirtual().name("worker-", 0).factory();
Thread vthread3 = factory.newThread(() -> {
System.out.println("方法3: ThreadFactoryから実行");
System.out.println("スレッド情報: " + Thread.currentThread());
});
vthread3.start();
// 完了を待機
vthread3.join();
// 4. Executors.newVirtualThreadPerTaskExecutorを使用
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
executor.submit(() -> {
System.out.println("方法4: ExecutorServiceから実行");
System.out.println("スレッド情報: " + Thread.currentThread());
return null;
}).get(); // 完了を待機
}
// バーチャルスレッドのスケーラビリティテスト
System.out.println("\n=== バーチャルスレッドのスケーラビリティ ===");
int threadCount = 100_000;
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicInteger activeCount = new AtomicInteger(0);
AtomicInteger maxActive = new AtomicInteger(0);
System.out.println("作成するバーチャルスレッド数: " + threadCount);
Instant start = Instant.now();
for (int i = 0; i < threadCount; i++) {
Thread.ofVirtual().start(() -> {
try {
// 現在アクティブなスレッド数を追跡
int current = activeCount.incrementAndGet();
maxActive.updateAndGet(max -> Math.max(max, current));
// I/Oブロッキングをシミュレート
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
activeCount.decrementAndGet();
latch.countDown();
}
});
}
// すべてのスレッドの完了を待機
latch.await();
Instant end = Instant.now();
System.out.println("バーチャルスレッド完了時間: " +
(end.toEpochMilli() - start.toEpochMilli()) + "ms");
System.out.println("同時実行の最大スレッド数: " + maxActive.get());
System.out.println("残りのアクティブスレッド: " + activeCount.get());
// バーチャルスレッドの特性を確認
Thread platformThread = Thread.currentThread();
Thread virtualThread = Thread.ofVirtual().start(() -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
virtualThread.join();
System.out.println("\n=== スレッド特性の比較 ===");
System.out.println("プラットフォームスレッド: " + platformThread);
System.out.println("プラットフォームスレッド.isVirtual(): " + platformThread.isVirtual());
System.out.println("バーチャルスレッド.isVirtual(): " + virtualThread.isVirtual());
}
}
このコードでは、バーチャルスレッドを生成するための4つの主要な方法を示している。
Thread.startVirtualThread(Runnable)– 最も簡単な方法で、Runnableタスクを実行するバーチャルスレッドを作成して開始する。Thread.ofVirtual().name().start(Runnable)– Thread.Builderを使用して、カスタム名などのオプションを設定したバーチャルスレッドを作成できる。Thread.ofVirtual().factory()– バーチャルスレッドを作成するThreadFactoryを取得し、それを使用してスレッドを作成する。Executors.newVirtualThreadPerTaskExecutor()– 各タスクに対して新しいバーチャルスレッドを作成するExecutorServiceを取得する。
また、10万個のバーチャルスレッドを同時に作成して実行するスケーラビリティテストも行い、バーチャルスレッドが従来のプラットフォームスレッドと比較して非常に軽量であることを示している。
バーチャルスレッドを使用する際の注意点として、以下の点が挙げられる。
- ピン留めの回避 – バーチャルスレッドが実行中にブロックされると、キャリアスレッドから切り離されることでシステムリソースが節約される。しかし、バーチャルスレッドが
synchronizedブロック内でブロックされた場合、キャリアスレッドへの「ピン留め」が発生し、この最適化が無効になる可能性がある。そのため、長時間のブロッキング操作では、ReentrantLockなどの明示的なロックの使用が推奨される。 - スレッドローカル変数の使用 – バーチャルスレッドは軽量であるため、大量に作成される可能性がある。各バーチャルスレッドがスレッドローカル変数を使用すると、メモリ使用量が増加する可能性があるため、注意が必要である。
- I/O操作を伴う場合に特に有効 – バーチャルスレッドは、I/O操作のように長時間ブロックする処理に特に効果的である。CPU負荷の高い処理では、プラットフォームスレッドとの性能差は小さくなる。
Java 21のバーチャルスレッドは、Javaの非同期プログラミングモデルを大きく改善するものであり、特にマイクロサービスやWeb APIのようなI/O集約型アプリケーションの開発を簡素化する。従来のコールバックベースの非同期プログラミングから、より直感的な命令型プログラミングスタイルに戻りつつ、高いスケーラビリティを実現することが可能になった。
構造化並行性(Structured Concurrency)
構造化並行性(Structured Concurrency)は、Java 21でプレビュー機能として導入された新しい並行処理パラダイムである。この概念は、親タスクと子タスクの間の関係を明示的に構造化することで、並行処理のライフサイクル管理と例外処理を改善することを目的としている。
従来の非構造化並行性では、親タスクは子タスクを開始した後、その完了を明示的に管理(待機、エラー処理など)する必要があった。構造化並行性では、親タスクのスコープ内で子タスクのライフサイクルが自動的に管理され、スコープを出るときにすべての子タスクが完了していることが保証される。
以下のコードで、構造化並行性の基本的な使い方を見てみよう。
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.function.Supplier;
public class StructuredConcurrencyBasics {
public static void main(String[] args) {
System.out.println("=== 非構造化並行性 vs 構造化並行性 ===");
try {
// 従来の非構造化並行性の例
System.out.println("\n--- 非構造化並行性 ---");
String result1 = traditionalConcurrency("ユーザーID123");
System.out.println("結果: " + result1);
// 構造化並行性の例
System.out.println("\n--- 構造化並行性 ---");
String result2 = structuredConcurrency("ユーザーID123");
System.out.println("結果: " + result2);
// エラー処理の例
System.out.println("\n--- エラー処理の例 ---");
try {
String result3 = structuredConcurrencyWithError("BadID");
System.out.println("結果: " + result3);
} catch (Exception e) {
System.out.println("エラーが捕捉されました: " + e.getMessage());
}
} catch (Exception e) {
e.printStackTrace();
}
}
// 従来の非構造化並行性を使用した例
static String traditionalConcurrency(String userId)
throws InterruptedException, ExecutionException {
System.out.println("ユーザー情報と注文履歴を並行して取得します");
Instant start = Instant.now();
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
// ユーザー情報を取得するタスク
Future<String> userInfoFuture = executor.submit(() -> {
System.out.println("ユーザー情報を取得中...");
Thread.sleep(1000); // APIリクエストをシミュレート
return "ユーザー: 田中太郎";
});
// 注文履歴を取得するタスク
Future<String> orderHistoryFuture = executor.submit(() -> {
System.out.println("注文履歴を取得中...");
Thread.sleep(1500); // APIリクエストをシミュレート
return "注文履歴: 3件の注文";
});
// 両方の結果を待機して結合
String userInfo = userInfoFuture.get(); // ブロッキング呼び出し
String orderHistory = orderHistoryFuture.get(); // ブロッキング呼び出し
Instant end = Instant.now();
System.out.println("処理時間: " + (end.toEpochMilli() - start.toEpochMilli()) + "ms");
return userInfo + ", " + orderHistory;
}
}
// 構造化並行性を使用した例
static String structuredConcurrency(String userId) throws Exception {
System.out.println("ユーザー情報と注文履歴を構造化並行性で取得します");
Instant start = Instant.now();
// 構造化並行性のスコープを作成(try-with-resourcesで使用)
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// ユーザー情報を取得するサブタスク
Subtask<String> userInfoTask = scope.fork(() -> {
System.out.println("ユーザー情報を取得中...");
Thread.sleep(1000); // APIリクエストをシミュレート
return "ユーザー: 田中太郎";
});
// 注文履歴を取得するサブタスク
Subtask<String> orderHistoryTask = scope.fork(() -> {
System.out.println("注文履歴を取得中...");
Thread.sleep(1500); // APIリクエストをシミュレート
return "注文履歴: 3件の注文";
});
// すべてのサブタスクの完了を待機
scope.join();
// すべてのタスクが成功したことを確認、エラーがあればその例外をスロー
scope.throwIfFailed();
// 結果を取得
String userInfo = userInfoTask.get();
String orderHistory = orderHistoryTask.get();
Instant end = Instant.now();
System.out.println("処理時間: " + (end.toEpochMilli() - start.toEpochMilli()) + "ms");
return userInfo + ", " + orderHistory;
}
}
// エラー処理を含む構造化並行性の例
static String structuredConcurrencyWithError(String userId) throws Exception {
System.out.println("エラーを処理する構造化並行性の例");
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// ユーザー情報を取得するサブタスク(エラーの可能性あり)
Subtask<String> userInfoTask = scope.fork(() -> {
System.out.println("ユーザー情報を取得中...");
Thread.sleep(500);
// エラー発生をシミュレート
if (userId.equals("BadID")) {
throw new IllegalArgumentException("不正なユーザーID: " + userId);
}
return "ユーザー: 田中太郎";
});
// 注文履歴を取得するサブタスク
Subtask<String> orderHistoryTask = scope.fork(() -> {
System.out.println("注文履歴を取得中...");
Thread.sleep(1000);
return "注文履歴: 3件の注文";
});
// すべてのサブタスクの完了を待機
scope.join();
// エラーがあった場合、例外変換関数を使用してカスタム例外にラップしてスロー
scope.throwIfFailed(e -> new RuntimeException("構造化並行性の処理中にエラー: " + e.getMessage(), e));
return userInfoTask.get() + ", " + orderHistoryTask.get();
}
}
}
このコードでは、従来の非構造化並行性と新しい構造化並行性のアプローチを比較している。
traditionalConcurrencyメソッドでは、ExecutorServiceとFutureを使用して、ユーザー情報と注文履歴を並行して取得している。開発者は明示的にFuture.get()を呼び出して結果を待機し、エラー処理を手動で行う必要がある。
対照的に、structuredConcurrencyメソッドでは、StructuredTaskScopeを使用している。このクラスは、すべてのサブタスクの完了を自動的に待機し、エラーが発生した場合に適切に処理する。スコープを出る前にすべてのサブタスクが完了することが保証されるため、リソースリークのリスクが低減される。
structuredConcurrencyWithErrorメソッドでは、エラー処理の例を示している。サブタスクの1つでエラーが発生した場合、ShutdownOnFailureポリシーにより、他のすべてのサブタスクがキャンセルされる。エラーはthrowIfFailedメソッドによって伝播され、親タスクのtry-catchブロックでキャッチできる。
構造化並行性のより高度な使用例として、異なる完了ポリシーを見てみよう。
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.Subtask;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.time.Duration;
public class StructuredConcurrencyPatterns {
public static void main(String[] args) {
try {
// シャットダウンオンサクセスパターン
System.out.println("\n=== シャットダウンオンサクセスパターン ===");
String firstResult = getFirstSuccessfulResult();
System.out.println("最初に成功した結果: " + firstResult);
// カスタム集約パターン
System.out.println("\n=== カスタム集約パターン ===");
String aggregatedResult = aggregateResults();
System.out.println("集約された結果: " + aggregatedResult);
// タイムアウトパターン
System.out.println("\n=== タイムアウトパターン ===");
try {
String timedResult = executeWithTimeout();
System.out.println("タイムアウト内の結果: " + timedResult);
} catch (TimeoutException e) {
System.out.println("タイムアウトが発生しました");
}
} catch (Exception e) {
e.printStackTrace();
}
}
// シャットダウンオンサクセスパターン - 最初に成功したタスクの結果を取得
static String getFirstSuccessfulResult() throws Exception {
// カスタムTaskScopeの実装
class ShutdownOnSuccessScope<T> extends StructuredTaskScope<T> {
private volatile Subtask<? extends T> successfulTask;
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
successfulTask = subtask;
// 最初の成功でほかのすべてのサブタスクをキャンセル
shutdown();
}
}
public T result() throws Exception {
if (successfulTask != null) {
return successfulTask.get();
}
throw new IllegalStateException("成功したタスクがありません");
}
}
try (var scope = new ShutdownOnSuccessScope<String>()) {
// 複数のデータソースから同じデータを取得する(レプリケーションなど)
scope.fork(() -> {
System.out.println("サーバー1からデータを取得中...");
Thread.sleep(2000); // 遅いサーバー
return "サーバー1のデータ";
});
scope.fork(() -> {
System.out.println("サーバー2からデータを取得中...");
Thread.sleep(1000); // 中程度のサーバー
return "サーバー2のデータ";
});
scope.fork(() -> {
System.out.println("サーバー3からデータを取得中...");
Thread.sleep(500); // 最速のサーバー
return "サーバー3のデータ";
});
// いずれかのタスクが成功するか、すべてが失敗するまで待機
scope.join();
// 最初に成功したタスクの結果を返す
return scope.result();
}
}
// カスタム集約パターン - すべての結果を集約
static String aggregateResults() throws Exception {
record ServiceResult(String serviceName, String data, boolean success) {}
// カスタムTaskScopeの実装
class AggregatingScope extends StructuredTaskScope<ServiceResult> {
private final StringBuilder aggregatedResults = new StringBuilder();
private int successCount = 0;
private int failureCount = 0;
@Override
protected void handleComplete(Subtask<? extends ServiceResult> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
ServiceResult result = subtask.get();
if (result.success()) {
aggregatedResults.append("[").append(result.serviceName())
.append(": ").append(result.data()).append("] ");
successCount++;
} else {
failureCount++;
}
} else {
failureCount++;
}
}
public String getAggregatedResults() {
return "成功: " + successCount + ", 失敗: " + failureCount +
", データ: " + aggregatedResults.toString();
}
}
try (var scope = new AggregatingScope()) {
// 複数のマイクロサービスを呼び出す
scope.fork(() -> {
System.out.println("ユーザーサービスを呼び出し中...");
Thread.sleep(800);
return new ServiceResult("ユーザーサービス", "ユーザーID: 123", true);
});
scope.fork(() -> {
System.out.println("注文サービスを呼び出し中...");
Thread.sleep(1200);
return new ServiceResult("注文サービス", "3件の注文", true);
});
scope.fork(() -> {
System.out.println("支払いサービスを呼び出し中...");
Thread.sleep(1000);
// 失敗をシミュレート
boolean success = Math.random() > 0.5;
return new ServiceResult("支払いサービス",
success ? "クレジットカード情報" : "エラー",
success);
});
// すべてのタスクの完了を待機
scope.join();
// 集約された結果を返す
return scope.getAggregatedResults();
}
}
// タイムアウトパターン - 指定された時間内に処理を完了
static String executeWithTimeout() throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 長時間実行される可能性のあるタスク
Subtask<String> task1 = scope.fork(() -> {
System.out.println("タスク1実行中...");
Thread.sleep(1500);
return "タスク1の結果";
});
Subtask<String> task2 = scope.fork(() -> {
System.out.println("タスク2実行中...");
Thread.sleep(2500);
return "タスク2の結果";
});
// 1秒のタイムアウトで待機
Duration timeout = Duration.ofSeconds(2);
try {
scope.joinUntil(java.time.Instant.now().plus(timeout));
// エラーチェック
scope.throwIfFailed(e -> new RuntimeException("タスク実行エラー", e));
// 結果を結合
return task1.get() + " + " + task2.get();
} catch (TimeoutException e) {
// タイムアウトが発生した場合、スコープをシャットダウン
// タイムアウト例外を原因として渡す
scope.shutdown(e);
throw e;
}
}
}
}
このコードでは、構造化並行性の3つの高度なパターンを示している。
- シャットダウンオンサクセスパターン – 最初に成功したタスクの結果を返す。これは、複数のレプリカサーバーから同じデータを取得する場合や、同じ計算を複数のアルゴリズムで試す場合に便利である。
- カスタム集約パターン – すべてのタスクの結果を集約する。これは、複数のマイクロサービスからデータを取得し、その結果を組み合わせる場合に役立つ。
- タイムアウトパターン – 指定された時間内にタスクが完了しない場合、タイムアウトを発生させる。これは、応答時間の要件が厳しいシステムで重要である。
これらのパターンは、カスタムStructuredTaskScopeクラスを拡張し、handleCompleteメソッドをオーバーライドすることで実装されている。このメソッドは、サブタスクが完了するたびに呼び出され、その状態に基づいて適切なアクションを実行できる。
構造化並行性は、Javaの非同期プログラミングモデルを大幅に改善するものであり、特に複雑な非同期処理フローの管理を簡素化する。これにより、コードの可読性が向上し、エラー処理が改善され、リソースリークのリスクが低減される。Java 21では、この機能はまだプレビュー段階であるが、将来のバージョンで完全にサポートされる予定である。
リアクティブプログラミングとの関係性
リアクティブプログラミングは、データフローと変更の伝播に焦点を当てたプログラミングパラダイムである。Javaでは、ReactiveStreamsスペシフィケーションとその実装(RxJava、Project Reactor、Akka Streamsなど)を通じて、リアクティブプログラミングがサポートされている。バーチャルスレッドと構造化並行性の導入により、Javaの非同期プログラミングモデルが変化する中、リアクティブプログラミングとの関係性を理解することが重要である。
まず、リアクティブプログラミングの基本概念と、Javaでの実装例を見てみよう。
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
public class ReactiveBasics {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Java標準のReactiveStreams API ===");
// パブリッシャーの作成(データの発行者)
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
// サブスクライバーの作成(データの受信者)
SimpleSubscriber subscriber1 = new SimpleSubscriber("サブスクライバー1");
SimpleSubscriber subscriber2 = new SimpleSubscriber("サブスクライバー2");
// サブスクライバーをパブリッシャーに登録
publisher.subscribe(subscriber1);
publisher.subscribe(subscriber2);
// データの発行
List<String> items = List.of("項目1", "項目2", "項目3", "項目4", "項目5");
System.out.println("データの発行を開始します");
for (String item : items) {
publisher.submit(item);
System.out.println("発行: " + item);
Thread.sleep(100); // 発行間隔をシミュレート
}
// 終了シグナルを送信
System.out.println("発行終了");
}
// サブスクライバーが処理を完了するまで待機
Thread.sleep(1000);
}
// シンプルなサブスクライバー実装
static class SimpleSubscriber implements Flow.Subscriber<String> {
private final String name;
private Flow.Subscription subscription;
public SimpleSubscriber(String name) {
this.name = name;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println(name + ": サブスクライブしました");
// 最初のリクエスト(バックプレッシャー)
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println(name + ": 受信 - " + item);
// 処理時間をシミュレート
try {
Thread.sleep((long) (Math.random() * 200));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 次のアイテムをリクエスト
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.err.println(name + ": エラー - " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println(name + ": 完了");
}
}
}
このコードでは、Java 9で導入された標準のReactiveStreams API(java.util.concurrent.Flow)を使用している。SubmissionPublisherはデータの発行者であり、SimpleSubscriberはデータの受信者である。この例では、リアクティブプログラミングの主要な概念であるパブリッシャー/サブスクライバーモデルとバックプレッシャー(サブスクライバーが処理できる速度でデータを受信する仕組み)を示している。
次に、より実用的なリアクティブプログラミングの例と、バーチャルスレッドとの統合を見てみよう。
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
import java.util.stream.Collectors;
public class ReactiveVirtualThreadsIntegration {
public static void main(String[] args) throws Exception {
System.out.println("=== リアクティブプログラミングとバーチャルスレッドの統合 ===");
// データソース(例:センサーからのデータストリーム)
List<SensorData> sensorDataList = generateSensorData(20);
// バーチャルスレッドを使用するExecutorServiceの作成
var executor = Executors.newVirtualThreadPerTaskExecutor();
try (
// ExecutorServiceをtry-with-resourcesに含める
executor;
// パブリッシャー(バーチャルスレッドExecutorを使用)
SubmissionPublisher<SensorData> publisher = new SubmissionPublisher<>(
executor, // バーチャルスレッドExecutor
Flow.defaultBufferSize()
);
// 温度変換プロセッサ(摂氏から華氏へ)
TransformProcessor<SensorData, SensorData> temperatureProcessor =
new TransformProcessor<>(data -> {
// バーチャルスレッドで実行される変換ロジック
if (data.unit().equals("C")) {
double fahrenheit = (data.value() * 9 / 5) + 32;
return new SensorData(data.id(), fahrenheit, "F", data.timestamp());
}
return data;
});
// 異常値フィルタプロセッサ(一定のしきい値を超える値をフィルタリング)
TransformProcessor<SensorData, SensorData> thresholdProcessor =
new TransformProcessor<>(data -> {
// バーチャルスレッドで実行されるフィルタリングロジック
if (data.unit().equals("F") && data.value() > 100) {
System.out.println("⚠️ 異常値を検出: " + data);
}
return data;
});
) {
// プロセッサをパイプラインに接続
publisher.subscribe(temperatureProcessor);
temperatureProcessor.subscribe(thresholdProcessor);
// 最終サブスクライバー(集計と処理)
thresholdProcessor.subscribe(new Flow.Subscriber<>() {
private Flow.Subscription subscription;
private int count = 0;
private double sum = 0;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1); // 最初のアイテムをリクエスト
}
@Override
public void onNext(SensorData data) {
count++;
sum += data.value();
System.out.println("データ処理: " + data +
", 平均: " + (sum / count) + data.unit());
// 処理時間をシミュレート
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
subscription.request(1); // 次のアイテムをリクエスト
}
@Override
public void onError(Throwable throwable) {
System.err.println("エラー: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("処理完了. 合計: " + count + "アイテム, 平均: " +
(sum / count) + "F");
}
});
// センサーデータの発行(ストリーミングをシミュレート)
for (SensorData data : sensorDataList) {
publisher.submit(data);
Thread.sleep(50); // データの到着間隔をシミュレート
}
System.out.println("すべてのデータを発行しました");
}
// 処理完了を待機
Thread.sleep(1000);
// より実践的なシナリオ: 構造化並行性とリアクティブプログラミングの組み合わせ
System.out.println("\n=== 構造化並行性とリアクティブプログラミングの組み合わせ ===");
try (var scope = new java.util.concurrent.StructuredTaskScope.ShutdownOnFailure()) {
// リアクティブストリームをバーチャルスレッドで処理するタスク
var reactiveTask = scope.fork(() -> {
try (var localPublisher = new SubmissionPublisher<Integer>(
Executors.newVirtualThreadPerTaskExecutor(), 256)) {
// CompletableFutureを使用して処理完了を待機
var completionFuture = new java.util.concurrent.CompletableFuture<Double>();
// サブスクライバーの作成(平均を計算)
localPublisher.subscribe(new Flow.Subscriber<>() {
private Flow.Subscription subscription;
private int count = 0;
private double sum = 0;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE); // すべてリクエスト
}
@Override
public void onNext(Integer item) {
count++;
sum += item;
}
@Override
public void onError(Throwable throwable) {
completionFuture.completeExceptionally(throwable);
}
@Override
public void onComplete() {
double average = count > 0 ? sum / count : 0;
completionFuture.complete(average);
}
});
// データの発行
for (int i = 1; i <= 100; i++) {
localPublisher.submit(i);
}
// ストリームの終了
localPublisher.close();
// 処理完了を待機して結果を取得
return completionFuture.get();
}
});
// 他のタスクをフォーク可能...
// すべてのタスクの完了を待機
scope.join();
scope.throwIfFailed();
// 結果の取得
Double average = reactiveTask.get();
System.out.println("リアクティブストリーム処理の結果(平均): " + average);
}
}
// センサーデータモデル
record SensorData(String id, double value, String unit, long timestamp) {}
// テストデータ生成
static List<SensorData> generateSensorData(int count) {
return java.util.stream.IntStream.range(0, count)
.mapToObj(i -> {
String id = "sensor-" + (i % 3 + 1);
double value = 20 + Math.random() * 15; // 20-35℃
// 時々異常値を入れる
if (i % 7 == 0) {
value = 45; // 高温異常値
}
return new SensorData(id, value, "C", System.currentTimeMillis());
})
.collect(Collectors.toList());
}
// 変換プロセッサ(Flow.Processor実装)
static class TransformProcessor<T, R> implements Flow.Processor<T, R> {
private final Function<T, R> transformFunction;
private Flow.Subscription subscription;
private SubmissionPublisher<R> publisher = new SubmissionPublisher<>();
public TransformProcessor(Function<T, R> transformFunction) {
this.transformFunction = transformFunction;
}
@Override
public void subscribe(Flow.Subscriber<? super R> subscriber) {
publisher.subscribe(subscriber);
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
try {
// 変換関数を適用
R transformed = transformFunction.apply(item);
publisher.submit(transformed);
} catch (Exception e) {
onError(e);
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
publisher.closeExceptionally(throwable);
}
@Override
public void onComplete() {
publisher.close();
}
}
}
このコードでは、リアクティブプログラミングとバーチャルスレッドの統合を示している。センサーデータのストリームを処理するリアクティブパイプラインを構築し、バーチャルスレッドを使用して各処理ステップを実行している。具体的には、以下のことが行われている。
- 温度データの変換(摂氏から華氏へ)
- 異常値の検出
- 平均値の計算
また、後半では、構造化並行性とリアクティブプログラミングを組み合わせる例も示している。リアクティブストリームの処理を構造化並行性のスコープ内で実行することで、ライフサイクル管理と例外処理の改善を実現している。
バーチャルスレッドとリアクティブプログラミングの関係性について、以下の点が重要である。
- 補完的なアプローチ – バーチャルスレッドとリアクティブプログラミングは、異なるアプローチで並行処理を扱う。バーチャルスレッドは命令型プログラミングを使用し、リアクティブプログラミングは宣言型プログラミングを使用する。どちらも特定のユースケースに適している。
- バーチャルスレッドの利点 – バーチャルスレッドは、ブロッキング操作を非効率にすることなく記述できるため、既存のコードベースを非同期化する際に特に役立つ。リアクティブなコードと比較して、読みやすく保守しやすい場合が多い。
- リアクティブプログラミングの利点 – リアクティブプログラミングは、データの変換、フィルタリング、組み合わせなどの操作をチェーンで記述できるため、データストリーム処理に適している。また、バックプレッシャーによる自動フロー制御も提供する。
- ハイブリッドアプローチ – 多くの実践的なアプリケーションでは、バーチャルスレッドとリアクティブプログラミングを組み合わせることが最適である。例えば、HTTPリクエストの受信にはリアクティブなWebフレームワークを使用し、各リクエストの処理にはバーチャルスレッドを使用するなど。
リアクティブプログラミングとSpring WebFluxなどのフレームワークを使用するより実用的な例を見てみよう。(注:この例はコンセプトを示すためのものであり、実際のコードは依存関係の設定などが必要になる。)
import java.time.Duration;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.function.Function;
// Spring WebFluxのモノとフラックスのシミュレーション
class Mono<T> {
private final T value;
private Mono(T value) {
this.value = value;
}
public static <T> Mono<T> just(T value) {
return new Mono<>(value);
}
public <R> Mono<R> map(Function<T, R> mapper) {
return new Mono<>(mapper.apply(value));
}
// flatMapメソッドを追加
public <R> Mono<R> flatMap(Function<T, Mono<R>> mapper) {
return mapper.apply(value);
}
public T block() {
return value;
}
}
class Flux<T> {
private final List<T> values;
private Flux(List<T> values) {
this.values = values;
}
public static <T> Flux<T> fromIterable(Iterable<T> iterable) {
return new Flux<>(List.copyOf(iterable));
}
public <R> Flux<R> map(Function<T, R> mapper) {
return new Flux<>(values.stream().map(mapper).toList());
}
public Flux<T> filter(java.util.function.Predicate<T> predicate) {
return new Flux<>(values.stream().filter(predicate).toList());
}
public Mono<List<T>> collectList() {
return Mono.just(values);
}
}
// 擬似WebClientとAPIサービス
class WebClient {
public static WebClient create() {
return new WebClient();
}
public RequestHeadersUriSpec<?> get() {
return new RequestHeadersUriSpec<>();
}
public static class RequestHeadersUriSpec<T> {
public RequestHeadersSpec<?> uri(String uri) {
return new RequestHeadersSpec<>();
}
}
public static class RequestHeadersSpec<T> {
public ResponseSpec retrieve() {
return new ResponseSpec();
}
}
public static class ResponseSpec {
public <T> Mono<T> bodyToMono(Class<T> clazz) {
if (clazz == User.class) {
return (Mono<T>) Mono.just(new User("1", "ユーザー1"));
} else if (clazz == Order.class) {
return (Mono<T>) Mono.just(new Order("1", "商品1", 100));
}
return Mono.just(null);
}
public <T> Flux<T> bodyToFlux(Class<T> clazz) {
if (clazz == Order.class) {
return (Flux<T>) Flux.fromIterable(List.of(
new Order("1", "商品1", 100),
new Order("2", "商品2", 200),
new Order("3", "商品3", 300)
));
}
return new Flux<>(List.of());
}
}
}
// データモデル
record User(String id, String name) {}
record Order(String id, String productName, double price) {}
record UserWithOrders(User user, List<Order> orders) {}
public class ReactiveVsVirtualThreadExample {
public static void main(String[] args) {
System.out.println("=== リアクティブプログラミング vs バーチャルスレッド ===");
// リアクティブなアプローチ(Spring WebFluxスタイル)
System.out.println("\n--- リアクティブなアプローチ ---");
WebClient webClient = WebClient.create();
Mono<UserWithOrders> userWithOrdersMono = webClient.get()
.uri("http://api.example.com/users/1")
.retrieve()
.bodyToMono(User.class)
.flatMap(user -> {
return webClient.get()
.uri("http://api.example.com/users/" + user.id() + "/orders")
.retrieve()
.bodyToFlux(Order.class)
.collectList()
.map(orders -> new UserWithOrders(user, orders));
});
UserWithOrders result = userWithOrdersMono.block();
System.out.println("リアクティブ結果: " + result);
// バーチャルスレッドを使用したアプローチ
System.out.println("\n--- バーチャルスレッドアプローチ ---");
// バーチャルスレッドを使用(Java 21の機能)
Runnable virtualThreadTask = () -> {
try {
// 同期的に記述できるが、バーチャルスレッドを使用して非同期的に実行
User user = webClient.get()
.uri("http://api.example.com/users/1")
.retrieve()
.bodyToMono(User.class)
.block(); // バーチャルスレッドではブロッキングも効率的
List<Order> orders = webClient.get()
.uri("http://api.example.com/users/" + user.id() + "/orders")
.retrieve()
.bodyToFlux(Order.class)
.collectList()
.block();
UserWithOrders userWithOrders = new UserWithOrders(user, orders);
System.out.println("バーチャルスレッド結果: " + userWithOrders);
} catch (Exception e) {
System.err.println("エラー: " + e.getMessage());
}
};
// バーチャルスレッドで実行
Thread.startVirtualThread(virtualThreadTask);
// メインスレッドで他の処理を続行可能
System.out.println("メインスレッドは他の処理を続行...");
// 処理完了を待機(デモのため)
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 実際のアプリケーションでは、以下のようなメソッドがあり得る
static class ReactiveService {
private final WebClient webClient = WebClient.create();
public Flux<Order> getRecentOrdersReactive(String userId) {
return webClient.get()
.uri("http://api.example.com/users/" + userId + "/orders")
.retrieve()
.bodyToFlux(Order.class)
.filter(order -> order.price() > 0) // 無効な注文をフィルタリング
.map(order -> {
// 何らかの変換処理
return order;
});
}
}
static class VirtualThreadService {
private final WebClient webClient = WebClient.create();
public List<Order> getRecentOrdersBlocking(String userId) {
// バーチャルスレッドで呼び出すことを前提とした同期メソッド
List<Order> orders = webClient.get()
.uri("http://api.example.com/users/" + userId + "/orders")
.retrieve()
.bodyToFlux(Order.class)
.collectList()
.block();
// 同期的なフィルタリングと変換
return orders.stream()
.filter(order -> order.price() > 0)
.map(order -> {
// 何らかの変換処理
return order;
})
.toList();
}
}
}
このコードでは、リアクティブプログラミング(Spring WebFluxスタイル)とバーチャルスレッドアプローチの比較を示している。両方のアプローチでユーザーデータと注文データを取得するシナリオを実装している。
リアクティブアプローチでは、宣言型のプログラミングスタイルを使用し、データフローを宣言的に記述している。一方、バーチャルスレッドアプローチでは、同期的なコードスタイルを使用しているが、バーチャルスレッドで実行することで非ブロッキングな動作を実現している。
バーチャルスレッドとリアクティブプログラミングのどちらを選択するかは、アプリケーションの要件やチームの専門知識に依存する。一般的に、以下のガイドラインが参考になる:
- バーチャルスレッド(命令型アプローチ)が有利な場合
- 既存の同期コードを非同期化する場合
- チームが命令型プログラミングに慣れている場合
- デバッグやテストの容易さが重要な場合
- I/O待ちの多いアプリケーションで、コードの単純さが優先される場合
- リアクティブプログラミング(宣言型アプローチ)が有利な場合
- データストリーム処理が主要な要件である場合
- 複雑なデータ変換やフィルタリングが必要な場合
- バックプレッシャーが重要な場合
- 非常に高いスケーラビリティが必要な場合
多くの場合、両方のアプローチを組み合わせることが最適である。例えば、リアクティブなWebフレームワークを使用してHTTPリクエストを処理し、個々のリクエスト処理ロジックにはバーチャルスレッドを使用するなど。
Java 21の導入により、非同期プログラミングの選択肢が増え、開発者はアプリケーションの特性に応じて最適なアプローチを選択できるようになった。バーチャルスレッド、構造化並行性、リアクティブプログラミングなど、技術を適切に組み合わせることで、高性能で保守性の高い非同期アプリケーションを開発することが可能である。