Reactive Streams:一种支持背压的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。
Reactive Streams主要解决背压(back-pressure)问题。当传入的任务速率大于系统处理能力时,数据处理将会对未处理数据产生一个缓冲区。
背压依我的理解来说,是指订阅者能和发布者交互(通过代码里面的调用request和cancel方法交互),可以调节发布者发布数据的速率,解决把订阅者压垮的问题。关键在于上面例子里面的订阅关系Subscription这个接口,他有request和cancel 2个方法,用于通知发布者需要数据和通知发布者不再接受数据。
我们重点理解背压在jdk9里面是如何实现的。关键在于发布者Publisher的实现类SubmissionPublisher的submit方法是block方法。订阅者会有一个缓冲池,默认为Flow.defaultBufferSize() = 256。当订阅者的缓冲池满了之后,发布者调用submit方法发布数据就会被阻塞,发布者就会停(慢)下来;订阅者消费了数据之后(调用Subscription.request方法),缓冲池有位置了,submit方法就会继续执行下去,就是通过这样的机制,实现了调节发布者发布数据的速率,消费得快,生成就快,消费得慢,发布者就会被阻塞,当然就会慢下来了。
单线程版本:
一个生产者,一个消费者
import lombok.SneakyThrows; import java.util.ArrayList; import java.util.List; import java.util.Random; public class BackpressureExample { public static void main(String[] args) throws InterruptedException { BackpressureSubscriber subscriber = new BackpressureSubscriber(); BackpressurePublisher publisher = new BackpressurePublisher(subscriber); publisher.start(); subscriber.start(); // 为了演示效果,这里让主线程休眠一段时间 Thread.sleep(50000); publisher.stop(); subscriber.stop(); } @SneakyThrows public static void processDataLogic(List<Integer> batch) { //模拟任务执行 int r = new Random().nextInt(3000); Thread.sleep(r); System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r); } static class BackpressurePublisher { private final BackpressureSubscriber subscriber; private volatile boolean running; public BackpressurePublisher(BackpressureSubscriber subscriber) { this.subscriber = subscriber; this.running = true; } public void start() { Thread thread = new Thread(() -> { int item = 1; while (running) { List<Integer> batch = new ArrayList<>(); for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName() + "-----produce data = " + item); batch.add(item++); } while (!subscriber.accept(batch)) { if (!running) { break; } } } }); thread.start(); } public void stop() { running = false; } } static class BackpressureSubscriber { private volatile boolean running; public BackpressureSubscriber() { this.running = true; } public boolean accept(List<Integer> batch) { if (running) { processDataLogic(batch); return true; } else { return false; } } public void start() { // Subscriber 在 JDK 8 中没有异步处理的能力,因此不需要单独开启线程 } public void stop() { running = false; } } }
多线程版本
一个生产者,多个消费者
import lombok.SneakyThrows; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class BackpressureExample { public static void main(String[] args) throws InterruptedException { BackpressureSubscriber subscriber = new BackpressureSubscriber(); BackpressurePublisher publisher = new BackpressurePublisher(subscriber); publisher.start(); subscriber.start(); // 为了演示效果,这里让主线程休眠一段时间 Thread.sleep(50000); publisher.stop(); subscriber.stop(); } @SneakyThrows public static void processDataLogic(List<Integer> batch) { //模拟任务执行 int r = new Random().nextInt(3000); Thread.sleep(r); System.out.println(Thread.currentThread().getName() + ",Received batch: " + batch + ",sleep ms = " + r); } static class BackpressurePublisher { private final BackpressureSubscriber subscriber; private volatile boolean running; public BackpressurePublisher(BackpressureSubscriber subscriber) { this.subscriber = subscriber; this.running = true; } public void start() { Thread thread = new Thread(() -> { int item = 1; while (running) { List<Integer> batch = new ArrayList<>(); for (int i = 0; i < 5; i++) { System.out.println(Thread.currentThread().getName() + "-----produce data = " + item); batch.add(item++); } while (!subscriber.accept(batch)) { if (!running) { break; } } } }); thread.start(); } public void stop() { running = false; } } static class BackpressureSubscriber { private volatile boolean running; private final ExecutorService executor; private final int workerSize = 2; private final List<Future> futures; public BackpressureSubscriber() { this.running = true; this.executor = Executors.newFixedThreadPool(workerSize); futures = new ArrayList<>(workerSize); } public boolean accept(List<Integer> batch) { if (running) { Future f = executor.submit(() -> processDataLogic(batch)); futures.add(f); waitForTaskDone(futures); return true; } else { return false; } } public void waitForTaskDone(List<Future> futures) { while (futures.size() >= workerSize) { for (Future future : futures) { if (future.isDone()) { // 只要有一个worker是空闲就重新获取任务 futures.remove(future); return; } } } } public void start() { // Subscriber 在 JDK 8 中没有异步处理的能力,因此不需要单独开启线程 } public void stop() { running = false; executor.shutdown(); } } }
到此这篇关于用JAVA自己实现一套背压机制的文章就介绍到这了,更多相关java背压机制内容请搜索好代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持好代码网!