public class MessageRepeater<T> extends java.lang.Object implements Reschedulable<T>
ReschedulingSink
to deliver the same message to sink's
consumers multiple times at predefined intervals for a finite number of
times.
A repeater is configured with a consumer and a sequence of n > 0
Duration
s d[0], .., d[n-1]
. When the first message m
comes in at time t0
, it is delivered to the consumer. If the consumer
returns Stop
no further action is taken; if it
returns Repeat
instead, then m
will be
delivered again to the consumer at time t1 = t0 + d[0]
. When given
the message again at time t1
, the consumer can either Stop
or Repeat
, in which case
m
will be delivered again to the consumer at time t2 = t1 +
d[1]
. And so on for at most n
deliveries, past which point m
is given to a configured exceeded re-delivery handler.
A typical use case for a repeater is that of retrying failed actions.
The consumer would carry out some task using the message as input. If a
transient error occurs, the consumer would ask to repeat
the delivery later; if the error is permanent the consumer would just
stop
instead. The consumer can retry up to n
times; past that, the message is fed into the exceeded re-delivery handler
which would be some kind of permanent failure handler in this scenario.
Constructor and Description |
---|
MessageRepeater(RepeatConsumer<T> consumer,
java.util.stream.Stream<java.time.Duration> repeatIntervals,
java.util.function.Consumer<T> exceededRedeliveryHandler)
Creates a new instance.
|
Modifier and Type | Method and Description |
---|---|
java.util.Optional<Schedule<T>> |
consume(CountedSchedule current,
T data)
Consumes the output of the channel this instance is bound to.
|
public MessageRepeater(RepeatConsumer<T> consumer, java.util.stream.Stream<java.time.Duration> repeatIntervals, java.util.function.Consumer<T> exceededRedeliveryHandler)
consumer
- consumes the message output from the channel and returns
an indication of whether the same message should be delivered again.repeatIntervals
- intervals at which to re-deliver the message.exceededRedeliveryHandler
- is given the message after n
re-deliveries, where n
is the number of repeat intervals.java.lang.NullPointerException
- if any argument is null
.java.lang.IllegalArgumentException
- if the retry intervals stream is empty
or contains null
.public java.util.Optional<Schedule<T>> consume(CountedSchedule current, T data)
Reschedulable
consume
in interface Reschedulable<T>
current
- information about this invocation's schedule.data
- the data received from the channel.null
.