- java.lang.Object
-
- java.util.concurrent.SubmissionPublisher<T>
-
- 参数类型
-
T
- 已发布的项目类型
- All Implemented Interfaces:
-
AutoCloseable
,Flow.Publisher<T>
public class SubmissionPublisher<T> extends Object implements Flow.Publisher<T>, AutoCloseable
AFlow.Publisher
异步地将提交的(非空)项目发送给当前订户,直到关闭。 每个当前用户以相同的顺序接收新提交的项目,除非遇到丢失或异常。 使用提交发布器允许项目生成器作为兼容性reactive-streams发布者依赖于丢弃处理和/或阻止流控制。提交发布者使用其构造函数中提供的
Executor
来传递给订阅者。 执行人员的最佳选择取决于预期用途。 如果提交的项目的生成器以不同的线程运行,并且可以估计订户的数量,请考虑使用Executors.newFixedThreadPool(int)
。 否则考虑使用默认值,通常是ForkJoinPool.commonPool()
。缓冲允许生产者和消费者以不同的速率暂时运作。 每个用户使用独立的缓冲区。 缓冲区是在首次使用时创建的,并根据需要扩展到给定的最大值。 (强制执行的容量可以四舍五入为最接近的权力和/或由此实施支持的最大值限制。)
request
的调用不直接导致缓冲区扩展,但如果未填充的请求超过最大容量,那么风险将会饱和。 默认值Flow.defaultBufferSize()
可能为基于预期利率,资源和用途选择容量提供了有用的起点。出版方法支持关于缓冲区饱和时要做什么的不同策略。 方法
submit
块,直到资源可用。 这是最简单的,但响应不大。offer
方法可能会丢弃项目(立即或有限超时),但提供插入处理程序然后重试的机会。如果任何订阅者方法抛出异常,其订阅将被取消。 如果处理程序是作为一个构造函数的参数,它会被取消之前在方法中的异常调用
onNext
,但在方法例外onSubscribe
,onError
和onComplete
不入账或取消之前办理。 如果提供的Executor在尝试执行任务时抛出RejectedExecutionException
(或任何其他RuntimeException或Error),或者在处理丢弃的项目时,丢弃处理程序会引发异常,则会重新引发异常。 在这些情况下,并非所有订阅者都将发布已发布的项目。 在这些情况下通常是closeExceptionally
的良好做法。方法
consume(Consumer)
简化了对常见情况的支持,其中订户的唯一动作是使用所提供的功能请求和处理所有项目。此类也可以作为生成项目的子类的便利基础,并使用此类中的方法发布它们。 例如,这里是定期发布从供应商生成的项目的类。 (实际上,您可以添加独立启动和停止生成的方法,在发布者之间共享Executors等等,或者使用SubmissionPublisher作为组件而不是超类)。
class PeriodicPublisher<T> extends SubmissionPublisher<T> { final ScheduledFuture<?> periodicTask; final ScheduledExecutorService scheduler; PeriodicPublisher(Executor executor, int maxBufferCapacity, Supplier<? extends T> supplier, long period, TimeUnit unit) { super(executor, maxBufferCapacity); scheduler = new ScheduledThreadPoolExecutor(1); periodicTask = scheduler.scheduleAtFixedRate( () -> submit(supplier.get()), 0, period, unit); } public void close() { periodicTask.cancel(false); scheduler.shutdown(); super.close(); } }
这是一个
Flow.Processor
实现的例子。 为了简化说明,它向其发布者使用单步请求。 更自适应的版本可以使用从submit
返回的滞后估计以及其他实用方法来监视流量。class TransformProcessor<S,T> extends SubmissionPublisher<T> implements Flow.Processor<S,T> { final Function<? super S, ? extends T> function; Flow.Subscription subscription; TransformProcessor(Executor executor, int maxBufferCapacity, Function<? super S, ? extends T> function) { super(executor, maxBufferCapacity); this.function = function; } public void onSubscribe(Flow.Subscription subscription) { (this.subscription = subscription).request(1); } public void onNext(S item) { subscription.request(1); submit(function.apply(item)); } public void onError(Throwable ex) { closeExceptionally(ex); } public void onComplete() { close(); } }
- 从以下版本开始:
- 9
-
-
构造方法摘要
构造方法 Constructor 描述 SubmissionPublisher()
使用ForkJoinPool.commonPool()
创建一个新的SubmissionPublisher,用于向用户进行异步传递(除非它不支持至少两个并行级别,在这种情况下,创建一个新的线程来运行每个任务),最大缓冲区容量为Flow.defaultBufferSize()
,否则方法onNext
中订阅者异常的处理程序。SubmissionPublisher(Executor executor, int maxBufferCapacity)
使用给定的Executor创建一个新的SubmissionPublisher,为订阅者提供异步传递,每个订户的给定最大缓冲区大小,方法onNext
中没有用户订户异常的处理程序。SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
使用给定的Executor创建一个新的提交发布者,以便为订阅者提供异步传递,给定每个用户的最大缓冲区大小,如果非空,则在任何订阅者在方法onNext
中引发异常时调用给定的处理程序。
-
方法摘要
所有方法 接口方法 具体的方法 Modifier and Type 方法 描述 void
close()
除非已经关闭,问题onComplete
向当前用户发出信号,并且不允许随后的尝试发布。void
closeExceptionally(Throwable error)
除非已经关闭,否则问题onError
以给定错误向当前用户发出信号,并且不允许随后的尝试发布。CompletableFuture<Void>
consume(Consumer<? super T> consumer)
使用给定的Consumer功能处理所有已发布的项目。int
estimateMaximumLag()
返回所有当前订阅者中生成但尚未消费的最大项目数量的估计值。long
estimateMinimumDemand()
在所有当前订阅者中,返回所要求的最小数量(通过request
)但尚未生成的估计。Throwable
getClosedException()
返回与closeExceptionally
相关联的异常,如果未关闭或正常关闭,则为null。Executor
getExecutor()
返回用于异步传递的执行程序。int
getMaxBufferCapacity()
返回最大每用户缓冲区容量。int
getNumberOfSubscribers()
返回当前订阅者的数量。List<Flow.Subscriber<? super T>>
getSubscribers()
返回用于监视和跟踪目的的当前用户列表,而不是在订阅者上调用Flow.Subscriber
方法。boolean
hasSubscribers()
如果此发布者有任何订阅者,则返回true。boolean
isClosed()
如果此发布商不接受提交,则返回true。boolean
isSubscribed(Flow.Subscriber<? super T> subscriber)
如果给定订阅者当前订阅,则返回true。int
offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
如果可能,通过异步调用其onNext
方法来发布给定的项目,阻止当任何订阅的资源不可用时,直到指定的超时或直到调用者线程被中断,此时给定的处理程序-null)被调用,如果返回true,则重试一次。int
offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
如果可能,通过异步调用其onNext
方法将给定项目发布给每个当前用户。int
submit(T item)
通过异步调用其onNext
方法将给定项目发布给每个当前用户,阻止不中断,而任何用户的资源不可用。void
subscribe(Flow.Subscriber<? super T> subscriber)
添加给定订阅者,除非已经订阅。
-
-
-
构造方法详细信息
-
SubmissionPublisher
public SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler)
使用给定的Executor创建一个新的SubmissionPublisher,为订户提供异步传递,给定每个订户的最大缓冲区大小,如果非空,则在任何Subscriber在方法onNext
中引发异常时调用给定的处理程序。- 参数
-
executor
- 执行器用于异步传递,支持创建至少一个独立线程 -
maxBufferCapacity
- 每个用户缓冲区的最大容量(强制容量可以舍入到最接近的两个幂并且/或由该实现支持的最大值限定);方法getMaxBufferCapacity()
返回实际值) -
handler
- 如果非空,则在方法onNext
抛出异常时调用的过程 - 异常
-
NullPointerException
- 如果executor为null -
IllegalArgumentException
- 如果maxBufferCapacityIllegalArgumentException
-
SubmissionPublisher
public SubmissionPublisher(Executor executor, int maxBufferCapacity)
使用给定的Executor创建一个新的SubmissionPublisher,以便为订户提供异步传递,给定每个订户的最大缓冲区大小,并且在方法onNext
中没有用于订阅者异常的处理程序。- 参数
-
executor
- 执行器用于异步传递,支持创建至少一个独立线程 -
maxBufferCapacity
- 每个用户缓冲区的最大容量(强制容量可以舍入到最接近的两个幂并且/或由该实现支持的最大值限定);方法getMaxBufferCapacity()
返回实际值) - 异常
-
NullPointerException
- 如果executor为null -
IllegalArgumentException
- 如果maxBufferCapacity不为正
-
SubmissionPublisher
public SubmissionPublisher()
创建一个新的SubmissionPublisher,使用ForkJoinPool.commonPool()
进行异步传递给用户(除非它不支持至少两个并行性级别,在这种情况下,创建一个新的线程来运行每个任务),最大缓冲容量为Flow.defaultBufferSize()
,否则方法onNext
中订阅者异常的处理程序。
-
-
方法详细信息
-
subscribe
public void subscribe(Flow.Subscriber<? super T> subscriber)
添加给定订阅者,除非已经订阅。 如果已经订阅,订阅者的onError
方法将在现有订阅中使用IllegalStateException
进行调用 。 否则,成功后,订阅者的onSubscribe
方法将与新的Flow.Subscription
异步调用。 如果onSubscribe
引发异常,订阅将被取消。 否则,如果此提交发布者异常关闭,则用户的onError
方法将被调用与相应的异常,或者如果关闭无一例外,则调用订户的onComplete
方法。 订阅者可以通过调用新订阅的request
方法来启用接收项,并可通过调用其cancel
方法取消订阅。- Specified by:
-
subscribe
在接口Flow.Publisher<T>
- 参数
-
subscriber
- 用户 - 异常
-
NullPointerException
- 如果用户为空
-
submit
public int submit(T item)
通过异步调用其onNext
方法将给定项目发布给每个当前用户,阻止不间断,而任何用户的资源不可用。 此方法返回所有当前订阅者中最大滞后(已提交但尚未消费的项目数)的估计。 如果有任何用户,此值至少为一(占该提交的项目),否则为零。如果此发布者的Executor在尝试异步通知订阅者时引发了RejectedExecutionException(或任何其他RuntimeException或Error),则会重新抛出此异常,在这种情况下,并非所有订阅者都将被发出此项。
- 参数
-
item
- 要发布的(非空)项 - 结果
- 用户估计最大滞后
- 异常
-
IllegalStateException
- 如果关闭 -
NullPointerException
- 如果item为null -
RejectedExecutionException
- 如果由Executor抛出
-
offer
public int offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
如果可能,通过异步调用其onNext
方法将给定项目发布给每个当前用户。 如果超出资源限制,该项目可能被一个或多个订户丢弃,在这种情况下调用给定的处理程序(如果非空),如果返回true,则重试一次。 在处理程序被调用时,阻止其他线程在此类中对方法的其他调用。 除非确保恢复,否则选项通常限于记录错误和/或向用户发出onError
信号。此方法返回一个状态指示器:如果为负,则表示(负)数量的丢弃(尝试将订单发送给订户失败)。 否则,是所有当前订阅者中最大滞后(已提交但尚未消费的项目数)的估计。 如果有任何用户,此值至少为一(占该提交的项目),否则为零。
如果此发布者的Executor在尝试异步通知订阅者时抛出了RejectedExecutionException(或任何其他RuntimeException或Error),或者在处理丢弃的项目时,drop handler抛出异常,则会重新抛出此异常。
- 参数
-
item
- 要发布的(非空)项 -
onDrop
- 如果非空,则在删除订户时调用该处理程序,其参数为订阅者和项目; 如果返回true,则重新尝试(一次) - 结果
- 如果为负,则(负)滴数; 否则估计最大滞后
- 异常
-
IllegalStateException
- 如果关闭 -
NullPointerException
- 如果项目为空 -
RejectedExecutionException
- 如果由Executor抛出
-
offer
public int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)
如果可能,通过异步调用其onNext
方法将给定项目发布给每个当前订户,阻止任何订阅的资源不可用,直到指定的超时或直到调用者线程中断,此时给定的处理程序(如果非-null)被调用,如果返回true,则重试一次。 (丢弃处理程序可以通过检查当前线程是否中断来区分超时与中断)。在调用处理程序时,阻止其他线程在此类中对方法的其他调用。 除非确定恢复,否则选项通常限于记录错误和/或向用户发出onError
信号。此方法返回一个状态指示器:如果为负,则表示(负)数量的丢弃(尝试将订单发送给订户失败)。 否则,是所有当前订阅者中最大滞后(已提交但尚未消费的项目数)的估计。 如果有任何用户,此值至少为一(占该提交的项目),否则为零。
如果此发布者的Executor在尝试异步通知订阅者时抛出了RejectedExecutionException(或任何其他RuntimeException或Error),或者在处理丢弃的项目时,drop handler抛出异常,则会重新抛出此异常。
- 参数
-
item
- 要发布的(非空)项 -
timeout
-等待多久资源对于任何用户放弃,在单位前unit
-
unit
- aTimeUnit
确定如何解释timeout
参数 -
onDrop
- 如果非空,则在删除订户时调用该处理程序,其参数为订阅者和项目; 如果返回true,则重新尝试(一次) - 结果
- 如果为负,则(负)滴数; 否则估计最大滞后
- 异常
-
IllegalStateException
- 如果关闭 -
NullPointerException
- 如果item为null -
RejectedExecutionException
- 如果由Executor抛出
-
close
public void close()
除非已经关闭,否则问题onComplete
向当前用户发出信号,并且不允许随后的尝试发布。 返回时,这种方法并不能保证所有的用户都尚未完成。- Specified by:
-
close
在接口AutoCloseable
-
closeExceptionally
public void closeExceptionally(Throwable error)
- 参数
-
error
- 发送给订阅者的onError
参数 - 异常
-
NullPointerException
- 如果错误为null
-
isClosed
public boolean isClosed()
如果此发布商不接受提交,则返回true。- 结果
- 如果关闭则为true
-
getClosedException
public Throwable getClosedException()
返回与closeExceptionally
相关联的异常,如果未关闭或正常关闭,则为null。- 结果
- 异常,如果没有则为null
-
hasSubscribers
public boolean hasSubscribers()
如果此发布者有任何订阅者,则返回true。- 结果
- 如果此发布商有任何订阅者,则为true
-
getNumberOfSubscribers
public int getNumberOfSubscribers()
返回当前订阅者的数量。- 结果
- 当前订阅者的数量
-
getExecutor
public Executor getExecutor()
返回用于异步传递的执行程序。- 结果
- Executor用于异步传送
-
getMaxBufferCapacity
public int getMaxBufferCapacity()
返回最大每用户缓冲区容量。- 结果
- 最大每用户缓冲区容量
-
getSubscribers
public List<Flow.Subscriber<? super T>> getSubscribers()
返回用于监视和跟踪目的的当前用户列表,而不是在订阅者上调用Flow.Subscriber
方法。- 结果
- 当前订阅者列表
-
isSubscribed
public boolean isSubscribed(Flow.Subscriber<? super T> subscriber)
如果给定订阅者当前订阅,则返回true。- 参数
-
subscriber
- 用户 - 结果
- 如果当前订阅,则为true
- 异常
-
NullPointerException
- 如果用户为空
-
estimateMinimumDemand
public long estimateMinimumDemand()
在所有当前订阅者中,返回所要求的最小数量(通过request
)但尚未生成的估计。- 结果
- 估计,如果没有订阅者,则为零
-
estimateMaximumLag
public int estimateMaximumLag()
返回所有当前订阅者中生成但尚未消费的最大项目数量的估计值。- 结果
- 估计
-
consume
public CompletableFuture<Void> consume(Consumer<? super T> consumer)
使用给定的Consumer功能处理所有已发布的项目。 返回一个CompletableFuture,当发布商发信号为onComplete
,或者在任何错误时异常地完成,或者消费者抛出异常,或者退回的CompletableFuture被取消,在这种情况下,没有进一步的处理。- 参数
-
consumer
- 应用于每个onNext项目的函数 - 结果
- 一个CompletedFuture,当发布商发出信号onComplete时正常完成,特别是在任何错误或取消时
- 异常
-
NullPointerException
- 如果消费者为空
-
-