Skip to content

Commit

Permalink
Add support for response builders on IRequestConfiguration (#134)
Browse files Browse the repository at this point in the history
This allows consumers to provide builders on a per-request basis, which
is required in ingest for streaming bulk response.

- Fixes potential null ref exceptions when disposing of streams.
- Refactors `IRequestConfiguration` and `RequestConfigurationDescriptor`
into their own files.
- Pins the SDK to 8.x until we're ready for 9.x.
  • Loading branch information
stevejgordon authored Nov 14, 2024
1 parent 1fbc364 commit 7d3a633
Show file tree
Hide file tree
Showing 11 changed files with 550 additions and 488 deletions.
2 changes: 1 addition & 1 deletion global.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"sdk": {
"version": "8.0.100",
"rollForward": "latestMajor",
"rollForward": "latestMinor",
"allowPrerelease": false
}
}
41 changes: 32 additions & 9 deletions src/Elastic.Transport/Components/Pipeline/RequestData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System.Collections.Specialized;
using System.Security.Cryptography.X509Certificates;
using Elastic.Transport.Extensions;
using Elastic.Transport.Products;

namespace Elastic.Transport;

Expand Down Expand Up @@ -41,9 +40,6 @@ public RequestData(ITransportConfiguration global, IRequestConfiguration? local
ProxyPassword = global.ProxyPassword;
DisableAutomaticProxyDetection = global.DisableAutomaticProxyDetection;
UserAgent = global.UserAgent;
ResponseBuilders = global.ResponseBuilders;
ProductResponseBuilders = global.ProductRegistration.ResponseBuilders;

KeepAliveInterval = (int)(global.KeepAliveInterval?.TotalMilliseconds ?? 2000);
KeepAliveTime = (int)(global.KeepAliveTime?.TotalMilliseconds ?? 2000);
RunAs = local?.RunAs ?? global.RunAs;
Expand Down Expand Up @@ -90,13 +86,36 @@ public RequestData(ITransportConfiguration global, IRequestConfiguration? local
Headers ??= [];
Headers.Add(OpaqueIdHeader, local.OpaqueId);
}
}

/// <inheritdoc cref="ITransportConfiguration.ResponseBuilders"/>
public IReadOnlyCollection<IResponseBuilder> ProductResponseBuilders { get; }
// If there are builders set at the transport level and on the request config, we combine them,
// prioritising the request config response builders as most specific.
if (local is not null && local.ResponseBuilders.Count > 0 && global.ResponseBuilders.Count > 0)
{
var builders = new IResponseBuilder[local.ResponseBuilders.Count + global.ResponseBuilders.Count];

/// <inheritdoc cref="ITransportConfiguration.ResponseBuilders"/>
public IReadOnlyCollection<IResponseBuilder> ResponseBuilders { get; }
var counter = 0;
foreach (var builder in local.ResponseBuilders)
{
builders[counter++] = builder;
}
foreach (var builder in global.ResponseBuilders)
{
builders[counter++] = builder;
}

ResponseBuilders = builders;
}
else if (local is not null && local.ResponseBuilders.Count > 0)
{
ResponseBuilders = local.ResponseBuilders;
}
else
{
ResponseBuilders = global.ResponseBuilders;
}

ProductResponseBuilders = global.ProductRegistration.ResponseBuilders;
}

/// <inheritdoc cref="ITransportConfiguration.MemoryStreamFactory"/>
public MemoryStreamFactory MemoryStreamFactory { get; }
Expand Down Expand Up @@ -168,4 +187,8 @@ public RequestData(ITransportConfiguration global, IRequestConfiguration? local
public bool DisableSniff { get; }
/// <inheritdoc cref="IRequestConfiguration.DisablePings"/>
public bool DisablePings { get; }
/// <inheritdoc cref="IRequestConfiguration.ResponseBuilders"/>
public IReadOnlyCollection<IResponseBuilder> ProductResponseBuilders { get; }
/// <inheritdoc cref="IRequestConfiguration.ResponseBuilders"/>
public IReadOnlyCollection<IResponseBuilder> ResponseBuilders { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
}
else
{
responseStream.Dispose();
receivedResponse.Dispose();
responseStream?.Dispose();
receivedResponse?.Dispose();
}

if (!OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners || (!(Activity.Current?.IsAllDataRequested ?? false)))
Expand All @@ -218,8 +218,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
catch
{
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
responseStream.Dispose();
receivedResponse.Dispose();
responseStream?.Dispose();
receivedResponse?.Dispose();
throw;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
}
else
{
responseStream.Dispose();
receivedResponse.Dispose();
responseStream?.Dispose();
receivedResponse?.Dispose();
}

if (OpenTelemetry.CurrentSpanIsElasticTransportOwnedAndHasListeners && (Activity.Current?.IsAllDataRequested ?? false))
Expand All @@ -208,8 +208,8 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(bool isAsync, End
catch
{
// if there's an exception, ensure we always release the stream and response so that the connection is freed.
responseStream.Dispose();
receivedResponse.Dispose();
responseStream?.Dispose();
receivedResponse?.Dispose();
throw;
}
}
Expand Down
155 changes: 155 additions & 0 deletions src/Elastic.Transport/Configuration/IRequestConfiguration.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Security.Cryptography.X509Certificates;

namespace Elastic.Transport;

/// <summary>
/// Allows you to inject per request overrides to the current <see cref="ITransportConfiguration"/>.
/// </summary>
public interface IRequestConfiguration
{
/// <summary>
/// Force a different Accept header on the request
/// </summary>
string? Accept { get; }

/// <summary>
/// Treat the following statuses (on top of the 200 range) NOT as error.
/// </summary>
IReadOnlyCollection<int>? AllowedStatusCodes { get; }

/// <summary> Provide an authentication header override for this request </summary>
AuthorizationHeader? Authentication { get; }

/// <summary>
/// Use the following client certificates to authenticate this single request
/// </summary>
X509CertificateCollection? ClientCertificates { get; }

/// <summary>
/// Force a different Content-Type header on the request
/// </summary>
string? ContentType { get; }

/// <summary>
/// Whether to buffer the request and response bytes for the call
/// </summary>
bool? DisableDirectStreaming { get; }

/// <summary>
/// Whether to disable the audit trail for the request.
/// </summary>
bool? DisableAuditTrail { get; }

/// <summary>
/// Under no circumstance do a ping before the actual call. If a node was previously dead a small ping with
/// low connect timeout will be tried first in normal circumstances
/// </summary>
bool? DisablePings { get; }

/// <summary>
/// Forces no sniffing to occur on the request no matter what configuration is in place
/// globally
/// </summary>
bool? DisableSniff { get; }

/// <summary>
/// Whether or not this request should be pipelined. http://en.wikipedia.org/wiki/HTTP_pipelining defaults to true
/// </summary>
bool? HttpPipeliningEnabled { get; }

/// <summary>
/// Enable gzip compressed requests and responses
/// </summary>
bool? EnableHttpCompression { get; }

/// <summary>
/// This will force the operation on the specified node, this will bypass any configured connection pool and will no retry.
/// </summary>
Uri? ForceNode { get; }

/// <summary>
/// When a retryable exception occurs or status code is returned this controls the maximum
/// amount of times we should retry the call to Elasticsearch
/// </summary>
int? MaxRetries { get; }

/// <summary>
/// Limits the total runtime including retries separately from <see cref="IRequestConfiguration.RequestTimeout" />
/// <pre>
/// When not specified defaults to <see cref="IRequestConfiguration.RequestTimeout" /> which itself defaults to 60 seconds
/// </pre>
/// </summary>
TimeSpan? MaxRetryTimeout { get; }

/// <summary>
/// Associate an Id with this user-initiated task, such that it can be located in the cluster task list.
/// Valid only for Elasticsearch 6.2.0+
/// </summary>
string? OpaqueId { get; }

/// <summary> Determines whether to parse all HTTP headers in the request. </summary>
bool? ParseAllHeaders { get; }

/// <summary>
/// The ping timeout for this specific request
/// </summary>
TimeSpan? PingTimeout { get; }

/// <summary>
/// The timeout for this specific request, takes precedence over the global timeout init
/// </summary>
TimeSpan? RequestTimeout { get; }

/// <summary>
/// Additional response builders to apply.
/// </summary>
IReadOnlyCollection<IResponseBuilder> ResponseBuilders { get; }

/// <summary> Specifies the headers from the response that should be parsed. </summary>
HeadersList? ResponseHeadersToParse { get; }

/// <summary>
/// Submit the request on behalf in the context of a different shield user
/// <pre />https://www.elastic.co/guide/en/shield/current/submitting-requests-for-other-users.html
/// </summary>
string? RunAs { get; }

/// <summary>
/// Instead of following a c/go like error checking on response.IsValid do throw an exception (except when <see cref="ApiCallDetails.SuccessOrKnownError"/> is false)
/// on the client when a call resulted in an exception on either the client or the Elasticsearch server.
/// <para>Reasons for such exceptions could be search parser errors, index missing exceptions, etc...</para>
/// </summary>
bool? ThrowExceptions { get; }

/// <summary>
/// Whether the request should be sent with chunked Transfer-Encoding.
/// </summary>
bool? TransferEncodingChunked { get; }

/// <summary>
/// Try to send these headers for this single request
/// </summary>
NameValueCollection? Headers { get; }

/// <summary>
/// Enable statistics about TCP connections to be collected when making a request
/// </summary>
bool? EnableTcpStats { get; }

/// <summary>
/// Enable statistics about thread pools to be collected when making a request
/// </summary>
bool? EnableThreadPoolStats { get; }

/// <summary>
/// Holds additional meta data about the request.
/// </summary>
RequestMetaData? RequestMetaData { get; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,4 @@ public interface ITransportConfiguration : IRequestConfiguration, IDisposable
/// about the client and runtime.
/// </summary>
bool DisableMetaHeader { get; }

/// <summary>
/// Additional response builders to apply.
/// </summary>
IReadOnlyCollection<IResponseBuilder> ResponseBuilders { get; }
}
Loading

0 comments on commit 7d3a633

Please sign in to comment.