T
- the type of the items sent and consumed from this queueAsyncCloseable
, AsyncIterator<T>
public interface BoundedAsyncQueue<T> extends AsyncIterator<T>
AsyncQueue
that provides a mechanism for backpressure.
The documentation from AsyncQueue
largely applies here. Backpressure refers to the signal
sent to senders that the queue is "full" and the sender should stop sending values for some
period of time. Typically a queue becomes "full" because values are being sent into the queue
faster than the consumer is capable of consuming them. Without backpressure, the senders could
cause an out of memory condition if they eventually sent too many messages into the queue. Users
are expected to respect backpressure by refraining from making a subsequent call to send(T)
until the CompletionStage
returned by the previous call completes.
Currently you can produce a bounded queue with AsyncQueues.bounded()
or
AsyncQueues.buffered(int)
.
Consider this example implemented without backpressure
AsyncIterator<Integer> produce() {
AsyncQueue queue = AsyncQueues.unbounded();
pool.submit(() -> {
while (keepGoing) {
queue.send(i++);
}
queue.terminate();
});
}
produce().forEach(i -> {
slowWriteToDisk(i);
});
Because generating work is a cheap in-memory operation but consuming it is a slow IO operation,
the sender will dramatically outpace the consumer in this case. Soon, the process will run out of
memory, as the sender continues to queue ints for the consumer to write. Instead we can use a
bounded queue:
AsyncIterator<Integer> produce() {
final AsyncQueue<Integer> queue = AsyncQueues.bounded();
//blocking sends on pool
pool.submit(() -> {
while (shouldContinue()) {
queue.send(i++).toCompletableFuture().join();
}
queue.terminate();
});
return queue;
}
// consumer doesn't know or care queue is bounded
produce().forEach(i -> {
slowWriteToDisk(i);
});
Senders of course can be implemented without blocking while still respecting backpressure:
AsyncIterator<Integer> produce() {
final AsyncQueue<Integer> queue = AsyncQueues.bounded();
// alternative approach to sending: async sender
AsyncIterators
.iterate(i -> i + 1)
// send to queue
.thenApply(i -> queue.send(i))
// consumes futures one by one
.takeWhile(ig -> shouldContinue())
.consume()
// finished, terminate queue
.thenRun(() -> queue.terminate());
return queue;
}
An important point is that trying to send is the only way to be notified that the queue is full. In practice, this means that if your number of senders is very large you can still consume too much memory even if you are respecting the send interface.
AsyncIterator
,
AsyncQueue
,
AsyncQueues
AsyncIterator.End
Modifier and Type | Method | Description |
---|---|---|
Optional<T> |
poll() |
Gets a result from the queue if one is immediately available.
|
CompletionStage<Boolean> |
send(T item) |
Sends a value into this queue that can be consumed via the
AsyncIterator interface. |
CompletionStage<Void> |
terminate() |
Terminates the queue.
|
batch, batch, close, collect, collect, concat, concat, consume, empty, error, exceptionally, filter, filterApply, filterCompose, find, fold, fold, forEach, fromIterator, fuse, generate, infiniteRange, nextStage, once, range, repeat, supply, take, takeWhile, thenApply, thenApplyAsync, thenApplyAsync, thenCompose, thenComposeAhead, thenComposeAsync, thenComposeAsync, thenFlatten, thenFlattenAhead, unfold, unordered, zipWith
CompletionStage<Boolean> send(T item)
AsyncIterator
interface.
This method is thread safe - multiple threads can send values into this queue concurrently.
This queue is bounded, so after a call to send
a CompletionStage
is returned to
the sender. When the stage finishes, consumption has progressed enough that the queue is again
willing to accept messages. The implementation decides when a queue is writable: it could
require that all outstanding values are consumed by the consumer, it could allow a certain
number of values to be buffered before applying back pressure, or it could use some out-of-band
metric to decide.
item
- element to send into the queueCompletionStage
that completes when the queue is ready to accept another
message. It completes with true if the item was accepted, false if it was rejected
because the queue has already been terminated.AsyncQueue.send(T)
CompletionStage<Void> terminate()
send(T)
into the queue will
fail.
After the queue is terminated, all subsequent sends will return stages that will complete with
false. After the consumer consumes whatever was sent before the terminate, the consumer will
receive an AsyncIterator.End
marker. When the CompletionStage
returned by this
method completes, no more messages will ever make it into the queue. Equivalently, all stages
generated by send(T)
that will complete with true
will have been completed by the
time the returned stage completes.
CompletionStage
that indicates when all sends that were sent before the
terminate have made it into the queueAsyncQueue.terminate()
Optional<T> poll()
This method consumes parts of the queue, so like the consumption methods on
AsyncIterator
, this method is not thread-safe and should be used in a single threaded
fashion. After terminate()
is called and all outstanding results are consumed, poll
will always return empty. This method should not be used if there are null values in
the queue.
Notice that the queue being closed is indistinguishable from the queue being transiently empty.
To discover that no more results will ever be available, you must use the normal means on
AsyncIterator
: either calling AsyncIterator.nextStage()
and seeing if the result indicates
an end of iteration when the stage completes, or using one of the consumer methods that only
complete once the queue has been closed.
NullPointerException
- if the polled result is nullAsyncQueue.poll()
Copyright © 2018. All rights reserved.