I have the following socket connection codes. Where for every connection is one new thread. After receiving the data which is done via the producer then I put into a queue for next database processor which is the consumer. This works fine but the moment the queue gets build up it can hang or even take even hours to clear cause is like so many producers and just one single consumer. I am thinking is it possible for me to have one consumer handles one producer meaning each message goes into its own queue gets process and that it. What changes can I do to handle it or must I build a thread pooling ?

public class cServer {
   private LinkedBlockingQueue<String> databaseQueue = new LinkedBlockingQueue<String>();
   class ConnectionHandler implements Runnable {

    private Socket receivedSocketConn1;   
    ConnectionHandler(Socket receivedSocketConn1) {
      this.receivedSocketConn1=receivedSocketConn1;
    }     
      // gets data from an inbound connection and queues it for databse update
      public void run() { // etc

         // you already have most of this in your existing code.
         // it uses the shared databaseQueue variable to queue message Strings         
         BufferedWriter w = null;
         BufferedReader r = null;      
          String message="";
          try {

             PrintStream out = System.out; 
             BufferedWriter fout = null;
             w =  new BufferedWriter(new OutputStreamWriter(receivedSocketConn1.getOutputStream()));
             r = new BufferedReader(new InputStreamReader(receivedSocketConn1.getInputStream()));

             int m = 0, count=0;
             int nextChar=0;

             while ((nextChar=r.read()) != -1) 
             {

                  message += (char) nextChar;  

                  if (nextChar == '*')
                  {

                     databaseQueue.add(message);  
                     message="";                
                  }
                 }
              } 
              catch (IOException ex)  
              { 
                   System.out.println("MyError:IOException has been caught in in the main first try");
                   ex.printStackTrace(System.out);
              }      
              finally
              {
                try 
                {

                    if ( w != null ) 
                    {
                        w.close();
                    }
                    else 
                    {
                        System.out.println("MyError:w is null in finally close");
                    }
                }
                catch(IOException ex){
                   System.out.println("MyError:IOException has been caught in w in finally close");
                   ex.printStackTrace(System.out);
                }

              }
      }

   }

   class DatabaseProcessor implements Runnable {

      // updates databaase with data queued by ConnectionHandler
      Connection dbconn = null;
      Statement stmt = null;
      Statement stmt1 = null;
      Statement stmt2 = null;
      Date connCreated = null;
      public void run()
      { // this is just like the QueueProcessor example I gave you
         // open database connection
         //createConnection();
             while (true) 
             {

                try 
                {
                    int count=0;
                    String message = "";
                    message = databaseQueue.take();
                    if (message.equals(null)) {
                       System.out.println("QueueProcessor is shutting down");
                    break; // exit while loop, ends run() method
                    }
                    System.out.println("Message taken from queue is :"+message);

                } 
                catch (Exception e) 
                {
                e.printStackTrace();
                }
            }//while true
           //closeConnection();
     }//run
   }

   public static void main(String[] args) {
      new cServer();
   }
   cServer() { // default constructor
      new Thread(new DatabaseProcessor()).start();
      try 
      {
               final ServerSocket serverSocketConn = new ServerSocket(5000);                
               while (true) 
                    {
                        try 
                        {
                                Socket socketConn1 = serverSocketConn.accept();
                                new Thread(new ConnectionHandler(socketConn1)).start();                     
                        }
                        catch(Exception e)
                        {
                            System.out.println("MyError:Socket Accepting has been caught in main loop."+e.toString());
                            e.printStackTrace(System.out);
                        }
                    }
      } 
      catch (Exception e) 
      {
         System.out.println("MyError:Socket Conn has been caught in main loop."+e.toString());
         e.printStackTrace(System.out);
         //System.exit(0); 
      }
      databaseQueue.add(null);
   }
}

I am trying to change it to this change to this

if (nextChar == '*')
                  {
                     Thread t = new Thread(new DatabaseProcessor (message)); 
                     t.start();
                     //databaseQueue.add(message);  
                     message="";                
                  }

Next I change the consumer to this. I need this due to the fact that the queue is getting very congested and stop my whole operation as time where I need to stop the operation and restart with I will loose all my data in the queue. So my plan is the socket gets data create a new thread and later close that socket? Is this vialbe?

class DatabaseProcessor extends Thread {
       private String data;
        public DatabaseProcessor(String data) {
            this.data = data;
        }
      // updates databaase with data queued by ConnectionHandler
      Connection dbconn = null;
      Statement stmt = null;
      Statement stmt1 = null;
      Statement stmt2 = null;
      Date connCreated = null;
      public void run()
      { // this is just like the QueueProcessor example I gave you
         // open database connection
         //createConnection();
             while (true) 
             {

                try 
                {
                    int count=0;
                    String message = "";
                    //message = databaseQueue.take();
                    //if (message.equals(null)) {
                    //   System.out.println("QueueProcessor is shutting down");
                    //break; // exit while loop, ends run() method
                    //}
                    System.out.println("Message taken from queue is :"+data);

                } 
                catch (Exception e) 
                {
                e.printStackTrace();
                }
            }//while true
           //closeConnection();
     }//run
   }

Hi newbie. I remember this!

If the queue keeps growing then the real problem is that you are processing database updates more slowly than new transactions are arriving. Messing about with threads and queues around the input won't change that. I don't fully understand the changes you propose, but I can't see how they could possibly help. You have to work out why it's taking so long to process each transaction and find ways to speed that up. so:

  1. Look for redundancies and inefficiencies in your Java code. Are you opening a new database connection for each transaction? Using prepared statements etc?
  2. work with your database people to speed up the processing of each database update by optimising the databse structure and SQL code.
  3. AFTER YOU HAVE DONE THAT, if you still have aproblem, look at running multiple database updates in parallel - ie start two or more database update threads taking updates from the queue in parallel. If two are faster than one then look at a thead pool and experiment to find its opimium size.

JC

Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.