Library to prioritize streams of data. If you have producers of data, that produce the same data, but you want to consume data from some producers sooner or more often than from others, this library let's you do that.
The simplest way of prioritizing producers is to assign shares. In the following example, producer A is invoked 70% of the time, producer B 15% and producer C 5%.
Scheduler().schedule(
producers = listOf(
PriorityConfiguration(shares = 70.0) to producer("A"),
PriorityConfiguration(shares = 15.0) to producer("B"),
PriorityConfiguration(shares = 5.0) to producer("C")
)
)
You can define penalties for producers, when they produce null
. This is useful if you are reading from different queues and the queue is empty for a while. In the following example, the share distribution is initially 15% (15 shares), 15% (15 shares), 70% (70 shares). When producer C returns null
it's shares will drop by 40 to 30 and the new distribution is 25% (15 shares), 25% (15 shares), 50% (30 shares). Once producer C returned a non-null value, the shares are restored to their original distribution.
Scheduler().schedule(
producers = listOf(
PriorityConfiguration(shares = 15.0) to producer("A"),
PriorityConfiguration(shares = 15.0) to producer("B"),
PriorityConfiguration(shares = 70.0, possiblePenalty = 40.0) to producer("C"),
)
)
You can use fair
to create an equal amount of shares between producers:
Schedulers.fair(
producer("A"),
producer("B"),
producer("C"),
producer("D")
)
You can also create a round-robin by setting it as a work strategy:
Scheduler().schedule(
producers = ...
strategy = WorkStrategy.ROUND_ROBIN
)
The libraries main abstraction to retrieve data is Producer interface.
If your data source is providing data in batches, you can use the flatten function to be able to treat each element of the batch as a single entity when determining priority.
val batchProducer: Producer<Set<MyObject>> = ...// e.g. read from queue
Scheduler().schedule(producers = batchProducer.flatten(), ...)