My most recent project at work was to introduce a Request Response messaging API into one our systems, as previously we were receiving a daily batch file with upwards of 4m entries when we only needed around 500-1,000 of them.
One of the restrictions was that we had to have mechanisms in place to ensure we did not flood the Service API with 1,000’s of messages all in one go, as this would cause delays for all other systems using the same API, as this was a widely used and critical service.
We had to make sure we do not have more than 4 requests queued at any one time (the API had 4 worker processes) so that everyone gets their fair share of the service. This made a great exercise for Rx 😀
The system could be represented via this diagram
One of my favourite things that Rx brought to the table, was this concept of virtual time using schedulers which you can learn about in Lee Campbells – Intro to Rx free on-line book under Part 4 – Testing Rx.
I started with an initial controller, which subscribes to an injected IObservable<Request> and calls the service straight away for each request. Responses are handled similarly, the controller subscribes to an IObservable<Response> on the IApiService using the injected IObserver<Response>
public class RequestResponseController { public RequestResponseController(IRequester requester, IResponseHandler responseHandler, IApiService service, ISchedulers schedulers) { requester .ObserveOn(schedulers.ThreadPool) .Subscribe(request => service.SendRequest(request)); service.Responses .ObserveOn(schedulers.ThreadPool) .Subscribe(responseHandler); } }
With a simple test to prove this
[Test] public void Should_get_respponse_when_request_received() { _requests.OnNext(new Request(1)); _schedulers.ThreadPool.AdvanceBy(1); // Send Request _requestsReceived.Should().HaveCount(1); _serviceScheduler.AdvanceBy(1); // Process Request _schedulers.ThreadPool.AdvanceBy(1); // Process Response _responsesProcessed.Should().HaveCount(1); }
Which passed with the following output:
Advancing ThreadPool by 1 Received Request: 1 Advancing ServiceScheduler by 1 Sending Response: 1 Advancing ThreadPool by 1 Processed Response: 1
I then went on to publish 5 requests from the requester, of which only 4 should be sent to the service before receiving a response.
[Test] public void Should_send_max_4_requests_at_one_time() { // Schedule 5 requests _requests.OnNext(new Request(1)); _requests.OnNext(new Request(2)); _requests.OnNext(new Request(3)); _requests.OnNext(new Request(4)); _requests.OnNext(new Request(5)); _schedulers.ThreadPool.AdvanceBy(5); // Attempt to send all 5 requests _requestsReceived.Should().HaveCount(4); // Check if only 4 requests received by service }
This test failed as expected, and gave the following results:
Advancing ThreadPool by 5 Received Request: 1 Received Request: 2 Received Request: 3 Received Request: 4 Received Request: 5 Expected collection to contain 4 item(s), but found 5.
In order to achieve the throttling, I added a new Subject
field which acts as a throttle to the requester, so that only 4 requests are active at any time.
private readonly Subject
This could be achieve by using the IObservable.Zip() extension, which brings 2 sequences together, and publishes at the rate of the slowest sequence. I also immediately published 4 objects into the _throttle sequence so that the first 4 requests would immediately fire.
public RequestResponseController(...) { // Service Subscription stayed the same requester .ObserveOn(schedulers.ThreadPool) .Zip(_throttle,(request, i) => request) // <---------Added Zip .Subscribe(request => service.SendRequest(request)); for (int i = 0; i < 4; i++) // Init _throttle with 4 requests _throttle.OnNext(i); }
This made the test pass, and could be represented by the following marble diagram:
Throttle |1234---------------------------- Requester |----1-2-3-4-5------------------- Result |----1-2-3-4---------------------
The missing piece of the puzzle was to ensure that queued requests would be sent out as soon as a response comes back. This could be represented by the following test:
[Test] public void Should_send__5th_queued_request_after_receiving_1st_response() { // Schedule 5 requests _requests.OnNext(new Request(1)); _requests.OnNext(new Request(2)); _requests.OnNext(new Request(3)); _requests.OnNext(new Request(4)); _requests.OnNext(new Request(5)); _requests.OnNext(new Request(6)); _schedulers.ThreadPool.AdvanceBy(5); // Attempt to send all 5 requests _serviceScheduler.AdvanceBy(1); _requestsReceived.Should().HaveCount(5); // Check that 5th request received by service }
Which failed and gave the following output:
Advancing ThreadPool by 5 Received Request: 1 Received Request: 2 Received Request: 3 Received Request: 4 Advancing ServiceScheduler by 1 Sending Response: 1 Expected collection to contain 5 item(s), but found 4.
The implementation for this was easy, just subscribe to the IApiService.Response sequence, with an Action that pushes a value onto the _throttle sequence.
public RequestResponseController(...) { service.Responses .Subscribe(r => _throttle.OnNext(0)); // Rest of the Constructor }
And could be represented by the following marble diagram:
Throttle |1234--------------0--0--0--0--0--0--0- Requester |----1-2-3-4-5-6-7----------------89--- Result |----1-2-3-4-------5--6--7--------89--- Response |------------------1--2--3--4--5--6--7-
Stay tuned as I will be blogging about the Time-out -> Retry implementation for this controller soon. Where if we don't receive a response within 30 seconds, we retry, and to handle multiple responses for the same request (e.g. due to the retry) so that we do not process duplicate responses.
As always the code can be found on GitHub @ michal-ciechan/codePerf/20150308-RxRequestResponse/