Asynchron aufrufbare Wrapper

Inhaltsverzeichnis[Anzeigen]

std::packaged_task erlaubt es, einen einfachen Wrapper um eine aufrufbare Einheit zu erzeugen, so dass diese später ausgeführt werden kann.

std::packaged_task

Die Arbeit mit std::packaged_task verläuft typischerweise in vier Schritten:

  1. Verpacke die Aufgabe
  2. Erzeugen den Future
  3. Führe die Berechnung aus
  4. Hole das Ergebnis ab

Am einfachsten lassen sich diese Schritte am Beispiel nachvollziehen.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include <utility>
#include <future>
#include <iostream>
#include <thread>
#include <deque>

class SumUp{
  public:
    int operator()(int beg, int end){
      long long int sum{0};
      for (int i= beg; i < end; ++i ) sum += i;
      return sum;
    }
};

int main(){

  std::cout << std::endl;

  SumUp sumUp1;
  SumUp sumUp2;
  SumUp sumUp3;
  SumUp sumUp4;

  // define the tasks
  std::packaged_task<int(int,int)> sumTask1(sumUp1);
  std::packaged_task<int(int,int)> sumTask2(sumUp2);
  std::packaged_task<int(int,int)> sumTask3(sumUp3);
  std::packaged_task<int(int,int)> sumTask4(sumUp4);

  // get the futures
  std::future<int> sumResult1= sumTask1.get_future();
  std::future<int> sumResult2= sumTask2.get_future();
  std::future<int> sumResult3= sumTask3.get_future();
  std::future<int> sumResult4= sumTask4.get_future();

  // push the tasks on the container
  std::deque< std::packaged_task<int(int,int)> > allTasks;
  allTasks.push_back(std::move(sumTask1));
  allTasks.push_back(std::move(sumTask2));
  allTasks.push_back(std::move(sumTask3));
  allTasks.push_back(std::move(sumTask4));
  
  int begin{1};
  int increment{2500};
  int end= begin + increment;

  // execute each task in a separate thread
  while ( not allTasks.empty() ){
    std::packaged_task<int(int,int)> myTask= std::move(allTasks.front());
    allTasks.pop_front();
    std::thread sumThread(std::move(myTask),begin,end);
    begin= end;
    end += increment;
    sumThread.detach();
  }

  // get the results
  auto sum= sumResult1.get() + sumResult2.get() + sumResult3.get() + sumResult4.get();

  std::cout << "sum of 0 .. 10000 = " << sum << std::endl;

  std::cout << std::endl;

}

 

Die Aufgabe des Programmes ist relativ einfach. Berechne die Summe von 0 bis 10000 mit der Hilfe der vier Threads und sammle die Ergebnisse mit den assoziierten Futuren ein. Das geht natürlich mit der Gaußschen Summenformel deutlich einfacher.

Im ersten Schritt verpacke ich die vier Arbeitspakete in std::packaged_task-Objekte. Die Arbeitspakete sind Instanzen der Klasse SumUp (Zeile 7 - 14). Der Klammeroperator der Klasse (Zeile 8 - 13) addiert alle Zahlen von beg bis end auf und gibt die Summe als Ergebnis zurück. Zum Abschluß werden die packaged_task-Objekte instanziiert. Die Objekte können aufrufbare Einheiten, die zwei ganze Zahlen erwarten und eine ganze Zahl zurückgeben, annehmen.

Nun gilt es im zweiten Schritt, die Future Objekte mit Hilfe der packaged_task-Objekte zu erzeugen. Genau das findet in den Zeilen 32 bis 35 statt. Die packaged_task-Objekte stellen die Promise in dem Kommunikationskanal dar. In diesem Fall definiere ich die Typen der Future explizit: std::future<int> sumResult1= sumTask1.get_future(). Automatische Typableitung mit auto ist da natürlich einfacher.: auto sumResult1= sumTask1.get_future().

Im dritten Schritt folgt die eigentliche Arbeit. Die packaged_task werden auf ein std::deque geschoben (Zeile 38 - 42), die in der  while-Schleife (Zeile 49 - 56) abgearbeitet wird. Dazu verschiebe ich den Kopf der std::deque in eine std::packaged_task (Zeile 50) und verschiebe diese weiter in einen neuen Thread (Zeile 52) und lasse diesen im Hintergrund arbeiten (Zeile 55). std::packaged_task-Objekte können natürlich nicht kopiert (Copy-Semantik) werden. Dies ist der Grund für die Move-Semantik in Zeile 50 und 52. Dies gilt natürlich auch für Promise im allgemein, aber auch Futures und Threads. 

Im vierten und letzten Schritt verwende ich die get-Methoden der Futures, um die Ergebnisse der vier Futures zusammen zu addieren (Zeile 61).

Zugegeben, std::packaged_task ist nicht für den einfachen Anwendungsfall wie std:.async konzipiert. Dafür ist das Ergebnis um so schlichter.

packagedTask

Optimierungspotential

C++11 besitzt die Funktion std::thread::hardware_concurrency. Diese Funktion gibt ein Hinweis auf die Anzahl der Prozessoren eines Systems zurück. Kann die C++-Laufzeit diesen Wert nicht ermitteln, ist es standardkonform, den Wert 0 zurückzugeben. Mit dem aktuellen gcc, clang oder Microsoft Compiler erhalte ich immer die richtige Antwort 4. Genau diese Information über die Anzahl meiner Prozessoren nützte ich in der optimierten Version des Programms aus, den in ihm entspricht die Anzahl der Threads der Anzahl meiner Prozessoren. Damit ist mein System optimal ausgelastet.

 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#include <algorithm>
#include <future>
#include <iostream>
#include <thread>
#include <deque>
#include <vector>

class SumUp{
public:
  SumUp(int b, int e): beg(b),end(e){}
  int operator()(){
    long long int sum{0};
    for (int i= beg; i < end; ++i ) sum += i;
    return sum;
  }
private:
    int beg;
    int end;
};

static const unsigned int hwGuess= 4;
static const unsigned int numbers= 10001;

int main(){

  std::cout << std::endl;

  unsigned int hw= std::thread::hardware_concurrency();
  unsigned int hwConcurr= (hw != 0)? hw : hwGuess;

  // define the functors
  std::vector<SumUp> sumUp;
  for ( unsigned int i= 0; i < hwConcurr; ++i){
    int begin= (i*numbers)/hwConcurr;
    int end= (i+1)*numbers/hwConcurr;
    sumUp.push_back(SumUp(begin ,end));
  }

  // define the tasks
  std::deque<std::packaged_task<int()>> sumTask;
  for ( unsigned int i= 0; i < hwConcurr; ++i){
    std::packaged_task<int()> SumTask(sumUp[i]);
    sumTask.push_back(std::move(SumTask));
  }

  // get the futures
  std::vector< std::future<int>> sumResult;
  for ( unsigned int i= 0; i < hwConcurr; ++i){
    sumResult.push_back(sumTask[i].get_future());
  }

  // execute each task in a separate thread
  while ( not sumTask.empty() ){
    std::packaged_task<int()> myTask= std::move(sumTask.front());
    sumTask.pop_front();
    std::thread sumThread(std::move(myTask));
    sumThread.detach();
  }

  // get the results
  int sum= 0;
  for ( unsigned int i= 0; i < hwConcurr; ++i){
    sum += sumResult[i].get();
  }

  std::cout << "sum of 0 .. 100000 = " << sum << std::endl;

  std::cout << std::endl;

}

 

Wie geht's weiter?

Im nächsten Artikel gehe ich tiefer auf Future und Promise ein.

Hintergrundinformationen

Automatische Typableitung
    Die automatische Typableitung mit auto und decltype stelle ich in dem Artikel Neue Ausdruckskraft 02/2014 vor.
Copy- versus Move-Semantik
   In dem Artikel Rasch verschoben 02/2015 stelle ich Rvalue Referenzen, Copy- und Move-Semantik genauer vor.

 

 

 

 

 

 

title page smalltitle page small Go to Leanpub/cpplibrary "What every professional C++ programmer should know about the C++ standard library".   Hole dir dein E-Book. Unterstütze meinen Blog.

Tags: Tasks

Kommentar schreiben


Abonniere den Newsletter (+ pdf Päckchen)

Beiträge-Archiv

Sourcecode

Neuste Kommentare