From 33f298d949454b1073ce8b7bdfddcd3bfac445ca Mon Sep 17 00:00:00 2001 From: "Dana.Desrosiers" Date: Wed, 27 Mar 2019 10:01:10 -0700 Subject: [PATCH 1/3] added ThrottleFactor configuration to slow down processing when delivery failures occur updated `BrokerClient.UnsubscribeByQueueName()` to not save stats added query parameters to filter results of the GET Broker Stats API --- .../Controllers/BrokerController.cs | 15 +++++++++-- .../BrokerClient.cs | 26 ++++++++++++------- .../BrokerService.cs | 2 +- .../BrokerServiceBase.cs | 24 +++++++++-------- .../Events/DefaultBrokerEventsManager.cs | 4 ++- .../Events/IBrokerEvents.cs | 2 +- .../State/ReferenceWrapper.cs | 12 +++++++++ 7 files changed, 60 insertions(+), 25 deletions(-) diff --git a/examples/SoCreate.ServiceFabric.PubSubDemo.Api/Controllers/BrokerController.cs b/examples/SoCreate.ServiceFabric.PubSubDemo.Api/Controllers/BrokerController.cs index c5f88e2..092785d 100644 --- a/examples/SoCreate.ServiceFabric.PubSubDemo.Api/Controllers/BrokerController.cs +++ b/examples/SoCreate.ServiceFabric.PubSubDemo.Api/Controllers/BrokerController.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Linq; using System.Reflection; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; @@ -23,11 +24,14 @@ public BrokerController(IBrokerClient brokerClient) // GET api/broker/stats [HttpGet("stats")] - public async Task>>> Get() + public async Task>>> Get([FromQuery] BrokerStatsQueryParams queryParams) { try { - return await _brokerClient.GetBrokerStatsAsync(); + return (await _brokerClient.GetBrokerStatsAsync()) + .Where(item => queryParams.MessageType == null || item.Key.Contains(queryParams.MessageType)) + .ToDictionary(item => item.Key, item => item.Value.Where(i => i.Time > queryParams.FromTime && + (queryParams.ServiceName == null || i.ServiceName.Contains(queryParams.ServiceName)))); } catch (Exception ex) { @@ -111,4 +115,11 @@ public async Task Delete(string queueName) } } } + + public class BrokerStatsQueryParams + { + public DateTime FromTime { get; set; } + public string ServiceName { get; set; } + public string MessageType { get; set; } + } } diff --git a/src/SoCreate.ServiceFabric.PubSub/BrokerClient.cs b/src/SoCreate.ServiceFabric.PubSub/BrokerClient.cs index 917ecd2..4204ee5 100644 --- a/src/SoCreate.ServiceFabric.PubSub/BrokerClient.cs +++ b/src/SoCreate.ServiceFabric.PubSub/BrokerClient.cs @@ -24,7 +24,7 @@ public class BrokerClient : IBrokerClient /// /// A list of QueueStats for each queue on the Broker Service. Hold up to items at a time. /// - private Dictionary> _queueStats = new Dictionary>(); + private readonly Dictionary> _queueStats = new Dictionary>(); /// /// A dictionary of Reference Wrappers, keyed by queue name, representing all queues on the Broker Service. @@ -85,11 +85,7 @@ public Task ProcessMessageAsync(MessageWrapper messageWrapper) /// public async Task>> GetBrokerStatsAsync() { - var tasks = from brokerService in await _brokerServiceLocator.GetBrokerServicesForAllPartitionsAsync() - select brokerService.GetBrokerStatsAsync(); - - var allStats = await Task.WhenAll(tasks.ToList()); - foreach (var stat in allStats.SelectMany(stat => stat.Stats)) + foreach (var stat in await GetAllBrokerStatsAsync()) { if (!_queueStats.ContainsKey(stat.QueueName)) { @@ -102,8 +98,6 @@ public async Task>> GetBrokerStatsAsync() } } - _subscriberReferences = allStats.SelectMany(stat => stat.Queues).ToDictionary(i => i.Key, i => i.Value); - return _queueStats; } @@ -112,7 +106,7 @@ public async Task UnsubscribeByQueueNameAsync(string queueName) { if (!_subscriberReferences.ContainsKey(queueName)) { - await GetBrokerStatsAsync(); + await GetAllBrokerStatsAsync(); } if (_subscriberReferences.TryGetValue(queueName, out var referenceWrapper)) @@ -120,8 +114,22 @@ public async Task UnsubscribeByQueueNameAsync(string queueName) var messageType = queueName.Split('_')[0]; var brokerService = await _brokerServiceLocator.GetBrokerServiceForMessageAsync(messageType); await brokerService.UnsubscribeAsync(referenceWrapper, messageType); + _subscriberReferences.Remove(queueName); + _queueStats.Remove(queueName); } } + + private async Task> GetAllBrokerStatsAsync() + { + var tasks = from brokerService in await _brokerServiceLocator.GetBrokerServicesForAllPartitionsAsync() + select brokerService.GetBrokerStatsAsync(); + + var allStats = await Task.WhenAll(tasks.ToList()); + + _subscriberReferences = allStats.SelectMany(stat => stat.Queues).ToDictionary(i => i.Key, i => i.Value); + + return allStats.SelectMany(stat => stat.Stats); + } } public static class BrokerClientExtensions diff --git a/src/SoCreate.ServiceFabric.PubSub/BrokerService.cs b/src/SoCreate.ServiceFabric.PubSub/BrokerService.cs index af15ae7..8444d1f 100644 --- a/src/SoCreate.ServiceFabric.PubSub/BrokerService.cs +++ b/src/SoCreate.ServiceFabric.PubSub/BrokerService.cs @@ -76,7 +76,7 @@ await TimeoutRetryHelper.ExecuteInTransaction(StateManager, async (tx, token, st } catch (Exception ex) { - await BrokerEventsManager.OnMessageDeliveryFailedAsync(queueName, subscriber, message.Value, ex); + await BrokerEventsManager.OnMessageDeliveryFailedAsync(queueName, subscriber, message.Value, ex, ThrottleFactor); throw; } } diff --git a/src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs b/src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs index 6912b6d..990cdf8 100644 --- a/src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs +++ b/src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs @@ -55,7 +55,12 @@ public abstract class BrokerServiceBase : StatefulService, IBrokerService /// /// Gets or sets the interval to wait between batches of publishing messages. (Default: 5s) /// - protected TimeSpan Period { get; set; } = TimeSpan.FromSeconds(5); + protected TimeSpan Period { get; set; } = TimeSpan.FromSeconds(1); + + /// + /// Gets or sets the interval to wait between batches of publishing messages. (Default: 5s) + /// + protected int ThrottleFactor { get; set; } = 10; /// /// Get or Sets the maximum period to process messages before allowing enqueuing @@ -252,18 +257,15 @@ protected override async Task RunAsync(CancellationToken cancellationToken) //process messages for given time, then allow other transactions to enqueue messages var cts = new CancellationTokenSource(MaxProcessingPeriod); var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, cancellationToken); + var timeoutCancellationToken = linkedTokenSource.Token; try { - var elements = _queues.ToArray(); - var tasks = new List(elements.Length); - - foreach (var element in elements) - { - var subscriber = element.Value; - string queueName = element.Key; - tasks.Add(ProcessQueues(linkedTokenSource.Token, subscriber, queueName)); - } - await Task.WhenAll(tasks); + await Task.WhenAll( + from subscription in _queues + let queueName = subscription.Key + let subscriber = subscription.Value + where subscriber.ShouldProcessMessages() + select ProcessQueues(timeoutCancellationToken, subscriber, queueName)); } catch (TaskCanceledException) {//swallow and move on.. diff --git a/src/SoCreate.ServiceFabric.PubSub/Events/DefaultBrokerEventsManager.cs b/src/SoCreate.ServiceFabric.PubSub/Events/DefaultBrokerEventsManager.cs index 5048897..7435639 100644 --- a/src/SoCreate.ServiceFabric.PubSub/Events/DefaultBrokerEventsManager.cs +++ b/src/SoCreate.ServiceFabric.PubSub/Events/DefaultBrokerEventsManager.cs @@ -56,12 +56,14 @@ public async Task OnMessageDeliveredAsync(string queueName, ReferenceWrapper sub _stats[queueName].TotalDelivered++; } - public async Task OnMessageDeliveryFailedAsync(string queueName, ReferenceWrapper subscriber, MessageWrapper messageWrapper, Exception exception) + public async Task OnMessageDeliveryFailedAsync(string queueName, ReferenceWrapper subscriber, MessageWrapper messageWrapper, Exception exception, int throttleFactor) { if (MessageDeliveryFailed != null) { await MessageDeliveryFailed.Invoke(queueName, subscriber, messageWrapper, exception); } + + subscriber.SkipCount = throttleFactor; _stats[queueName].TotalDeliveryFailures++; } diff --git a/src/SoCreate.ServiceFabric.PubSub/Events/IBrokerEvents.cs b/src/SoCreate.ServiceFabric.PubSub/Events/IBrokerEvents.cs index e4c2a32..9420240 100644 --- a/src/SoCreate.ServiceFabric.PubSub/Events/IBrokerEvents.cs +++ b/src/SoCreate.ServiceFabric.PubSub/Events/IBrokerEvents.cs @@ -20,7 +20,7 @@ public interface IBrokerEventsManager : IBrokerEvents Task OnUnsubscribedAsync(string queueName, ReferenceWrapper subscriber, string messageTypeName); Task OnMessageReceivedAsync(string queueName, ReferenceWrapper subscriber, MessageWrapper messageWrapper); Task OnMessageDeliveredAsync(string queueName, ReferenceWrapper subscriber, MessageWrapper messageWrapper); - Task OnMessageDeliveryFailedAsync(string queueName, ReferenceWrapper subscriber, MessageWrapper messageWrapper, Exception exception); + Task OnMessageDeliveryFailedAsync(string queueName, ReferenceWrapper subscriber, MessageWrapper messageWrapper, Exception exception, int throttleFactor = 0); Task> GetStatsAsync(); } } \ No newline at end of file diff --git a/src/SoCreate.ServiceFabric.PubSub/State/ReferenceWrapper.cs b/src/SoCreate.ServiceFabric.PubSub/State/ReferenceWrapper.cs index 42a6771..2afa9dc 100644 --- a/src/SoCreate.ServiceFabric.PubSub/State/ReferenceWrapper.cs +++ b/src/SoCreate.ServiceFabric.PubSub/State/ReferenceWrapper.cs @@ -44,6 +44,8 @@ protected IHashingHelper HashingHelper [DataMember] public string RoutingKey { get; private set; } + public int SkipCount { get; set; } + /// public abstract bool Equals(ReferenceWrapper other); @@ -98,5 +100,15 @@ public bool ShouldDeliverMessage(MessageWrapper message) return string.Equals(_routingKeyValue[1], value, StringComparison.InvariantCultureIgnoreCase); } + + public bool ShouldProcessMessages() + { + if (SkipCount > 0) + { + SkipCount--; + return false; + } + return true; + } } } \ No newline at end of file From 841ec3d81c8837ecd2682425e70702315f290d8b Mon Sep 17 00:00:00 2001 From: "Dana.Desrosiers" Date: Thu, 28 Mar 2019 12:01:34 -0700 Subject: [PATCH 2/3] update changelog and version --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f766696..051449c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,9 @@ -# 9.1.0 (2019-03-26) +# 9.1.0 (2019-04-01) ### Features * **Subscribe Retry**: Added a retry strategy for when a subscriber fails to subscribe because the Broker doesn't exist yet. * **BrokerServiceUri**: We recently lost the ability to specify the BrokerUri when publishing/subscribing. Added the ability to pass the BrokerUri to the BrokerServiceLocator class. +* **Throttle on Failure**: Added a config to slow down the processing loop when errors occur. +* **Filter Broker Stats**: Added ability to filter on time, Service name, or message type using query parameters in the GET broker/stats API in the demo app. # 9.0.0 (2019-03-21) ### Features From c9f0c28a9e1b17f7a8ab9edae819e2d6ada2f805 Mon Sep 17 00:00:00 2001 From: "Dana.Desrosiers" Date: Thu, 28 Mar 2019 15:01:43 -0700 Subject: [PATCH 3/3] fix documentation --- src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs b/src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs index 990cdf8..595eba9 100644 --- a/src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs +++ b/src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs @@ -53,12 +53,12 @@ public abstract class BrokerServiceBase : StatefulService, IBrokerService protected TimeSpan DueTime { get; set; } = TimeSpan.FromSeconds(5); /// - /// Gets or sets the interval to wait between batches of publishing messages. (Default: 5s) + /// Gets or sets the interval to wait between batches of publishing messages. (Default: 1s) /// protected TimeSpan Period { get; set; } = TimeSpan.FromSeconds(1); /// - /// Gets or sets the interval to wait between batches of publishing messages. (Default: 5s) + /// Gets or sets the amount to throttle queue processing when deliveries are failing. Slow down queue processing by a factor of X. (Default 10) (Default: 10) /// protected int ThrottleFactor { get; set; } = 10;