Hello ALL,

In my program, I first receive data from another machine, then processing the data. Data receiveing is fast, but data processing is relatively time consuming. Hence, I put the code for data processing in a POSIX thread (i am using Red Hat enterpriese linux AS 4). In the main() function, I receive data first. Whevenever a data is received, a new thread for processing the data is created.

In this thread,
http://www.codeguru.com/forum/showthread.php?t=459433

a friend Peled suggested a class, named SynchronizedQueue as shown below. I think the class is great for solving classic producer/consumer problem. However, I do not exactly know how to use it since my poor C++ knowledge. Can anyone help me with this. Please demonstrate how to use the SynchronizedQueue class?

SynchronizedQueue.cpp

#include "SynchronizedQueue.h"
/*
*   SynchronizedQueue.h
*   Author: Ishay Peled
*/
using namespace std;

SynchronizedQueue::SynchronizedQueue(){
    m_iSize = 0;
    pthread_mutex_init(&m_ptrMutex,NULL);
    pthread_cond_init(&m_ptrCondition, NULL);
}

SynchronizedQueue::~SynchronizedQueue(){
    pthread_mutex_destroy(&m_ptrMutex);
    pthread_cond_destroy(&m_ptrCondition);
}

void SynchronizedQueue::push(Message* obj){
    pthread_mutex_lock(&m_ptrMutex);
    m_vctQueue.push_back(obj);
    m_iSize++;
    if (m_iSize == 1)
        pthread_cond_signal(&m_ptrCondition);
    pthread_mutex_unlock(&m_ptrMutex);
}

Message* SynchronizedQueue::pop(){
    Message* result;
    void* res;
    pthread_mutex_lock(&m_ptrMutex);
    while (m_iSize == 0)
        pthread_cond_wait(&m_ptrCondition, &m_ptrMutex);
    result = m_vctQueue[0];
    m_vctQueue.erase(m_vctQueue.begin());
    m_iSize--;
    pthread_mutex_unlock(&m_ptrMutex);
    return result;
}

int SynchronizedQueue::getSize(){
    return m_iSize;
}

SynchronizedQueue.h

#ifndef BUFFER_H
#define BUFFER_H

#include <pthread.h>
#include <string>
#include <vector>
#include "Semaphore.h"
#include "Message.h"  //Change this to the data type you're using

using namespace std;

class SynchronizedQueue{
public:
    SynchronizedQueue();
    ~SynchronizedQueue();
    void push(Message* obj);
    Message* pop();
    int getSize();
        
private:
    std::vector<Message*> m_vctQueue;
    pthread_mutex_t m_ptrMutex;
    pthread_cond_t m_ptrCondition;
    int m_iSize;
};
#endif

Recommended Answers

All 8 Replies

On the codeguru forum author of SynchronizedQueue wrote to his correspondent:

Do you have any experience in c++ programming? the reason I'm asking is because it's crucial to applying the scheme I've supplied before.

The same question to you?..

On the codeguru forum author of SynchronizedQueue wrote to his correspondent:

The same question to you?..

ArkM thank you for your reply. Yes, as stated in the initial post, I have a little knowledge of C++ and OO program experience. This is also the reason I post here to ask for help from other friends. ArkM, if you know how to use the SynchronizedQueue class and please help me.

Thanks in advance.

The queue has 3 functions that will interest you: push, pop, getSize.
push inserts data into the queue. pop returns the oldest data from the queue, and removes it from the queue. getSize tells you how many items are in the queue. The data is always of type 'Message*', you need to define your own Message class/struct.

If you want to pass data in both directions between the 2 threads, you will need 2 queues. Declare them as global variables so that all threads can access them.

Well, the simplest answer is: no need in SynchronizedQueue class to process your data flow.
The point is that you can start new threads without any synchronization at all.
If a new thread knowns what to do, it processes assigned request then die.

Quite another matter if processing threads can't do all processing: for example, your application must process these thread results (collect them, send back or what else).
In that case you need to improve the application architecture (with or without SynchronizedQueue class).

Please, post more info (add your code sceleton if it's possible)...

Well, the simplest answer is: no need in SynchronizedQueue class to process your data flow.
The point is that you can start new threads without any synchronization at all.
If a new thread knowns what to do, it processes assigned request then die.

Quite another matter if processing threads can't do all processing: for example, your application must process these thread results (collect them, send back or what else).
In that case you need to improve the application architecture (with or without SynchronizedQueue class).

Please, post more info (add your code sceleton if it's possible)...

Thank you ArkM,

My application can be simplified as the following problem.

In the main function, the program constantly receives data (a struct data type, containing two integer type elements), whenever a struct of data is received, the struct are pushed into a queue; On the other hand, once there are idle thread (totally, n threads are created when the program is started), one data unit are popped out, and are processed by one certain thread. ArkM, could you please provide a COMPLETE program to implement the above task using SynchronizedQueue class. Thanks, I really need this.

the data struct is defined as
struct DataIn{
int ItemA;
int ItemB;
}

QUestions:
(1) How to create n threads.
(2) Peled told me to use something like ProcessData procD(SynQ); I see this means that I should use a SynchronizedQueue type object as an argument to instantiate a ProcessData type object , but what should be done with SynQ within the ProcessData class.


I tried my best to make the following code based on my understanding of the SynchronizedQueue class and poor knowledge of C++/OO design. The code is erroreous and incomplete even looks stupid. my apologies for this

#include "SynchronizedQueue.h"
struct TWO_INT
{
	int iA;
	int iB;
};

vector<int> SumofTwoNumbers;
class ProcessData{
private:
	SynchronizedQueue* jobs
		bool destroy;
public:
	ProcessData(SynchronizedQueue* jobs)
	~ProcessData()
	void destroy()
	void* run(void* args)
}

ProcessData::run()
{
	while (!destroy){
		struct TWO_INT* data  = jobs->pop();
		//process data
		Sum0fTwoNumbers.push(data->iA + data->iB);
	}
}

SynchronizedQueue SynQ;
SynQ.destroy = 0;
main()
{
	int i;
                ProcessData ProcD;
	for(i=0;  i<1000; ++i)
	{
		struct TWO_INT* twoint = new TWO_INT;
		twoint->iA = i;
		twoint->iB = i + 1;
		SynQ.push(twoint);
                                
                                 ProcD.run();
	}
                //ProcD.run();// Or put the line here
	SynQ.destroy = 1;

                ProcD.destroy();//But what the code in the destroy function body should look like?
}

Original post:

Whevenever a data is received, a new thread for processing the data is created.

Previous post:

On the other hand, once there are idle thread (totally, n threads are created when the program is started), one data unit are popped out, and are processed by one certain thread.

There are two different processing models. In the second one (much more realistic) you have the message pump (main thread) and worker threads pool. It's the standard multiple-reader / single-write model.
I get some time-out to experiment with possible solutions (no POSIX threads in my ready-to-use toolkit at the present moment;)). The point is that you must to prevent message queue overflow. Imagine that messages source is not a loop from 1 to 1000 but gigabit network socket or 10Gb data file...

Well, now I have installed pthreads library on my Windows XP.
Look at this improvisation (of course, it's not a copy/paste solution):

#include <stdio.h>

#include <string>
#include <vector>

using namespace std;

#include "Message.h"
#include "SynchronizedQueue.h"

/// pthread thred function type
typedef void*(*Tfunc)(void*);
/// Thread pool sceleton.
class TPool
{
public:
    explicit TPool(int ntids):tid(ntids), tCounter(0) 
    {}
    virtual ~TPool() { join(); }
    void reserve(int ntids) { tid.resize(ntids); }
    void start(Tfunc f,void* parm);
    void join();
protected:
    vector<pthread_t> tid;
private:
    void* junk;
    int tCounter;
};

/// Process int pair function type
typedef void (*wFunc)(int a, int b);

/// Working thread pool container with msg queue.
/** @todo Add errors check up code */
class SQPool: TPool
{
public:
    explicit SQPool(int ntids = 2):TPool(ntids)
    {
        pQueue = &sq;
    }
    /// Message pump.
    void push(int ia, int ib);
    void run(wFunc f);
    void join();
private:
    static void* tFunc(void* parm);
    static SynchronizedQueue* pQueue;
    static wFunc pFunc;
    
    SynchronizedQueue sq;

    SQPool(const SQPool&);
    SQPool& operator=(const SQPool&);
};

/// Start thread pool.
void TPool::start(Tfunc f,void* parm)
{
    int n = tid.size();
    for (int i = 0; i < n; ++i)
    {
        pthread_create(&tid[i],0,f,parm);
        tCounter++;
    }
}
/// Stop thread pool.
void TPool::join()
{
    for (int i = 0; i < tCounter; ++i)
        pthread_join(tid[i],&junk);
    tCounter = 0;
}

namespace {
    /// Do nothing.
    void dummyFunc(int a, int b) {}
}

wFunc SQPool::pFunc = dummyFunc;

SynchronizedQueue* SQPool::pQueue = 0;

void SQPool::push(int ia, int ib)
{
    Message* pm = new Message(ia,ib);
    sq.push(pm);
}

void SQPool::run(wFunc f)
{
    pFunc = f;
    start(SQPool::tFunc,&sq);
}

void SQPool::join()
{
    int n = tid.size();
    Message done(0,0,Message::DONE);
    for (int i = 0; i < n; ++i)
        sq.push(&done);
    TPool::join();
}

void* SQPool::tFunc(void* parm)
{
    Message* pm;

    while (pm = pQueue->pop(), pm->what == Message::DOIT)
    {
        pFunc(pm->a,pm->b);
        delete pm;
    }
    pthread_t tid = pthread_self();
    printf("Terminate %p\n",tid.p);
    pthread_exit(0);
    return 0;
}

/// Process the next int pair function.
/** tid.p for Windows pthreads library */
void doIt(int a, int b)
{
    pthread_t tid;
    tid = pthread_self();
    printf("Thread %p: %d,%d\n",tid.p,a,b);
}
/// Worker threads pool size.
const int nThreads = 3;
/// Data flow max size
const int nPairs   = 20;
/// Test run.
int main()
{
    SQPool pool(nThreads);

    pool.run(doIt); // Start thread pool.

    for (int i = 0; i < nPairs; ++i)
        pool.push(i,i); // Pair to queue.

    pool.join();    // Stop thread pool.
    
    printf("Done\n");

    return 0;
}

For a while it's alpha version only (especially in a graceful thread pool termination part).
Try with your own the real worker function doIt(int,int).
It's interesting what happens on Linux pthreads.

Hope it helps...

Message.h:

#ifndef MESSAGE_H
#define MESSAGE_H

struct Message
{
    enum ToDo { DONE, DOIT };
    explicit 
    Message(int ia = 0, int ib = 0, ToDo todo = DOIT)
    : a(ia), b(ib), what(todo) 
    {}
    ToDo what;
    int a, b;    
};
#endif

ArkM thank you for your valuable time. I will study closely your code and test it on my Linux box.

Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.