0

Hi! I'm trying to emplement the job queue. Somehow i can't initialize it: the queue is empty

Inline Code Example Here //INCLUDES

include <stdio.h>

include <stdlib.h>

include <semaphore.h>

include <pthread.h>

//GLOBALS
pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
int i;

define N_threads 10

int val=0;
//STRUCTURES
typedef struct job
{
int data;

struct job *next;
}job_t;

// A linked list of jobs.
job_t *job_queue=NULL;

// A semaphore counting the number of jobs in the queue.
sem_t job_queue_count;

//FUNCTION'S DECLARATION
int init_job_queue(job_t **head,job_t **tail);//initialize the queue
int add_node_to_head(job_t head);//add job to queue
void remove_node_from_tail(job_t **head);//remove job from queue in FIFO order
int process_job(job_t *the_job);//thread function itself
void
thread_func(void* arg);
int main()
{
pthread_t thr_id[N_threads];//array of thread id's
int i=0;
int status=-1;
job_t (head)=NULL;
job_t (
tail)=NULL;
sem_init(&job_queue_count,0,1);//initializing semaphore, 1 token is available
status= init_job_queue(&head,&tail);
if(status==-1)
{
printf("Error in memory allocation\n");
return -1;
}

for(i=0;i<N_threads;i++)
{
pthread_create(&thr_id[i],NULL,thread_func,NULL);
}
for(i=0;i<N_threads;i++)
{
pthread_join(thr_id[i],NULL);
}
printf("MAIN:all threads have been terminated\n");
return 0;
}

void* thread_func(void* arg)
{
job_t* current_job = NULL;
printf("THREAD FUNC: thread_id = 0x%x job_queue=0x%d \n", pthread_self(),job_queue);
for(;;)
{
sem_wait(&job_queue_count);
pthread_mutex_lock (&job_queue_mutex);
//safe
if (job_queue == NULL)
{

  current_job = NULL;
printf("There is nothing to do. Sorry\n");
}
else
{
  //taking the job
  current_job = job_queue;
  job_queue = job_queue->next;//remooving the job from the job queue
}

pthread_mutex_unlock (&job_queue_mutex);
sem_post(&job_queue_count);

//in case that queue was empty:
if (current_job == NULL)
break;

//processing the job
process_job (current_job);
free (current_job);
}

return NULL;
}

//functions definition
int init_job_queue(job_t **head,job_t **tail)
{
int flag=-1;
int i;
i=0;
head=(job_t)malloc(sizeof(job_t));
(head)->next =NULL;
(
head)->data = 0;
for(i=0;i<N_threads;i++)
{
flag=add_node_to_head(&head);
if(flag==-1)
{
printf("Error in memory allocation.\n");
return -1;
}
}
return 0;
}

int add_node_to_head(job_t** head)
{
job_t* temp=NULL;
temp=(job_t*)malloc(sizeof(job_t));
if(temp==NULL)
{
printf("Error in memory allocation.\n");
return -1;
}

temp->next=head;
temp->data=val+1;
*head=temp;
sem_post (&amp;job_queue_count);
return 0;
}
void remove_node_from_tail(job_t** head)
{
job_t *p1=
head;
job_t p2=p1->next;
while(p2->next!=0)
{
p1=p1->next;
p2=p2->next;
}
p1->next=NULL;
free(p2);
return;
}
int process_job(job_t
the_job)
{
printf("JOB NUMBER :%d process_job() thread_id=0x%d the_job=0x%d data = %d\n",i,pthread_self(),the_job,the_job->data);

return 0;
}

Help,please.....

2
Contributors
1
Reply
4
Views
4 Years
Discussion Span
Last Post by histrungalot
0

Not really sure what you want this program to do. I had to change the sem_init to sem_open because I have a Mac (should change the functionality). I didn't check anything else like memory leaks, so see if this helps. Jobs are added to the queue and each thread processes one job then exits. If you leave the for(;;) loop in the threads, the first thread will most likely process all of the jobs on the queue and the other threads will do nothing. That is why I took it out, if that is what you want then put it back in. I also remove the sem_post in the add_node_to_head, I didn't think you wanted them or needed them. You problem was pointers to pointers and when to de-reference them in functions so you effect the value in the calling function.

#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>

//GLOBALS
pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER;
int i;

#define N_threads 10
int val=0;

//STRUCTURES
typedef struct job
{
  int data;
  struct job *next;
} job_t;

// A linked list of jobs.
job_t *job_queue=NULL;

// A semaphore counting the number of jobs in the queue.
sem_t *job_queue_count=NULL;

//FUNCTION'S DECLARATION
int init_job_queue(job_t **head,job_t **tail);//initialize the queue
int add_node_to_head(job_t **head);//add job to queue
void remove_node_from_tail(job_t **head);//remove job from queue in FIFO order
int process_job(job_t *the_job);//thread function itself
void *thread_func(void* arg);

int main()
{
  pthread_t thr_id[N_threads];//array of thread id's
  int i=0;
  int status=-1;
  job_t *head=NULL;
  job_t *tail=NULL;

  //sem_init(&job_queue_count,0,1);//initializing semaphore, 1 token is available
  job_queue_count=sem_open("/mySem",O_CREAT,S_IRUSR | S_IWUSR,1);
  if ( job_queue_count == SEM_FAILED ) {
    printf("%s\n",strerror(errno));
    return 0;
  }
  status= init_job_queue(&job_queue,&tail);

  if(status==-1) {
     printf("Error in memory allocation\n");
     return -1;
  }

  for(i=0;i<N_threads;i++) {
    pthread_create(&thr_id[i],NULL,(void *(*)(void *))thread_func,NULL);
  }
  for(i=0;i<N_threads;i++) {
     pthread_join(thr_id[i],NULL);
  }  
  sem_close(job_queue_count);
  sem_unlink("/mySem");
  printf("MAIN:all threads have been terminated\n");
  return 0;
}

void* thread_func(void* arg)
{
  job_t* current_job = NULL;
  printf("THREAD FUNC: thread_id = 0x%llx job_queue=%p \n", (unsigned long long)pthread_self(),job_queue);
  //for(;;)
  {
    if ( sem_wait(job_queue_count) == -1 ){
      printf("Error:  %s\n",strerror(errno));
    }
     pthread_mutex_lock (&job_queue_mutex);
     //safe
     if (job_queue == NULL)
     {
        current_job = NULL;
        printf("There is nothing to do. Sorry\n");
     }
     else
     {
        //taking the job
        current_job = job_queue;
        job_queue = job_queue->next;//remooving the job from the job queue
     }
     pthread_mutex_unlock (&job_queue_mutex);
     sem_post(job_queue_count);
     //in case that queue was empty:
     //if (current_job == NULL)
     //  break;

     //processing the job
     process_job (current_job);
     free (current_job);
  }

  return NULL;
}

//functions definition
int init_job_queue(job_t **head,job_t **tail)
{
  int flag=-1;
  int i;
  i=0;
  *head=(job_t *)malloc(sizeof(job_t));
  (*head)->next =NULL;
  (*head)->data = 0;
  for(i=0;i<N_threads;i++)
  {
     flag=add_node_to_head(head);
     if(flag==-1)
     {
        printf("Error in memory allocation.\n");
        return -1;
     }
  }
  return 0;
}

int add_node_to_head(job_t** head)
{
  job_t* temp=NULL;
  temp=(job_t*)malloc(sizeof(job_t));
  if(temp==NULL)
  {
    printf("Error in memory allocation.\n");
    return -1;
  }

  temp->next=*head;
  temp->data=val++;
  *head=temp;
  //sem_post (job_queue_count);
  return 0;
}
void remove_node_from_tail(job_t** head)
{
  job_t *p1=*head;
  job_t *p2=p1->next;
  while(p2->next!=0) {
    p1=p1->next;
    p2=p2->next;
  }
  p1->next=NULL;
  free(p2);
  return;
}

int process_job(job_t *the_job)
{
    printf("JOB NUMBER :%d process_job() thread_id=0x%llx "
           "the_job=%p data = %d\n",i++,
            (unsigned long long)pthread_self(), the_job,the_job->data);

  return 0;
}

Ouput

$ ./a.out
THREAD FUNC: thread_id = 0x1000b6000 job_queue=0x100100120 
JOB NUMBER :0 process_job() thread_id=0x1000b6000 the_job=0x100100120 data = 9
THREAD FUNC: thread_id = 0x100281000 job_queue=0x100100110 
THREAD FUNC: thread_id = 0x100304000 job_queue=0x100100110 
THREAD FUNC: thread_id = 0x100510000 job_queue=0x100100110 
THREAD FUNC: thread_id = 0x100387000 job_queue=0x100100110 
THREAD FUNC: thread_id = 0x100593000 job_queue=0x100100110 
THREAD FUNC: thread_id = 0x100616000 job_queue=0x100100110 
THREAD FUNC: thread_id = 0x10040a000 job_queue=0x100100110 
JOB NUMBER :1 process_job() thread_id=0x100281000 the_job=0x100100110 data = 8
THREAD FUNC: thread_id = 0x100699000 job_queue=0x100100110 
THREAD FUNC: thread_id = 0x10048d000 job_queue=0x100100110 
JOB NUMBER :2 process_job() thread_id=0x100304000 the_job=0x100100100 data = 7
JOB NUMBER :3 process_job() thread_id=0x100510000 the_job=0x1001000f0 data = 6
JOB NUMBER :4 process_job() thread_id=0x100387000 the_job=0x1001000e0 data = 5
JOB NUMBER :5 process_job() thread_id=0x100593000 the_job=0x1001000d0 data = 4
JOB NUMBER :6 process_job() thread_id=0x100616000 the_job=0x1001000c0 data = 3
JOB NUMBER :7 process_job() thread_id=0x10040a000 the_job=0x1001000b0 data = 2
JOB NUMBER :8 process_job() thread_id=0x100699000 the_job=0x1001000a0 data = 1
JOB NUMBER :9 process_job() thread_id=0x10048d000 the_job=0x100100090 data = 0
$
This topic has been dead for over six months. Start a new discussion instead.
Have something to contribute to this discussion? Please be thoughtful, detailed and courteous, and be sure to adhere to our posting rules.