Multithreading from C++11 (
#include <thread>
) is the introduction of a self-contained rationale Application Programming Interface -API- for parallel (even, concurrent) programming beyond heavyweight processes in an Operating System (OS).Threads of control are in essence lightweight processes that cross operating systems' boundaries and thus overcome incompatibilities (between UNIX and Windows especially).
See also
Rule(s)
- C++ multicore programming requires key information about the computer architecture.
Example (getting the number of processors -based on Windows PowerShell-)
echo $env:NUMBER_OF_PROCESSORS 8
Example (getting data about the architecture -based on Windows PowerShell-)
Get-WmiObject -Class Win32_Processor | Select-Object -Property Name, Number* Name NumberOfCores NumberOfEnabledCore NumberOfLogicalProcessors ---- ------------- ------------------- ------------------------- Intel(R) Core(TM) i7-4800MQ CPU @ 2.70GHz 4 4 8
See also
- Hyper-threading (Intel technology): physical cores are divided into logical cores.
Effective concurrency
Rule(s)
- The
std::thread::hardware_concurrency
function informs us on the “effective” hardware architecture from which threads may be assigned to cores/processors. Typically, an Intel® Core™ i7-10510U processor has 4 cores and 8 (logical) threads while the 8th-gen Core™ i7-8700 processor comes with 6 cores and 12 (logical) threads.Example Illustration_of_thread_local.Cpp.zip
std::cout << "Concurrent supported threads: " << std::thread::hardware_concurrency() << '\n';
Thread state
Rule(s)
- The possibility of testing the state of a thread avoids inconsistent waits or, in general, inconsistent operations, e.g., joins while the thread is no longer joinable.
Example Illustration_of_thread_local.Cpp.zip
bool stop = false; // Capture 'stop' by reference for guaranteed stopping: std::thread my_thread([&stop]() -> void { // Thread creation from lambda expression... while (!stop) { ... } std::cout << "Just ends..." << '\n'; }); assert(my_thread.joinable()); my_thread.detach(); assert(!my_thread.joinable()); std::this_thread::sleep_for(std::chrono::seconds(1)); stop = true; try { my_thread.join(); } catch (std::system_error& se) { // Inheritance graph: 'std::exception' < 'std::runtime_error' < 'std::system_error' (C++11) std::cerr << "'my_thread.join()' failed because 'my_thread.joinable()' is " << std::boolalpha << my_thread.joinable() << ", message: " << se.code().message() << '\n'; }
thread_local
keywordRule(s)
- C++11 offers the
thread_local
keyword to force threads to get a copy of static data at starting time. For example, read-only data used in threads' computation may be duplicated so that threads do not access the same memory at reading time.- C++11 offers a mutual exclusion mechanism based on the
std::mutex
class. As ofsynchronized
in Java, locks may be settled from “mutexes” to typically delimit critical execution sections.Example Illustration_of_thread_local.Cpp.zip
// '.h' file: class Illustration_of_thread_local { /** The game is, as much as possible, avoiding the following inconsistent compositions: * "FrançoisSarkozy" and "NicolasHollande" */ // Threads using '_Francois' have their own copy (read-only mode): static thread_local const std::string _Francois; static thread_local const std::string _Hollande; static thread_local const std::string _Nicolas; static thread_local const std::string _Sarkozy; static std::string _Patronym; static bool _Checkable; static bool _Stop; static std::mutex _Mutex; // Create critical section for string manipulation ('#include <mutex>')... static void _Check(); static void _Set(std::string const&, std::string const&); static void _Set_Francois_Hollande(); static void _Set_Nicolas_Sarkozy(); public: static int Bads; static int Goods; static void Go(); }; // '.cpp' file: thread_local const std::string Illustration_of_thread_local::_Francois = "François"; thread_local const std::string Illustration_of_thread_local::_Hollande = "Hollande"; thread_local const std::string Illustration_of_thread_local::_Nicolas = "Nicolas"; thread_local const std::string Illustration_of_thread_local::_Sarkozy = "Sarkozy"; std::string Illustration_of_thread_local::_Patronym = "?"; bool Illustration_of_thread_local::_Checkable = false; bool Illustration_of_thread_local::_Stop = false; int Illustration_of_thread_local::Bads = 0; int Illustration_of_thread_local::Goods = 0; std::mutex Illustration_of_thread_local::_Mutex; void Illustration_of_thread_local::_Check() { while (!_Stop) { if (_Checkable) { // 'thread_local' for '_Francois', '_Hollande', '_Nicolas' and '_Sarkozy': if (_Patronym.compare(_Francois + _Sarkozy) == 0 || _Patronym.compare(_Nicolas + _Hollande) == 0) Bads++; else Goods++; _Checkable = false; } } } void Illustration_of_thread_local::_Set(std::string const& given_name, std::string const& surname) { /* Processing is *deliberately* slowed down through two statements instead of '_Patronym = given_name + surname;' */ // const std::lock_guard<std::mutex> critical_section(_Mutex); // Lightweight... std::unique_lock<std::mutex> critical_section(_Mutex); // Heavyweight... // critical_section.try_lock(); // Execution error since constructor does it just before... _Patronym = given_name; // Random segmentation fault if no critical section set up... critical_section.unlock(); critical_section.lock(); _Patronym += surname; // Random segmentation fault if no critical section set up... critical_section.unlock(); } // Mutex is released through destruction of 'critical_section' object... void Illustration_of_thread_local::_Set_Francois_Hollande() { while (!_Stop) { _Set(_Francois, _Hollande); // 'thread_local' for '_Francois', '_Hollande'... _Checkable = true; } } void Illustration_of_thread_local::_Set_Nicolas_Sarkozy() { while (!_Stop) { _Set(_Nicolas, _Sarkozy); // 'thread_local' for '_Nicolas' and '_Sarkozy'... _Checkable = true; } } void Illustration_of_thread_local::Go() { std::cout << "Concurrent supported threads: " << std::thread::hardware_concurrency() << '\n'; std::thread Check(&Illustration_of_thread_local::_Check); std::thread Set_Francois_Hollande(&Illustration_of_thread_local::_Set_Francois_Hollande); std::thread Set_Nicolas_Sarkozy(&Illustration_of_thread_local::_Set_Nicolas_Sarkozy); std::this_thread::sleep_for(std::chrono::milliseconds(10)); _Stop = true; Check.join(); Set_Francois_Hollande.join(); Set_Nicolas_Sarkozy.join(); }
See also
Condition variable
Rule(s)
- As of Java with
wait
,notify
(notify_one
in C++11) and,notifyAll
(notify_all
in C++11) fromjava.lang.Object
, threads' synchronization in C++11 may rely on condition variables.Example Short_dead_lock.Cpp.zip
// '.h' file: #include <condition_variable> #include <mutex> class Short_dead_lock { private: bool __stop = false; std::condition_variable _my_condition_variable; std::mutex _my_mutex; public: void f(); private: void _g(); inline bool _stop() const {return __stop;}; // 'inline' is for pedagogical reasons *ONLY* (compactness of code) }; // '.cpp' file: void Short_dead_lock::f() { std::cout << "Start of 'f()' in thread: " << std::this_thread::get_id() << std::endl; // _g(); // '_g' *MUST* be started in another thread to avoid self-deadlock: std::thread g(&Short_dead_lock::_g, this); // This starts 'g'... std::this_thread::sleep_for(std::chrono::milliseconds(5000)); // Wait 5s... // __stop = true; // Required for ver. 2 // Release '_g': _my_condition_variable.notify_all(); // Required for ver. 1 and ver. 2... g.join(); // Wait for 'g' thread to complete... std::cout << "End of 'f()'" << std::endl; } void Short_dead_lock::_g() { std::cout << "Start of '_g()' in thread: " << std::this_thread::get_id() << std::endl; std::unique_lock<std::mutex> lock(_my_mutex); // Ver. 1: _my_condition_variable.wait(lock); // Ver. 2: // _my_condition_variable.wait(lock, std::bind(&Short_dead_lock::_stop, this)); // 'std::bind' <=> 'bind' in JavaScript std::cout << "End of '_g()'" << std::endl; } int main(int argc, char** argv) { Short_dead_lock sdl; sdl.f(); return 0; }
Atomicity
Rule(s)
std::atomic
allows the wrapping of a trivial variable so that threads may read and/or write this variable by means of theload
(get wrapped variable) andstore
(set wrapped variable) atomic primitives.load
andstore
are associated with memory access policies. As for theexchange
atomic primitive, it mainly allows thread synchronization.Example Atomic.Cpp.zip
std::array<int, 5> my_array{ 1, 4, 3, 2, 5 }; std::atomic<std::array<int, 5>* > my_atomic{ &my_array }; std::thread ascending( [&my_atomic]() { // Resource is unavailable: std::array<int, 5>* my_array = nullptr; // Race condition, try to be the first to get the resource: do { my_array = my_atomic.exchange(nullptr); } while (my_array == nullptr); std::sort(my_array->begin(), my_array->end(), [](const int a, const int b) { // std::cout << "\tascending a: " << a << " b: " << b << std::endl; return a > b; }); // Release resource: my_atomic.exchange(my_array); }); std::thread descending( [&my_atomic]() { // Resource is unavailable: std::array<int, 5>* my_array = nullptr; // Race condition, try to be the first to get the resource: do { my_array = my_atomic.exchange(nullptr); } while (my_array == nullptr); std::sort(my_array->begin(), my_array->end(), [](const int a, const int b) { // std::cout << "\tdescending a: " << a << " b: " << b << std::endl; return a < b; }); // Release resource: my_atomic.exchange(my_array); }); descending.join(); ascending.join(); for (const int i : my_array) // Either ascending or descending based on the "loser thread" std::cout << "\t" << i;
Towards asynchronous programming
Rule(s)
std::async
is the base function to transparently deal with threads that run on an asynchronous basis.std::async
is used in conjunction with execution policies as follows.:
std::launch::async
guarantees the asynchronous behavior, i.e., the passed function is executed in a separate thread.std::launch::deferred
: no asynchronous behavior, i.e., passed function is called when another thread callsget
on returnedstd::future
object.std::launch::async | std::launch::deferred
is the default behavior; the launch policy leads to asynchronous execution or not based on load balancing issues: no control is possible.Example Promise.Cpp.zip
// '.h' file: class Image { static std::list<std::launch> _Start; // Benchmarking only... static std::list<std::launch> _End; // Benchmarking only... static std::mutex _Mutex; // Benchmarking only... const std::string _file_name; public: Image(std::string&&); std::string process(const std::launch&) noexcept(false); }; // '.cpp' file: std::string Image::process(const std::launch& launch) noexcept(false) { std::string start = launch == std::launch::deferred ? "Deferred" : launch == (std::launch::async | std::launch::deferred) ? "Default" : launch == std::launch::async ? "Async" : "?"; start += " (start)"; { const std::lock_guard<std::mutex> critical_section(_Mutex); _Start.push_back(launch); for (std::launch launch : _Start) { start += " < "; start += launch == std::launch::deferred ? "Deferred" : launch == (std::launch::async | std::launch::deferred) ? "Default" : launch == std::launch::async ? "Async" : "?"; } } // 'mutex' is released at 'critical_section' destruction time std::size_t size = 0; try { std::ifstream ifstream(_file_name, std::ios::binary | std::ios::in); assert(ifstream.is_open()); std::filebuf* buffer = ifstream.rdbuf(); assert(buffer); // Get file size by setting internal position to relative position and returns new position: size = buffer->pubseekoff(0, ifstream.end, ifstream.in); // Set internal position to absolute position (very beginning): std::size_t position = buffer->pubseekpos(0, ifstream.in); assert(position == 0); // Allocate 'memory' to contain file data char* memory = new char[size]; // Fill in 'memory' with file data: buffer->sgetn(memory, size); // 'ifstream' is automatically closed at destruction time: // ifstream.close(); // Write file data to stdout: std::cout.write(memory, size); delete[] memory; } catch (std::ifstream::failure & failure) { … } std::string end = launch == std::launch::deferred ? "Deferred" : launch == (std::launch::async | std::launch::deferred) ? "Default" : launch == std::launch::async ? "Async" : "?"; end += " (end)"; { const std::lock_guard<std::mutex> critical_section(_Mutex); _End.push_back(launch); for (std::launch launch : _Start) { end += " < "; end += launch == std::launch::deferred ? "Deferred" : launch == (std::launch::async | std::launch::deferred) ? "Default" : launch == std::launch::async ? "Async" : "?"; } } // 'mutex' is released at 'critical_section' destruction time return start + " *** " + end; } … Image image("Franck.jpg"); // Passed functions are each executed in a separate thread: std::future<std::string> deferred_f = std::async(std::launch::deferred, std::bind(&Image::process, image, std::launch::deferred)); std::future<std::string> default_f = std::async(std::bind(&Image::process, image, std::launch::async | std::launch::deferred)); std::future<std::string> async_f = std::async(std::launch::async, std::bind(&Image::process, image, std::launch::async)); std::string async_s = async_f.get(); // Blocking... std::string deferred_s = deferred_f.get(); // Blocking... std::string default_s = default_f.get(); // Blocking... std::cout << deferred_s << std::endl; std::cout << default_s << std::endl; std::cout << async_s << std::endl;
Promise
Rule(s)
- The idea of “promise” pervades all programming language. It basically simplifies, even removes, the direct utilization of threads (“join” operations in particular). It is the best straightforward way of asynchronous programming for stream programming, big data processing, etc.
Example Promise.Cpp.zip
// '.h' file: #include <future> // 'std::promise'... #include <string> class Promise { std::promise<std::string> _p; public: Promise(); void request_Resource(std::string&&); std::string get_Resource(); }; // '.cpp' file: Promise::Promise() { std::time_t t; // See also 'std::default_random_engine' at https://en.cppreference.com/w/cpp/numeric/random std::srand(static_cast<unsigned>(std::time(&t))); // Setup random suite... } std::string Promise::get_Resource() { std::future<std::string> f = _p.get_future(); // Blocking! std::string resource_name = f.get(); std::cout << "'get_Resource' ends for " << resource_name << std::endl; return resource_name; } void Promise::request_Resource(std::string&& resource_name) { // This is a job that takes time to compute some value... int wait_for = std::abs(std::rand() % 5000 + 1); // Between 1 and 5000... std::cout << "Wait for " << wait_for << " ms about: " << resource_name << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(wait_for)); _p.set_value(resource_name); // Job ends... '_p.get_future();' is unblocked... } … Promise franck, barbier; franck.request_Resource("Franck"); barbier.request_Resource("Barbier"); std::cout << franck.get_Resource() + barbier.get_Resource() << std::endl;
Parallelism-driven policies in STL Rule(s)
- From C++17, STL algorithms may be rules by parallelism-driven policies.
Example Horner.Cpp.zip
// '.h' file: class Polynomial { // Polynomial evaluation according to different methods... private: std::unordered_map<unsigned, double> _polynomial; public: Polynomial(unsigned = 5U); // https://en.cppreference.com/w/cpp/language/integer_literal Polynomial(const std::initializer_list<std::unordered_map<unsigned, double>::value_type>&); double horner_method(double, bool parallel = false) const; double simple_method(double) const; std::string to_string() const; };
// '.cpp' file: double Polynomial::horner_method(double x, bool parallel) const { // Horner method... std::vector<long> degrees; // 'std::unordered_map' is unsortable! for (const auto& member : _polynomial) // Copy polynomial's degrees before sorting... degrees.push_back((long)member.first); parallel ? // Parallel sorting (C++ 17) based on descending (C++ 14): std::sort(std::execution::par, degrees.begin(), degrees.end(), std::greater<unsigned>()) : std::sort(std::execution::seq, degrees.begin(), degrees.end(), std::greater<unsigned>()); // std::sort(std::execution::par_unseq, degrees.begin(), degrees.end(), std::greater<unsigned>()); // Parallel and vectorized double coefficient; double result = 0; for (long n = *degrees.begin(); n > 0L; n--) { try { coefficient = _polynomial.at((unsigned)n); // '[]' conflicts with 'const' member function // std::cout << n << '\t'; } catch (std::out_of_range& oor) { coefficient = 0.; // std::cerr << n << ": " << oor.what() << '\t'; } result = (result + coefficient) * x; } try { result += _polynomial.at(0U); } catch (std::out_of_range& oor) { // std::cerr << "'x^^0' does not exist: " << oor.what() << '\t'; } return result; }