The Producer Thread
What does a producer thread write to the buffer? Create 20 different files (named in0.txt, in1.txt, ... in19.txt). [Testing Hint: you want
to be able to tell from the text which file it is from, so the contents should be aaaaaa or bbbbbbb etc..] Vary the length of the file
contents, so that you can test that your program works with content smaller than the buffer size and content longer than the buffer
size. Implement a FIFO queue and add the file names to it, one per queue entry. Create 20 Producer threads. Up to 128
characters of each file is "produced" as follows: a producer thread gets the file name from the FIFO queue, reads the contents from
that file, converts all characters to uppercase, and writes the contents into any available buffer slot.
The Consumer Thread
Create 5 consumer threads. Each consumer thread should read the current contents of a full buffer slot, and print them to stdout. A
consumer thread continues until 2 conditions hold: All buffers are empty, and all 20 files have been produced.
Synchronization
Each different text buffer must be produced and consumed exactly once. In other words, the output from your program should show
the file contents up to a max of 128 characters.

-------------------------------------------------

OK so I'm basically done with this thing except I've run into a last minute problem. It seems like only one buffer is being used the entire time instead of using all 5 slots. Also it feels like only one consumer is being used, which is a problem. I posted this on another board, but only one guy was helping me out and I think he might be done for the night, so figured I'd try here because I've been on this for about 20+ hours the last few days and I'm stumped here.

Why is this happening? I'm sure my semaphores are set up properly. Anyway, my code is posting below, with the semaphore sections BOLDED. And the code is following by the output. The output to the left is from the producer, the output to the right is from the consumer.

#include<stdio.h>
#include<stdlib.h>
#include<ctype.h>
#include<unistd.h>
#include<pthread.h>
#include<string.h>
#include<semaphore.h>

#define FILESIZE 20
#define BUFFER_SIZE 5
#define PRODUCERS 20
#define CONSUMERS 5

struct fifo_struct
{
        char fileName[1024];
        struct fifo_struct* next;
};

struct linked_list
{
        struct fifo_struct* head;
        struct fifo_struct* tail;
};

char buffer[BUFFER_SIZE][128];
int counter;

pthread_mutex_t mutex;
sem_t full, empty;


void print_queue(const struct linked_list* ps)
{
        struct fifo_struct* p = NULL;

        if(ps)
        {
                for(p = ps->head; p; p = p->next)
                {
                        if(p)
                                printf("int = %s\n", p->fileName);
                        else
                                printf("can't print NULL STRUCT\n");
                }
        }
}
void *producer(void *q);
void *consumer(void *q);
struct linked_list *s;

int main()
{
        pthread_t producerVar[PRODUCERS];
        pthread_t consumerVar[CONSUMERS];
        char str[200];
        int i = 0;
        counter = 0;
        sem_init(&full, 0, 0);
        sem_init(&empty, 0, BUFFER_SIZE);
        pthread_mutex_init(&mutex,NULL);
        struct fifo_struct* fifo;

        // Initialize the 5 buffer slots
        for(i = 0; i < BUFFER_SIZE; i++)
        {
                buffer[i][0] = '\n';
        }

        // Create linked list
        s = malloc( 1 * sizeof(*s));
        if(s == NULL)
        {
                fprintf(stderr, "LINE: %d, malloc() failed\n", __LINE__);
        }
        s->head = s->tail = NULL;

        for(i = 0; i < (FILESIZE); i++)
        {
                // Generates file names to store into queue
                sprintf(str, "in%d.txt", i);

                // Create queue to store file names
                fifo = malloc(1 * sizeof(*fifo));

                // Error in creating fifo
                if(fifo == NULL)
                {
                        fprintf(stderr, "IN %s, %s: malloc() failed\n", __FILE__, "list_add");
                }

                // Store filename into queue
                strcpy(fifo->fileName,str);
                fifo->next = NULL;

                if(s == NULL)
                {
                        printf("Error: Queue has not been initialized\n");
                }
                else if(s->head == NULL && s->tail == NULL)
                {
                        // First element in queue
                        s->head = s->tail = fifo;
                }
                else if(s->head == NULL || s->tail == NULL)
                {
                        printf("Error: Problem with code\n");
                        free(fifo);
                }
                else
                {
                        // Increments queue
                        s->tail->next = fifo;
                        s->tail = fifo;
                }
        }
        //print_queue(s);

        // Create producer threads
        for(i = 0; i < PRODUCERS; i++)
        {
                pthread_create(&producerVar[i], NULL, producer, &i);
                //pthread_join(producerVar[i], NULL);
        }
        // Create consumer threads
        for(i = 0; i < CONSUMERS; i++)
        {
                pthread_create(&consumerVar[i], NULL, consumer, s);
        }
        for(i = 0; i < PRODUCERS; i++)
        {
                pthread_join(producerVar[i], NULL);
        }

        return 0;
}

void *producer(void *idx)
{
        int myidx = * (int *) idx;
        int i = 0;
        char fileContent[1024];
        char line[1024];
        FILE * myfile;
        struct linked_list *q;
        //print_queue(q);
        struct fifo_struct* tmp1 = NULL;
        struct fifo_struct* tmp2 = NULL;
        pthread_mutex_lock(&mutex);
        printf("IN PRODUCER\n");
        q = s;
        if(q == NULL)
        {
                printf("List is empty\n");
                return(NULL);
        }
        else if(q->head == NULL && q->tail == NULL)
        {
                printf("List is empty\n");
                return(NULL);
        }
        else if(q->head == NULL || q->tail == NULL)
        {
                printf("Error: Problem with code\n");
                return(NULL);
        }

        printf("Producer: %d\n", myidx);
        //print_queue(q);
        myfile = fopen(q->head->fileName,"r");
        tmp1 = q->head;
        tmp2 = tmp1->next;
        free(tmp1);
        q->head = tmp2;

        if(q->head == NULL)
                      q->tail = q->head;
        //print_queue(q);
        printf("After printq\n");
        fflush(stdout);
        printf("\n");

        if((fgets(line, 1024, myfile)) != NULL)
        {
                strcpy(fileContent, line);
                printf("%s",fileContent);
        }
        strcpy(fileContent, line);
        printf("Myfile: %s",fileContent);
        fclose(myfile);

        while(fileContent[i] != '\n')
        {
                fileContent[i] = toupper(fileContent[i]);
                i++;
        }

        pthread_mutex_unlock(&mutex);
       [B] sem_wait(&empty);
        if(counter < BUFFER_SIZE) {
                strncpy(buffer[counter],fileContent,128);
                printf("buffer[%d] = %s\n", counter, buffer[counter]);
                counter++;
        }

        sem_post(&full);[/B]
        return(NULL);
}

void *consumer(void *q)
{
        int myidx = * (int *) q;

        printf("\t\t\t\tCONSUMER: %d\n", myidx);

        while(1)
        {
       [B]         sem_wait(&full);
                if(counter > 0) {
                printf("\t\t\tbuffer[%d] = %s\n", counter - 1, buffer[(counter - 1)]);
                counter--;
                }
                sem_post(&empty);[/B]
        }
        return(NULL);
}
IN PRODUCER
Producer: 1
After printq

aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
Myfile: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
IN PRODUCER
buffer[0] = AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA

Producer: 15
                        CONSUMER: 141192
                        buffer[0] = AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA

                        CONSUMER: 141192
                        CONSUMER: 141192
                        CONSUMER: 141192
                        CONSUMER: 141192
After printq

bbbbbbbb
Myfile: bbbbbbbb
buffer[0] = BBBBBBBB

IN PRODUCER
                        buffer[0] = BBBBBBBB

Producer: 0
After printq

ccccccc
Myfile: ccccccc
buffer[0] = CCCCCCC

IN PRODUCER
                        buffer[0] = CCCCCCC

Producer: 19
After printq

ddddddd
Myfile: ddddddd
buffer[0] = DDDDDDD

IN PRODUCER
                        buffer[0] = DDDDDDD

Producer: 18
After printq

eeeeeee
Myfile: eeeeeee
buffer[0] = EEEEEEE

IN PRODUCER
                        buffer[0] = EEEEEEE

Producer: 16
After printq

ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
Myfile: ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
buffer[0] = FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF

IN PRODUCER
                        buffer[0] = FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF

Producer: 14
After printq

ggggggg
Myfile: ggggggg
buffer[0] = GGGGGGG

IN PRODUCER
                        buffer[0] = GGGGGGG

Producer: 13
After printq

hhhhhhh
Myfile: hhhhhhh
buffer[0] = HHHHHHH

IN PRODUCER
                        buffer[0] = HHHHHHH

Producer: 12
After printq

iiiiiii
Myfile: iiiiiii
buffer[0] = IIIIIII

IN PRODUCER
                        buffer[0] = IIIIIII

Producer: 11
After printq

jjjjjjj
Myfile: jjjjjjj
buffer[0] = JJJJJJJ

IN PRODUCER
                        buffer[0] = JJJJJJJ

Producer: 10
After printq

kkkkkk
Myfile: kkkkkk
buffer[0] = KKKKKK

IN PRODUCER
                        buffer[0] = KKKKKK

Producer: 9
After printq

llllllll
Myfile: llllllll
buffer[0] = LLLLLLLL

IN PRODUCER
                        buffer[0] = LLLLLLLL

Producer: 8
After printq

mmmmmmm
Myfile: mmmmmmm
buffer[0] = MMMMMMM

IN PRODUCER
                        buffer[0] = MMMMMMM

Producer: 7
After printq

nnnnnnn
Myfile: nnnnnnn
buffer[0] = NNNNNNN

IN PRODUCER
                        buffer[0] = NNNNNNN

Producer: 6
After printq

ooooooo
Myfile: ooooooo
buffer[0] = OOOOOOO

IN PRODUCER
                        buffer[0] = OOOOOOO

Producer: 5
After printq

ppppppp
Myfile: ppppppp
buffer[0] = PPPPPPP

IN PRODUCER
                        buffer[0] = PPPPPPP

Producer: 4
After printq

qqqqqq
Myfile: qqqqqq
buffer[0] = QQQQQQ

IN PRODUCER
                        buffer[0] = QQQQQQ

Producer: 3
After printq

rrrrrrr
Myfile: rrrrrrr
buffer[0] = RRRRRRR

IN PRODUCER
                        buffer[0] = RRRRRRR

Producer: 2
After printq

sssssssss
Myfile: sssssssss
buffer[0] = SSSSSSSSS

IN PRODUCER
                        buffer[0] = SSSSSSSSS

Producer: 17
After printq

tttttt
Myfile: tttttt
buffer[0] = TTTTTT

                        buffer[0] = TTTTTT

Recommended Answers

All 7 Replies

Also ignore some of the output to the left, some of it is just for debugging. The main thing is that only buffer[0] is ever being used. buffer1, 2, 3, and 4 never even get touched!

also notice at the top right of the output, the consumer threads start right away. I'm not sure exactly how that should output, but I don't believe that's correct...

When you create the consumer thread, check the function call. The last parameter is s . It should be &i

There are several problems with this code.
1. The producer locks the queue for much too long. The mutex can be safely released right after the queue entry is unlinked. In fact you make your producers to run sequentially, and since the file IO is a very time consuming operation, the consumers have more than enough time to do their job while another producer opens and reads its file. That's why you only get one slot used.
2. Semaphores are not enough. You need a mutex to protect the counter.

There are several problems with this code.
1. The producer locks the queue for much too long. The mutex can be safely released right after the queue entry is unlinked. In fact you make your producers to run sequentially, and since the file IO is a very time consuming operation, the consumers have more than enough time to do their job while another producer opens and reads its file. That's why you only get one slot used.
2. Semaphores are not enough. You need a mutex to protect the counter.

Your are the best, So I added mutex's in both my consumer AND producer basically in between the semaphores and before I do my action:
PRODUCER:

sem_wait(&empty);
        pthread_mutex_lock(&mutex);
        if(counter < BUFFER_SIZE) {
                strncpy(buffer[counter],fileContent,128);
                printf("buffer[%d] = %s\n", counter, buffer[counter]);
                counter++;
        }
        pthread_mutex_unlock(&mutex); 
        sem_post(&full);

CONSUMER:

sem_wait(&full);
                pthread_mutex_lock(&mutex);
                if(counter > 0) {
                printf("\t\t\tbuffer[%d] = %s\n", counter - 1, buffer[(counter - 1)]);
                counter--;
                }
                pthread_mutex_unlock(&mutex);
                sem_post(&empty);

And I changed the other mutex to unlock right after this statement:
if(q->head == NULL)
q->tail = q->head;

Now I actually get output from different buffers! Gracias amigo. However, buffer 3 and 4 never get used. Perhaps I'm still doing something wrong? Here's my output now..

IN PRODUCER
Producer: 1
IN PRODUCER
Producer: 7
Myfile: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
IN PRODUCER
Producer: 12
Myfile: bbbbbbbb
IN PRODUCER
Producer: 18
Myfile: ccccccc
                        CONSUMER: 143320
                        CONSUMER: 143320
                        CONSUMER: 143320
                        CONSUMER: 143320
                        CONSUMER: 143320
buffer[0] = BBBBBBBB

Myfile: ddddddd
IN PRODUCER
Producer: 0
buffer[1] = DDDDDDD

Myfile: eeeeeee
buffer[2] = CCCCCCC

                        buffer[2] = CCCCCCC

                        buffer[1] = DDDDDDD

                        buffer[0] = BBBBBBBB

IN PRODUCER
Producer: 19
buffer[0] = EEEEEEE

Myfile: ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff
IN PRODUCER
Producer: 17
buffer[1] = FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFCCCCCCC

Myfile: ggggggg
                        buffer[1] = FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFCCCCCCC

                        buffer[0] = EEEEEEE

buffer[0] = AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA

IN PRODUCER
Producer: 15
buffer[1] = GGGGGGG

Myfile: hhhhhhh
                        buffer[1] = GGGGGGG

                        buffer[0] = AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA

IN PRODUCER
Producer: 14
buffer[0] = HHHHHHH

Myfile: iiiiiii
IN PRODUCER
Producer: 13
buffer[1] = IIIIIII

Myfile: jjjjjjj
IN PRODUCER
Producer: 11
buffer[2] = JJJJJJJ

Myfile: kkkkkk
                        buffer[2] = JJJJJJJ

                        buffer[1] = IIIIIII

IN PRODUCER
Producer: 10
buffer[1] = KKKKKK

Myfile: llllllll
IN PRODUCER
Producer: 9
buffer[2] = LLLLLLLL

Myfile: mmmmmmm
                        buffer[2] = LLLLLLLL

                        buffer[1] = KKKKKK

IN PRODUCER
Producer: 8
buffer[1] = MMMMMMM

Myfile: nnnnnnn
IN PRODUCER
Producer: 6
buffer[2] = NNNNNNN

Myfile: ooooooo
                        buffer[2] = NNNNNNN

                        buffer[1] = MMMMMMM

IN PRODUCER
Producer: 5
buffer[1] = OOOOOOO

Myfile: ppppppp
IN PRODUCER
Producer: 4
buffer[2] = PPPPPPP

Myfile: qqqqqq
                        buffer[2] = PPPPPPP

                        buffer[1] = OOOOOOO

IN PRODUCER
Producer: 3
IN PRODUCER
Myfile: rrrrrrr
Producer: 2
buffer[1] = RRRRRRR

Myfile: sssssssss
IN PRODUCER
Producer: 16
buffer[2] = SSSSSSSSS

Myfile: tttttt
                        buffer[2] = SSSSSSSSS

                        buffer[1] = RRRRRRR

                        buffer[0] = HHHHHHH

buffer[0] = QQQQQQ

                        buffer[0] = QQQQQQ

buffer[0] = TTTTTT

                        buffer[0] = TTTTTT

Can someone help me with this problem?

buffer[1] = FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFCCCCCCC

My buffer should only print 128 characters, but I'm getting more then that, a different buffer is writing into this one. SO instead of getting all F's, I'm getting F's with another letter... Anyone?

thanks

Actually I found a way to get around it! Good deal, thanks for your help nezachem.

#include <stdlib.h>
#include <pthread.h>
#include <sys/time.h>
#include <iostream>
#include <stdio.h>
#include <list>

using namespace std;

int producerThreadCount, 
	consumerThreadCount,
	totalProductCount,  
	sizeOfQueue, 	
	schedulingAlgorithm, 
	quantumValue, 		
	seed;				 

struct thread_data{
   int  thread_number;
   char *thread_type;
};
struct thread_data *producer, *consumer;


struct product{
   int  id;
   int life;
   struct timeval timestamp;
};


list<struct product> productQueue;

int	insertProductCount = 0, 
	consumedProductCount = 0; 

struct timeval tvStart_TotalProcessingTime, tvEnd_TotalProcessingTime;
struct timeval tvStart_TurnAroundTime, tvEnd_TurnAroundTime;
struct timeval tvStart_WaitTime, tvEnd_WaitTime;
list<double> lstTurnAroundTime;
list<double> lstWaitTime;
double totalProductsInsertionTime, totalProductsConsumptionTime;
struct timeval previousProductTimestamp, previousProductConsumedTimestamp;


pthread_mutex_t mutex	=	PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t notFull	=	PTHREAD_COND_INITIALIZER;
pthread_cond_t notEmpty	=	PTHREAD_COND_INITIALIZER;

void CreateThreads(); 
void *InsertProduct(void *param);
void *ConsumeProduct(void *param); 
struct product AddProductToQueue(); 
int RemoveProductFromQueue_FCFS(); 
int RemoveProductFromQueue_RoundRobin(); 
int fn(int number); 
double time_difference(struct timeval tvStart,struct timeval tvEnd); 

int main()
{
	
	cout << "Please enter the following parameters\n";
	

	cout << "Number of Producer Threads: ";
	cin >> producerThreadCount;

	cout << "Number of Consumer Threads: ";
	cin >> consumerThreadCount;

	cout << "Total number of products to be generated by all producer threads: ";
	cin >> totalProductCount;

	cout << "Size of the queue to store products for both producer and consumer threads: ";
	cin >> sizeOfQueue;

	
	if (sizeOfQueue == 0)
		sizeOfQueue = totalProductCount;

	cout << "Enter 0 or 1 for type of scheduling algorithm";
	cout << "\n0 for First-Come-First-Serve, and 1 for Round-Robin: ";
	cin >> schedulingAlgorithm;
	if (schedulingAlgorithm == 1)
	{
		cout << "Value of quantum used for round-robin scheduling: ";
		cin >> quantumValue;
	}

	cout << "Seed for random number generator: ";
	cin >> seed;

	

	CreateThreads(); 
	
	cout << "Performance metrics\n";
	


	}
}

void CreateThreads()
{
	pthread_t	producerThreads[producerThreadCount], 
				consumerThreads[consumerThreadCount]; 

	int count = 0; //counter for number of threads

	producer = new struct thread_data[producerThreadCount];
	consumer = new struct thread_data[consumerThreadCount];

	
	srand (seed);

	
	gettimeofday(&tvStart_TotalProcessingTime,NULL);

	
	for (count = 0; count < producerThreadCount; count++)
	{
		producer[count].thread_number = count+1;
		producer[count].thread_type = "Producer";
		pthread_create(&producerThreads[count],NULL,&InsertProduct,(void *)&producer[count]);
	}

	
	for (count = 0; count < consumerThreadCount; count++)
	{
		consumer[count].thread_number = count+1;
		consumer[count].thread_type = "Consumer";
		pthread_create(&consumerThreads[count],NULL,&ConsumeProduct,(void *)&consumer[count]);
	}

	
	for (count = 0; count < producerThreadCount; count++)
	{
		pthread_join(producerThreads[count],NULL);
	}

	for (count = 0; count < consumerThreadCount; count++)
	{
		pthread_join(consumerThreads[count],NULL);
	}

	
	gettimeofday(&tvEnd_TotalProcessingTime,NULL);

	
	pthread_cond_destroy(&notFull);
	pthread_cond_destroy(&notEmpty);
	pthread_mutex_destroy(&mutex);

}

void *InsertProduct(void *param)
{
	while(true)
	{
		pthread_mutex_lock( &mutex ); // aquire lock

		
		while(productQueue.size() == sizeOfQueue && insertProductCount != totalProductCount)
			pthread_cond_wait( &notFull, &mutex );

		if (insertProductCount == totalProductCount)
		{
			pthread_cond_broadcast(&notFull);
			pthread_mutex_unlock(&mutex);
			pthread_exit(0);
		}

		struct product newProduct = AddProductToQueue(); 

		
		struct thread_data *current_thread;
		current_thread = (struct thread_data *)param;
		cout << "Producer " << current_thread->thread_number  << " has produced product " << newProduct.id <<
		" with life " << newProduct.life << "." << endl;

		insertProductCount = insertProductCount + 1; 

		pthread_cond_signal (&notEmpty); // send signal to consumer that queue is not empty

		pthread_mutex_unlock(&mutex); 

		usleep(100000); // sleep for 100ms
	}

}

struct product AddProductToQueue()
{
	struct product createProduct; // new product to be inserted in the queue

	createProduct.id = insertProductCount + 1;

	createProduct.life = rand()%1024; 

	p
	struct timeval productTimestamp;
	gettimeofday(&productTimestamp, NULL);
	createProduct.timestamp = productTimestamp;

	productQueue.push_back(createProduct); 
	if (insertProductCount != 0)
		totalProductsInsertionTime = totalProductsInsertionTime + time_difference(previousProductTimestamp, productTimestamp);
	previousProductTimestamp = 	productTimestamp;

	return createProduct; 

void *ConsumeProduct(void *param)
{
	while(true)
	{
		pthread_mutex_lock( &mutex ); // acquire lock

		
		while(productQueue.size() == 0 && consumedProductCount != totalProductCount)
			pthread_cond_wait( &notEmpty, &mutex );

	
		if( consumedProductCount == totalProductCount)
		{
			pthread_cond_broadcast(&notEmpty);
			pthread_mutex_unlock(&mutex);
			pthread_exit(0);
		}

		int productID;

		if (schedulingAlgorithm == 0)
			productID = RemoveProductFromQueue_FCFS();
		else
			productID = RemoveProductFromQueue_RoundRobin();

		if (productID != 0)
		{
			
			struct thread_data *current_thread;
			current_thread = (struct thread_data *)param;
			cout << "Consumer " << current_thread->thread_number << " has consumed product " << productID << "." << endl;

			
			gettimeofday(&tvEnd_TurnAroundTime, NULL);

			
			double turnAroundTime = time_difference(tvStart_TurnAroundTime, tvEnd_TurnAroundTime);
			lstTurnAroundTime.push_back(turnAroundTime);

			
			double WaitTime = time_difference(tvStart_WaitTime, tvEnd_WaitTime);
			lstWaitTime.push_back(WaitTime);

					if (consumedProductCount != 0)
				totalProductsConsumptionTime = totalProductsConsumptionTime + time_difference(previousProductConsumedTimestamp,tvEnd_TurnAroundTime);
			previousProductConsumedTimestamp = 	tvEnd_TurnAroundTime;

			consumedProductCount = consumedProductCount + 1;

			pthread_cond_signal (&notFull); 

		}

		pthread_mutex_unlock(&mutex); // release lock

		usleep(100000); //sleep for 100ms
	}
}

int RemoveProductFromQueue_FCFS()
{
	// retrieve the first product from the queue
	struct product consumeProduct = productQueue.front();

	gettimeofday(&tvEnd_WaitTime, NULL);

	int productLife = consumeProduct.life; // product life
	int functionCallCount = 0;

	//get the start time for calculating turn around time and wait time for the product
	tvStart_TurnAroundTime = consumeProduct.timestamp;
	tvStart_WaitTime = tvStart_TurnAroundTime;

	// call the function N times: N is the product life
	while(functionCallCount < productLife)
	{
		int fibonnaciNumber = fn(10);
		functionCallCount++;
	}

	productQueue.pop_front(); // remove product from the queue

	return consumeProduct.id; // return the removed product id
}

int RemoveProductFromQueue_RoundRobin()
{
	// retrieve the first product from the queue
	struct product consumeProduct = productQueue.front();

	// get the end time for claculating wait time for the product in consumer queue
	gettimeofday(&tvEnd_WaitTime, NULL);

	int productLife = consumeProduct.life; 
	int functionCallCount = 0;

	
	tvStart_TurnAroundTime = consumeProduct.timestamp;
	tvStart_WaitTime = tvStart_TurnAroundTime;

	
	if (quantumValue < productLife)
	{
		struct product updateProduct;
		updateProduct.id = consumeProduct.id;
		updateProduct.life = (productLife - quantumValue);
		updateProduct.timestamp = consumeProduct.timestamp;

		
		productQueue.pop_front();
		productQueue.push_back(updateProduct);

		
		while(functionCallCount < quantumValue)
		{
			int fibonnaciNumber = fn(10);
			functionCallCount++;
		}

		return 0;
	}
	else
	{
		
		while(functionCallCount < productLife)
		{
			int fibonnaciNumber = fn(10);
			functionCallCount++;
		}

		productQueue.pop_front(); 

		return consumeProduct.id; 
	}
}

// function to generate fibonnaci number
int fn(int number)
{
	if(number == 1)
		return 0;
	else if(number == 2)
		return 1;
	else
		return fn(number-1)+fn(number-2);
}
Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.