Generic Reactor Pattern using C#

The Reactor pattern allows an application to handle concurrent events while retaining the simplicity of single-threading. See my previous post The Reactor Pattern using C# for more details.

In this example we will create a generic version of the Reactor Pattern, it gives us further abstraction of the business logic from the pattern to improve code reuse.

We start to defining the interface for the Handle, where T is the type of object being monitored and V is the type of data returned from the event. In the example below T is TcpListener and V is byte array.


namespace GenericReactorPattern
{
    public interface IHandle< T, V >
    {
        T Value();
        V Event();
    }
}

Next we define our interface contract for the EventHandle, notice that GetHandle returns IHandle


namespace GenericReactorPattern
{
    public interface IEventHandler< T, V >
    {
        void HandleEvent(V response);
        IHandle< T, V > GetHandle();
    }
}

Next we define the interface contract for the Synchronous Event Demultiplexer


using System.Collections.Generic;

namespace GenericReactorPattern
{
    public interface ISynchronousEventDemultiplexer< T, V >
    {
        IHandle< T, V > Select(ICollection< IHandle< T, V > > keys);
    }
}

And now we define the interface contract for the Reactor or InitiationDispatcher


namespace GenericReactorPattern
{
    public interface IReactor< T, V >
    {
        void RegisterHandle(IEventHandler< T, V > eventHandler);
        void RemoveHandle(IEventHandler< T, V > eventHandler);
        void HandleEvents();
    }
}

Now we will implement our Generic Reactor, this approch follows the Open Closed Principle.


using System.Collections.Generic;

namespace GenericReactorPattern
{
    public class Reactor< T, V > : IReactor< T, V >
    {
        private readonly ISynchronousEventDemultiplexer< T, V > _synchronousEventDemultiplexer;
        private readonly IDictionary< IHandle< T, V >, IEventHandler< T, V > > _handles;

        protected Reactor(ISynchronousEventDemultiplexer< T, V > synchronousEventDemultiplexer)
        {
            _synchronousEventDemultiplexer = synchronousEventDemultiplexer;
            _handles = new Dictionary< IHandle< T, V >, IEventHandler< T, V > >();
        }

        public void RegisterHandle(IEventHandler< T, V > eventHandler)
        {
            _handles.Add(eventHandler.GetHandle(),eventHandler);
        }

        public void RemoveHandle(IEventHandler< T, V > eventHandler)
        {
            _handles.Remove(eventHandler.GetHandle());
        }

        public void HandleEvents()
        {
            IHandle< T, V > handle = _synchronousEventDemultiplexer.Select(_handles.Keys);
            V response = handle.Event();
            _handles[handle].HandleEvent(response);
        }
    }
}

Now lets create an example using our Generic Reactor Pattern. In this example T is a TcpListener and V is byte array.

The Handle’s Event method has the logic for getting the data from the event.


using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using GenericReactorPattern;

namespace TcpListenerReactorPattern
{
    public class TcpListenerHandle : IHandle< TcpListener, byte[] >
    {
        private readonly TcpListener _listener;

        public TcpListenerHandle(TcpListener listener)
        {
            _listener = listener;
        }

        public TcpListener Value()
        {
            return _listener;
        }

        public byte[] Event()
        {
            int dataReceived = 0;
            byte[] buffer = new byte[1];
            IList< byte > data = new List< byte >();

            Socket socket = _listener.AcceptSocket();

            do
            {
                dataReceived = socket.Receive(buffer);

                if (dataReceived > 0)
                {
                    data.Add(buffer[0]);
                }

            } while (dataReceived > 0);

            socket.Close();

            return data.ToArray();
        }
    }
}

Next the Event Handler has the code for processing the event data


using System.Net.Sockets;
using System.Text;
using GenericReactorPattern;

namespace TcpListenerReactorPattern
{
    public class MessageEventHandler : IEventHandler< TcpListener, byte[] >
    {
        private readonly IHandle< TcpListener, byte[] > _handle;

        public MessageEventHandler(IHandle< TcpListener, byte[] > handle)
        {
            _handle = handle;
        }

        public void HandleEvent(byte[] response)
        {
            string message = Encoding.UTF8.GetString(response);
        }

        public IHandle< TcpListener, byte[] > GetHandle()
        {
            return _handle;
        }
    }
}

The Synchronous Event Demultiplexer has the logic for getting handles which are ready to connect


using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using GenericReactorPattern;

namespace TcpListenerReactorPattern
{
    public class SynchronousEventDemultiplexer : ISynchronousEventDemultiplexer< TcpListener, byte[] >
    {
        public IHandle< TcpListener, byte[] > Select(ICollection< IHandle< TcpListener, byte[] > > listeners)
        {
            TcpListener tcpListener = (TcpListener)(from listener in listeners
                                                     where listener.Value().Pending()
                                                     select listener).SingleOrDefault();

            var handle = new TcpListenerHandle(tcpListener);

            return handle;
        }
    }
}

Then our local implementation of the Reactor, this merely just called the base constructor and passes the values, as all the logic is already in our base Reactor.


using System.Net.Sockets;
using GenericReactorPattern;

namespace TcpListenerReactorPattern
{
    public class InitiationDispatcher : Reactor< TcpListener, byte[] >
    {
        public InitiationDispatcher(SynchronousEventDemultiplexer synchronousEventDemultiplexer)
            : base(synchronousEventDemultiplexer)
        {
        }
    }
}

Here is a simple example of it being hooked up


using System.Net;
using System.Net.Sockets;

namespace TcpListenerReactorPattern
{
    class Program
    {
        static void Main(string[] args)
        {
            TcpListenerHandle handle1 = new TcpListenerHandle(GetTcpListener());
            TcpListenerHandle handle2 = new TcpListenerHandle(GetTcpListener());
            TcpListenerHandle handle3 = new TcpListenerHandle(GetTcpListener());

            MessageEventHandler client1 = new MessageEventHandler(handle1);
            MessageEventHandler client2 = new MessageEventHandler(handle2);
            MessageEventHandler client3 = new MessageEventHandler(handle3);

            SynchronousEventDemultiplexer synchronousEventDemultiplexer = new SynchronousEventDemultiplexer();

            InitiationDispatcher reactor = new InitiationDispatcher(synchronousEventDemultiplexer);

            reactor.RegisterHandle(client1);
            reactor.RegisterHandle(client2);
            reactor.RegisterHandle(client3);

            reactor.HandleEvents();
        }

        private static TcpListener GetTcpListener()
        {
            return new TcpListener(IPAddress.Parse("123.123.123.123"), 123);
        }
    }
}

You can download the Generic Reactor Pattern here



Comments

  1. IUnknown June 24th

    Comment Arrow

    Thanks… I think this was the most straight forward, easy example I’ve seen.

    Do you have a Proactor?


Add Yours

  • Author Avatar

    YOU


Comment Arrow




About Author

Robert

Programming today is a race between software engineers striving to build bigger and better idiot-proof programs, and the Universe trying to produce bigger and better idiots. So far, the Universe is winning hands down.