別のスレッドを中断して終了したい場合どのように設計したらよいでしょうか?
昔悩んだことをメモしてみました。
メインスレッドから重い処理をさせるワークスレッドを止めたい場合を考えてみます。
//work thread while(true) { // do work } //Main thread cancel();
まずwork threadのループ部分のコードには手を入れずにMain threadから終了できるかどうかですが、一応そのような手段はありますがこれらはあまりにも問題が多く実質利用できないと考えるべきです。
Win32API なら TerminateThread();
pthread なら pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS) を指定したスレッドに対して pthread_cancel();
これらはいつでも目的のスレッドを終了できますが、いつでも終了できるということはリソースの確保中だろうが解放中だろうが問答無用で終了してしまうということです。当然おかしなことになりますよね。つまりnewやmalloc、mutexなどの同期オブジェクト、スレッド間で共有するデータ構造など大抵のリソースが使用できないということになります。リソースが確保前、確保中、確保後、解放中、解放後かどうか判別することはできません。その他にも環境依存の制約があります。
TerminateThread();//(WIN32API) too bad pthread_cancel(); //(pthread) too bad with PTHREAD_CANCEL_ASYNCHRONOUS
ではC++例外を外部からwork thread内で何とかして(アセンブラなどで)投げられないでしょうか?
残念ながらこれも実質無理です。例外モドキを投げられたとしても上記と同じリソースの問題に直面することになります。
つまり、work threadに手を入れずに外部から「いつでも」終了できるようにすることは実質できないのです。
ではどうしたらよいのでしょうか?
正解はwork threadに手を入れることになります。
struct thread_aborted{}; //適当にこしらえた例外クラス std::atomic<bool> exit_flag(false); //work thread try { while(true) { // do work check_exit(); } } catch (thread_aborted& e) { } void check_exit() { if (exit_flag) throw thread_aborted{}; //スレッドの終了は例外を投げるのが簡単でよい。 } //Main thread void cancel() { exit_flag = true; } cancel();
このように定期的に check_exit() を呼び出して終了フラッグを調べるようにします。スレッドが自ら終了する場合は何の問題もありません(ただし低レベルなスレッド終了関数を使用するのは避けましょう。理由は後述)。自スレッドを終了させる場合は return してもよいですが関数がネストしていると面倒ですので例外を投げてからcatchしてreturnするのが簡単です。例外はcatchしておかないとstd::terminate()が呼び出されてプログラム全体が終了してしまうことだけには気を付けてください。check_exit() を呼び出す頻度はレスポンスと負荷のトレードオフで決定しなければなりません。
この check_exit() を interruption points や cancellation points と呼びます。
これらと同様もしくはそれ以上に優れた仕組みが boost.thread に用意されています。
check_exit() が boost::this_thread::interruption_point() に相当し、cancel() は boost::thread::interrupt() に相当します。boost.thread は interruption points で boost::thread_interrupted 例外が throw されます。
interruption points は予め特定の待機関数でも設定されていますので
boost::thread::join() boost::thread::timed_join() boost::thread::try_join_for() boost::thread::try_join_until() boost::condition_variable::wait() boost::condition_variable::timed_wait() boost::condition_variable::wait_for() boost::condition_variable::wait_until() boost::condition_variable_any::wait() boost::condition_variable_any::timed_wait() boost::condition_variable_any::wait_for() boost::condition_variable_any::wait_until() boost::thread::sleep() boost::this_thread::sleep_for() boost::this_thread::sleep_until() boost::this_thread::interruption_point() //boost threadのドキュメントより
これらの関数を呼び出しているところでも終了することができます。
void work() { while(true) { //do work boost::this_thread::sleep_for(seconds(60)); } } boost::thread thr(work); //... thr.interrupt();//sleep_forのところで待機中ならすぐに終了できる。 // 詳しく書くとworkスレッドが sleep_for を実行中に thread_interrupted例外が投げられ // その結果としてworkスレッドが終了する。 // thread_interrupted例外はboost.threadライブラリが良きに計らってくれるため // 特にユーザーがcatchする必要はない。
これらの待機関数に interruption points が設定されていることのメリットはただ単に自分で interrruption_point() を書く手間が省けるというだけではありません。自作 check_exit() を使用した場合を考えてみましょう。
void work() try { while(true) { //do work sleep_for_60seconds(); check_exit(); } } catch (...) { } cancel();
スレッドが do work 部分を実行中に外部から cancel() を呼び出してもスレッドは do work を終えた後 sleep_for_60seconds() に突入し60秒間も何も反応しません。その後ようやく check_exit() に突入して晴れてユーザーを1分も待たせたのちに終了することになるのです。つまり自作の check_exit() よりも待機関数と融合した boost.thread の interruption points の方がより高いレスポンスを期待できます。
しかしながら時には対応外の他のライブラリの待機関数を組み合わせたい場合が出てきます。こんな時には待機関数に短いタイムアウトを設定して
void sleep_for_60seconds() { for (int i=0; i<60; ++i) { other_librarys_sleep_for(seconds(1)); check_exit(); //interruption points } } //本題から逸れるが細かく分割するほど待機する時間に誤差が蓄積しやすいので //それを気にする場合はもう少し工夫が必要。
このように細切れに interruption points を挟みます。この例に限らず連携の取れない通知オブジェクトをシングルスレッドで待機するにはポーリングが必要になります。別のスレッドを使用すれば互換性のない待機関数同士を組み合わせるということができる場合もあります。別のスレッドで通知を待機しておいてから元のスレッドで待機可能な通知を投げるという感じです。
連携の取れない待機関数を使う boost::thread をポーリング無しで中断したい場合は boost::thread::interrupt() を使用せずに直接待機可能な通知があればそれを発行して終了処理する方が簡単です。例えばネットワークプログラムを boost.asio を使って書いているとしたら、待機関数の boost::asio::io_service の run() に対して post() など。因みに std::thread では interrupt() はサポートされていませんが、普通は待機関数に合わせた通知方法を選択する場合が多いと思いますのでそこまで不便はしないでしょう。標準に condition_variable もありますし、C++20からはstd::jthreadとstd::stop_tokenが導入される予定です。
boost::asio::io_service io; //here: register async handler std::thread thr([&]{ try { io.run(); } catch(std::runtime_error& e) {} }); //以下のラムダ式が作成したスレッド内のio.run()で実行される io.post([]{ throw std::runtime_error("aborted"); }); thr.join();
何れにしろ待機関数と通知方法を互換性のある一つの種類に統一することがポーリングを回避し、高レスポンスを実現する近道です。
スレッドの終了関数について
pthread_exit, pthread_cancel の問題点
pthread_exit を呼び出してもC++デストラクタが呼び出される保証はありません。しかし主流な実装では呼び出されるようです。
また pthread_cancel() は他スレッドから pthread_setcanceltype( PTHREAD_CANCEL_DEFERRED ) が設定されたスレッド(デフォルトで設定されている)に対して特定の関数の cancellation points でスレッドを終了できますが、これも pthread_exit() と同じでC++デストラクタが呼び出される保証はありません。また仮にC++デストラクタが呼び出されるとしてもどの関数に cancellation points が設定されているのかすべて把握し、それらの関数を呼び出しているすべての個所で中断されることを想定したコード(リソースを使用しない or RAIIを使用する or リソースを回収するクリーンナップハンドラを登録する。かつデータの整合性を保つ必要がある。)が書かれていることを保証しなければなりません。これは現実的にはかなり難しいでしょう。つまり例えば fopenにcancellation pointが設定されていてその関数内では適切にハンドリングされている処理系であったとしても、普通のCやC++のコードはfopen関数が途中で(戻り値も返さずに)中断される可能性があるとは思ってもいないわけです。そこでこれを回避するために pthread_setcancelstate() を駆使して必要なところ(目の届く範囲)でのみキャンセルが有効になるようにしますがそれでも細心の注意と労力が必要になります。
Special thanks @yohhoyさん
結局自スレッドを終了する場合においても低レベルなスレッド関数の使用は避けC++の流儀に則って関数を return するか throw するのがよいでしょう。
スレッドライブラリについて
そもそもスレッドはライブラリとして実装できない(コンパイラの最適化を抑止できない)という話が存在するためpthreadなどを直接使用せずに言語仕様の一部であるstd.async、std.thread、std.mutexなどを使用するのが理想的でしょう。移植性の面からも望ましいです。
おまけ boost.thread + boost.asio のポーリング版
io_service には poll() や poll_one()、run_one() といったメンバ関数が存在しますがこれら自体は中断させる機構を作る用途には向いていません。poll_one() を使用してIOの性能や応答性を高めようとすれば負荷が気になりますし、poll() はそれに加えて処理するハンドラの最大値もしくはタイムアウトを設定できません。run_one() もタイムアウトを設定できません。結局タイマーを使用して io_service から定期的に interruption point を含むハンドラを呼び出すのがポーリングをする場合にはよさそうです。
#include <chrono> #include <iostream> #include <boost/asio.hpp> #include <boost/asio/steady_timer.hpp> #include <boost/thread.hpp> int main(){ boost::asio::io_service io; //here: register async handler boost::thread thr([&]{ boost::asio::steady_timer t(io); std::function<void(boost::system::error_code const&)> handler; handler = [&](boost::system::error_code const&){ boost::this_thread::interruption_point(); t.expires_from_now(std::chrono::seconds(1)); t.async_wait(handler); }; handler({}); io.run(); }); thr.interrupt(); thr.join(); return 0; }