diff --git a/CHANGELOG.md b/CHANGELOG.md
index 435d91f..f766696 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,3 +1,8 @@
+# 9.1.0 (2019-03-26)
+### Features
+* **Subscribe Retry**: Added a retry strategy for when a subscriber fails to subscribe because the Broker doesn't exist yet.
+* **BrokerServiceUri**: We recently lost the ability to specify the BrokerUri when publishing/subscribing. Added the ability to pass the BrokerUri to the BrokerServiceLocator class.
+
# 9.0.0 (2019-03-21)
### Features
* **Broker Stats**: Added `GetBrokerStatsAsync()` and `UnsubscribeByQueueNameAsync()` to `BrokerClient` to help with monitoring and managing the Broker Service.
diff --git a/CONTRIBUTIING.md b/CONTRIBUTING.md
similarity index 100%
rename from CONTRIBUTIING.md
rename to CONTRIBUTING.md
diff --git a/ServiceFabric.PubSub.sln b/ServiceFabric.PubSub.sln
index 20304be..8d243cf 100644
--- a/ServiceFabric.PubSub.sln
+++ b/ServiceFabric.PubSub.sln
@@ -7,7 +7,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
ProjectSection(SolutionItems) = preProject
README.md = README.md
CHANGELOG.md = CHANGELOG.md
- CONTRIBUTIING.md = CONTRIBUTIING.md
+ CONTRIBUTING.md = CONTRIBUTING.md
LICENSE = LICENSE
EndProjectSection
EndProject
diff --git a/src/SoCreate.ServiceFabric.PubSub/BrokerClient.cs b/src/SoCreate.ServiceFabric.PubSub/BrokerClient.cs
index fecaa33..917ecd2 100644
--- a/src/SoCreate.ServiceFabric.PubSub/BrokerClient.cs
+++ b/src/SoCreate.ServiceFabric.PubSub/BrokerClient.cs
@@ -33,6 +33,10 @@ public class BrokerClient : IBrokerClient
public int QueueStatCapacity { get; set; } = 100;
+ ///
+ /// Create a BrokerClient
+ ///
+ ///
public BrokerClient(IBrokerServiceLocator brokerServiceLocator = null)
{
_brokerServiceLocator = brokerServiceLocator ?? new BrokerServiceLocator();
@@ -122,7 +126,7 @@ public async Task UnsubscribeByQueueNameAsync(string queueName)
public static class BrokerClientExtensions
{
- // subscribe/unsubscribe using Generic type (useful when subscibing manually)
+ // subscribe/unsubscribe using Generic type (useful when subscribing manually)
///
/// Registers this StatelessService as a subscriber for messages of type with the .
diff --git a/src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs b/src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs
index 22e94c6..6912b6d 100644
--- a/src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs
+++ b/src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs
@@ -78,7 +78,7 @@ protected BrokerServiceBase(StatefulServiceContext serviceContext, bool enableAu
{
if (enableAutoDiscovery)
{
- new BrokerServiceLocator().RegisterAsync(Context.ServiceName)
+ new BrokerServiceLocator(Context.ServiceName).RegisterAsync()
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
@@ -100,7 +100,7 @@ protected BrokerServiceBase(StatefulServiceContext serviceContext,
{
if (enableAutoDiscovery)
{
- new BrokerServiceLocator().RegisterAsync(Context.ServiceName)
+ new BrokerServiceLocator(Context.ServiceName).RegisterAsync()
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
diff --git a/src/SoCreate.ServiceFabric.PubSub/Helpers/BrokerServiceLocator.cs b/src/SoCreate.ServiceFabric.PubSub/Helpers/BrokerServiceLocator.cs
index 920288a..2b97535 100644
--- a/src/SoCreate.ServiceFabric.PubSub/Helpers/BrokerServiceLocator.cs
+++ b/src/SoCreate.ServiceFabric.PubSub/Helpers/BrokerServiceLocator.cs
@@ -6,73 +6,78 @@
using Microsoft.ServiceFabric.Services.Client;
using Microsoft.ServiceFabric.Services.Remoting.Client;
using Microsoft.ServiceFabric.Services.Remoting.V2.FabricTransport.Client;
-using SoCreate.ServiceFabric.PubSub.State;
namespace SoCreate.ServiceFabric.PubSub.Helpers
{
public class BrokerServiceLocator : IBrokerServiceLocator
{
private readonly IHashingHelper _hashingHelper;
- private static List _cachedPartitionKeys = new List();
- private static Uri _cachedBrokerUri;
+ private readonly List _cachedPartitionKeys = new List();
private readonly FabricClient _fabricClient;
private const string BrokerName = nameof(BrokerService);
private readonly IServiceProxyFactory _serviceProxyFactory;
+ private Uri _brokerServiceUri;
///
/// Creates a new default instance.
///
- public BrokerServiceLocator(IHashingHelper hashingHelper = null)
+ public BrokerServiceLocator(Uri brokerServiceUri = null, IHashingHelper hashingHelper = null)
{
_hashingHelper = hashingHelper ?? new HashingHelper();
_fabricClient = new FabricClient();
_serviceProxyFactory = new ServiceProxyFactory(c => new FabricTransportServiceRemotingClientFactory());
+ _brokerServiceUri = brokerServiceUri;
}
///
- public async Task RegisterAsync(Uri brokerServiceName)
+ public async Task RegisterAsync()
{
var activationContext = FabricRuntime.GetActivationContext();
- await _fabricClient.PropertyManager.PutPropertyAsync(new Uri(activationContext.ApplicationName), BrokerName, brokerServiceName.ToString());
+ await _fabricClient.PropertyManager.PutPropertyAsync(new Uri(activationContext.ApplicationName), BrokerName, (await LocateAsync()).ToString());
}
///
- public async Task GetBrokerServiceForMessageAsync(object message, Uri brokerServiceName = null)
+ public async Task GetBrokerServiceForMessageAsync(object message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
- var resolvedPartition = await GetPartitionForMessageAsync(message, brokerServiceName);
+ var resolvedPartition = await GetPartitionForMessageAsync(message);
return _serviceProxyFactory.CreateServiceProxy(
- brokerServiceName ?? await LocateAsync(), resolvedPartition, listenerName: BrokerServiceBase.ListenerName);
+ await LocateAsync(), resolvedPartition, listenerName: BrokerServiceBase.ListenerName);
}
///
- public async Task GetBrokerServiceForMessageAsync(string messageTypeName, Uri brokerServiceName = null)
+ public async Task GetBrokerServiceForMessageAsync(string messageTypeName)
{
- var resolvedPartition = await GetPartitionForMessageAsync(messageTypeName, brokerServiceName);
+ var resolvedPartition = await GetPartitionForMessageAsync(messageTypeName);
return _serviceProxyFactory.CreateServiceProxy(
- brokerServiceName ?? await LocateAsync(), resolvedPartition, listenerName: BrokerServiceBase.ListenerName);
+ await LocateAsync(), resolvedPartition, listenerName: BrokerServiceBase.ListenerName);
}
///
- public async Task> GetBrokerServicesForAllPartitionsAsync(Uri brokerServiceName = null)
+ public async Task> GetBrokerServicesForAllPartitionsAsync()
{
var serviceProxies = new List();
- foreach (var partition in await GetBrokerPartitionKeys(brokerServiceName))
+ var brokerServiceUri = await LocateAsync();
+ foreach (var partition in await GetBrokerPartitionKeys())
{
serviceProxies.Add(_serviceProxyFactory.CreateServiceProxy(
- brokerServiceName ?? await LocateAsync(), partition, listenerName: BrokerServiceBase.ListenerName));
+ brokerServiceUri, partition, listenerName: BrokerServiceBase.ListenerName));
}
return serviceProxies;
}
- ///
- public async Task LocateAsync()
+ ///
+ /// Locates the registered broker service.
+ ///
+ ///
+ ///
+ private async Task LocateAsync()
{
- if (_cachedBrokerUri != null)
+ if (_brokerServiceUri != null)
{
- return _cachedBrokerUri;
+ return _brokerServiceUri;
}
try
@@ -101,23 +106,24 @@ public async Task LocateAsync()
var found = await LocateAsync(app.ApplicationName);
if (found != null)
{
- _cachedBrokerUri = found;
- return _cachedBrokerUri;
+ _brokerServiceUri = found;
+ return _brokerServiceUri;
}
}
}
}
else
{
- _cachedBrokerUri = new Uri(property.GetValue());
- return _cachedBrokerUri;
+ _brokerServiceUri = new Uri(property.GetValue());
+ return _brokerServiceUri;
}
}
catch
{
- ;
+ // ignored
}
- throw new InvalidOperationException("No brokerService was discovered in the cluster.");
+
+ throw new BrokerNotFoundException("No brokerService was discovered in the cluster.");
}
private async Task LocateAsync(Uri applicationName)
@@ -140,8 +146,9 @@ private async Task GetBrokerPropertyOrNull(Uri applicationName)
}
catch
{
- ;
+ // ignored
}
+
return null;
}
@@ -149,11 +156,10 @@ private async Task GetBrokerPropertyOrNull(Uri applicationName)
/// Resolves the to send the message to, based on message type name.
///
/// Full type name of message object.
- ///
///
- private async Task GetPartitionForMessageAsync(string messageTypeName, Uri brokerServiceName)
+ private async Task GetPartitionForMessageAsync(string messageTypeName)
{
- var partitionKeys = await GetBrokerPartitionKeys(brokerServiceName);
+ var partitionKeys = await GetBrokerPartitionKeys();
int hashCode;
unchecked
@@ -168,20 +174,19 @@ private async Task GetPartitionForMessageAsync(string messa
/// Resolves the to send the message to, based on message's type.
///
/// The message to publish
- ///
///
- private Task GetPartitionForMessageAsync(object message, Uri brokerServiceName)
+ private Task GetPartitionForMessageAsync(object message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
string messageTypeName = message.GetType().FullName;
- return GetPartitionForMessageAsync(messageTypeName, brokerServiceName);
+ return GetPartitionForMessageAsync(messageTypeName);
}
- private async Task> GetBrokerPartitionKeys(Uri brokerServiceName = null)
+ private async Task> GetBrokerPartitionKeys()
{
if (_cachedPartitionKeys.Count == 0)
{
- foreach (var partition in await _fabricClient.QueryManager.GetPartitionListAsync(brokerServiceName ?? await LocateAsync()))
+ foreach (var partition in await _fabricClient.QueryManager.GetPartitionListAsync(await LocateAsync()))
{
if (partition.PartitionInformation.Kind != ServicePartitionKind.Int64Range)
{
@@ -196,4 +201,12 @@ private async Task> GetBrokerPartitionKeys(Uri brokerS
return _cachedPartitionKeys;
}
}
+
+ public class BrokerNotFoundException : Exception
+ {
+ public BrokerNotFoundException(string message)
+ : base(message)
+ {
+ }
+ }
}
diff --git a/src/SoCreate.ServiceFabric.PubSub/Helpers/IBrokerServiceLocator.cs b/src/SoCreate.ServiceFabric.PubSub/Helpers/IBrokerServiceLocator.cs
index b142333..af0251f 100644
--- a/src/SoCreate.ServiceFabric.PubSub/Helpers/IBrokerServiceLocator.cs
+++ b/src/SoCreate.ServiceFabric.PubSub/Helpers/IBrokerServiceLocator.cs
@@ -6,40 +6,30 @@ namespace SoCreate.ServiceFabric.PubSub.Helpers
{
public interface IBrokerServiceLocator
{
- ///
- /// Locates the registered broker service.
- ///
- ///
- Task LocateAsync();
-
///
/// Registers the default by name.
///
- ///
///
- Task RegisterAsync(Uri brokerServiceName);
+ Task RegisterAsync();
///
/// Gets the instance for the provided
///
///
- /// Uri of BrokerService instance
///
- Task GetBrokerServiceForMessageAsync(object message, Uri brokerServiceName = null);
+ Task GetBrokerServiceForMessageAsync(object message);
///
/// Gets the instance for the provided
///
/// Full type name of message object.
- /// Uri of BrokerService instance
///
- Task GetBrokerServiceForMessageAsync(string messageTypeName, Uri brokerServiceName = null);
+ Task GetBrokerServiceForMessageAsync(string messageTypeName);
///
/// Gets a collection of ServiceProxy instances, one for each partition.
///
- ///
///
- Task> GetBrokerServicesForAllPartitionsAsync(Uri brokerServiceName = null);
+ Task> GetBrokerServicesForAllPartitionsAsync();
}
}
\ No newline at end of file
diff --git a/src/SoCreate.ServiceFabric.PubSub/Helpers/TimeoutRetryHelper.cs b/src/SoCreate.ServiceFabric.PubSub/Helpers/TimeoutRetryHelper.cs
index 992da7a..c9fd60b 100644
--- a/src/SoCreate.ServiceFabric.PubSub/Helpers/TimeoutRetryHelper.cs
+++ b/src/SoCreate.ServiceFabric.PubSub/Helpers/TimeoutRetryHelper.cs
@@ -22,14 +22,14 @@ internal static class TimeoutRetryHelper
/// Operation to execute with retry.
/// State passed to callback. (optional)
/// Cancellation support. (optional)
- /// #Attempts to execute (optional)
+ /// #Attempts to execute (optional)
/// First delay between attempts. Later on this will be exponentially grow. (optional)
///
- public static async Task ExecuteInTransaction(IReliableStateManager stateManager,
- Func> operation,
- object state = null,
- CancellationToken cancellationToken = default(CancellationToken),
- int maxAttempts = DefaultMaxAttempts,
+ public static async Task ExecuteInTransaction(IReliableStateManager stateManager,
+ Func> operation,
+ object state = null,
+ CancellationToken cancellationToken = default(CancellationToken),
+ int maxAttempts = DefaultMaxAttempts,
TimeSpan? initialDelay = null)
{
if (stateManager == null) throw new ArgumentNullException(nameof(stateManager));
@@ -68,10 +68,10 @@ public static async Task ExecuteInTransaction(IReliableStateMa
/// Operation to execute with retry.
/// State passed to callback. (optional)
/// Cancellation support. (optional)
- /// #Attempts to execute (optional)
+ /// #Attempts to execute (optional)
/// First delay between attempts. Later on this will be exponentially grow. (optional)
///
- public static async Task ExecuteInTransaction(IReliableStateManager stateManager, Func operation,
object state = null,
CancellationToken cancellationToken = default(CancellationToken),
@@ -112,11 +112,11 @@ public static async Task ExecuteInTransaction(IReliableStateManager stateManager
/// Operation to execute with retry.
/// State passed to callback. (optional)
/// Cancellation support. (optional)
- /// #Attempts to execute (optional)
+ /// #Attempts to execute (optional)
/// First delay between attempts. Later on this will be exponentially grow. (optional)
///
- public static async Task Execute(Func> operation,
- object state = null,
+ public static async Task Execute(Func> operation,
+ object state = null,
CancellationToken cancellationToken = default(CancellationToken),
int maxAttempts = DefaultMaxAttempts,
TimeSpan? initialDelay = null)
@@ -136,7 +136,7 @@ public static async Task Execute(FuncSoCreate.ServiceFabric.PubSub adds pub/sub behaviour to your Reliable Actors and Services in Service Fabric. Documentation: http://service-fabric-pub-sub.socreate.it
© SoCreate. All rights reserved.
SoCreate.ServiceFabric.PubSub
- 9.0.0
+ 9.1.0
SoCreate
Loek Duys, SoCreate
netstandard2.0
diff --git a/src/SoCreate.ServiceFabric.PubSub/Subscriber/SubscriberStatefulServiceBase.cs b/src/SoCreate.ServiceFabric.PubSub/Subscriber/SubscriberStatefulServiceBase.cs
index 3e2da4b..f402068 100644
--- a/src/SoCreate.ServiceFabric.PubSub/Subscriber/SubscriberStatefulServiceBase.cs
+++ b/src/SoCreate.ServiceFabric.PubSub/Subscriber/SubscriberStatefulServiceBase.cs
@@ -52,9 +52,25 @@ protected SubscriberStatefulServiceBase(StatefulServiceContext serviceContext,
}
///
- protected override Task OnOpenAsync(ReplicaOpenMode openMode, CancellationToken cancellationToken)
+ protected override async Task OnOpenAsync(ReplicaOpenMode openMode, CancellationToken cancellationToken)
{
- return Subscribe();
+ for (var attempt = 0; ; attempt++)
+ {
+ try
+ {
+ await Subscribe();
+ break;
+ }
+ catch (BrokerNotFoundException)
+ {
+ if (attempt > 10)
+ {
+ throw;
+ }
+
+ await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
+ }
+ }
}
///
@@ -84,6 +100,7 @@ protected virtual async Task Subscribe()
catch (Exception ex)
{
LogMessage($"Failed to register Service:'{Context.ServiceName}' as Subscriber of {handler.Key}. Error:'{ex.Message}'.");
+ throw;
}
}
}
diff --git a/src/SoCreate.ServiceFabric.PubSub/Subscriber/SubscriberStatelessServiceBase.cs b/src/SoCreate.ServiceFabric.PubSub/Subscriber/SubscriberStatelessServiceBase.cs
index e915e28..afbea2e 100644
--- a/src/SoCreate.ServiceFabric.PubSub/Subscriber/SubscriberStatelessServiceBase.cs
+++ b/src/SoCreate.ServiceFabric.PubSub/Subscriber/SubscriberStatelessServiceBase.cs
@@ -37,9 +37,25 @@ protected SubscriberStatelessServiceBase(StatelessServiceContext serviceContext,
}
///
- protected override Task OnOpenAsync(CancellationToken cancellationToken)
+ protected override async Task OnOpenAsync(CancellationToken cancellationToken)
{
- return Subscribe();
+ for (var attempt = 0; ; attempt++)
+ {
+ try
+ {
+ await Subscribe();
+ break;
+ }
+ catch (BrokerNotFoundException)
+ {
+ if (attempt > 10)
+ {
+ throw;
+ }
+
+ await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
+ }
+ }
}
///
@@ -69,6 +85,7 @@ protected virtual async Task Subscribe()
catch (Exception ex)
{
LogMessage($"Failed to register Service:'{Context.ServiceName}' as Subscriber of {handler.Key}. Error:'{ex.Message}'.");
+ throw;
}
}
}
diff --git a/tests/SoCreate.ServiceFabric.PubSub.Tests/GivenBrokerClientTests.cs b/tests/SoCreate.ServiceFabric.PubSub.Tests/GivenBrokerClientTests.cs
index 0bb4fc1..baa7c91 100644
--- a/tests/SoCreate.ServiceFabric.PubSub.Tests/GivenBrokerClientTests.cs
+++ b/tests/SoCreate.ServiceFabric.PubSub.Tests/GivenBrokerClientTests.cs
@@ -83,12 +83,12 @@ public MockBrokerServiceLocatorMultiPartition(List brokers = nul
new MockBrokerServicePartitionTwo()
};
}
- public override Task> GetBrokerServicesForAllPartitionsAsync(Uri brokerServiceName = null)
+ public override Task> GetBrokerServicesForAllPartitionsAsync()
{
return Task.FromResult>(_brokers);
}
- public override Task GetBrokerServiceForMessageAsync(string messageTypeName, Uri brokerServiceName = null)
+ public override Task GetBrokerServiceForMessageAsync(string messageTypeName)
{
return Task.FromResult(_brokers.First());
}
diff --git a/tests/SoCreate.ServiceFabric.PubSub.Tests/MockBrokerServiceLocator.cs b/tests/SoCreate.ServiceFabric.PubSub.Tests/MockBrokerServiceLocator.cs
index f3251ef..82ff168 100644
--- a/tests/SoCreate.ServiceFabric.PubSub.Tests/MockBrokerServiceLocator.cs
+++ b/tests/SoCreate.ServiceFabric.PubSub.Tests/MockBrokerServiceLocator.cs
@@ -1,5 +1,4 @@
-using System;
-using System.Collections.Generic;
+using System.Collections.Generic;
using System.Threading.Tasks;
using SoCreate.ServiceFabric.PubSub.Helpers;
using SoCreate.ServiceFabric.PubSub.State;
@@ -8,27 +7,22 @@ namespace SoCreate.ServiceFabric.PubSub.Tests
{
public class MockBrokerServiceLocator : IBrokerServiceLocator
{
- public Task LocateAsync()
- {
- return Task.FromResult(new Uri("mockUri"));
- }
-
- public Task RegisterAsync(Uri brokerServiceName)
+ public Task RegisterAsync()
{
return Task.CompletedTask;
}
- public Task GetBrokerServiceForMessageAsync(object message, Uri brokerServiceName = null)
+ public Task GetBrokerServiceForMessageAsync(object message)
{
return Task.FromResult(new MockBrokerService());
}
- public virtual Task GetBrokerServiceForMessageAsync(string messageTypeName, Uri brokerServiceName = null)
+ public virtual Task GetBrokerServiceForMessageAsync(string messageTypeName)
{
return Task.FromResult(new MockBrokerService());
}
- public virtual Task> GetBrokerServicesForAllPartitionsAsync(Uri brokerServiceName = null)
+ public virtual Task> GetBrokerServicesForAllPartitionsAsync()
{
return Task.FromResult>(new List
{