Rx Request Response Throttle

My most recent project at work was to introduce a Request Response messaging API into one our systems, as previously we were receiving a daily batch file with upwards of 4m entries when we only needed around 500-1,000 of them.

One of the restrictions¬†was that we had to have mechanisms in place to ensure we did not flood the Service API with 1,000’s of messages all in one go, as this would cause delays for all other systems using the same API, as this was a widely used and critical service.

We had to make sure we do not have more than 4 requests queued at any one time (the API had 4 worker processes) so that everyone gets their fair share of the service. This made a great exercise for Rx ūüėÄ

The system could be represented via this diagram 

2015-03-07 13_47_38-RxRequestResponse-OverviewOne of my favourite things that Rx brought to the table, was this concept of virtual time using schedulers which you can learn about in Lee Campbells РIntro to Rx free on-line book under Part 4 РTesting Rx.

I started with an initial controller, which subscribes to an injected IObservable<Request> and calls the service straight away for each request. Responses are handled similarly, the controller subscribes to an IObservable<Response> on the IApiService using the injected IObserver<Response>

public class RequestResponseController
{
    public RequestResponseController(IRequester requester, IResponseHandler responseHandler, 
        IApiService service, ISchedulers schedulers)
    {
        requester
            .ObserveOn(schedulers.ThreadPool)
            .Subscribe(request => service.SendRequest(request));
            
        service.Responses
            .ObserveOn(schedulers.ThreadPool)
            .Subscribe(responseHandler);
    }
}

With a simple test to prove this

[Test]
public void Should_get_respponse_when_request_received()
{
    _requests.OnNext(new Request(1));
    
    _schedulers.ThreadPool.AdvanceBy(1); // Send Request
    _requestsReceived.Should().HaveCount(1);
    
    _serviceScheduler.AdvanceBy(1); // Process Request
    
    _schedulers.ThreadPool.AdvanceBy(1); // Process Response
    _responsesProcessed.Should().HaveCount(1);
}

Which passed with the following output:

Advancing ThreadPool by 1
Received Request: 1
  
Advancing ServiceScheduler by 1
Sending Response: 1
   
Advancing ThreadPool by 1
Processed Response: 1

I then went on to publish 5 requests from the requester, of which only 4 should be sent to the service before receiving a response.

[Test]
public void Should_send_max_4_requests_at_one_time()
{
    // Schedule 5 requests
    _requests.OnNext(new Request(1));
    _requests.OnNext(new Request(2));
    _requests.OnNext(new Request(3));
    _requests.OnNext(new Request(4));
    _requests.OnNext(new Request(5));
    
    _schedulers.ThreadPool.AdvanceBy(5); // Attempt to send all 5 requests
    _requestsReceived.Should().HaveCount(4); // Check if only 4 requests received by service
}

 This test failed as expected, and gave the following results:

Advancing ThreadPool by 5
Received Request: 1
Received Request: 2
Received Request: 3
Received Request: 4
Received Request: 5
Expected collection to contain 4 item(s), but found 5.

 In order to achieve the throttling, I added a new Subject

  field which acts as a throttle to the requester, so that only 4 requests are active at any time.

private readonly Subject _throttle = new Subject();

This could be achieve by using the  IObservable.Zip() extension, which brings 2 sequences together, and publishes at the rate of the slowest sequence. I also immediately published 4 objects into the  _throttle sequence so that the first 4 requests would immediately fire.

public RequestResponseController(...)
{
    // Service Subscription stayed the same
 
    requester
        .ObserveOn(schedulers.ThreadPool)
        .Zip(_throttle,(request, i) => request) // <---------Added Zip
        .Subscribe(request => service.SendRequest(request));
 
    for (int i = 0; i < 4; i++) // Init _throttle with 4 requests
        _throttle.OnNext(i);
}

 This made the test pass, and could be represented by the following marble diagram: 

Throttle  |1234----------------------------
Requester |----1-2-3-4-5-------------------
Result    |----1-2-3-4---------------------

 The missing piece of the puzzle was to ensure that queued requests would be sent out as soon as a response comes back. This could be represented by the following test:

[Test]
public void Should_send__5th_queued_request_after_receiving_1st_response()
{
    // Schedule 5 requests
    _requests.OnNext(new Request(1));
    _requests.OnNext(new Request(2));
    _requests.OnNext(new Request(3));
    _requests.OnNext(new Request(4));
    _requests.OnNext(new Request(5));
    _requests.OnNext(new Request(6));
 
    _schedulers.ThreadPool.AdvanceBy(5); // Attempt to send all 5 requests
 
    _serviceScheduler.AdvanceBy(1);
    _requestsReceived.Should().HaveCount(5); // Check that 5th request received by service
}

 Which failed and gave the following output:

Advancing ThreadPool by 5
Received Request: 1
Received Request: 2
Received Request: 3
Received Request: 4
 
Advancing ServiceScheduler by 1
Sending Response: 1
Expected collection to contain 5 item(s), but found 4.

 The implementation for this was easy, just subscribe to the  IApiService.Response  sequence, with an Action that pushes a value onto the _throttle sequence.

public RequestResponseController(...)
{
    service.Responses
        .Subscribe(r => _throttle.OnNext(0));
        
    // Rest of the Constructor
}

 And could be represented by the following marble diagram:

Throttle  |1234--------------0--0--0--0--0--0--0-
Requester |----1-2-3-4-5-6-7----------------89---
Result    |----1-2-3-4-------5--6--7--------89---
Response  |------------------1--2--3--4--5--6--7-

Stay tuned as I will be blogging about the Time-out -> Retry implementation for this controller soon. Where if we don't receive a response within 30 seconds, we retry, and to handle multiple responses for the same request (e.g. due to the retry) so that we do not process duplicate responses.

As always the code can be found on GitHub @ michal-ciechan/codePerf/20150308-RxRequestResponse/