Skip to content

Commit

Permalink
v1.3.1 -fix: ReleaseSocket logic
Browse files Browse the repository at this point in the history
  • Loading branch information
2881099 committed Sep 4, 2024
1 parent f8ba148 commit 63a7836
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/FreeRedis/FreeRedis.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<AssemblyName>FreeRedis</AssemblyName>
<PackageId>FreeRedis</PackageId>
<RootNamespace>FreeRedis</RootNamespace>
<Version>1.3.0</Version>
<Version>1.3.1</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageProjectUrl>https://github.com/2881099/FreeRedis</PackageProjectUrl>
<Description>FreeRedis is .NET redis client, supports cluster, sentinel, master-slave, pipeline, transaction and connection pool.</Description>
Expand Down
80 changes: 68 additions & 12 deletions src/FreeRedis/Internal/DefaultRedisSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class DefaultRedisSocket : IRedisSocket, IRedisSocketModify
{
public static TempProxyRedisSocket CreateTempProxy(IRedisSocket rds, Action dispose)
{
if (rds is TempProxyRedisSocket proxy)
if (rds is TempProxyRedisSocket proxy)
return new TempProxyRedisSocket(proxy._owner, dispose);
return new TempProxyRedisSocket(rds, dispose);
}
Expand Down Expand Up @@ -170,8 +170,19 @@ public void Write(CommandPacket cmd)
{
new RespHelper.Resp3Writer(ms, Encoding, Protocol).WriteCommand(cmd);
ms.Position = 0;
ms.CopyTo(Stream);
ms.Close();
try
{
ms.CopyTo(Stream);
}
catch(Exception ex)
{
ReleaseSocket();
throw new ProtocolViolationException($"Socket.Write error: {ex.Message}");
}
finally
{
ms.Close();
}
}
WriteAfter(cmd);
}
Expand All @@ -181,7 +192,16 @@ public RedisResult Read(CommandPacket cmd)
if (ClientReply == ClientReplyType.on)
{
if (IsConnected == false) Connect();
var rt = Reader.ReadObject(cmd?._flagReadbytes == true ? null : Encoding);
RedisResult rt;
try
{
rt = Reader.ReadObject(cmd?._flagReadbytes == true ? null : Encoding);
}
catch (Exception ex)
{
ReleaseSocket();
throw new ProtocolViolationException($"Socket.Read error: {ex.Message}");
}
rt.Encoding = Encoding;
cmd?.OnDataTrigger(rt);
return rt;
Expand All @@ -193,7 +213,15 @@ public void ReadChunk(Stream destination, int bufferSize = 1024)
if (ClientReply == ClientReplyType.on)
{
if (IsConnected == false) Connect();
Reader.ReadBlobStringChunk(destination, bufferSize);
try
{
Reader.ReadBlobStringChunk(destination, bufferSize);
}
catch (Exception ex)
{
ReleaseSocket();
throw new ProtocolViolationException($"Socket.Read error: {ex.Message}");
}
}
}
#if isasync
Expand All @@ -205,8 +233,19 @@ async public Task WriteAsync(CommandPacket cmd)
{
new RespHelper.Resp3Writer(ms, Encoding, Protocol).WriteCommand(cmd);
ms.Position = 0;
await ms.CopyToAsync(Stream);
ms.Close();
try
{
await ms.CopyToAsync(Stream);
}
catch (Exception ex)
{
ReleaseSocket();
throw new ProtocolViolationException($"Socket.Write error: {ex.Message}");
}
finally
{
ms.Close();
}
}
WriteAfter(cmd);
}
Expand All @@ -216,7 +255,16 @@ async public Task<RedisResult> ReadAsync(CommandPacket cmd)
if (ClientReply == ClientReplyType.on)
{
if (IsConnected == false) Connect();
var rt = await Reader.ReadObjectAsync(cmd?._flagReadbytes == true ? null : Encoding);
RedisResult rt;
try
{
rt = await Reader.ReadObjectAsync(cmd?._flagReadbytes == true ? null : Encoding);
}
catch (Exception ex)
{
ReleaseSocket();
throw new ProtocolViolationException($"Socket.Read error: {ex.Message}");
}
rt.Encoding = Encoding;
cmd?.OnDataTrigger(rt);
return rt;
Expand All @@ -228,7 +276,15 @@ async public Task ReadChunkAsync(Stream destination, int bufferSize = 1024)
if (ClientReply == ClientReplyType.on)
{
if (IsConnected == false) Connect();
await Reader.ReadBlobStringChunkAsync(destination, bufferSize);
try
{
await Reader.ReadBlobStringChunkAsync(destination, bufferSize);
}
catch (Exception ex)
{
ReleaseSocket();
throw new ProtocolViolationException($"Socket.Read error: {ex.Message}");
}
}
}
#endif
Expand All @@ -244,8 +300,8 @@ public void Connect()
(EndPoint)new IPEndPoint(tryip, _port) :
new DnsEndPoint(_ip, _port);

var localSocket = endpoint.AddressFamily == AddressFamily.InterNetworkV6 ?
new Socket(AddressFamily.InterNetworkV6, SocketType.Stream, ProtocolType.Tcp):
var localSocket = endpoint.AddressFamily == AddressFamily.InterNetworkV6 ?
new Socket(AddressFamily.InterNetworkV6, SocketType.Stream, ProtocolType.Tcp) :
new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

try
Expand Down Expand Up @@ -340,7 +396,7 @@ public static KeyValuePair<string, int> SplitHost(string host)
host = host.Trim();
var ipv6 = Regex.Match(host, @"^\[([^\]]+)\]\s*(:\s*(\d+))?$");
if (ipv6.Success) //ipv6+port 格式: [fe80::b164:55b3:4b4f:7ce6%15]:6379
return new KeyValuePair<string, int>(ipv6.Groups[1].Value.Trim(),
return new KeyValuePair<string, int>(ipv6.Groups[1].Value.Trim(),
int.TryParse(ipv6.Groups[3].Value, out var tryint) && tryint > 0 ? tryint : 6379);

var spt = (host ?? "").Split(':');
Expand Down
2 changes: 0 additions & 2 deletions src/FreeRedis/RedisClient/Adapter/ClusterAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ public override TValue AdapterCall<TValue>(CommandPacket cmd, Func<RedisResult,
}
catch (ProtocolViolationException)
{
rds.ReleaseSocket();
cmd._protocolErrorTryCount++;
if (cmd._protocolErrorTryCount <= pool._policy._connectionStringBuilder.Retry)
protocolRetry = true;
Expand Down Expand Up @@ -230,7 +229,6 @@ async public override Task<TValue> AdapterCallAsync<TValue>(CommandPacket cmd, F
}
catch (ProtocolViolationException)
{
rds.ReleaseSocket();
cmd._protocolErrorTryCount++;
if (cmd._protocolErrorTryCount <= pool._policy._connectionStringBuilder.Retry)
protocolRetry = true;
Expand Down
2 changes: 0 additions & 2 deletions src/FreeRedis/RedisClient/Adapter/NormanAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public override TValue AdapterCall<TValue>(CommandPacket cmd, Func<RedisResult,
catch (ProtocolViolationException)
{
var pool = (rds as DefaultRedisSocket.TempProxyRedisSocket)._pool;
rds.ReleaseSocket();
cmd._protocolErrorTryCount++;
if (cmd._protocolErrorTryCount <= pool._policy._connectionStringBuilder.Retry)
protocolRetry = true;
Expand Down Expand Up @@ -180,7 +179,6 @@ async public override Task<TValue> AdapterCallAsync<TValue>(CommandPacket cmd, F
catch (ProtocolViolationException)
{
var pool = (rds as DefaultRedisSocket.TempProxyRedisSocket)._pool;
rds.ReleaseSocket();
cmd._protocolErrorTryCount++;
if (cmd._protocolErrorTryCount <= pool._policy._connectionStringBuilder.Retry)
protocolRetry = true;
Expand Down
2 changes: 0 additions & 2 deletions src/FreeRedis/RedisClient/Adapter/PoolingAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public override TValue AdapterCall<TValue>(CommandPacket cmd, Func<RedisResult,
catch (ProtocolViolationException)
{
var pool = (rds as DefaultRedisSocket.TempProxyRedisSocket)._pool;
rds.ReleaseSocket();
cmd._protocolErrorTryCount++;
if (cmd._protocolErrorTryCount <= pool._policy._connectionStringBuilder.Retry)
protocolRetry = true;
Expand Down Expand Up @@ -123,7 +122,6 @@ public override Task<TValue> AdapterCallAsync<TValue>(CommandPacket cmd, Func<Re
catch (ProtocolViolationException)
{
var pool = (rds as DefaultRedisSocket.TempProxyRedisSocket)._pool;
rds.ReleaseSocket();
cmd._protocolErrorTryCount++;
if (cmd._protocolErrorTryCount <= pool._policy._connectionStringBuilder.Retry)
protocolRetry = true;
Expand Down
2 changes: 0 additions & 2 deletions src/FreeRedis/RedisClient/Adapter/SentinelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ public override TValue AdapterCall<TValue>(CommandPacket cmd, Func<RedisResult,
catch (ProtocolViolationException)
{
var pool = (rds as DefaultRedisSocket.TempProxyRedisSocket)._pool;
rds.ReleaseSocket();
cmd._protocolErrorTryCount++;
if (cmd._protocolErrorTryCount <= pool._policy._connectionStringBuilder.Retry)
protocolRetry = true;
Expand Down Expand Up @@ -163,7 +162,6 @@ public override Task<TValue> AdapterCallAsync<TValue>(CommandPacket cmd, Func<Re
catch (ProtocolViolationException)
{
var pool = (rds as DefaultRedisSocket.TempProxyRedisSocket)._pool;
rds.ReleaseSocket();
cmd._protocolErrorTryCount++;
if (cmd._protocolErrorTryCount <= pool._policy._connectionStringBuilder.Retry)
protocolRetry = true;
Expand Down

0 comments on commit 63a7836

Please sign in to comment.