I've got an open connection with a stream reader pulling data in to my system and storing in a database. Everything works great, except occasionally the stream from the remote server stops and this makes my program crash.

How can I catch this situation when it occurs, and maybe attempt to reconnect. The streamreader code is:

using (stream)
            {
                using (StreamReader sr = new StreamReader(stream))
                {
                    while (!sr.EndOfStream && running)
                    {
                        string s = sr.ReadLine();
                        var del = this.OnLineReceived;
                        if (del != null)
                        {
                            del(this, new LineReceivedArgs(s));
                        }
                    }
                }
            }

The code fails on the 'while' loop and I'm not sure how I can catch it...

Recommended Answers

All 6 Replies

That looks like code from the twitter API sample I posted some time back ;)

Depending if you have changed it or not you do not want to catch the exception in this method. Here is the original code:

new Action<Stream>(BeginRead).BeginInvoke(
        response.GetResponseStream(),
        new AsyncCallback(EndRead),
        null);

.......

    private void BeginRead(Stream stream)
    {
      using (stream)
      {
        using (StreamReader sr = new StreamReader(stream))
        {
          while (!sr.EndOfStream && running)
          {
            string s = sr.ReadLine();
            var del = this.OnLineReceived;
            if (del != null)
            {
              del(this, new TwitterLineReceivedArgs(s));
            }
          }
        }
      }
    }
    private void EndRead(IAsyncResult ar)
    {
      AsyncResult result = (AsyncResult)ar;
      Action<Stream> action = (Action<Stream>)result.AsyncDelegate;
      try
      {
        action.EndInvoke(ar);
      }
      catch (Exception Ex) //The stream read error is handled here from BeginRead()
      {
        var del = this.OnError;
        if (del != null)
        {
          del(this, new TwitterErrorArgs(Ex));
        }
      }
      finally
      {
        running = false;
      }
    }

Any exception thrown inside of BeginRead will be handled in the catch (Exception Ex) of EndRead() . It will also fire off the OnError event where you know you need to reconnect to the twitter server or shut down the application.

It already does what you are asking as far as I know. Please clarify if this isn't working for you.

* Note: This requires that the .BeginInvoke() call uses the EndRead as a callback.

Hi Sknake, I was kind of hoping you'd pick this up : )
Thanks for this API by the way, its been massively useful. I haven't changed anything significant in this code. The callback code above BeginRead() is still in place.
It always fails at

while (!sr.EndOfStream && running)

when I leave it running overnight though...

Are you running it inside of the debugger? If so then it will cause the debugger to break. If not then it will be handled just like I mentioned so there is no need to worry. Just wire up and subscribe to the OnError delegate. When that event fires call .Stop() on the API, put the thread to sleep for a few seconds, then call .Start() again.

If you want to leave this running in the debugger then you need to turn off breaking on exceptions because it will stop the API from functioning. The reason is because the way the exception is handled is a very "roundabout" way to handle an exception, but it is the proper way for using the thread pool and callbacks. You just need to be aware of the behavior.

[edit]
When the debugger breaks at the problem line hit the "Run" button in the IDE anyway. The code will keep running like an exception never happened. It won't make you stop the process like an unhandled exception.
[/edit]

So if I build a release version it should all work without this error?
When you say 'put the thread to sleep', is that just a case of setting a short timer before reconnection, or is there something else involved?

Here I added an AutoRestart and an AutoRestartDelay property, and debugger attributes to stop breaking on the failure. You should be able to run this inside the IDE for days without a problem. Notice that I added some test code inside BeginRead() to simulate a connection error. This way you don't have to wait all day to see an error ;)

using System;
using System.IO;
using System.Net;
using System.Runtime.Remoting.Messaging;

namespace daniweb
{
  public class TwitterReader : IDisposable
  {
    //
    private string username;
    private string password;
    private bool running;
    //
    private const string url = @"http://stream.twitter.com/1/statuses/sample.json";
    //
    public event OnTwitterLineReceived OnLineReceived;
    public event OnTwitterError OnError;
    public bool AutoRestartOnFailure { get; set; }
    /// <summary>
    /// Restart delay in milliseconds
    /// </summary>
    public int AutoRestartDelay { get; set; }
    //
    private TwitterReader()
    {
      running = false;
    }
    public TwitterReader(string username, string password)
      : this()
    {
      this.username = username;
      this.password = password;
    }
    public void Start()
    {
      if (running)
        throw new InvalidOperationException("Twitter streamer is already running");

      running = true;
      HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(url);
      request.Method = "GET";
      request.Credentials = new NetworkCredential(this.username, this.password);
      HttpWebResponse response = (HttpWebResponse)request.GetResponse();
      new Action<Stream>(BeginRead).BeginInvoke(
        response.GetResponseStream(),
        new AsyncCallback(EndRead),
        null);
    }

    [
    global::System.Diagnostics.DebuggerNonUserCodeAttribute(),
    global::System.Diagnostics.DebuggerStepThrough()
    ]
    private void BeginRead(Stream stream)
    {
      //Uncomment the lines in this method to simulate a failure.
      //int eventCnt = 0;

      using (stream)
      {
        using (StreamReader sr = new StreamReader(stream))
        {
          while (!sr.EndOfStream && running)
          {
            string s = sr.ReadLine();
            var del = this.OnLineReceived;
            if (del != null)
            {
              //eventCnt++;
              del(this, new TwitterLineReceivedArgs(s));
              //if (eventCnt > 5)
              //  throw new Exception(); //Simulate a dropped connection
            }
          }
        }
      }
    }
    private void EndRead(IAsyncResult ar)
    {
      AsyncResult result = (AsyncResult)ar;
      Action<Stream> action = (Action<Stream>)result.AsyncDelegate;
      bool hadError = false;
      try
      {
        action.EndInvoke(ar);
      }
      catch (Exception Ex) //The stream read error is handled here from BeginRead()
      {
        hadError = true;
        var del = this.OnError;
        if (del != null)
        {
          del(this, new TwitterErrorArgs(Ex));
        }
      }
      finally
      {
        running = false;
      }

      if (hadError && this.AutoRestartOnFailure)
      {
        System.Threading.Thread.Sleep(this.AutoRestartDelay);
        this.Start();
      }
    }
    public void Stop()
    {
      running = false;
    }
    #region IDisposable Members
    public void Dispose()
    {
      Dispose(true);
      GC.SuppressFinalize(this);
    }
    protected virtual void Dispose(bool disposing)
    {
      if (disposing)
      {
        if (running)
        {
          this.Stop();
        }
      }
    }
    #endregion
  }
  public delegate void OnTwitterLineReceived(object sender, TwitterLineReceivedArgs e);
  public class TwitterLineReceivedArgs : EventArgs
  {
    private string line;
    public string Line { get { return line; } }
    private TwitterLineReceivedArgs()
    {
    }
    public TwitterLineReceivedArgs(string Line)
      : this()
    {
      this.line = Line;
    }
  }
  public delegate void OnTwitterError(object sender, TwitterErrorArgs e);
  public class TwitterErrorArgs : EventArgs
  {
    private Exception exception;
    public Exception Exception { get { return exception; } }
    private TwitterErrorArgs()
    {
    }
    public TwitterErrorArgs(Exception Exception)
      : this()
    {
      this.exception = Exception;
    }
  }
}

Calling it:

private TwitterReader reader;
    private int linesRead;
    private void button2_Click(object sender, EventArgs e)
    {
      linesRead = 0;
      reader = new TwitterReader(username, password);
      reader.OnLineReceived += new OnTwitterLineReceived(reader_OnLineReceived);
      reader.OnError += new OnTwitterError(reader_OnError);
      reader.AutoRestartDelay = 1000; //1 second
      reader.AutoRestartOnFailure = true;
      reader.Start();
    }

Amazing stuff. Thanks a lot!

Be a part of the DaniWeb community

We're a friendly, industry-focused community of developers, IT pros, digital marketers, and technology enthusiasts meeting, networking, learning, and sharing knowledge.