T
- The type of the elements in this queueAsyncCloseable
, AsyncIterator<T>
public interface AsyncQueue<T> extends AsyncIterator<T>
This class provides a queue abstraction that allows multiple senders to place values into the
queue synchronously, and a single consumer to consume values as they become available
asynchronously. You can construct an AsyncQueue
with the static methods on
AsyncQueues
.
This interface represents an unbounded queue, meaning there is no mechanism to notify
the sender that the queue is "full" (nor is there a notion of the queue being full to begin
with). The queue will continue to accept values as fast as the senders can send(T)
them,
regardless of the rate at which the values are being consumed. If senders produce a lot of values
much faster than the consumption rate, it will lead to an out of memory error, so users are
responsible for enforcing that the queue does not grow too large. If you would like a queue
abstraction that provides backpressure, see BoundedAsyncQueue
.
This queue can be terminated by someone calling terminate()
, which can be called by
consumers or senders. It is strongly recommended that all instances of this class eventually be
terminated. Most terminal operations on AsyncIterator
return
CompletionStages
whose stage will not complete until
the queue is terminated. After the queue is terminated, subsequent send(T)
s are rejected,
though consumers of the queue will still receive any values that were sent before the
termination.
Typically you'll want to use a queue when you have some "source" of items, and want to consume
them asynchronously as the become available. Some examples of sources could be a collection of
CompletionStages
, bytes off of a socket, results
produced by dedicated worker threads, etc. Suppose you had a scenario where you had many threads
doing some CPU intensive computation, and you'd send their answers off to some server one at a
time.
AsyncQueue<Integer> queue = AsyncQueues.unbounded();
for (i = 0; i < numThreads; i++) {
// spawn threads that send results to queue
threadpool.submit(() -> {
while (canStillCompute) {
int num = computeReallyExpensiveThing();
queue.send(num);
}
});
}
//consumer of queue, sending numbers to a server one at a time
queue
// lazily map numbers to send
.thenCompose(number -> sendToServer(number))
// consume all values
.consume()
// iteration stopped (meaning queue was terminated)
.thenAccept(ig -> sendToServer("no more numbers!");
threadpool.awaitTermination();
// terminate the queue, done computing
queue.terminate();
It is also convenient to use a queue to merge many AsyncIterator
s together. Consider the
destination server in the previous example, now with many compute servers sending the numbers
they were computing. If we used AsyncIterator.concat(java.util.Iterator<? extends com.ibm.asyncutil.iteration.AsyncIterator<T>>)
in the following example, we would
wait until we got all the work from the first iterator to move onto the next. With a queue we
instead process each number as soon as it becomes available.
AsyncIterator<Integer> getNumbersFrom(ServerLocation ip);
AsyncQueue queue = AsyncQueues.unbounded();
futures = ips.stream()
// get an AsyncIterator of numbers from each server
.map(this::getNumbersFrom)
// send each number on each iterator into the queue as they arrive
.forEach(asyncIterator -> asyncIterator.forEach(t -> queue.send(t)))
// bundle futures into a list
.collect(Collectors.toList());
// terminate the queue whenever we're done sending
Combinators.allOf(futures).thenAccept(ignore -> queue.terminate());
// prints each number returned by servers as they arrive
queue
.forEach(num -> System.out.println(num))
.thenAccept(ig -> System.out.println("finished getting all numbers")));
A reminder, all topics addressed in the documentation of AsyncIterator
apply to this
interface as well. Most importantly this means:
AsyncQueues
,
BoundedAsyncQueue
AsyncIterator.End
Modifier and Type | Method | Description |
---|---|---|
Optional<T> |
poll() |
Gets a result from the queue if one is immediately available.
|
boolean |
send(T item) |
Sends a value into this queue that can be consumed via the
AsyncIterator interface. |
void |
terminate() |
Terminates the queue, disabling
send(T) . |
batch, batch, close, collect, collect, concat, concat, consume, empty, error, exceptionally, filter, filterApply, filterCompose, find, fold, fold, forEach, fromIterator, fuse, generate, infiniteRange, nextStage, once, range, repeat, supply, take, takeWhile, thenApply, thenApplyAsync, thenApplyAsync, thenCompose, thenComposeAhead, thenComposeAsync, thenComposeAsync, thenFlatten, thenFlattenAhead, unfold, unordered, zipWith
boolean send(T item)
AsyncIterator
interface.
This method is thread safe - multiple threads can send values into this queue concurrently. This queue is unbounded, so it will continue to accept new items immediately and store them in memory until they can be consumed. If you are sending work faster than you can consume it, this can easily lead to an out of memory condition.
item
- the item to be sent into the queuevoid terminate()
send(T)
.
After the queue is terminated all subsequent sends will be rejected, returning false. After the consumer consumes whatever was sent before the terminate, the consumer will receive an end of iteration notification.
This method is thread-safe, and can be called multiple times. An attempt to terminate after termination has already occurred is a no-op.
Optional<T> poll()
This method consumes parts of the queue, so like the consumption methods on
AsyncIterator
, this method is not thread-safe and should be used in a single threaded
fashion. After terminate()
is called and all outstanding results are consumed, poll
will always return empty. This method should not be used if there are null values in
the queue.
Notice that the queue being closed is indistinguishable from the queue being transiently empty.
To discover that no more results will ever be available, you must use the normal means on
AsyncIterator
: either calling AsyncIterator.nextStage()
and seeing if the result indicates
an end of iteration when the future completes, or using one of the consumer methods that only
complete once the queue has been closed.
NullPointerException
- if the polled result is nullCopyright © 2018. All rights reserved.