Skip to content

Commit

Permalink
Merge pull request #1382 from HubSpot/max_per_req
Browse files Browse the repository at this point in the history
Add maxTasksPerOffer at request level
  • Loading branch information
ssalinas authored Jan 3, 2017
2 parents aaa5c5d + 5890b66 commit 2ab38a7
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class SingularityRequest {
private final Optional<Boolean> taskLogErrorRegexCaseSensitive;

private final Optional<Double> taskPriorityLevel;
private final Optional<Integer> maxTasksPerOffer;

private final Optional<Boolean> allowBounceToSameHost;

Expand All @@ -75,7 +76,7 @@ public SingularityRequest(@JsonProperty("id") String id, @JsonProperty("requestT
@JsonProperty("emailConfigurationOverrides") Optional<Map<SingularityEmailType, List<SingularityEmailDestination>>> emailConfigurationOverrides,
@JsonProperty("daemon") @Deprecated Optional<Boolean> daemon, @JsonProperty("hideEvenNumberAcrossRacks") Optional<Boolean> hideEvenNumberAcrossRacksHint,
@JsonProperty("taskLogErrorRegex") Optional<String> taskLogErrorRegex, @JsonProperty("taskLogErrorRegexCaseSensitive") Optional<Boolean> taskLogErrorRegexCaseSensitive,
@JsonProperty("taskPriorityLevel") Optional<Double> taskPriorityLevel, @JsonProperty("allowBounceToSameHost") Optional<Boolean> allowBounceToSameHost) {
@JsonProperty("taskPriorityLevel") Optional<Double> taskPriorityLevel, @JsonProperty("maxTasksPerOffer") Optional<Integer> maxTasksPerOffer, @JsonProperty("allowBounceToSameHost") Optional<Boolean> allowBounceToSameHost) {
this.id = checkNotNull(id, "id cannot be null");
this.owners = owners;
this.numRetriesOnFailure = numRetriesOnFailure;
Expand Down Expand Up @@ -104,6 +105,7 @@ public SingularityRequest(@JsonProperty("id") String id, @JsonProperty("requestT
this.taskLogErrorRegex = taskLogErrorRegex;
this.taskLogErrorRegexCaseSensitive = taskLogErrorRegexCaseSensitive;
this.taskPriorityLevel = taskPriorityLevel;
this.maxTasksPerOffer = maxTasksPerOffer;
this.allowBounceToSameHost = allowBounceToSameHost;
if (requestType == null) {
this.requestType = RequestType.fromDaemonAndScheduleAndLoadBalanced(schedule, daemon, loadBalanced);
Expand Down Expand Up @@ -141,6 +143,7 @@ public SingularityRequestBuilder toBuilder() {
.setTaskLogErrorRegex(taskLogErrorRegex)
.setTaskLogErrorRegexCaseSensitive(taskLogErrorRegexCaseSensitive)
.setTaskPriorityLevel(taskPriorityLevel)
.setMaxTasksPerOffer(maxTasksPerOffer)
.setAllowBounceToSameHost(allowBounceToSameHost);
}

Expand Down Expand Up @@ -234,6 +237,11 @@ public Optional<Map<String, String>> getAllowedSlaveAttributes() {
return allowedSlaveAttributes;
}

@ApiModelProperty(required=false, value="Do not schedule more than this many tasks using a single offer from a single mesos slave")
public Optional<Integer> getMaxTasksPerOffer() {
return maxTasksPerOffer;
}

@ApiModelProperty(required=false, value="If set to true, allow tasks to be scheduled on the same host as an existing active task when bouncing")
public Optional<Boolean> getAllowBounceToSameHost() {
return allowBounceToSameHost;
Expand Down Expand Up @@ -373,6 +381,7 @@ public String toString() {
.add("taskLogErrorRegex", taskLogErrorRegex)
.add("taskLogErrorRegexCaseSensitive", taskLogErrorRegexCaseSensitive)
.add("taskPriorityLevel", taskPriorityLevel)
.add("maxTasksPerOffer", maxTasksPerOffer)
.add("allowBounceToSameHost", allowBounceToSameHost)
.toString();
}
Expand Down Expand Up @@ -414,11 +423,12 @@ public boolean equals(Object o) {
Objects.equals(taskLogErrorRegex, request.taskLogErrorRegex) &&
Objects.equals(taskLogErrorRegexCaseSensitive, request.taskLogErrorRegexCaseSensitive) &&
Objects.equals(taskPriorityLevel, request.taskPriorityLevel) &&
Objects.equals(maxTasksPerOffer, request.maxTasksPerOffer) &&
Objects.equals(allowBounceToSameHost, request.allowBounceToSameHost);
}

@Override
public int hashCode() {
return Objects.hash(id, requestType, owners, numRetriesOnFailure, schedule, quartzSchedule, scheduleTimeZone, scheduleType, killOldNonLongRunningTasksAfterMillis, taskExecutionTimeLimitMillis, scheduledExpectedRuntimeMillis, waitAtLeastMillisAfterTaskFinishesForReschedule, instances, rackSensitive, rackAffinity, slavePlacement, requiredSlaveAttributes, allowedSlaveAttributes, loadBalanced, group, readWriteGroups, readOnlyGroups, bounceAfterScale, emailConfigurationOverrides, hideEvenNumberAcrossRacksHint, taskLogErrorRegex, taskLogErrorRegexCaseSensitive, taskPriorityLevel, allowBounceToSameHost);
return Objects.hash(id, requestType, owners, numRetriesOnFailure, schedule, quartzSchedule, scheduleTimeZone, scheduleType, killOldNonLongRunningTasksAfterMillis, taskExecutionTimeLimitMillis, scheduledExpectedRuntimeMillis, waitAtLeastMillisAfterTaskFinishesForReschedule, instances, rackSensitive, rackAffinity, slavePlacement, requiredSlaveAttributes, allowedSlaveAttributes, loadBalanced, group, readWriteGroups, readOnlyGroups, bounceAfterScale, emailConfigurationOverrides, hideEvenNumberAcrossRacksHint, taskLogErrorRegex, taskLogErrorRegexCaseSensitive, taskPriorityLevel, maxTasksPerOffer, allowBounceToSameHost);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class SingularityRequestBuilder {
private Optional<String> taskLogErrorRegex;
private Optional<Boolean> taskLogErrorRegexCaseSensitive;
private Optional<Double> taskPriorityLevel;
private Optional<Integer> maxTasksPerOffer;
private Optional<Boolean> allowBounceToSameHost;

public SingularityRequestBuilder(String id, RequestType requestType) {
Expand Down Expand Up @@ -80,13 +81,14 @@ public SingularityRequestBuilder(String id, RequestType requestType) {
this.taskLogErrorRegex = Optional.absent();
this.taskLogErrorRegexCaseSensitive = Optional.absent();
this.taskPriorityLevel = Optional.absent();
this.maxTasksPerOffer = Optional.absent();
this.allowBounceToSameHost = Optional.absent();
}

public SingularityRequest build() {
return new SingularityRequest(id, requestType, owners, numRetriesOnFailure, schedule, instances, rackSensitive, loadBalanced, killOldNonLongRunningTasksAfterMillis, taskExecutionTimeLimitMillis, scheduleType, quartzSchedule, scheduleTimeZone,
rackAffinity, slavePlacement, requiredSlaveAttributes, allowedSlaveAttributes, scheduledExpectedRuntimeMillis, waitAtLeastMillisAfterTaskFinishesForReschedule, group, readWriteGroups, readOnlyGroups,
bounceAfterScale, skipHealthchecks, emailConfigurationOverrides, Optional.<Boolean>absent(), hideEvenNumberAcrossRacksHint, taskLogErrorRegex, taskLogErrorRegexCaseSensitive, taskPriorityLevel, allowBounceToSameHost);
bounceAfterScale, skipHealthchecks, emailConfigurationOverrides, Optional.<Boolean>absent(), hideEvenNumberAcrossRacksHint, taskLogErrorRegex, taskLogErrorRegexCaseSensitive, taskPriorityLevel, maxTasksPerOffer, allowBounceToSameHost);
}

public Optional<Boolean> getSkipHealthchecks() {
Expand Down Expand Up @@ -327,6 +329,15 @@ public SingularityRequestBuilder setTaskPriorityLevel(Optional<Double> taskPrior
return this;
}

public Optional<Integer> getMaxTasksPerOffer() {
return maxTasksPerOffer;
}

public SingularityRequestBuilder setMaxTasksPerOffer(Optional<Integer> maxTasksPerOffer) {
this.maxTasksPerOffer = maxTasksPerOffer;
return this;
}

public Optional<Boolean> getAllowBounceToSameHost() {
return allowBounceToSameHost;
}
Expand Down Expand Up @@ -368,6 +379,7 @@ public String toString() {
", taskLogErrorRegex=" + taskLogErrorRegex +
", taskLogErrorRegexCaseSensitive=" + taskLogErrorRegexCaseSensitive +
", taskPriorityLevel=" + taskPriorityLevel +
", maxTasksPerOffer=" + maxTasksPerOffer +
", allowBounceToSameHost=" + allowBounceToSameHost +
']';
}
Expand Down Expand Up @@ -410,6 +422,7 @@ public boolean equals(Object o) {
Objects.equals(taskLogErrorRegex, that.taskLogErrorRegex) &&
Objects.equals(taskLogErrorRegexCaseSensitive, that.taskLogErrorRegexCaseSensitive) &&
Objects.equals(taskPriorityLevel, that.taskPriorityLevel) &&
Objects.equals(maxTasksPerOffer, that.maxTasksPerOffer) &&
Objects.equals(allowBounceToSameHost, that.allowBounceToSameHost);
}

Expand All @@ -418,7 +431,7 @@ public int hashCode() {
return Objects.hash(id, requestType, owners, numRetriesOnFailure, schedule, quartzSchedule, scheduleTimeZone, scheduleType, killOldNonLongRunningTasksAfterMillis,
taskExecutionTimeLimitMillis, scheduledExpectedRuntimeMillis, waitAtLeastMillisAfterTaskFinishesForReschedule, instances, rackSensitive, rackAffinity, slavePlacement,
requiredSlaveAttributes, allowedSlaveAttributes, loadBalanced, group, readOnlyGroups, readWriteGroups, bounceAfterScale, skipHealthchecks, emailConfigurationOverrides,
hideEvenNumberAcrossRacksHint, taskLogErrorRegex, taskLogErrorRegexCaseSensitive, taskPriorityLevel, allowBounceToSameHost);
hideEvenNumberAcrossRacksHint, taskLogErrorRegex, taskLogErrorRegexCaseSensitive, taskPriorityLevel, maxTasksPerOffer, allowBounceToSameHost);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ public class SingularityConfiguration extends Configuration {

private int maxTasksPerOffer = 0;

private int maxTasksPerOfferPerRequest = 0;

private int maxRequestIdSize = 100;

private int maxUserIdSize = 100;
Expand Down Expand Up @@ -576,6 +578,10 @@ public int getMaxTasksPerOffer() {
return maxTasksPerOffer;
}

public int getMaxTasksPerOfferPerRequest() {
return maxTasksPerOfferPerRequest;
}

public MesosConfiguration getMesosConfiguration() {
return mesosConfiguration;
}
Expand Down Expand Up @@ -917,6 +923,10 @@ public void setMaxTasksPerOffer(int maxTasksPerOffer) {
this.maxTasksPerOffer = maxTasksPerOffer;
}

public void setMaxTasksPerOfferPerRequest(int maxTasksPerOfferPerRequest) {
this.maxTasksPerOfferPerRequest = maxTasksPerOfferPerRequest;
}

public void setMesosConfiguration(MesosConfiguration mesosConfiguration) {
this.mesosConfiguration = mesosConfiguration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ public SingularityRequest checkSingularityRequest(SingularityRequest request, Op
checkBadRequest(!request.getInstances().isPresent(), "one-off requests can not define a # of instances");
}

if (request.getMaxTasksPerOffer().isPresent()) {
checkBadRequest(request.getMaxTasksPerOffer().get() > 0, "maxTasksPerOffer must be positive");
}

return request.toBuilder().setQuartzSchedule(Optional.fromNullable(quartzSchedule)).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;

import javax.inject.Singleton;
Expand Down Expand Up @@ -149,7 +151,7 @@ public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
numDueTasks = taskRequests.size();

final List<SingularityOfferHolder> offerHolders = Lists.newArrayListWithCapacity(offers.size());

final Map<String, Map<String, Integer>> tasksPerOfferPerRequest = new HashMap<>();
for (Protos.Offer offer : offers) {
offerHolders.add(new SingularityOfferHolder(offer, numDueTasks));
}
Expand All @@ -166,7 +168,7 @@ public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers) {
continue;
}

Optional<SingularityTask> accepted = match(taskRequests, stateCache, offerHolder);
Optional<SingularityTask> accepted = match(taskRequests, stateCache, offerHolder, tasksPerOfferPerRequest);
if (accepted.isPresent()) {
offerHolder.addMatchedTask(accepted.get());
addedTaskInLastLoop = true;
Expand Down Expand Up @@ -226,9 +228,14 @@ private double getWeightedPriority(SingularityTaskRequest taskRequest, long now)
return overdueMillis * Math.pow(requestPriority, configuration.getSchedulerPriorityWeightFactor());
}

private Optional<SingularityTask> match(Collection<SingularityTaskRequest> taskRequests, SingularitySchedulerStateCache stateCache, SingularityOfferHolder offerHolder) {

private Optional<SingularityTask> match(Collection<SingularityTaskRequest> taskRequests, SingularitySchedulerStateCache stateCache, SingularityOfferHolder offerHolder, Map<String, Map<String, Integer>> tasksPerOfferPerRequest) {
String offerId = offerHolder.getOffer().getId().getValue();
for (SingularityTaskRequest taskRequest : taskRequests) {
if (tooManyTasksPerOfferForRequest(tasksPerOfferPerRequest, offerId, taskRequest)) {
LOG.debug("Skipping task request for request id {}, too many tasks already scheduled using offer {}", taskRequest.getRequest().getId(), offerId);
continue;
}

final Resources taskResources = taskRequest.getPendingTask().getResources().or(taskRequest.getDeploy().getResources()).or(defaultResources);

// only factor in executor resources if we're running a custom executor
Expand Down Expand Up @@ -259,6 +266,7 @@ private Optional<SingularityTask> match(Collection<SingularityTaskRequest> taskR
taskManager.createTaskAndDeletePendingTask(zkTask);

stateCache.getActiveTaskIds().add(task.getTaskId());
addRequestToMapByOfferId(tasksPerOfferPerRequest, offerId, taskRequest.getRequest().getId());
stateCache.getScheduledTasks().remove(taskRequest.getPendingTask());

return Optional.of(task);
Expand All @@ -271,6 +279,35 @@ private Optional<SingularityTask> match(Collection<SingularityTaskRequest> taskR
return Optional.absent();
}

private void addRequestToMapByOfferId(Map<String, Map<String, Integer>> tasksPerOfferPerRequest, String offerId, String requestId) {
if (tasksPerOfferPerRequest.containsKey(offerId)) {
if (tasksPerOfferPerRequest.get(offerId).containsKey(requestId)) {
int count = tasksPerOfferPerRequest.get(offerId).get(requestId);
tasksPerOfferPerRequest.get(offerId).put(requestId, count + 1);
} else {
tasksPerOfferPerRequest.get(offerId).put(requestId, 0);
}
} else {
tasksPerOfferPerRequest.put(offerId, new HashMap<String, Integer>());
tasksPerOfferPerRequest.get(offerId).put(requestId, 1);
}
}

private boolean tooManyTasksPerOfferForRequest(Map<String, Map<String, Integer>> tasksPerOfferPerRequest, String offerId, SingularityTaskRequest taskRequest) {
if (!tasksPerOfferPerRequest.containsKey(offerId)) {
return false;
}
if (!tasksPerOfferPerRequest.get(offerId).containsKey(taskRequest.getRequest().getId())) {
return false;
}

int maxPerOfferPerRequest = taskRequest.getRequest().getMaxTasksPerOffer().or(configuration.getMaxTasksPerOfferPerRequest());
if (!(maxPerOfferPerRequest > 0)) {
return false;
}
return tasksPerOfferPerRequest.get(offerId).get(taskRequest.getRequest().getId()) > maxPerOfferPerRequest;
}

@Override
public void offerRescinded(SchedulerDriver driver, Protos.OfferID offerId) {
LOG.info("Offer {} rescinded", offerId);
Expand Down
12 changes: 12 additions & 0 deletions SingularityUI/app/components/requestForm/RequestForm.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,17 @@ const RequestForm = (props) => {
/>
);

const maxTasksPerOffer = (
<TextFormGroup
id="max-per-offer"
onChange={event => updateField('maxTasksPerOffer', event.target.value)}
value={getValue('maxTasksPerOffer')}
label="Schedule at most this many tasks using a single offer form a single slave"
required={INDEXED_FIELDS.maxTasksPerOffer.required}
feedback={feedback('maxTasksPerOffer')}
/>
);

const taskLogErrorRegex = (
<TextFormGroup
id="task-log-error-regex"
Expand Down Expand Up @@ -661,6 +672,7 @@ const RequestForm = (props) => {
{ shouldRenderField('group') && group }
{ shouldRenderField('readOnlyGroups') && readOnlyGroups }
{ shouldRenderField('readWriteGroups') && readWriteGroups }
{ shouldRenderField('maxTasksPerOffer') && maxTasksPerOffer }
{ shouldRenderField('taskLogErrorRegex') && taskLogErrorRegex }
{ shouldRenderField('taskLogErrorRegexCaseSensitive') && taskLogErrorRegexCaseSensitive }
{ shouldRenderField('emailConfigurationOverrides') && emailConfigurationOverrides }
Expand Down
1 change: 1 addition & 0 deletions SingularityUI/app/components/requestForm/fields.es6
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export const FIELDS_BY_REQUEST_TYPE = {
}
},
{id: 'group', type: 'string'},
{id: 'maxTasksPerOffer', type: 'number'},
{id: 'taskLogErrorRegex', type: 'string'},
{id: 'taskLogErrorRegexCaseSensitive', type: 'bool'},
{
Expand Down

0 comments on commit 2ab38a7

Please sign in to comment.