Writing Your First Rx Statement


Introduction

In my first blog post on Reactive Extensions (Rx), Introduction to Reactive Extensions, I mentioned that there are several ways in which you can create an IObservable<T>. The most fundamental way to create an IObservale<T> is be using the Observable.Create<TSource>() method.  At first glance this method is a bit confusing but once you know how to implement it, it becomes very powerful.

You First Rx Statement 

In this tutorial I will build a Windows console application that will make an asynchronous web call to the Twitter API using the WebClient.DownloadStringAsync() and Rx.  I begin by creating a new console application called SampleRxApp.

Next, I add the appropriate Rx reference.  For this, I use Nuget to add a reference to Rx-Main.





Once I have added necessary Rx library, I add the following using statement to the top of my Program.cs file. 

using System;
using System.Net;
using System.Reactive.Linq;

I now have access to the Observable class and all it has to offer.  First, I'm going to write the Rx statement manually and later I will demonstrate a shortcut.  Written manually, my Rx statement look like this:

static void Main()
{
    var webCallObservable = Observable.Create<string>(
        observer =>
        {
            const string urlString = "https://api.twitter.com/1/statuses/show.json?id=199666728274374658";

            DownloadStringCompletedEventHandler handler =
                (sender, args) =>
                {
                    if (args.Error != null)
                    {
                        observer.OnError(args.Error);
                        return;
                    }

                    observer.OnNext(args.Result);
                    observer.OnCompleted();
                };

            WebClient client = null;

            try
            {
                client = newWebClient();

                client.DownloadStringCompleted += handler;

                client.DownloadStringAsync(newUri(urlString));
            }
            catch (Exception ex)
            {
                observer.OnError(ex);
            }

            return () =>
                        {
                            if (client != null)
                                client.DownloadStringCompleted -= handler;
                        };
        });

    webCallObservable.Subscribe(Console.WriteLine);

    Console.WriteLine("DONE");
    Console.ReadKey();
}

Running this code will return a Tweet in Json format.  There is quite a lot going on here so let me break it down into chunks and explain each bit.  

I begin by calling Observable.Create<T>().  This generic method accepts a type declaration which defines the type of stream I'm creating (string in my case) and a delegate as a parameter.  The expected delegate takes in an IObserver<T> and returns an Action.  The observer is used to pass information back to the subscribers and the Action will provide any cleanup logic necessary when the subscriptions are disposed. 

I pass the delegate into the Create method in the form of an anonymous function like so:

observer =>
    {
        return () => { };
    }

Next, I create a DownloadStringCompleteEventHandler.  

                        DownloadStringCompletedEventHandler handler =
                            (sender, args) =>
                                {
                                    if (args.Error != null)
                                    {
                                        observer.OnError(args.Error);
                                    }

                                    observer.OnNext(args.Result);
                                    observer.OnCompleted();
                                };

The handler first checks to see if any errors occurred during the web call and if there were, it sends the exception information to the subscribers using the observer.OnError() method.  If the call completes successfully, then the resulting string value is sent to the subscribers using the observer.OnNext() method. Finally, the handler notifies the subscribers that the call is complete by calling observer.OnComplete(). 

Once the handler is defined, it is time to create a new WebClient and hook the handler up to the DownloadStringCompleted event:

                        WebClient client = null;

                        try
                        {
                            client = newWebClient();

                            client.DownloadStringCompleted += handler;

                            client.DownloadStringAsync(newUri(urlString));
                        }
                        catch (Exception ex)
                        {
                            observer.OnError(ex);
                        }

I perform these operations inside a try-catch block.  This way, if anything goes wrong while I'm setting up the WebClient, I can send the exception information back to the subscribers using the observer.OnError().

Finally, as I mentioned above, the delegate expects an Action to be returned containing any finalization logic. In my case, I'm going to unhook the event handler I created above when the stream is disposed. 

                        return () =>
                                   {
                                       if (client != null)
                                           client.DownloadStringCompleted -= handler;
                                   };

The Observable.Create<T>() method returns an IObservable<T> which I assign to a local variable.  I then call the Subscribe() method on that variable outputting the string result to the console window:

            webCallObservable.Subscribe(s => Console.WriteLine(s));

The result should be a chunk of Json in the console window.  you will notice that the "DONE" statement actually gets outputted before the Json.  That is because the web call is happening asynchronously.

The Shortcut

I promised you a shortcut and here it is.  The friendly developers at Microsoft have provided an easy to use extension method to make this very call using a fraction of the code. 

const string urlString = "https://api.twitter.com/1/statuses/show.json?id=199666728274374658";

var client = new WebClient();

var webCallObservable =
    Observable.FromEventPattern<downloadstringcompletedeventhandler, downloadstringcompletedeventargs>(
        h => client.DownloadStringCompleted += h,
        h => client.DownloadStringCompleted -= h);

webCallObservable.Subscribe(s => Console.WriteLine(s.EventArgs.Result));

client.DownloadStringAsync(new Uri(urlString));

In this sample, I'm using the Observable.FromEventPattern to provide the logic for wiring up and tearing down the DownloadStringCompletedEventHandler.  Then, all I need to do is subscribe to the output of that call and look in the EventArgs for the result string.

Conclusion

Reactive Extensions is an extremely powerful technology that helps you build asynchronous and event-driven applications  There are some fantastic extensions methods that cover off many of the common patterns associated with async and event-driven programming.  It is important to know what is happening under the covers though because will still encounter issues like race conditions and deadlocks if you are not careful with your use of Rx.  Knowing how to build an IObservable<T> using the Observable.Create<T>() method is important if you want to do anything that the extension methods don't offer you out-of-the-box.

Download the Source Code here.

(2012 3/52)

Comments

Popular posts from this blog

Tracking Authenticated User ID in Application Insights

Introduction to Reactive Extensions