I am working on some image processing ideas and I want to implement them using pthreads. Here is an example I came up with just to make sure that the threads were actually doing what they were supposed to be doing. It just increments the values in an array and prints the run time. There is also a serial version of the incrementing function just to show the benefits of the pthreads. The program is getting hung up though and I am not sure why or how to fix this. It is labeled in two locations (same error) as "FOO". The code runs correctly but not without the two usleep(1000) inserted in nIncreaseArray() (both marked with "FOO"). Let me know what you think and if you have any suggestions, thanks. The code is also attached as a .cpp if you want to compile and run.

Sam

//pthread_example1.cpp.cpp


#include <iostream>
#include <stdio.h> //used for printf
#include <math.h>
#include <errno.h>
#include <sys/time.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>

using namespace std;

#define ARRAY_HEIGHT 50 //each cell represents a ditsance of 5cm
#define ARRAY_WIDTH 50 //each cell represents a ditsance of 5cm

int g_anArray[ARRAY_HEIGHT][ARRAY_WIDTH];

pthread_mutex_t mutex[4], count_cond_mutex, count_mutex;
pthread_cond_t condition[4], count_condition;
pthread_t threads[4];
pthread_attr_t attr;
int nCount = 0;
bool bExit[4];


int nInit(){
  int i, nError;

  /* Initialize mutex and condition variable objects */
  for(i = 0; i < 4; i++){
    pthread_mutex_init(&mutex[i], NULL);
    pthread_cond_init (&condition[i], NULL);
  }
  pthread_mutex_init(&count_cond_mutex, NULL);
  pthread_mutex_init(&count_mutex, NULL);
  pthread_cond_init(&count_condition, NULL);

  /* For portability, explicitly create threads in a joinable state */
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

  memset(bExit, false, 4);

  return(0);
}

int nUninit(){
  /* remove thread dependencies */
  pthread_attr_destroy(&attr);
  pthread_mutex_destroy(&count_cond_mutex);
  pthread_mutex_destroy(&count_mutex);
  pthread_cond_destroy(&count_condition);
  for(int i = 0; i < 4; i++){
    pthread_mutex_destroy(&mutex[i]);
    pthread_cond_destroy(&condition[i]);
  }

  pthread_exit(NULL);
  //return(0);
}


void *vThread(void *pvQuadrant){
  int r_t, r_b, c_l, c_r, x, y, q;

  q = (int)pvQuadrant;
  /* Set index limits for quadrant */
  if(q == 0){
    r_t = 0; r_b = (ARRAY_HEIGHT / 2) - 1; c_l = ARRAY_WIDTH / 2; c_r = ARRAY_WIDTH - 1;
  }
  if(q == 1){
    r_t = 0; r_b = (ARRAY_HEIGHT / 2) - 1; c_l = 0; c_r = (ARRAY_WIDTH / 2) - 1;
  }
  if(q == 2){
    r_t = ARRAY_HEIGHT / 2; r_b = ARRAY_HEIGHT - 1; c_l = 0; c_r = (ARRAY_WIDTH / 2) - 1;
  }
  if(q == 3){
    r_t = ARRAY_HEIGHT / 2; r_b = ARRAY_HEIGHT - 1; c_l = ARRAY_WIDTH / 2; c_r = ARRAY_WIDTH - 1;
  }

  //let calling process know that all the threads have been created
  pthread_mutex_lock(&count_cond_mutex);
    nCount++;
    if (nCount == 4)
      pthread_cond_signal(&count_condition);
  pthread_mutex_unlock(&count_cond_mutex);

  while(true){
    pthread_mutex_lock(&mutex[q]);
    pthread_cond_wait(&condition[q], &mutex[q]);
    pthread_mutex_unlock(&mutex[q]);

    if(bExit[q]){
      printf("Thread %d exited\n", q);
      pthread_exit(NULL);
    }

    //PUT CODE FOR MANIPULATING QUADRANTS OF 2-D ARRAY
    for(y = r_t; y <= r_b; y++){
      for(x = c_l; x <= c_r; x++){
        g_anArray[y][x]++;
      }
    }
    //usleep(500000); //this is just so you can see they all have to finish first

    /* Check if thread is the last one finsihed, if so, signal nIncreaseArray() that vDilate is finsihed */
    pthread_mutex_lock(&count_cond_mutex);
      nCount++;
      if (nCount == 4)
        pthread_cond_signal(&count_condition);
    pthread_mutex_unlock(&count_cond_mutex);

  }
}


int nIncreaseArray(int amount){
  int nError, i, j;
  timeval clk1, clk2; 

  /* Create all the threads for vDilate */
  for(i = 0; i < 4; i++){
    if( nError = (pthread_create(&threads[i], &attr, vThread, (void *)i) != 0) ){
      cout << "Pthread_Example1: nIncreaseArray() - pthread_create() error - " << strerror( nError ) << endl;
      return(-1);
    }
  }
  /* give threads a moment to be created */
  pthread_mutex_lock(&count_cond_mutex);
    if(nCount < 4){ //don't run pthread_cond_wait if nCount already reached 4 because pthread_cond_wait will never receive the signal
      pthread_cond_wait(&count_condition, &count_cond_mutex);
      nCount = 0;
    }
  pthread_mutex_unlock(&count_cond_mutex);
  usleep(1000); //FOO - do work here or last running thread in vThread gets hung up on pthread_mutex_unlock(&count_cond_mutex);

  for(j = 0; j < amount; j++){

    gettimeofday(&clk1, NULL);

    printf("Threads Starting\n");

    /* Signal threads to start */
    for (i = 0; i < 4; i++){
      pthread_mutex_lock(&mutex[i]);
        pthread_cond_signal(&condition[i]);
      pthread_mutex_unlock(&mutex[i]);
    }
    /* Wait for all threads to complete (the last thread will make nCount 4 and call pthread_cond_signal()) */
    pthread_mutex_lock(&count_cond_mutex);
      if(nCount < 4){ //don't run pthread_cond_wait if nCount already reached 4 because pthread_cond_wait will never receive the signal
        pthread_cond_wait(&count_condition, &count_cond_mutex);
        nCount = 0;
      }
    pthread_mutex_unlock(&count_cond_mutex);
    usleep(1000); //FOO - do work here or last running thread in vThread gets hung up on pthread_mutex_unlock(&count_cond_mutex);

    printf("Threads Finished\n");
    gettimeofday(&clk2, NULL);
    printf("Threads Execution Time: %.6f seconds\n\n", (double)(clk2.tv_sec - clk1.tv_sec) + 
           ( ( (double)(clk2.tv_usec - clk1.tv_usec) ) / 1000000 ) );

  }

  /* Terminate all the threads, wait for them to finish */
  memset(bExit, true, 4);
  for(i = 0; i < 4; i++){
    pthread_mutex_lock(&mutex[i]);
    pthread_cond_signal(&condition[i]);
    pthread_mutex_unlock(&mutex[i]);
  }
  for(i = 0; i < 4; i++){
    if(nError = (pthread_join(threads[i], NULL) != 0) ){
      cout << "Pthread_Example1: nIncreaseArray() -  pthread_join() error - " << strerror(nError) << endl;
      return(-1);
    }
  }
}


int nIncreaseArray_s(int amount){
  int x, y, j;
  timeval clk1, clk2;

  printf("Serial Started\n");
  for(j = 0; j < amount; j++){

    gettimeofday(&clk1, NULL);

    /* Signal vDilate threads to start */
    printf("iteration: %d\n", j);
    for(y = 0; y < ARRAY_HEIGHT; y++){
      for(x = 0; x < ARRAY_WIDTH; x++){
        g_anArray[y][x]++;
      }
    }

    //usleep(50000); //this is just so you can see they all have to finish first

    gettimeofday(&clk2, NULL);
    printf("Serial Execution Time: %.6f seconds\n\n", (double)(clk2.tv_sec - clk1.tv_sec) + 
           ( ( (double)(clk2.tv_usec - clk1.tv_usec) ) / 1000000 ) );

  }
  printf("Serial Finsished\n");
}


void vPrintArray(){
  for(int r = 0; r < ARRAY_HEIGHT; r++){
    for(int c = 0; c < ARRAY_WIDTH; c++){
      cout << g_anArray[r][c] << " ";
    }
    cout << endl;
  }
}


int main(int argc, char *argv[]){
  if(nInit() == -1){
    printf("Pthread_Example1: main() - nInit() error\n");
    return(-1);
  }

  //memset(g_anArray, 0, sizeof(g_anArray));
  //nIncreaseArray_s(10);
  memset(g_anArray, 0, sizeof(g_anArray));
  nIncreaseArray(20);
  vPrintArray();

  if(nUninit() == -1){
    printf("Pthread_Example1: main() - nUninit() error\n");
    return(-1);
  }

  pthread_exit(NULL);
}

Edited 3 Years Ago by mike_2000_17: Fixed formatting

Attachments
//pthread_example1.cpp.cpp


#include <iostream>
#include <stdio.h> //used for printf
#include <math.h>
#include <errno.h>
#include <sys/time.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>

using namespace std;

#define ARRAY_HEIGHT 50 //each cell represents a ditsance of 5cm
#define ARRAY_WIDTH 50 //each cell represents a ditsance of 5cm

int g_anArray[ARRAY_HEIGHT][ARRAY_WIDTH];

pthread_mutex_t mutex[4], count_cond_mutex, count_mutex;
pthread_cond_t condition[4], count_condition;
pthread_t threads[4];
pthread_attr_t attr;
int nCount = 0;
bool bExit[4];


int nInit(){
  int i, nError;

  /* Initialize mutex and condition variable objects */
  for(i = 0; i < 4; i++){
    pthread_mutex_init(&mutex[i], NULL);
    pthread_cond_init (&condition[i], NULL);
  }
  pthread_mutex_init(&count_cond_mutex, NULL);
  pthread_mutex_init(&count_mutex, NULL);
  pthread_cond_init(&count_condition, NULL);

  /* For portability, explicitly create threads in a joinable state */
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

  memset(bExit, false, 4);

  return(0);
}

int nUninit(){
  /* remove thread dependencies */
  pthread_attr_destroy(&attr);
  pthread_mutex_destroy(&count_cond_mutex);
  pthread_mutex_destroy(&count_mutex);
  pthread_cond_destroy(&count_condition);
  for(int i = 0; i < 4; i++){
    pthread_mutex_destroy(&mutex[i]);
    pthread_cond_destroy(&condition[i]);
  }

  pthread_exit(NULL);
  //return(0);
}


void *vThread(void *pvQuadrant){
  int r_t, r_b, c_l, c_r, x, y, q;

  q = (int)pvQuadrant;
  /* Set index limits for quadrant */
  if(q == 0){
    r_t = 0; r_b = (ARRAY_HEIGHT / 2) - 1; c_l = ARRAY_WIDTH / 2; c_r = ARRAY_WIDTH - 1;
  }
  if(q == 1){
    r_t = 0; r_b = (ARRAY_HEIGHT / 2) - 1; c_l = 0; c_r = (ARRAY_WIDTH / 2) - 1;
  }
  if(q == 2){
    r_t = ARRAY_HEIGHT / 2; r_b = ARRAY_HEIGHT - 1; c_l = 0; c_r = (ARRAY_WIDTH / 2) - 1;
  }
  if(q == 3){
    r_t = ARRAY_HEIGHT / 2; r_b = ARRAY_HEIGHT - 1; c_l = ARRAY_WIDTH / 2; c_r = ARRAY_WIDTH - 1;
  }

  //let calling process know that all the threads have been created
  pthread_mutex_lock(&count_cond_mutex);
    nCount++;
    if (nCount == 4)
      pthread_cond_signal(&count_condition);
  pthread_mutex_unlock(&count_cond_mutex);

  while(true){
    pthread_mutex_lock(&mutex[q]);
    pthread_cond_wait(&condition[q], &mutex[q]);
    pthread_mutex_unlock(&mutex[q]);

    if(bExit[q]){
      printf("Thread %d exited\n", q);
      pthread_exit(NULL);
    }

    //PUT CODE FOR MANIPULATING QUADRANTS OF 2-D ARRAY
    for(y = r_t; y <= r_b; y++){
      for(x = c_l; x <= c_r; x++){
        g_anArray[y][x]++;
      }
    }
    //usleep(500000); //this is just so you can see they all have to finish first

    /* Check if thread is the last one finsihed, if so, signal nIncreaseArray() that vDilate is finsihed */
    pthread_mutex_lock(&count_cond_mutex);
      nCount++;
      if (nCount == 4)
        pthread_cond_signal(&count_condition);
    pthread_mutex_unlock(&count_cond_mutex);

  }
}


int nIncreaseArray(int amount){
  int nError, i, j;
  timeval clk1, clk2; 

  /* Create all the threads for vDilate */
  for(i = 0; i < 4; i++){
    if( nError = (pthread_create(&threads[i], &attr, vThread, (void *)i) != 0) ){
      cout << "Pthread_Example1: nIncreaseArray() - pthread_create() error - " << strerror( nError ) << endl;
      return(-1);
    }
  }
  /* give threads a moment to be created */
  pthread_mutex_lock(&count_cond_mutex);
    if(nCount < 4){ //don't run pthread_cond_wait if nCount already reached 4 because pthread_cond_wait will never receive the signal
      pthread_cond_wait(&count_condition, &count_cond_mutex);
      nCount = 0;
    }
  pthread_mutex_unlock(&count_cond_mutex);
  usleep(1000); //FOO - do work here or last running thread in vThread gets hung up on pthread_mutex_unlock(&count_cond_mutex);

  for(j = 0; j < amount; j++){

    gettimeofday(&clk1, NULL);

    printf("Threads Starting\n");

    /* Signal threads to start */
    for (i = 0; i < 4; i++){
      pthread_mutex_lock(&mutex[i]);
        pthread_cond_signal(&condition[i]);
      pthread_mutex_unlock(&mutex[i]);
    }
    /* Wait for all threads to complete (the last thread will make nCount 4 and call pthread_cond_signal()) */
    pthread_mutex_lock(&count_cond_mutex);
      if(nCount < 4){ //don't run pthread_cond_wait if nCount already reached 4 because pthread_cond_wait will never receive the signal
        pthread_cond_wait(&count_condition, &count_cond_mutex);
        nCount = 0;
      }
    pthread_mutex_unlock(&count_cond_mutex);
    usleep(1000); //FOO - do work here or last running thread in vThread gets hung up on pthread_mutex_unlock(&count_cond_mutex);

    printf("Threads Finished\n");
    gettimeofday(&clk2, NULL);
    printf("Threads Execution Time: %.6f seconds\n\n", (double)(clk2.tv_sec - clk1.tv_sec) + 
           ( ( (double)(clk2.tv_usec - clk1.tv_usec) ) / 1000000 ) );

  }

  /* Terminate all the threads, wait for them to finish */
  memset(bExit, true, 4);
  for(i = 0; i < 4; i++){
    pthread_mutex_lock(&mutex[i]);
    pthread_cond_signal(&condition[i]);
    pthread_mutex_unlock(&mutex[i]);
  }
  for(i = 0; i < 4; i++){
    if(nError = (pthread_join(threads[i], NULL) != 0) ){
      cout << "Pthread_Example1: nIncreaseArray() -  pthread_join() error - " << strerror(nError) << endl;
      return(-1);
    }
  }
}


int nIncreaseArray_s(int amount){
  int x, y, j;
  timeval clk1, clk2;

  printf("Serial Started\n");
  for(j = 0; j < amount; j++){

    gettimeofday(&clk1, NULL);

    /* Signal vDilate threads to start */
    printf("iteration: %d\n", j);
    for(y = 0; y < ARRAY_HEIGHT; y++){
      for(x = 0; x < ARRAY_WIDTH; x++){
        g_anArray[y][x]++;
      }
    }

    //usleep(50000); //this is just so you can see they all have to finish first

    gettimeofday(&clk2, NULL);
    printf("Serial Execution Time: %.6f seconds\n\n", (double)(clk2.tv_sec - clk1.tv_sec) + 
           ( ( (double)(clk2.tv_usec - clk1.tv_usec) ) / 1000000 ) );

  }
  printf("Serial Finsished\n");
}


void vPrintArray(){
  for(int r = 0; r < ARRAY_HEIGHT; r++){
    for(int c = 0; c < ARRAY_WIDTH; c++){
      cout << g_anArray[r][c] << " ";
    }
    cout << endl;
  }
}


int main(int argc, char *argv[]){
  if(nInit() == -1){
    printf("Pthread_Example1: main() - nInit() error\n");
    return(-1);
  }

  //memset(g_anArray, 0, sizeof(g_anArray));
  //nIncreaseArray_s(10);
  memset(g_anArray, 0, sizeof(g_anArray));
  nIncreaseArray(20);
  vPrintArray();

  if(nUninit() == -1){
    printf("Pthread_Example1: main() - nUninit() error\n");
    return(-1);
  }

  pthread_exit(NULL);
}

***PLEASE USE CODE TAGS IN THE FUTURE***

Your problem is classic. You should know that the pthread_cond_wait function is subject to spurious wake-ups. This means that sometimes (fairly often actually), the pthread_cond_wait function will return even when the condition was not signaled. You need to use a predicate. This means, each condition should be accompanied with a status variable (like a bool or int) that is set to a particular value anytime the condition is signaled. Then, you wrap the pthread_cond_wait inside a while loop that checks the predicate every time pthread_cond_wait returns, to make sure that you only continue after the loop if the predicate is met, meaning that the condition was actually met and that it was not a spurious wake-up.

As for the hung-ups you are getting, I'm not entirely sure that is completely attributable to spurious wake-ups (but certainly to some extent). You might have a dead-lock too. Essentially, spurious wake-ups might cause a desynchronization of the threads. What happens is that one of your working thread is waiting for the "start" signal, while the main thread is waiting for "all-finished" signal, which will never come. How this comes about is so:
- A working thread has a spurious wake-up while the others are waiting (start not signaled yet)
- The working thread does its work and increments nCount (which becomes 5)
- The main thread signals start and sets nCount to zero
- When all working threads are back at the start, nCount is only at 3 and no "all-finished" condition was ever signaled.
--> You have a dead-lock.

The reason why a sleep function here and there can help is because it reduces the chance of desynchronization by sort-of "taking-it-slow", but often that is not desirable... you want stuff to run fast! I'm not even totally sure that a predicate will eliminate all problems too.. thread synchronization is a tricky business. Try to design your software to avoid requiring this synchronous signal cycle:

thread1: wait_for_cond1 -> execute_action -> signal_cond2
thread2: signal_cond1 -> execute_action -> wait_for_cond2

It is clear that in the above, if your threads are desynchronized, you have a deadlock. You can try and make sure there is no desynchronization possible, but that is harder than designing your implementation to avoid a cycle of synchronous threads.. try making at least one of the two signals asynchronous.

BTW: Make it easier on yourself and use Boost.Thread instead of pthread, it is cross-platform and very nice to use (OOP, not C-style like pthread).

Edited 6 Years Ago by mike_2000_17: n/a

Yeah sorry about not using the code blocks tags, my first post, didn't know. I am still in wonder why this is not working. I spent hours trying to figure out why the last thread is getting stopped while trying to unlock the condition mutex. As for not using the predicates for starting and stopping vThread(), you got me there, should be using a predicate as I am with nCount. As for the spurious wake-ups, I had tried both if(nCount < 4) and while(nCount < 4) and both times I was still getting hung up on that last thread try to leave the condition mutex after is signals. So it isn't a spurious wake-up problem. I took a look at the boost stuff, looks like it is very easy and intuitive, but something about me has to figure this one out. Many thanks for the reply.


threads...

This article has been dead for over six months. Start a new discussion instead.