Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #67 from SoCreate/feature/subscribe-retry
Browse files Browse the repository at this point in the history
Subscribe Retry
  • Loading branch information
lurock authored Apr 1, 2019
2 parents 7c92a92 + e484d13 commit 3543510
Show file tree
Hide file tree
Showing 13 changed files with 122 additions and 82 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 9.1.0 (2019-03-26)
### Features
* **Subscribe Retry**: Added a retry strategy for when a subscriber fails to subscribe because the Broker doesn't exist yet.
* **BrokerServiceUri**: We recently lost the ability to specify the BrokerUri when publishing/subscribing. Added the ability to pass the BrokerUri to the BrokerServiceLocator class.

# 9.0.0 (2019-03-21)
### Features
* **Broker Stats**: Added `GetBrokerStatsAsync()` and `UnsubscribeByQueueNameAsync()` to `BrokerClient` to help with monitoring and managing the Broker Service.
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion ServiceFabric.PubSub.sln
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
ProjectSection(SolutionItems) = preProject
README.md = README.md
CHANGELOG.md = CHANGELOG.md
CONTRIBUTIING.md = CONTRIBUTIING.md
CONTRIBUTING.md = CONTRIBUTING.md
LICENSE = LICENSE
EndProjectSection
EndProject
Expand Down
6 changes: 5 additions & 1 deletion src/SoCreate.ServiceFabric.PubSub/BrokerClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public class BrokerClient : IBrokerClient

public int QueueStatCapacity { get; set; } = 100;

/// <summary>
/// Create a BrokerClient
/// </summary>
/// <param name="brokerServiceLocator"></param>
public BrokerClient(IBrokerServiceLocator brokerServiceLocator = null)
{
_brokerServiceLocator = brokerServiceLocator ?? new BrokerServiceLocator();
Expand Down Expand Up @@ -122,7 +126,7 @@ public async Task UnsubscribeByQueueNameAsync(string queueName)

public static class BrokerClientExtensions
{
// subscribe/unsubscribe using Generic type (useful when subscibing manually)
// subscribe/unsubscribe using Generic type (useful when subscribing manually)

/// <summary>
/// Registers this StatelessService as a subscriber for messages of type <typeparam name="T"/> with the <see cref="BrokerService"/>.
Expand Down
4 changes: 2 additions & 2 deletions src/SoCreate.ServiceFabric.PubSub/BrokerServiceBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ protected BrokerServiceBase(StatefulServiceContext serviceContext, bool enableAu
{
if (enableAutoDiscovery)
{
new BrokerServiceLocator().RegisterAsync(Context.ServiceName)
new BrokerServiceLocator(Context.ServiceName).RegisterAsync()
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
Expand All @@ -100,7 +100,7 @@ protected BrokerServiceBase(StatefulServiceContext serviceContext,
{
if (enableAutoDiscovery)
{
new BrokerServiceLocator().RegisterAsync(Context.ServiceName)
new BrokerServiceLocator(Context.ServiceName).RegisterAsync()
.ConfigureAwait(false)
.GetAwaiter()
.GetResult();
Expand Down
81 changes: 47 additions & 34 deletions src/SoCreate.ServiceFabric.PubSub/Helpers/BrokerServiceLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,73 +6,78 @@
using Microsoft.ServiceFabric.Services.Client;
using Microsoft.ServiceFabric.Services.Remoting.Client;
using Microsoft.ServiceFabric.Services.Remoting.V2.FabricTransport.Client;
using SoCreate.ServiceFabric.PubSub.State;

namespace SoCreate.ServiceFabric.PubSub.Helpers
{
public class BrokerServiceLocator : IBrokerServiceLocator
{
private readonly IHashingHelper _hashingHelper;
private static List<ServicePartitionKey> _cachedPartitionKeys = new List<ServicePartitionKey>();
private static Uri _cachedBrokerUri;
private readonly List<ServicePartitionKey> _cachedPartitionKeys = new List<ServicePartitionKey>();
private readonly FabricClient _fabricClient;
private const string BrokerName = nameof(BrokerService);
private readonly IServiceProxyFactory _serviceProxyFactory;
private Uri _brokerServiceUri;

/// <summary>
/// Creates a new default instance.
/// </summary>
public BrokerServiceLocator(IHashingHelper hashingHelper = null)
public BrokerServiceLocator(Uri brokerServiceUri = null, IHashingHelper hashingHelper = null)
{
_hashingHelper = hashingHelper ?? new HashingHelper();
_fabricClient = new FabricClient();
_serviceProxyFactory = new ServiceProxyFactory(c => new FabricTransportServiceRemotingClientFactory());
_brokerServiceUri = brokerServiceUri;
}


/// <inheritdoc />
public async Task RegisterAsync(Uri brokerServiceName)
public async Task RegisterAsync()
{
var activationContext = FabricRuntime.GetActivationContext();
await _fabricClient.PropertyManager.PutPropertyAsync(new Uri(activationContext.ApplicationName), BrokerName, brokerServiceName.ToString());
await _fabricClient.PropertyManager.PutPropertyAsync(new Uri(activationContext.ApplicationName), BrokerName, (await LocateAsync()).ToString());
}

/// <inheritdoc />
public async Task<IBrokerService> GetBrokerServiceForMessageAsync(object message, Uri brokerServiceName = null)
public async Task<IBrokerService> GetBrokerServiceForMessageAsync(object message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
var resolvedPartition = await GetPartitionForMessageAsync(message, brokerServiceName);
var resolvedPartition = await GetPartitionForMessageAsync(message);
return _serviceProxyFactory.CreateServiceProxy<IBrokerService>(
brokerServiceName ?? await LocateAsync(), resolvedPartition, listenerName: BrokerServiceBase.ListenerName);
await LocateAsync(), resolvedPartition, listenerName: BrokerServiceBase.ListenerName);
}

/// <inheritdoc />
public async Task<IBrokerService> GetBrokerServiceForMessageAsync(string messageTypeName, Uri brokerServiceName = null)
public async Task<IBrokerService> GetBrokerServiceForMessageAsync(string messageTypeName)
{
var resolvedPartition = await GetPartitionForMessageAsync(messageTypeName, brokerServiceName);
var resolvedPartition = await GetPartitionForMessageAsync(messageTypeName);
return _serviceProxyFactory.CreateServiceProxy<IBrokerService>(
brokerServiceName ?? await LocateAsync(), resolvedPartition, listenerName: BrokerServiceBase.ListenerName);
await LocateAsync(), resolvedPartition, listenerName: BrokerServiceBase.ListenerName);
}

/// <inheritdoc />
public async Task<IEnumerable<IBrokerService>> GetBrokerServicesForAllPartitionsAsync(Uri brokerServiceName = null)
public async Task<IEnumerable<IBrokerService>> GetBrokerServicesForAllPartitionsAsync()
{
var serviceProxies = new List<IBrokerService>();
foreach (var partition in await GetBrokerPartitionKeys(brokerServiceName))
var brokerServiceUri = await LocateAsync();
foreach (var partition in await GetBrokerPartitionKeys())
{
serviceProxies.Add(_serviceProxyFactory.CreateServiceProxy<IBrokerService>(
brokerServiceName ?? await LocateAsync(), partition, listenerName: BrokerServiceBase.ListenerName));
brokerServiceUri, partition, listenerName: BrokerServiceBase.ListenerName));
}

return serviceProxies;
}

/// <inheritdoc />
public async Task<Uri> LocateAsync()
/// <summary>
/// Locates the registered broker service.
/// </summary>
/// <returns></returns>
/// <exception cref="BrokerNotFoundException"></exception>
private async Task<Uri> LocateAsync()
{
if (_cachedBrokerUri != null)
if (_brokerServiceUri != null)
{
return _cachedBrokerUri;
return _brokerServiceUri;
}

try
Expand Down Expand Up @@ -101,23 +106,24 @@ public async Task<Uri> LocateAsync()
var found = await LocateAsync(app.ApplicationName);
if (found != null)
{
_cachedBrokerUri = found;
return _cachedBrokerUri;
_brokerServiceUri = found;
return _brokerServiceUri;
}
}
}
}
else
{
_cachedBrokerUri = new Uri(property.GetValue<string>());
return _cachedBrokerUri;
_brokerServiceUri = new Uri(property.GetValue<string>());
return _brokerServiceUri;
}
}
catch
{
;
// ignored
}
throw new InvalidOperationException("No brokerService was discovered in the cluster.");

throw new BrokerNotFoundException("No brokerService was discovered in the cluster.");
}

private async Task<Uri> LocateAsync(Uri applicationName)
Expand All @@ -140,20 +146,20 @@ private async Task<NamedProperty> GetBrokerPropertyOrNull(Uri applicationName)
}
catch
{
;
// ignored
}

return null;
}

/// <summary>
/// Resolves the <see cref="ServicePartitionKey"/> to send the message to, based on message type name.
/// </summary>
/// <param name="messageTypeName">Full type name of message object.</param>
/// <param name="brokerServiceName"></param>
/// <returns></returns>
private async Task<ServicePartitionKey> GetPartitionForMessageAsync(string messageTypeName, Uri brokerServiceName)
private async Task<ServicePartitionKey> GetPartitionForMessageAsync(string messageTypeName)
{
var partitionKeys = await GetBrokerPartitionKeys(brokerServiceName);
var partitionKeys = await GetBrokerPartitionKeys();

int hashCode;
unchecked
Expand All @@ -168,20 +174,19 @@ private async Task<ServicePartitionKey> GetPartitionForMessageAsync(string messa
/// Resolves the <see cref="ServicePartitionKey"/> to send the message to, based on message's type.
/// </summary>
/// <param name="message">The message to publish</param>
/// <param name="brokerServiceName"></param>
/// <returns></returns>
private Task<ServicePartitionKey> GetPartitionForMessageAsync(object message, Uri brokerServiceName)
private Task<ServicePartitionKey> GetPartitionForMessageAsync(object message)
{
if (message == null) throw new ArgumentNullException(nameof(message));
string messageTypeName = message.GetType().FullName;
return GetPartitionForMessageAsync(messageTypeName, brokerServiceName);
return GetPartitionForMessageAsync(messageTypeName);
}

private async Task<List<ServicePartitionKey>> GetBrokerPartitionKeys(Uri brokerServiceName = null)
private async Task<List<ServicePartitionKey>> GetBrokerPartitionKeys()
{
if (_cachedPartitionKeys.Count == 0)
{
foreach (var partition in await _fabricClient.QueryManager.GetPartitionListAsync(brokerServiceName ?? await LocateAsync()))
foreach (var partition in await _fabricClient.QueryManager.GetPartitionListAsync(await LocateAsync()))
{
if (partition.PartitionInformation.Kind != ServicePartitionKind.Int64Range)
{
Expand All @@ -196,4 +201,12 @@ private async Task<List<ServicePartitionKey>> GetBrokerPartitionKeys(Uri brokerS
return _cachedPartitionKeys;
}
}

public class BrokerNotFoundException : Exception
{
public BrokerNotFoundException(string message)
: base(message)
{
}
}
}
18 changes: 4 additions & 14 deletions src/SoCreate.ServiceFabric.PubSub/Helpers/IBrokerServiceLocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,30 @@ namespace SoCreate.ServiceFabric.PubSub.Helpers
{
public interface IBrokerServiceLocator
{
/// <summary>
/// Locates the registered broker service.
/// </summary>
/// <returns></returns>
Task<Uri> LocateAsync();

/// <summary>
/// Registers the default <see cref="BrokerServiceBase"/> by name.
/// </summary>
/// <param name="brokerServiceName"></param>
/// <returns></returns>
Task RegisterAsync(Uri brokerServiceName);
Task RegisterAsync();

/// <summary>
/// Gets the <see cref="IBrokerService"/> instance for the provided <paramref name="message"/>
/// </summary>
/// <param name="message"></param>
/// <param name="brokerServiceName">Uri of BrokerService instance</param>
/// <returns></returns>
Task<IBrokerService> GetBrokerServiceForMessageAsync(object message, Uri brokerServiceName = null);
Task<IBrokerService> GetBrokerServiceForMessageAsync(object message);

/// <summary>
/// Gets the <see cref="IBrokerService"/> instance for the provided <paramref name="messageTypeName"/>
/// </summary>
/// <param name="messageTypeName">Full type name of message object.</param>
/// <param name="brokerServiceName">Uri of BrokerService instance</param>
/// <returns></returns>
Task<IBrokerService> GetBrokerServiceForMessageAsync(string messageTypeName, Uri brokerServiceName = null);
Task<IBrokerService> GetBrokerServiceForMessageAsync(string messageTypeName);

/// <summary>
/// Gets a collection of <see cref="IBrokerService"/> ServiceProxy instances, one for each partition.
/// </summary>
/// <param name="brokerServiceName"></param>
/// <returns></returns>
Task<IEnumerable<IBrokerService>> GetBrokerServicesForAllPartitionsAsync(Uri brokerServiceName = null);
Task<IEnumerable<IBrokerService>> GetBrokerServicesForAllPartitionsAsync();
}
}
24 changes: 12 additions & 12 deletions src/SoCreate.ServiceFabric.PubSub/Helpers/TimeoutRetryHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ internal static class TimeoutRetryHelper
/// <param name="operation">Operation to execute with retry.</param>
/// <param name="state">State passed to callback. (optional)</param>
/// <param name="cancellationToken">Cancellation support. (optional)</param>
/// <param name="maxAttempts">#Attempts to execute <paramref name="operation"/> (optional)</param>
/// <param name="maxAttempts">#Attempts to execute <paramref name="operation"/> (optional)</param>
/// <param name="initialDelay">First delay between attempts. Later on this will be exponentially grow. (optional)</param>
/// <returns></returns>
public static async Task<TResult> ExecuteInTransaction<TResult>(IReliableStateManager stateManager,
Func<ITransaction, CancellationToken, object, Task<TResult>> operation,
object state = null,
CancellationToken cancellationToken = default(CancellationToken),
int maxAttempts = DefaultMaxAttempts,
public static async Task<TResult> ExecuteInTransaction<TResult>(IReliableStateManager stateManager,
Func<ITransaction, CancellationToken, object, Task<TResult>> operation,
object state = null,
CancellationToken cancellationToken = default(CancellationToken),
int maxAttempts = DefaultMaxAttempts,
TimeSpan? initialDelay = null)
{
if (stateManager == null) throw new ArgumentNullException(nameof(stateManager));
Expand Down Expand Up @@ -68,10 +68,10 @@ public static async Task<TResult> ExecuteInTransaction<TResult>(IReliableStateMa
/// <param name="operation">Operation to execute with retry.</param>
/// <param name="state">State passed to callback. (optional)</param>
/// <param name="cancellationToken">Cancellation support. (optional)</param>
/// <param name="maxAttempts">#Attempts to execute <paramref name="operation"/> (optional)</param>
/// <param name="maxAttempts">#Attempts to execute <paramref name="operation"/> (optional)</param>
/// <param name="initialDelay">First delay between attempts. Later on this will be exponentially grow. (optional)</param>
/// <returns></returns>
public static async Task ExecuteInTransaction(IReliableStateManager stateManager, Func<ITransaction, CancellationToken,
public static async Task ExecuteInTransaction(IReliableStateManager stateManager, Func<ITransaction, CancellationToken,
object, Task> operation,
object state = null,
CancellationToken cancellationToken = default(CancellationToken),
Expand Down Expand Up @@ -112,11 +112,11 @@ public static async Task ExecuteInTransaction(IReliableStateManager stateManager
/// <param name="operation">Operation to execute with retry.</param>
/// <param name="state">State passed to callback. (optional)</param>
/// <param name="cancellationToken">Cancellation support. (optional)</param>
/// <param name="maxAttempts">#Attempts to execute <paramref name="operation"/> (optional)</param>
/// <param name="maxAttempts">#Attempts to execute <paramref name="operation"/> (optional)</param>
/// <param name="initialDelay">First delay between attempts. Later on this will be exponentially grow. (optional)</param>
/// <returns></returns>
public static async Task<TResult> Execute<TResult>(Func<CancellationToken, object, Task<TResult>> operation,
object state = null,
public static async Task<TResult> Execute<TResult>(Func<CancellationToken, object, Task<TResult>> operation,
object state = null,
CancellationToken cancellationToken = default(CancellationToken),
int maxAttempts = DefaultMaxAttempts,
TimeSpan? initialDelay = null)
Expand All @@ -136,7 +136,7 @@ public static async Task<TResult> Execute<TResult>(Func<CancellationToken, objec
}
catch (TimeoutException)
{
if (attempts == DefaultMaxAttempts)
if (attempts == maxAttempts)
{
throw;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<Description>SoCreate.ServiceFabric.PubSub adds pub/sub behaviour to your Reliable Actors and Services in Service Fabric. Documentation: http://service-fabric-pub-sub.socreate.it</Description>
<Copyright>© SoCreate. All rights reserved.</Copyright>
<AssemblyTitle>SoCreate.ServiceFabric.PubSub</AssemblyTitle>
<Version>9.0.0</Version>
<Version>9.1.0</Version>
<Company>SoCreate</Company>
<Authors>Loek Duys, SoCreate</Authors>
<TargetFramework>netstandard2.0</TargetFramework>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,25 @@ protected SubscriberStatefulServiceBase(StatefulServiceContext serviceContext,
}

/// <inheritdoc/>
protected override Task OnOpenAsync(ReplicaOpenMode openMode, CancellationToken cancellationToken)
protected override async Task OnOpenAsync(ReplicaOpenMode openMode, CancellationToken cancellationToken)
{
return Subscribe();
for (var attempt = 0; ; attempt++)
{
try
{
await Subscribe();
break;
}
catch (BrokerNotFoundException)
{
if (attempt > 10)
{
throw;
}

await Task.Delay(TimeSpan.FromSeconds(10), cancellationToken);
}
}
}

/// <summary>
Expand Down Expand Up @@ -84,6 +100,7 @@ protected virtual async Task Subscribe()
catch (Exception ex)
{
LogMessage($"Failed to register Service:'{Context.ServiceName}' as Subscriber of {handler.Key}. Error:'{ex.Message}'.");
throw;
}
}
}
Expand Down
Loading

0 comments on commit 3543510

Please sign in to comment.