Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add worker manager and workers #230

Merged
merged 2 commits into from
Sep 20, 2019
Merged

Conversation

peterhuffer
Copy link
Contributor

@peterhuffer peterhuffer commented Aug 29, 2019

What does this PR do?

Builds off of #229

Only review 18a33a3 . Other commits are from PR 229.

Adds a manager which monitors sites and spins up/down workers based on the site's configuration

Who is reviewing it (please choose AT LEAST two reviewers that need to approve the PR before it can get merged)?

@paouelle @clockard @kcover

How should this be tested? (List steps with links to updated documentation)

CI Build.

Any background context you want to provide?

Checklist:

  • Documentation Updated
  • Update / Add Unit Tests
  • Update / Add Integration Tests

@peterhuffer
Copy link
Contributor Author

I need to make NodeAdapters into an interface until I can move my code into a separate module.

@peterhuffer peterhuffer force-pushed the DITTO-140 branch 4 times, most recently from 21e4207 to 43c42e0 Compare August 29, 2019 23:13
@@ -21,7 +21,10 @@
SITE_TIMEOUT(true),
UPSTREAM_SERVICE_UNAVAILABLE(true),
UPSTREAM_SERVICE_TIMEOUT(true),
UNKNOWN_ERROR(false);
UNKNOWN_ERROR(false),
UNSUPPORTED_ERROR(true),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Why would `UNSUPPORTED_ERROR be retryable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was using this for a worker that received an unknown operation type, but will switch it to unlock the task and get rid of this based on discussions earlier.
❓ How will the queue know not to send the same task back to the worker that just received it even though the worker could not handle it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good question and for now, it won't. For the monolithic service, that will never happen. For the micro services one, the deployment sequences should already have started newer workers and would most likely be in the process shutting down the older ones. So worst case scenario, it gets picked up again by the same until shut down.

Once we have integrated with the external queue, we can see if there are ways to mark it as such that it wouldn't be picked up again by the same worker but based on how we would get there and what I said above, I am not sure we should worry about it.

@paouelle
Copy link
Contributor

paouelle commented Sep 4, 2019

I think moving the worker into a separate module can be done once we work on micro services for them at which point they will actually move to a different repo.

return;
}

final DDFMetadataInfo info = infos.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Should this worker be more generic and not care about DDF metadata info but instead pass that info to the adapter and let the adapter handle it properly. A ddf adapter would care and on a query, it would simply return it back whereas as another one would perform its query to get the data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think eventually we will want that, but I'd like to avoid modifications to the adapter API in this PR if we can defer it.

.collect(Collectors.toList());

final DDFMetadataInfo info = infos.get(0);
final Metadata metadata = new MetadataImpl(((Map) info.getData()).get("raw-metadata"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will change when #215 goes in. It should simplify this I think.


URI uri = task.getResource().flatMap(ResourceInfo::getResourceUri).orElse(null);
if (uri != null) {
ResourceRequest request = new ResourceRequestImpl(metadata);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only request we should support right now when dealing with Ion and existing DDFs.

switch (type) {
case HARVEST:
case REPLICATE_FROM:
doCreate(task, metadata, destination, local);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the query service still works how I think it is then we will also get updates via the queue as well. I assume we are expecting the create to fail if we have already created it (based on correlation id). How are we going to handle updates that come through and just keep failing? Might not be a concern for the worker but we need to make sure we handle that. Ion needs to probably handle updates to solve this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK updates will not be supported in this iteration. The query service does not have an appropriate operation type to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updates should be treated down the road as upserts. This is purposely why we don't differentiate updates from creates in the type of operations. For DDF, it would be up to the worker to handle that by updating first and if it fails creating or vice versa. We cannot really know in advance since the data can always arrive at a destination via other sites in the ecosystem.

@peterhuffer peterhuffer force-pushed the DITTO-140 branch 3 times, most recently from a1f2836 to 9b29f0c Compare September 10, 2019 21:19
UNKNOWN_ERROR(false);

private final boolean shouldBeRetried;

private ErrorCode(boolean shouldBeRetried) {
ErrorCode(boolean shouldBeRetried) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Why are we removing private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's redundant cause enum ctors are private

@@ -27,12 +27,12 @@
*
* @return the class for the raw data
*/
public Class<T> getDataClass();
Class<T> getDataClass();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

"Metadata deleted from site %s before able to create", source.getSystemName()));
return;
}
} catch (AdapterException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❗️ Need to handle AdapterInterruptedExeption on all methods from adapters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't think we would handle them specially, so they are just caught with the general AdapterException. Should I be handling it specially?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. They are interruptions and you should be trying to stop without failing as soon as possible and propagating the interruption up your thread either by re-interrupting the thread or throwing an InterruptedException out. Speaking of which, in the places you are currently handling interrupted exceptions, you need to make sure you propagate the interruption state of things as I just described. Basically, anywhere you can catch InterruptedException, an IOException where you need to handle InterruptedIOException and this new AdapterInterruptedException, you have to handle it appropriately.

import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

public class WorkerTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❗️ I would want to see tests cases that shows that if the worker is interrupted, it stops its current processing properly. Interruption is part of the contract for a worker so we need to make sure that for one, the task currently being handled is unlocked, completed, or failed and that the interrupted state of the thread is propagated once the loop finishes.

@peterhuffer peterhuffer changed the title [WIP] Add worker manager and workers Add worker manager and workers Sep 16, 2019
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>${junit-jupiter.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ I think you mentioned that this lib runs junit 4 tests as well, does that mean we can remove the junit 4 dependency and just use this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't really want to affect the rest of the project. I figured fully transitioning over to Junit5 could be a separate issue.

final String metadataId = task.getId();
final String operation = task.getOperation().name();

try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✏️ the try-catch blocks for this method seem pretty similar could they be extracted?

Copy link
Contributor

@paouelle paouelle Sep 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure it can be since the AdapterInterruptedException would be caught by the catch(AdapterException). The only way I can think of is to create a functional interface that declares it throws InterruptedException and return some generic type. Then call whatever method throws an AdapterException through a new method that receives that functional interface that actually does the call and unwraps the AdapterInterruptedException. Something like this:

@FunctionalInterface
public interface InterruptableSupplier<T> {
    /**
     * Gets a result.
     *
     * @return a result
     */
    T get() throws InterruptedException;
}

private <T> T doInterruptibly(InterruptibleSupplier<T> s) {
  try {
    return s.get();
  } catch (AdapterInterrutedException e) {
    throw new InterruptedException(e);
  }
}

Then wrap one of the call below like this:

try {
  if (!doInterruptibly(() -> source.exists(metadata)) {
   ...
} catch (AdapterException e) {
...
}

You could even extend the doInterruptibly() method to receive additional info or a Runnable to be called if an AdapterException is thrown before throwing back, thus removing the need to put a try/catch altogether here. But since pretty much all of them handle it the same way, you could get away with just passing info on how to fail the task and how to log the error before rethrowing it. Then you could let that error bubble out of this method and handle it above as opposed to returning.

} catch (InterruptedException e) {
if (task != null) {
try {
task.unlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✏️ To avoid the nested try/catch, it might be worthwhile to add a new unlockUnterruptibly() method to the task which would do exactly that or again renamed the existing unlock() to unlockInterruptibly() and add a new unlock() that would do exactly that

() -> {
while (true) {
if (queue.size() == 3) {
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✏️ If I understand what's going on here, Patrick added a method to the queue that does the same thing and might look a little neater. Take a look at MemorySiteQueue#waitForPendingSizeToReach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is waiting for the total size. I think he was going to add an additional method for size.

queue.put(taskInfo2);
queue.put(taskInfo3);

assertTimeoutPreemptively(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Why the timeout here? The implementation of the queue and the fact that you have not started any workers guarantees that the size will be 3 here.

}
}
});
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❗️ Missing validation of outcome from your component. Like the fact that the tasks should be completed, failed, or re-queued. Like the fact that data in the tasks should be sent out to the destination site.


workerManager.init();

assertTimeoutPreemptively(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✏️ Can be replaced with the new queue.waitForSizeToReach()

Copy link
Contributor

@paouelle paouelle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiting for the last rebase to approve. But it is looking very good 🎉


ArgumentCaptor<CreateStorageRequest> createStorageRequestCaptor = ArgumentCaptor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't wait to have our endpoint stubs so we can replace all that mocking with simple checks of what they received.

}

@Test
public void testMultipleTasksCompleted() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✏️ It be nice to test with one task that you know you will ignore and unlock and one task that fails

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this test mostly to check that a worker will continue to take from the queue after processing 1. I can add separate tests for an ignored and failed task though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean in this test. For sure additional tests that verifies that would be great.

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class WorkerThreadPoolTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✏️ public

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I upgraded this module to use Junit5, it was no longer required so I just ended up removing it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants