Skip to content

Commit

Permalink
v3.0.7.2
Browse files Browse the repository at this point in the history
  • Loading branch information
justlive1 committed Nov 24, 2021
1 parent 68ceff7 commit 647d164
Show file tree
Hide file tree
Showing 35 changed files with 906 additions and 449 deletions.
8 changes: 1 addition & 7 deletions oxygen-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>oxygen</artifactId>
<groupId>vip.justlive</groupId>
<version>3.0.7</version>
<version>3.0.7.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand All @@ -26,12 +26,6 @@
<artifactId>cglib-nodep</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,38 @@
@ToString
@RequiredArgsConstructor
public abstract class CoreJobTrigger implements JobTrigger {

protected AtomicLong rounds = new AtomicLong(0);

protected final String key;
protected final String jobKey;

protected Long startTime;
protected Long endTime;

protected Long previousFireTime;
protected Long nextFireTime;
protected Long lastCompletedTime;


protected Integer state;

@Override
public void setLastCompletedTime(Long lastCompletedTime) {
this.lastCompletedTime = lastCompletedTime;
this.rounds.incrementAndGet();
}

@Override
public Long computeNextFireTime(long timestamp) {
public Long triggerFired(long timestamp) {
previousFireTime = nextFireTime;
Long begin = timestamp;
Long next = timestamp;
if (nextFireTime != null) {
begin = nextFireTime;
next = nextFireTime;
}
while (begin != null && begin <= timestamp) {
begin = getFireTimeAfter(begin);
while (next != null && next <= timestamp) {
next = getFireTimeAfter(next);
}
nextFireTime = begin;
nextFireTime = next;
return nextFireTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,27 @@
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class DelayOrRateJobTrigger extends CoreJobTrigger {

private final long initialDelay;
private final long fixedOffset;
private final boolean delay;

public DelayOrRateJobTrigger(String jobKey, long fixedOffset, boolean delay) {
this(jobKey, fixedOffset, fixedOffset, delay);
}

public DelayOrRateJobTrigger(String jobKey, long initialDelay, long fixedOffset, boolean delay) {
this(jobKey, jobKey, initialDelay, fixedOffset, delay);
}

public DelayOrRateJobTrigger(String key, String jobKey, long initialDelay, long fixedOffset,
boolean delay) {
super(key, jobKey);
this.initialDelay = initialDelay;
this.fixedOffset = fixedOffset;
this.delay = delay;
}

@Override
public Long getFireTimeAfter(long timestamp) {
long offset = fixedOffset;
Expand All @@ -62,23 +62,31 @@ public Long getFireTimeAfter(long timestamp) {
}
return time;
}

@Override
public void setLastCompletedTime(Long lastCompletedTime) {
super.setLastCompletedTime(lastCompletedTime);
if (isDelay()) {
if (lastCompletedTime != null && isDelay()) {
nextFireTime = getFireTimeAfter(lastCompletedTime);
}
}

@Override
public Long computeNextFireTime(long timestamp) {
public Long triggerFired(long timestamp) {
if (isDelay() && lastCompletedTime != null && lastCompletedTime < nextFireTime) {
return Long.MAX_VALUE;
}
if (!isDelay() && nextFireTime != null) {
timestamp = nextFireTime;
}
return super.computeNextFireTime(timestamp);
return super.triggerFired(timestamp);
}

@Override
public Long computeNextFireTime() {
if (!isDelay() && nextFireTime != null) {
return super.triggerFired(nextFireTime);
}
return super.computeNextFireTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ public class JobConf {
private int fetchMaxSize = 100;
private long idleWaitTime = 30000;
private int idleWaitRandom = 7 * 1000;
private long misfireThreshold = 60000;
private long lostThreshold = 600000;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package vip.justlive.oxygen.core.job;

import lombok.experimental.UtilityClass;

/**
* job常量
*
* @author wubo
*/
@UtilityClass
public class JobConstants {

public final int STATE_WAITING = 0;
public final int STATE_ACQUIRED = 1;
public final int STATE_COMPLETE = 2;
public final int STATE_PAUSED = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ private void check(Scheduled scheduled) {
count++;
}
if (count > 1 || count == 0) {
throw new IllegalArgumentException(
"Specify 'fixedDelay', 'fixedRate', 'onApplicationStart' or 'cron'");
throw new IllegalArgumentException("Specify 'fixedDelay', 'fixedRate' or 'cron'");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package vip.justlive.oxygen.core.job;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -36,6 +37,7 @@ public class JobResource {
private Signaler signaler;

Map<String, List<WaitingTaskFuture>> futures = new ConcurrentHashMap<>(4);
List<SchedulerPlugin> schedulerPlugins = new ArrayList<>();

static class WaitingTaskFuture {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
@Getter
@RequiredArgsConstructor
public class JobRunTask implements Runnable {

public static final int NOOP = 0;
public static final int DELETE = 1;

private final Job job;
private final JobContext ctx;
private final JobTrigger trigger;
private final JobResource resource;

@Override
public void run() {
long startTime = System.currentTimeMillis();
Expand All @@ -45,19 +45,19 @@ public void run() {
job.execute(ctx);
} catch (Exception e) {
log.info("job {} throw exception", ctx.getJobInfo().getKey(), e);
} finally {
long lastCompletedTime = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("job {} elapsed {}ms", ctx.getJobInfo().getKey(), lastCompletedTime - startTime);
}

int state = NOOP;
if (trigger.getNextFireTime() == null) {
state = DELETE;
}

resource.getJobStore().triggerCompleted(trigger, state);
resource.getSignaler().triggerCompleted(trigger.getKey(), ctx.getExpectedFireTime());
}

long lastCompletedTime = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("job {} elapsed {}ms", ctx.getJobInfo().getKey(), lastCompletedTime - startTime);
}

int state = NOOP;
if (trigger.getNextFireTime() == null) {
state = DELETE;
}

resource.getJobStore().triggerCompleted(trigger, state);
resource.getSignaler().triggerCompleted(trigger.getKey(), ctx.getExpectedFireTime());
}
}
Loading

0 comments on commit 647d164

Please sign in to comment.