-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add worker manager and workers #230
Conversation
I need to make NodeAdapters into an interface until I can move my code into a separate module. |
21e4207
to
43c42e0
Compare
replication-api/src/main/java/com/connexta/replication/api/data/ErrorCode.java
Show resolved
Hide resolved
@@ -21,7 +21,10 @@ | |||
SITE_TIMEOUT(true), | |||
UPSTREAM_SERVICE_UNAVAILABLE(true), | |||
UPSTREAM_SERVICE_TIMEOUT(true), | |||
UNKNOWN_ERROR(false); | |||
UNKNOWN_ERROR(false), | |||
UNSUPPORTED_ERROR(true), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ Why would `UNSUPPORTED_ERROR be retryable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was using this for a worker that received an unknown operation type, but will switch it to unlock the task and get rid of this based on discussions earlier.
❓ How will the queue know not to send the same task back to the worker that just received it even though the worker could not handle it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really good question and for now, it won't. For the monolithic service, that will never happen. For the micro services one, the deployment sequences should already have started newer workers and would most likely be in the process shutting down the older ones. So worst case scenario, it gets picked up again by the same until shut down.
Once we have integrated with the external queue, we can see if there are ways to mark it as such that it wouldn't be picked up again by the same worker but based on how we would get there and what I said above, I am not sure we should worry about it.
replication-api/src/main/java/com/connexta/replication/api/data/ErrorCode.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
I think moving the worker into a separate module can be done once we work on micro services for them at which point they will actually move to a different repo. |
replication-api/src/main/java/com/connexta/replication/api/data/ddf/DDFMetadataInfo.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
return; | ||
} | ||
|
||
final DDFMetadataInfo info = infos.get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ Should this worker be more generic and not care about DDF metadata info but instead pass that info to the adapter and let the adapter handle it properly. A ddf adapter would care and on a query, it would simply return it back whereas as another one would perform its query to get the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think eventually we will want that, but I'd like to avoid modifications to the adapter API in this PR if we can defer it.
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/WorkerManager.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/WorkerManager.java
Outdated
Show resolved
Hide resolved
...cation-api-impl/src/main/java/com/connexta/replication/api/impl/worker/WorkerThreadPool.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/WorkerManager.java
Outdated
Show resolved
Hide resolved
...cation-api-impl/src/main/java/com/connexta/replication/api/impl/worker/WorkerThreadPool.java
Outdated
Show resolved
Hide resolved
43c42e0
to
d11311f
Compare
.collect(Collectors.toList()); | ||
|
||
final DDFMetadataInfo info = infos.get(0); | ||
final Metadata metadata = new MetadataImpl(((Map) info.getData()).get("raw-metadata"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will change when #215 goes in. It should simplify this I think.
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
|
||
URI uri = task.getResource().flatMap(ResourceInfo::getResourceUri).orElse(null); | ||
if (uri != null) { | ||
ResourceRequest request = new ResourceRequestImpl(metadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only request we should support right now when dealing with Ion and existing DDFs.
switch (type) { | ||
case HARVEST: | ||
case REPLICATE_FROM: | ||
doCreate(task, metadata, destination, local); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the query service still works how I think it is then we will also get updates via the queue as well. I assume we are expecting the create to fail if we have already created it (based on correlation id). How are we going to handle updates that come through and just keep failing? Might not be a concern for the worker but we need to make sure we handle that. Ion needs to probably handle updates to solve this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK updates will not be supported in this iteration. The query service does not have an appropriate operation type to use.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updates should be treated down the road as upserts. This is purposely why we don't differentiate updates from creates in the type of operations. For DDF, it would be up to the worker to handle that by updating first and if it fails creating or vice versa. We cannot really know in advance since the data can always arrive at a destination via other sites in the ecosystem.
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
a1f2836
to
9b29f0c
Compare
UNKNOWN_ERROR(false); | ||
|
||
private final boolean shouldBeRetried; | ||
|
||
private ErrorCode(boolean shouldBeRetried) { | ||
ErrorCode(boolean shouldBeRetried) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ Why are we removing private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's redundant cause enum ctors are private
@@ -27,12 +27,12 @@ | |||
* | |||
* @return the class for the raw data | |||
*/ | |||
public Class<T> getDataClass(); | |||
Class<T> getDataClass(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😆
...cation-api-impl/src/main/java/com/connexta/replication/api/impl/worker/WorkerThreadPool.java
Show resolved
Hide resolved
...cation-api-impl/src/main/java/com/connexta/replication/api/impl/worker/WorkerThreadPool.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/WorkerManager.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
"Metadata deleted from site %s before able to create", source.getSystemName())); | ||
return; | ||
} | ||
} catch (AdapterException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❗️ Need to handle AdapterInterruptedExeption
on all methods from adapters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't think we would handle them specially, so they are just caught with the general AdapterException. Should I be handling it specially?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. They are interruptions and you should be trying to stop without failing as soon as possible and propagating the interruption up your thread either by re-interrupting the thread or throwing an InterruptedException out. Speaking of which, in the places you are currently handling interrupted exceptions, you need to make sure you propagate the interruption state of things as I just described. Basically, anywhere you can catch InterruptedException, an IOException where you need to handle InterruptedIOException and this new AdapterInterruptedException, you have to handle it appropriately.
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/test/java/com/connexta/replication/api/impl/worker/WorkerTest.java
Outdated
Show resolved
Hide resolved
import org.junit.jupiter.params.provider.EnumSource; | ||
import org.junit.jupiter.params.provider.MethodSource; | ||
|
||
public class WorkerTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❗️ I would want to see tests cases that shows that if the worker is interrupted, it stops its current processing properly. Interruption is part of the contract for a worker so we need to make sure that for one, the task currently being handled is unlocked, completed, or failed and that the interrupted state of the thread is propagated once the loop finishes.
c5f0ce3
to
18a33a3
Compare
<dependency> | ||
<groupId>org.junit.vintage</groupId> | ||
<artifactId>junit-vintage-engine</artifactId> | ||
<version>${junit-jupiter.version}</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ I think you mentioned that this lib runs junit 4 tests as well, does that mean we can remove the junit 4 dependency and just use this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't really want to affect the rest of the project. I figured fully transitioning over to Junit5 could be a separate issue.
final String metadataId = task.getId(); | ||
final String operation = task.getOperation().name(); | ||
|
||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✏️ the try-catch blocks for this method seem pretty similar could they be extracted?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure it can be since the AdapterInterruptedException
would be caught by the catch(AdapterException)
. The only way I can think of is to create a functional interface that declares it throws InterruptedException
and return some generic type. Then call whatever method throws an AdapterException
through a new method that receives that functional interface that actually does the call and unwraps the AdapterInterruptedException
. Something like this:
@FunctionalInterface
public interface InterruptableSupplier<T> {
/**
* Gets a result.
*
* @return a result
*/
T get() throws InterruptedException;
}
private <T> T doInterruptibly(InterruptibleSupplier<T> s) {
try {
return s.get();
} catch (AdapterInterrutedException e) {
throw new InterruptedException(e);
}
}
Then wrap one of the call below like this:
try {
if (!doInterruptibly(() -> source.exists(metadata)) {
...
} catch (AdapterException e) {
...
}
You could even extend the doInterruptibly()
method to receive additional info or a Runnable
to be called if an AdapterException
is thrown before throwing back, thus removing the need to put a try/catch altogether here. But since pretty much all of them handle it the same way, you could get away with just passing info on how to fail the task and how to log the error before rethrowing it. Then you could let that error bubble out of this method and handle it above as opposed to returning.
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Show resolved
Hide resolved
...cation-api-impl/src/main/java/com/connexta/replication/api/impl/worker/WorkerThreadPool.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Outdated
Show resolved
Hide resolved
} catch (InterruptedException e) { | ||
if (task != null) { | ||
try { | ||
task.unlock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✏️ To avoid the nested try/catch, it might be worthwhile to add a new unlockUnterruptibly()
method to the task which would do exactly that or again renamed the existing unlock()
to unlockInterruptibly()
and add a new unlock()
that would do exactly that
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Show resolved
Hide resolved
() -> { | ||
while (true) { | ||
if (queue.size() == 3) { | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✏️ If I understand what's going on here, Patrick added a method to the queue that does the same thing and might look a little neater. Take a look at MemorySiteQueue#waitForPendingSizeToReach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is waiting for the total size. I think he was going to add an additional method for size.
queue.put(taskInfo2); | ||
queue.put(taskInfo3); | ||
|
||
assertTimeoutPreemptively( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❓ Why the timeout here? The implementation of the queue and the fact that you have not started any workers guarantees that the size will be 3 here.
replication-api-impl/src/main/java/com/connexta/replication/api/impl/worker/Worker.java
Show resolved
Hide resolved
} | ||
} | ||
}); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❗️ Missing validation of outcome from your component. Like the fact that the tasks should be completed, failed, or re-queued. Like the fact that data in the tasks should be sent out to the destination site.
|
||
workerManager.init(); | ||
|
||
assertTimeoutPreemptively( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✏️ Can be replaced with the new queue.waitForSizeToReach()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
waiting for the last rebase to approve. But it is looking very good 🎉
...-impl/src/test/java/com/connexta/replication/api/impl/worker/WorkerManagerComponentTest.java
Outdated
Show resolved
Hide resolved
...-impl/src/test/java/com/connexta/replication/api/impl/worker/WorkerManagerComponentTest.java
Outdated
Show resolved
Hide resolved
|
||
ArgumentCaptor<CreateStorageRequest> createStorageRequestCaptor = ArgumentCaptor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't wait to have our endpoint stubs so we can replace all that mocking with simple checks of what they received.
...-impl/src/test/java/com/connexta/replication/api/impl/worker/WorkerManagerComponentTest.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void testMultipleTasksCompleted() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✏️ It be nice to test with one task that you know you will ignore and unlock and one task that fails
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this test mostly to check that a worker will continue to take from the queue after processing 1. I can add separate tests for an ignored and failed task though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't mean in this test. For sure additional tests that verifies that would be great.
0569001
to
9fc3d6a
Compare
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.Test; | ||
|
||
class WorkerThreadPoolTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✏️ public
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I upgraded this module to use Junit5, it was no longer required so I just ended up removing it
4c986fc
to
a368c20
Compare
What does this PR do?
Builds off of #229
Only review 18a33a3 . Other commits are from PR 229.
Adds a manager which monitors sites and spins up/down workers based on the site's configuration
Who is reviewing it (please choose AT LEAST two reviewers that need to approve the PR before it can get merged)?
@paouelle @clockard @kcover
How should this be tested? (List steps with links to updated documentation)
CI Build.
Any background context you want to provide?
Checklist: