The Reactor Pattern using C#

The Reactor Pattern is a design pattern for synchronous demultiplexing and dispatching of concurrently arriving events. It receives incoming messages/requests/connections from multiple concurrent clients and processes these messages sequentially using event handlers.

The purpose of the Reactor design pattern is to avoid the common problem of creating a thread for each incoming message/request/connection. It receives events from a set of handles and distributes them sequentially to the corresponding event handlers. Thus, the application using the Reactor need only use one thread to handle concurrently arriving events.

Basically the Reactor pattern allows an application to handle concurrent events while retaining the simplicity of single-threading.

In this example we will create a Reactor pattern which handles TcpListeners.

First we have the interace that defines the Event Handler contract.


using System.Net.Sockets;

namespace ReactorPattern
{
    public interface IEventHandler
    {
        void HandleEvent(byte[] data);
        TcpListener GetHandler();
    }
}

Then we have the interface for the Synchronous Event Demultiplexer, which returns the Handles that are ready to be processes the handed to the Event handler.


using System.Collections.Generic;
using System.Net.Sockets;

namespace ReactorPattern
{
    public interface ISynchronousEventDemultiplexer
    {
        IList< TcpListener > Select(ICollection< TcpListener > listeners);
    }
}

Then we have the interface that defines the Reactor contract, notice it has methods for add, removing and handling the events.


namespace ReactorPattern
{
    public interface IReactor
    {
        void RegisterHandle(IEventHandler eventHandler);
        void RemoveHandle(IEventHandler eventHandler);
        void HandleEvents();
    }
}

Now we have the concrete implementation of the IEventHandler, which creates an instance of our TcpListener handle. It also has methods for returning the Handle and handling the event for a message arriving.


using System.Net;
using System.Net.Sockets;
using System.Text;

namespace ReactorPattern
{
    public class MessageEventHandler : IEventHandler
    {
        private readonly TcpListener _listener;

        public MessageEventHandler(IPAddress ipAddress, int port)
        {
            _listener = new TcpListener(ipAddress, port);
        }

        public void HandleEvent(byte[] data)
        {
            string message = Encoding.UTF8.GetString(data);
        }

        public TcpListener GetHandler()
        {
            return _listener;
        }
    }
}


Here we have the concrete implemenation of the Synchronous Event Demultiplexer, which returns TcpListeners which are ready to be processed.


using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;

namespace ReactorPattern
{
    public class SynchronousEventDemultiplexer : ISynchronousEventDemultiplexer
    {
        public IList< TcpListener > Select(ICollection< TcpListener > listeners)
        {
            var tcpListeners =
                new List< TcpListener >(from listener in listeners 
                                      where listener.Pending() 
                                      select listener);
            return tcpListeners;
        }
    }
}

Here we have the Reactor or InitiationDispatcher. This takes a Synchronous Event Demultiplexer for getting handles that are ready to be processes. It also creates a IDictionary to store all its handles and EventHandlers.


using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;

namespace ReactorPattern
{
    public class Reactor : IReactor
    {
        private readonly ISynchronousEventDemultiplexer _synchronousEventDemultiplexer;
        private readonly IDictionary< TcpListener, IEventHandler > _handlers;

        public Reactor(ISynchronousEventDemultiplexer synchronousEventDemultiplexer)
        {
            _synchronousEventDemultiplexer = synchronousEventDemultiplexer;
            _handlers = new Dictionary< TcpListener, IEventHandler >();
        }

        public void RegisterHandle(IEventHandler eventHandler)
        {
            _handlers.Add(eventHandler.GetHandler(), eventHandler);
        }

        public void RemoveHandle(IEventHandler eventHandler)
        {
            _handlers.Remove(eventHandler.GetHandler());
        }

        public  void HandleEvents()
        {
            while (true)
            {
                IList< TcpListener > listeners = _synchronousEventDemultiplexer.Select(_handlers.Keys);

                foreach (TcpListener listener in listeners)
                {
                    int dataReceived = 0;
                    byte[] buffer = new byte[1];
                    IList< byte > data = new List< byte >();

                    Socket socket = listener.AcceptSocket();

                    do
                    {
                        dataReceived = socket.Receive(buffer);

                        if (dataReceived > 0)
                        {
                            data.Add(buffer[0]);
                        }
                            
                    } while (dataReceived > 0);

                    socket.Close();

                    _handlers[listener].HandleEvent(data.ToArray());
                }
            }
        }
    }
}

Here is a simple example of it being constructed.


using System.Net;

namespace ReactorPattern
{
    class Program
    {
        static void Main(string[] args)
        {
            IEventHandler client1 = new MessageEventHandler(IPAddress.Parse("123.123.123.123"), 123);
            IEventHandler client2 = new MessageEventHandler(IPAddress.Parse("234.234.234.234"), 123);
            IEventHandler client3 = new MessageEventHandler(IPAddress.Parse("345.345.345.345"), 123);

            ISynchronousEventDemultiplexer synchronousEventDemultiplexer = new SynchronousEventDemultiplexer();

            Reactor dispatcher = new Reactor(synchronousEventDemultiplexer);

            dispatcher.RegisterHandle(client1);
            dispatcher.RegisterHandle(client2);
            dispatcher.RegisterHandle(client3);

            dispatcher.HandleEvents();

        }
    }
}

Download the Reactor Pattern:

Download the Reactor Pattern example application here

You also might be interested in Generic version of the Reactor Pattern:

Generic Reactor Pattern using C# example here

Read more about the Reactor Pattern:



Comments

  1. Vlad Bezden September 8th

    Comment Arrow

    Great description and example. Thank for sharing.


  2. house April 11th

    Comment Arrow

    thanks, it’s really helpful for me


  3. dev February 19th

    Comment Arrow

    clear and helpful, thanks!


  4. Vaibhav Singh April 20th

    Comment Arrow

    Thank you for sharing. Seeing the C# implementation of the concept has been really helpful.


  5. David Daabul April 27th

    Comment Arrow

    Thank you very much for sharing this nice code sample. I’m studying now design pattern for concurrent, distributed applications, and this example is VERY nice and useful!
    If you have some more examples of cuncurent design patterns, like wrapper facade, monitor objet, acceptor-connector, hals sync-half assync, proactor etc (preferably in C#), I’ll be more than gratefull to have it… :-)


  6. John May 6th

    Comment Arrow

    Couldn’t run until I added this to MessageEventHandler.cs:

    public MessageEventHandler(IPAddress ipAddress, int port)
    {
    _listener = new TcpListener(ipAddress, port);
    System.Console.WriteLine(“Starting listener on {0} : {1}”, ipAddress, port);
    _listener.Start();
    }


Add Yours

  • Author Avatar

    YOU


Comment Arrow




About Author

Robert

Programming today is a race between software engineers striving to build bigger and better idiot-proof programs, and the Universe trying to produce bigger and better idiots. So far, the Universe is winning hands down.