Skip to content

Commit

Permalink
feature: modified executors (#23)
Browse files Browse the repository at this point in the history
  • Loading branch information
YarikRevich authored Jun 4, 2024
1 parent 81f836d commit a8ccfd5
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,44 +38,45 @@ public class TelemetryService {

private final ConcurrentLinkedQueue<Runnable> additionalContentUploadQueue = new ConcurrentLinkedQueue<>();

private final static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(6);
private final static ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(0, Thread.ofVirtual().factory());

/**
* Starts telemetries listener, which handles incoming telemetries to be processed in a sequential way.
*/
@PostConstruct
private void configure() {
executorService.scheduleWithFixedDelay(() -> {
scheduledExecutorService.scheduleWithFixedDelay(() -> {
if (!apiServerHealthCheckQueue.isEmpty()) {
apiServerHealthCheckQueue.poll().run();
}
}, 0, properties.getDiagnosticsScrapeDelay(), TimeUnit.MILLISECONDS);

executorService.scheduleWithFixedDelay(() -> {
scheduledExecutorService.scheduleWithFixedDelay(() -> {
if (!clusterHealthCheckQueue.isEmpty()) {
clusterHealthCheckQueue.poll().run();
}
}, 0, properties.getDiagnosticsScrapeDelay(), TimeUnit.MILLISECONDS);

executorService.scheduleWithFixedDelay(() -> {
scheduledExecutorService.scheduleWithFixedDelay(() -> {
if (!clusterStateQueue.isEmpty()) {
clusterStateQueue.poll().run();
}
}, 0, properties.getDiagnosticsScrapeDelay(), TimeUnit.MILLISECONDS);

executorService.scheduleWithFixedDelay(() -> {
scheduledExecutorService.scheduleWithFixedDelay(() -> {
if (!clusterDownloadQueue.isEmpty()) {
clusterDownloadQueue.poll().run();
}
}, 0, properties.getDiagnosticsScrapeDelay(), TimeUnit.MILLISECONDS);

executorService.scheduleWithFixedDelay(() -> {
scheduledExecutorService.scheduleWithFixedDelay(() -> {
if (!rawContentUploadQueue.isEmpty()) {
rawContentUploadQueue.poll().run();
}
}, 0, properties.getDiagnosticsScrapeDelay(), TimeUnit.MILLISECONDS);

executorService.scheduleWithFixedDelay(() -> {
scheduledExecutorService.scheduleWithFixedDelay(() -> {
if (!additionalContentUploadQueue.isEmpty()) {
additionalContentUploadQueue.poll().run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ public class LoggingStateService {
@Autowired
private PropertiesEntity properties;

private final ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(0, Thread.ofVirtual().factory());
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

/**
* Performs application exit if the required state has been changed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,19 @@ public class SchedulerConfigService {
@Autowired
private ApiServerCommunicationResource apiServerCommunicationResource;

private final ExecutorService starterExecutorService = Executors.newVirtualThreadPerTaskExecutor();

private final ScheduledExecutorService operationScheduledExecutorService =
Executors.newScheduledThreadPool(0, Thread.ofVirtual().factory());

/**
* Performs configuration of RepoAchiever Cluster workers.
*
* @throws SchedulerPeriodRetrievalFailureException if scheduler period retrieval process failed.
*/
@PostConstruct
private void process() throws SchedulerPeriodRetrievalFailureException {
ExecutorService starterExecutorService =
Executors.newFixedThreadPool(configService.getConfig().getContent().getLocations().size());

ScheduledExecutorService operationScheduledExecutorService =
Executors.newScheduledThreadPool(configService.getConfig().getContent().getLocations().size());

Long period;

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ public class GitGitHubVendorService {

private String document;

private final ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(0, Thread.ofVirtual().factory());
private ScheduledExecutorService scheduledExecutorService;

/**
* Performs initial GraphQL client and HTTP client configuration.
Expand All @@ -67,6 +66,10 @@ public class GitGitHubVendorService {
*/
@PostConstruct
private void configure() throws GitHubGraphQlClientDocumentNotFoundException {
scheduledExecutorService =
Executors.newScheduledThreadPool(
configService.getConfig().getContent().getLocations().size() * 2);

if (configService.getConfig().getService().getProvider() == ConfigEntity.Service.Provider.GIT_GITHUB) {
WebClient client = WebClient.builder()
.baseUrl(properties.getGraphQlClientGitHubUrl())
Expand Down

0 comments on commit a8ccfd5

Please sign in to comment.