Schlafen und Warten

Die neue Zeitbibliothek ist ein elementarer Bestandteil der Threading-Schnittstelle. Sowohl Threads, Locks, Bedingungsvariablen als auch Futures besitzen ein Verständnis von Zeit. Alle vier Komponenten ist gemein, dass sie für eine Zeitspanne oder bis zu einem Zeitpunkt schlafen, warten oder auch nur blockieren können.

Ordnung

Die Methoden rund um Zeitmanagement in Multithreading Programmen folgen einer einfachen Konvention. Methoden, die mit _for enden, werden mit einer Zeitdauer parametrisiert. Methoden, die mit _until enden, werden mit einem Zeitpunkt parametrisiert. Sowohl Zeitdauern als auch Zeitpunkte habe ich in eigenen Artikel beschrieben. Einen einfachen Überblick zu den verschiedenen Methoden liefert die folgende Tabelle. 

untilAndFor

In der Tabelle ist in2min ein Zeitpunkt, der 2 Minuten in der Zukunft liegt. Hingegen bezeichnet 2s eine Zeitdauer über 2 Sekunden. Während der Anwender in2min mit dem Ausdruck auto in2min= std::chrono::steady_clock::now() + std::chrono::minutes(2) aufwändig definieren muss, steht der Ausdruck 2s mit C++14 automatisch als fundamentales Literal zur Verfügung. C++14 enthält weitere fundamentale Literale zu typischen Zeitdauern.

Nach der Theorie folgt die Praxis.

Verschiedene Warte-Strategien

Bevor ich das Programm vorstelle, will ich die verschiedenen Warte-Strategien kurz beschrieben.

Zentrale Idee des Programms ist ein Promis, der das Ergebnis seiner Berechnung vier geteilten Futuren zur Verfügung stellt. Nur dadurch, dass die Future geteilt sind (std::shared_future), ist dies möglich. Jeder der Future wartet auf das Ergebnis des gleichen Promise in einer verschiedenen Strategie. Alle Promise und Future werden in separaten Threads ausgeführt. Der Einfachheit halber spreche ich nun auch nicht mehr vom Future, der wartet, sondern von dem Thread, indem der Future verwendet wird. Die Details zur Promise und Future habe ich im gleichnamigen Artikel bereits vorgestellt.

  • consumeThread1: Wartet bis zu 4 Sekunden auf das Ergebnis des Promise.
  • consumeThread2: Wartet bis zu 20 Sekunden auf das Ergebnis des Promise.
  • consumeThread3: Frägt den Promise an, ob das Ergebnis vorliegt und legt sich dann wieder 700 Millisekunden schlafen.
  • consumeThread4: Frägt den Promise, ob das Ergebnis vorliegt. Legt sich beim ersten Mal 1 Millisekunden schlafen und verdoppelt anschließend jeweils seine Schlafperiode.

Nun aber zum Programm.

 

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
// sleepAndWait.cpp

#include <utility>
#include <iostream>
#include <future>
#include <thread>
#include <utility>

std::mutex coutMutex;

long double getDifference(const std::chrono::steady_clock::time_point& tp1,const std::chrono::steady_clock::time_point& tp2){
    auto diff= tp2- tp1;
    auto res= std::chrono::duration <double, std::milli> (diff).count();
    return res;
}

voidproducer(std::promise<int>&& prom){
    std::cout << "PRODUCING THE VALUE 2011\n\n"; 
    std::this_thread::sleep_for(std::chrono::seconds(5));
    prom.set_value(2011);
}

void consumer(std::shared_future<int> fut,std::chrono::steady_clock::duration dur){
    auto start = std::chrono::steady_clock::now();
    std::future_status status= fut.wait_until(std::chrono::steady_clock::now() + dur);
    if ( status == std::future_status::ready ){
        std::lock_guard<std::mutex> lockCout(coutMutex);
        std::cout << std::this_thread::get_id() << " ready => Result: " << fut.get() << std::endl;
    }
    else{
        std::lock_guard<std::mutex> lockCout(coutMutex);
        std::cout << std::this_thread::get_id() << " stopped waiting." << std::endl;
    }
    auto end= std::chrono::steady_clock::now();
    std::lock_guard<std::mutex> lockCout(coutMutex);
    std::cout << std::this_thread::get_id() << " waiting time: " << getDifference(start,end) << " ms" << std::endl;
}

void consumePeriodically(std::shared_future<int> fut){
    auto start = std::chrono::steady_clock::now();
    std::future_status status;
    do {
        std::this_thread::sleep_for(std::chrono::milliseconds(700));
        status = fut.wait_for(std::chrono::seconds(0));
        if (status == std::future_status::timeout) {
            std::lock_guard<std::mutex> lockCout(coutMutex);
            std::cout << "     " << std::this_thread::get_id() << " still waiting." << std::endl;
        }
        else if (status == std::future_status::ready) {
            std::lock_guard<std::mutex> lockCout(coutMutex);
            std::cout << "     " << std::this_thread::get_id() << " waiting done => Result: " << fut.get() << std::endl;
        }
    } while (status != std::future_status::ready); 
    auto end= std::chrono::steady_clock::now();
    std::lock_guard<std::mutex> lockCout(coutMutex);
    std::cout << "     " << std::this_thread::get_id() << " waiting time: " << getDifference(start,end) << " ms" << std::endl;
}

void consumeWithBackoff(std::shared_future<int> fut){
    auto start = std::chrono::steady_clock::now();
    std::future_status status;
    auto dur= std::chrono::milliseconds(1);
    do {
        std::this_thread::sleep_for(dur);
        status = fut.wait_for(std::chrono::seconds(0));
        dur *= 2;
        if (status == std::future_status::timeout) {
            std::lock_guard<std::mutex> lockCout(coutMutex);
            std::cout << "         " << std::this_thread::get_id() << " still waiting." << std::endl;
        }
        else if (status == std::future_status::ready) {
            std::lock_guard<std::mutex> lockCout(coutMutex);
            std::cout << "         " << std::this_thread::get_id() << " waiting done => Result: " << fut.get() << std::endl;
        }
    } while (status != std::future_status::ready);
    auto end= std::chrono::steady_clock::now();
    std::lock_guard<std::mutex> lockCout(coutMutex);
    std::cout << "         " << std::this_thread::get_id() << " waiting time: " << getDifference(start,end) << " ms" << std::endl;
}

int main(){
    
    std::cout << std::endl;

    std::promise<int> prom;
    std::shared_future<int> future= prom.get_future();
    std::thread producerThread(producer,std::move(prom));
    
    std::thread consumerThread1(consumer,future,std::chrono::seconds(4));
    std::thread consumerThread2(consumer,future,std::chrono::seconds(20));
    std::thread consumerThread3(consumePeriodically,future);
    std::thread consumerThread4(consumeWithBackoff,future);
    
    consumerThread1.join();
    consumerThread2.join();
    consumerThread3.join();
    consumerThread4.join();
    producerThread.join();
    
    std::cout << std::endl;

}

 

In der main-Funktion erzeuge ich den Promise (Zeile 85), erzeuge mir mit Hilfe des Promise einen Future (Zeile 86) und verschiebe den Promise in einen separaten Thread (Zeile 87). Der Promise kann nur in den Thread verschoben werden, da er keine Copy-Semantik unterstützt. Das gilt nicht für die geteilte Futures in Zeile 89 - 92. Sie können in ihren Thread kopiert werden. 

Bevor ich genauer auf die Arbeitspakete der Threads eingehe, ein paar Worte zu der Hilfsfunktion getDifference (Zeile 11 - 15). Sie benötigt als Eingabe zwei Zeitpunkte und gibt mir als Rückgabewert die vergangenene Zeit in Millisekunden zurück. Von dieser Funktion werde ich häufig Gebrauch machen.

Nun zu den Threads.

  • producerThread: Führt die Funktion producer (Zeile 17 - 21) aus und stellt in ihr nach einem 5-sekündigen Schlaf das Ergebnis 2011 zur Verfügung. Genau auf dieses Ergebnis warten alle Future.
  • consumerThread1:Führt die Funktion consumer (Zeile 23 - 37) aus. In ihr wartet der Thread maximal vom aktuellen Zeitpunkt ausgehend 4 Sekunden (Zeile 25), bis er mit seiner Arbeit fortfährt. In dieser Zeit steht natürlich das Ergebnis des Promise noch nicht zur Verfügung.
  • consumerThread2:Führt die Funktion consumer (Zeile 23 - 37) aus. In ihr wartet er maximal vom aktuellen Zeitpunkt ausgehend 20 Sekunden (Zeile 25), bis er mit seiner Arbeit fortfährt.
  • consumerThread3:Führt die Funktion consumePeriodically (Zeile 39 - 57) aus. Er schläft immer 700 Millisekunden (Zeile 43) und frägt dann nach, ob das Ergebnis des Promise zur Verfügung steht (Zeile 44). Durch die Zeitangabe von 0 Sekunden (std::chrono::seconds(0)) findet in diesem Fall kein Warten statt. Steht das Ergebnis der Berechnung zur Verfügung, gibt er das Ergebnis in Zeile 73 aus.
  • consumerThread4:Führt die Funktion consumeWithBackoff (Zeile 59 - 79) aus. Er schläft in der ersten Iteration 1 Millisekunden und verdoppelt bei jeder weiteren Iteration seine Schlafdauer. Sonst ist seine Strategie wie die des consumerThread3.

Nun zur Synchronisation des Programms. Sowohl der gemeinsame Zeitgeber, mit dem ich die aktuellen Zeitpunkte bestimme ist eine geteilte Variable als auch std::cout. Trotzdem ist keine Synchronisations notwendig. Zum einen sind die Methodenaufrufe std::chrono::steady_clock::now() thread-sicher (zum Beispiel in Zeile 24 und 34), zum anderen sichert die C++-Laufzeit zu, dass die Zeichen atomar auf std::cout geschrieben werden. Ich habe aber nur um der Optik willen std::cout in einen std::lock_guard verpackt (zum Beispiel in Zeile 27, 31 und 35).

Trotz dem geordneten Schreiben auf std::cout, fordert die Ausgabe einen scharfen Blick.

sleepAndWait

Zuerst kommt der Promise zum Zuge. Danach sind die Future an der Reihen. Als erstes frägt consumerThread4 nach, ob das Ergebnis vorliegt. Die Ausgabe des Threads ist mit 8 Zeichen eingerückt. Darüber hinaus gibt er seine ID aus. Unmittelbar danach folgt consumerThread3. Dessen Ausgabe ist mit vier Zeichen eingerückt. consumerThread1 und consumerThread2 sind nicht eingerückt.

  • consumeThread1: Wartet vergeblich 4000.18 ms Sekunden ohne das Ergebnis zu erhalten.
  • consumeThread2: Erhält das Ergebnis nach 5000.3 ms, obwohl er maximal 20 Sekunden warten würde.
  • consumeThread3: Erhält das Ergebnis nach 5601.76 ms. Das ist cirka 5600= 8*700 Millisekunden.
  • consumeThread4: Erhält das Ergebnis nach 8193.81 ms. Oder anders formuliert. Er wartet ca. 3 Sekunden zu lange.

Wie geht's weiter?

Mit diesem Artikel beende ich meine Miniserie über die neue Zeitbibliothek in C++. Daher kann ich in diesem Fall nur auf die Übersichtsseite zu allen Artikeln verweisen.

 

 

 

 

 

 

 

 

title page smalltitle page small Go to Leanpub/cpplibrary "What every professional C++ programmer should know about the C++ standard library".   Hole dir dein E-Book. Unterstütze meinen Blog.

 

Tags: time

Mentoring

Stay Informed about my Mentoring

 

Rezensionen

Tutorial

Besucher

Heute 272

Gestern 4711

Woche 272

Monat 63556

Insgesamt 3713021

Aktuell sind 697 Gäste und keine Mitglieder online

Kubik-Rubik Joomla! Extensions

Abonniere den Newsletter (+ pdf Päckchen)

Beiträge-Archiv

Sourcecode

Neuste Kommentare