C++ Concurrency



Preamble

Multithreading from C++11 (#include <thread>) is the introduction of a self-contained rationale Application Programming Interface -API- for parallel (even, concurrent) programming beyond heavyweight processes in an Operating System (OS).

Threads of control are in essence lightweight processes that cross operating systems' boundaries and thus overcome incompatibilities (between UNIX and Windows especially).

See also

Headlines
Multicore programming

Rule(s)

Example (getting the number of processors -based on Windows PowerShell-)

echo $env:NUMBER_OF_PROCESSORS
8

Example (getting data about the architecture -based on Windows PowerShell-)

Get-WmiObject -Class Win32_Processor | Select-Object -Property Name, Number*
Name                                      NumberOfCores NumberOfEnabledCore NumberOfLogicalProcessors
----                                      ------------- ------------------- -------------------------
Intel(R) Core(TM) i7-4800MQ CPU @ 2.70GHz             4                   4                         8

See also

C++ multithreading API

Effective concurrency

Rule(s)

Example Illustration_of_thread_local.Cpp.zip 

std::cout << "Concurrent supported threads: " << std::thread::hardware_concurrency() << '\n';

Thread state

Rule(s)

Example Illustration_of_thread_local.Cpp.zip 

bool stop = false;
// Capture 'stop' by reference for guaranteed stopping:
std::thread my_thread([&stop]() -> void { // Thread creation from lambda expression...
    while (!stop) { ... }
    std::cout << "Just ends..." << '\n';
});
assert(my_thread.joinable());
my_thread.detach();
assert(!my_thread.joinable());
std::this_thread::sleep_for(std::chrono::seconds(1));
stop = true;
try {
    my_thread.join();
} catch (std::system_error& se) { // Inheritance graph: 'std::exception' < 'std::runtime_error' < 'std::system_error' (C++11)
    std::cerr << "'my_thread.join()' failed because 'my_thread.joinable()' is " << std::boolalpha << my_thread.joinable() << ", message: " << se.code().message()  << '\n';
}

thread_local keyword

Rule(s)

Example Illustration_of_thread_local.Cpp.zip 

// '.h' file:
class Illustration_of_thread_local {
    /** The game is, as much as possible, avoiding the following inconsistent compositions:
     * "FrançoisSarkozy" and "NicolasHollande" */
    // Threads using '_Francois' have their own copy (read-only mode):
    static thread_local const std::string _Francois;
    static thread_local const std::string _Hollande;
    static thread_local const std::string _Nicolas;
    static thread_local const std::string _Sarkozy;
    static std::string _Patronym;

    static bool _Checkable;
    static bool _Stop;

    static std::mutex _Mutex; // Create critical section for string manipulation ('#include <mutex>')...

    static void _Check();
    static void _Set(std::string const&, std::string const&);
    static void _Set_Francois_Hollande();
    static void _Set_Nicolas_Sarkozy();
public:
    static int Bads;
    static int Goods;
    static void Go();
};

// '.cpp' file:
thread_local const std::string Illustration_of_thread_local::_Francois = "François";
thread_local const std::string Illustration_of_thread_local::_Hollande = "Hollande";
thread_local const std::string Illustration_of_thread_local::_Nicolas = "Nicolas";
thread_local const std::string Illustration_of_thread_local::_Sarkozy = "Sarkozy";

std::string Illustration_of_thread_local::_Patronym = "?";
bool Illustration_of_thread_local::_Checkable = false;
bool Illustration_of_thread_local::_Stop = false;

int Illustration_of_thread_local::Bads = 0;
int Illustration_of_thread_local::Goods = 0;

std::mutex Illustration_of_thread_local::_Mutex;

void Illustration_of_thread_local::_Check() {
    while (!_Stop) {
        if (_Checkable) {
            // 'thread_local' for '_Francois', '_Hollande', '_Nicolas' and '_Sarkozy':
            if (_Patronym.compare(_Francois + _Sarkozy) == 0 || _Patronym.compare(_Nicolas + _Hollande) == 0)
                Bads++;
            else
                Goods++;
            _Checkable = false;
        }
    }
}

void Illustration_of_thread_local::_Set(std::string const& given_name, std::string const& surname) {
    /* Processing is *deliberately* slowed down through two statements instead of '_Patronym = given_name + surname;' */
    // const std::lock_guard<std::mutex> critical_section(_Mutex); // Lightweight...
    std::unique_lock<std::mutex> critical_section(_Mutex); // Heavyweight...
    //    critical_section.try_lock(); // Execution error since constructor does it just before...
    _Patronym = given_name; // Random segmentation fault if no critical section set up...
    critical_section.unlock();
    critical_section.lock();
    _Patronym += surname; // Random segmentation fault if no critical section set up...
    critical_section.unlock();
} // Mutex is released through destruction of 'critical_section' object...

void Illustration_of_thread_local::_Set_Francois_Hollande() {
    while (!_Stop) {
        _Set(_Francois, _Hollande); // 'thread_local' for '_Francois', '_Hollande'...
        _Checkable = true;
    }
}

void Illustration_of_thread_local::_Set_Nicolas_Sarkozy() {
    while (!_Stop) {
        _Set(_Nicolas, _Sarkozy); // 'thread_local' for '_Nicolas' and '_Sarkozy'...
        _Checkable = true;
    }
}

void Illustration_of_thread_local::Go() {
    std::cout << "Concurrent supported threads: " << std::thread::hardware_concurrency() << '\n';

    std::thread Check(&Illustration_of_thread_local::_Check);
    std::thread Set_Francois_Hollande(&Illustration_of_thread_local::_Set_Francois_Hollande);
    std::thread Set_Nicolas_Sarkozy(&Illustration_of_thread_local::_Set_Nicolas_Sarkozy);

    std::this_thread::sleep_for(std::chrono::milliseconds(10));
    _Stop = true;

    Check.join();
    Set_Francois_Hollande.join();
    Set_Nicolas_Sarkozy.join();
}

See also

Condition variable

Rule(s)

Example Short_dead_lock.Cpp.zip 

// '.h' file:
#include <condition_variable>
#include <mutex>

class Short_dead_lock {
private:
    bool __stop = false;
    std::condition_variable _my_condition_variable;
    std::mutex _my_mutex;
public:
    void f();
private:
    void _g();
    inline bool _stop() const {return __stop;}; // 'inline' is for pedagogical reasons *ONLY* (compactness of code)
};

// '.cpp' file:
void Short_dead_lock::f() {
    std::cout << "Start of 'f()' in thread: " << std::this_thread::get_id() << std::endl;
    // _g(); // '_g' *MUST* be started in another thread to avoid self-deadlock: 
    std::thread g(&Short_dead_lock::_g, this); // This starts 'g'...
    std::this_thread::sleep_for(std::chrono::milliseconds(5000)); // Wait 5s...
    // __stop = true; // Required for ver. 2
    // Release '_g':
    _my_condition_variable.notify_all(); // Required for ver. 1 and ver. 2...
    g.join(); // Wait for 'g' thread to complete...
    std::cout << "End of 'f()'" << std::endl;
}

void Short_dead_lock::_g() {
    std::cout << "Start of '_g()' in thread: " << std::this_thread::get_id() << std::endl;
    std::unique_lock<std::mutex> lock(_my_mutex);
    // Ver. 1:
    _my_condition_variable.wait(lock);
    // Ver. 2:
    // _my_condition_variable.wait(lock, std::bind(&Short_dead_lock::_stop, this)); // 'std::bind' <=> 'bind' in JavaScript
    std::cout << "End of '_g()'" << std::endl;
}

int main(int argc, char** argv) {
    Short_dead_lock sdl;
    sdl.f();

    return 0;
}

Atomicity

Rule(s)

Example Atomic.Cpp.zip 

std::array<int, 5> my_array{ 1, 4, 3, 2, 5 };
std::atomic<std::array<int, 5>* > my_atomic{ &my_array };

std::thread ascending(
    [&my_atomic]() {
        // Resource is unavailable:
        std::array<int, 5>* my_array = nullptr;
        // Race condition, try to be the first to get the resource:
        do { my_array = my_atomic.exchange(nullptr); } while (my_array == nullptr);
        std::sort(my_array->begin(), my_array->end(),
            [](const int a, const int b) {
                // std::cout << "\tascending a: " << a << " b: " << b << std::endl;
                return a > b;
            });
        // Release resource:
        my_atomic.exchange(my_array);
});

std::thread descending(
    [&my_atomic]() {
	// Resource is unavailable:
	std::array<int, 5>* my_array = nullptr;
	// Race condition, try to be the first to get the resource:
	do { my_array = my_atomic.exchange(nullptr); } while (my_array == nullptr);
	std::sort(my_array->begin(), my_array->end(),
            [](const int a, const int b) {
                // std::cout << "\tdescending a: " << a << " b: " << b << std::endl;
                return a < b;
            });
        // Release resource:
        my_atomic.exchange(my_array);
});

descending.join();
ascending.join();

for (const int i : my_array) // Either ascending or descending based on the "loser thread"
    std::cout << "\t" << i;

Towards asynchronous programming

Rule(s)

Example Promise.Cpp.zip 

// '.h' file:
class Image {
    static std::list<std::launch> _Start; // Benchmarking only...
    static std::list<std::launch> _End; // Benchmarking only...
    static std::mutex _Mutex; // Benchmarking only...

    const std::string _file_name;
public:
    Image(std::string&&);
    std::string process(const std::launch&) noexcept(false);
};
// '.cpp' file:
std::string Image::process(const std::launch& launch) noexcept(false) {
    
    std::string start = launch == std::launch::deferred ? "Deferred" :
        launch == (std::launch::async | std::launch::deferred) ? "Default" :
        launch == std::launch::async ? "Async" : "?";
    start += " (start)";
    {
        const std::lock_guard<std::mutex> critical_section(_Mutex);
        _Start.push_back(launch);
        for (std::launch launch : _Start) {
            start += " < ";
            start += launch == std::launch::deferred ? "Deferred" :
                launch == (std::launch::async | std::launch::deferred) ? "Default" :
                launch == std::launch::async ? "Async" : "?";
        }
    } // 'mutex' is released at 'critical_section' destruction time

    std::size_t size = 0;
    try {
		std::ifstream ifstream(_file_name, std::ios::binary | std::ios::in);
		assert(ifstream.is_open());
		std::filebuf* buffer = ifstream.rdbuf();
		assert(buffer);
		// Get file size by setting internal position to relative position and returns new position:
		size = buffer->pubseekoff(0, ifstream.end, ifstream.in);
		// Set internal position to absolute position (very beginning):
		std::size_t position = buffer->pubseekpos(0, ifstream.in);
		assert(position == 0);
		// Allocate 'memory' to contain file data
		char* memory = new char[size];
		// Fill in 'memory' with file data:
		buffer->sgetn(memory, size);
		// 'ifstream' is automatically closed at destruction time:
		// ifstream.close();
		// Write file data to stdout:
		std::cout.write(memory, size);
		delete[] memory;
    }
    catch (std::ifstream::failure & failure) { … }

    std::string end = launch == std::launch::deferred ? "Deferred" :
        launch == (std::launch::async | std::launch::deferred) ? "Default" :
        launch == std::launch::async ? "Async" : "?";
    end += " (end)";
    {
        const std::lock_guard<std::mutex> critical_section(_Mutex);
        _End.push_back(launch);
        for (std::launch launch : _Start) {
            end += " < ";
            end += launch == std::launch::deferred ? "Deferred" :
                launch == (std::launch::async | std::launch::deferred) ? "Default" :
                launch == std::launch::async ? "Async" : "?";
        }
    } // 'mutex' is released at 'critical_section' destruction time

    return start + " *** " + end;
}
…
Image image("Franck.jpg");
// Passed functions are each executed in a separate thread:
std::future<std::string> deferred_f = std::async(std::launch::deferred, std::bind(&Image::process, image, std::launch::deferred));
std::future<std::string> default_f = std::async(std::bind(&Image::process, image, std::launch::async | std::launch::deferred));
std::future<std::string> async_f = std::async(std::launch::async, std::bind(&Image::process, image, std::launch::async));

std::string async_s = async_f.get(); // Blocking...
std::string deferred_s = deferred_f.get(); // Blocking...
std::string default_s = default_f.get(); // Blocking...

std::cout << deferred_s << std::endl;
std::cout << default_s << std::endl;
std::cout << async_s << std::endl;

Promise

Rule(s)

  • The idea of “promise” pervades all programming language. It basically simplifies, even removes, the direct utilization of threads (“join” operations in particular). It is the best straightforward way of asynchronous programming for stream programming, big data processing, etc.

Example Promise.Cpp.zip 

// '.h' file:
#include <future> // 'std::promise'...
#include <string>

class Promise {
    std::promise<std::string> _p;
public:
    Promise();
    void request_Resource(std::string&&);
    std::string get_Resource();
};

// '.cpp' file:
Promise::Promise() {
    std::time_t t;
    // See also 'std::default_random_engine' at https://en.cppreference.com/w/cpp/numeric/random
    std::srand(static_cast<unsigned>(std::time(&t))); // Setup random suite...
}

std::string Promise::get_Resource() {
    std::future<std::string> f = _p.get_future(); // Blocking!
    std::string resource_name = f.get();
    std::cout << "'get_Resource' ends for " << resource_name << std::endl;
    return resource_name;
}

void Promise::request_Resource(std::string&& resource_name) {
    // This is a job that takes time to compute some value...
    int wait_for = std::abs(std::rand() % 5000 + 1); // Between 1 and 5000...
    std::cout << "Wait for " << wait_for << " ms about: " << resource_name << std::endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(wait_for));
    _p.set_value(resource_name); // Job ends... '_p.get_future();' is unblocked...
}
…
Promise franck, barbier;
franck.request_Resource("Franck");
barbier.request_Resource("Barbier");
std::cout << franck.get_Resource() + barbier.get_Resource() << std::endl;
Parallelism-driven policies in STL

Rule(s)

Example Horner.Cpp.zip 

// '.h' file:
class Polynomial { // Polynomial evaluation according to different methods...
private:
	std::unordered_map<unsigned, double> _polynomial;
public:
	Polynomial(unsigned = 5U); // https://en.cppreference.com/w/cpp/language/integer_literal
	Polynomial(const std::initializer_list<std::unordered_map<unsigned, double>::value_type>&);
	double horner_method(double, bool parallel = false) const;
	double simple_method(double) const;
	std::string to_string() const;
};
// '.cpp' file:
double Polynomial::horner_method(double x, bool parallel) const { // Horner method...
	std::vector<long> degrees; // 'std::unordered_map' is unsortable!
	for (const auto& member : _polynomial) // Copy polynomial's degrees before sorting...
		degrees.push_back((long)member.first);

	parallel ?
		// Parallel sorting (C++ 17) based on descending (C++ 14):
		std::sort(std::execution::par, degrees.begin(), degrees.end(), std::greater<unsigned>()) :
		std::sort(std::execution::seq, degrees.begin(), degrees.end(), std::greater<unsigned>());
	// std::sort(std::execution::par_unseq, degrees.begin(), degrees.end(), std::greater<unsigned>()); // Parallel and vectorized

	double coefficient;
	double result = 0;
	for (long n = *degrees.begin(); n > 0L; n--) {
		try {
			coefficient = _polynomial.at((unsigned)n); // '[]' conflicts with 'const' member function
			// std::cout << n << '\t';
		}
		catch (std::out_of_range& oor) {
			coefficient = 0.;
			// std::cerr << n << ": " << oor.what() << '\t';
		}
		result = (result + coefficient) * x;
	}
	try {
		result += _polynomial.at(0U);
	}
	catch (std::out_of_range& oor) {
		// std::cerr << "'x^^0' does not exist: " << oor.what() << '\t';
	}
	return result;
}