Class ChangesFollower
There are two modes of operation:
startOneOff()
to fetch the changes from the supplied since sequence until there are no further pending changes.start()
to fetch the changes from the supplied since sequence and then continuing to listen indefinitely for further new changes.
The starting sequence ID can be changed for either mode by using
PostChangesOptions.Builder.since(String)
. By default when using:
startOneOff()
the feed will start from the beginning.start()
the feed will start from "now".
In either mode the Stream
of changes can be terminated early by calling
stop()
.
By default ChangesFollower
will suppress transient errors indefinitely and
endeavour to run to completion or listen forever. For applications where that
behaviour is not desirable an alternate constructor is available where a
Duration
may be specified to limit the time since the last successful
response that transient errors will be suppressed.
It should be noted that errors considered terminal, for example, the database not existing or invalid credentials are never suppressed and will throw an exception immediately.
The PostChangesOptions
model of changes feed options is used to configure
the behaviour of the ChangesFollower
. However, a subset of the options are
invalid as they are configured internally by the implementation and will cause
an IllegalArgumentException
to be thrown if supplied. These invalid
options are:
PostChangesOptions.Builder.descending(Boolean)
PostChangesOptions.Builder.feed(String)
PostChangesOptions.Builder.heartbeat(long)
PostChangesOptions.Builder.lastEventId(String)
(usePostChangesOptions.Builder.since(String)
instead)PostChangesOptions.Builder.timeout(long)
Only the value of _selector
is permitted for the PostChangesOptions.Builder.filter(String)
option.
Selector based filters perform better than JS based filters and using one of the alternative JS based
filter types will cause ChangesFollower
to throw an IllegalArgumentException
.
It should also be noted that the PostChangesOptions.Builder.limit(long)
parameter will truncate the stream at the given number of changes in either
operating mode.
The ChangesFollower
requires the Cloudant
client to have
HTTP call and read timeouts of at least 1 minute. The default client
configuration has sufficiently long timeouts.
-
Constructor Summary
ConstructorDescriptionChangesFollower
(Cloudant client, PostChangesOptions options) Create a newChangesFollower
using the supplied client and options and configured to continually suppress transient errors and retry indefinitely.ChangesFollower
(Cloudant client, PostChangesOptions options, Duration errorTolerance) Create a newChangesFollower
using the supplied client and options that will suppress transient errors and retry for as long as the given duration. -
Method Summary
Modifier and TypeMethodDescriptionstart()
Return all available changes and keep listening for new changes until reaching an end condition.Return all available changes until there are no further changes pending or reaching an end condition.void
stop()
Stop thisChangesFollower
.
-
Constructor Details
-
ChangesFollower
Create a newChangesFollower
using the supplied client and options and configured to continually suppress transient errors and retry indefinitely.- Parameters:
client
- - Cloudant client instance to use to make requestsoptions
- - Changes feed options- Throws:
IllegalArgumentException
- if there are invalid options
-
ChangesFollower
public ChangesFollower(Cloudant client, PostChangesOptions options, Duration errorTolerance) throws IllegalArgumentException Create a newChangesFollower
using the supplied client and options that will suppress transient errors and retry for as long as the given duration.- Parameters:
client
- - Cloudant client instance to use to make requestsoptions
- - Changes feed optionserrorTolerance
- - the duration to suppress errors, measured from the previous successful request. UseDuration.ZERO
to disable error suppression and terminate thisChangesFollower
on any failed request.- Throws:
IllegalArgumentException
- if there are invalid options
-
-
Method Details
-
start
public Stream<ChangesResultItem> start() throws IllegalStateException, com.ibm.cloud.sdk.core.service.exception.ServiceResponseExceptionReturn all available changes and keep listening for new changes until reaching an end condition.The end conditions are:
- a terminal error (e.g. unauthorized client).
- transient errors occur for longer than the error suppression duration.
- the number of changes received reaches the limit specified in the
PostChangesOptions
used to instantiate thisChangesFollower
. stop()
is called.
The same change may be received more than once.
- Returns:
Stream
of at least oneChangesResultItem
per change- Throws:
IllegalStateException
- ifstart()
orstartOneOff()
was already called.com.ibm.cloud.sdk.core.service.exception.ServiceResponseException
- if a terminal error or unsupressed transient error is recevied from the service when fetching changes
-
startOneOff
public Stream<ChangesResultItem> startOneOff() throws IllegalStateException, com.ibm.cloud.sdk.core.service.exception.ServiceResponseExceptionReturn all available changes until there are no further changes pending or reaching an end condition.The end conditions are:
- a terminal error (e.g. unauthorized client).
- transient errors occur for longer than the error suppression duration.
- the number of changes received reaches the limit specified in the
PostChangesOptions
used to instantiate thisChangesFollower
. stop()
is called.
The same change may be received more than once.
- Returns:
Stream
of at least oneChangesResultItem
per change- Throws:
IllegalStateException
- ifstart()
orstartOneOff()
was already called.com.ibm.cloud.sdk.core.service.exception.ServiceResponseException
- if a terminal error or unsupressed transient error is recevied from the service when fetching changes
-
stop
Stop thisChangesFollower
.Note that
Stream
terminal operations block so this stop method needs to be called from a different thread than the terminal operation to have any effect.- Throws:
IllegalStateException
- ifstart()
orstartOneOff()
was not called first.
-