面试知识点总结
1. 一篇文章说清 netty 的线程模型 2. 一篇说尽 java 线程池 3. 一篇就够了系列 - LinkedHashMap 4. 使用LinkedHashMap构建LRU的Cache 5. 并发编程之-Excutor框架 6. Java工具类提供的排序功能 7. Java的众多log库都是什么关系? 8. Java常用日志框架历史 9. 网页特殊符号(HTML字符实体)大全 10. JavaFX项目打包为独立的macOS应用程序和dmg文件 11. Java 11 支持的 基于 HTTP/2 的响应式请求 12. Java 11 中 HttpClient 的使用(HTTP/2协议) 13. Java 11 模块化入门教程 14. 五五面试网-带你 理解 java 模块系统 (一) 15. 五五面试网-带你 理解 java 模块系统 (二) 16. 五五面试网-带你 理解 java 模块系统 (三) 手动新建一个java模块 17. Java中的TreeMap 18. gradle:现代高效的java构建工具 19. Spring + MyBatis 框架下处理数据库异常 20. 通过开源项目,免费获取Idea的开源授权 21. IntelliJ IDEA 2020最新激活码(亲测有效,可激活至 2089 年)

Java 11 支持的 基于 HTTP/2 的响应式请求

使用Java 11的新HTTP API,您可以做的不仅仅是HTTP/2和异步请求—您还可以以反应式方式处理请求和响应体,这使您可以完全控制通过网络传输的字节:您可以进行节流,可以流式传输(以节省内存),你可以在发现结果后立即公开(而不是等待整个Body的到来)。

在这篇文章中,我们将研究流请求和响应体,因为这需要对反应流(在Java9中作为流API引入)有一个实际的理解,所以我们也将快速讨论它们——如果您已经知道它们是如何工作的,请跳到流式处理请求正文。以后的文章可能会研究反应式web套接字API。

HTTP/2 API使用   响应式流( reactive streams ) 来处理请求和响应主体。完全可以使用响应式流( reactive streams )来构建类似于Java8流的管道:从源代码开始,定义一组操作来处理源代码包含/发出的每个项。

不过,有一些重要的区别,最显著的区别是项目在管道中的移动方式。对于Java8流,源代码包含这些项,终端操作通过管道将它们拉入(想象一下您想要处理的tweet集合)。在响应式流 中,源代码生成项目,并希望通过管道推送它们(想想一个实时发出tweets的Twitter API,您希望处理它们)——“愿意”,因为订阅者可以向后推以确保它们不会被压垮。

虽然可以使用反应流来构建强大的管道,但是HTTP/2api使用它们的方式要简单得多。这很好,因为JDK只包含连接更大管道的两个步骤所需的构建块——像RxJava或projectreactor这样的库提供了构建在它们之上的高级功能。以下是涉及的三种类型:

  • Publisher 生成要使用的项目并可以订阅。(例如,HTTP响应可以在字节到达时发布字节。)
  • Subscriber 订阅发布服务器,并提供onNext方法以供新项目使用,onError用于发布服务器遇到的错误,onComplete方法用于发布服务器完成后调用。(例如,JSON解析器可以订阅响应。)
  • Subscription 是发布服务器和订阅服务器之间的连接,可用于请求项目或取消订阅

程序流程如下:

  • 创建并订阅:
    • you need a Publisher pub and a Subscriber sub
    • call pub.subscribe(sub)
    • pub creates Subscription script and calls sub.onSubscription(script)
    • sub stores script
  • 流:
    • sub calls script.request(n) (where n is a positive int)
    • pub calls sub.onNext(item) to push items (max n times)
    • this continues for as long as publisher and subscriber want and there is no error
  • 取消:
    • pub may call sub.OnError(err) or sub.onComplete()
    • sub may call script.cancel()

 流式请求:

如果POST/PUT/…请求有一个很大的主体,则可能不希望将其全部加载到内存中。有了Java新的反应式HTTP API,您就不必这么做了!

例如,在创建POST请求时,需要提供主体,但不必以字符串或byte[]的形式提供。从形式上讲,你必须交出一个BodyPublisher,它本质上是一个Publisher<ByteBuffer>,即它发布字节块。然后,HTTP请求将订阅该发布服务器并请求通过网络发送的字节。

我们可以通过为接口BodyPublisher、Subscriber和Subscription创建decorator来观察这种行为,这些接口将登录到standard out,然后将它们注入到HTTP请求生成器中:

HttpClient client = HttpClient.newBuilder().build();

HttpRequest post = HttpRequest

    .newBuilder(POSTMAN_POST)

    // this is where the magic happens!

    .POST(new LoggingBodyPublisher(BodyPublishers.ofFile(LARGE_JSON)))

    .header("Content-Type", "application/json")

    .build();

HttpResponse<String> response = client

    .send(post, BodyHandlers.ofString());

LARGE_JSON是一个路径,BodyPublishers::ofFile为它创建一个BodyPublisher,根据需要遍历文件。我们将其包装到日志装饰器中,并将请求传递给客户机。流媒体启动后,您可以看到以下输出:

# the HTTP client created a subscriber and now registers it with the

# file publisher by calling `Publisher::subscribe`

[subscribe   ] Subscriber registered:

    jdk.internal.net.http.Http1Request$FixedContentSubscriber@70ede696

# the file publisher created the subscription and passes it to the

# HTTP client by calling `Subscriber::onSubscribe`

[onSubscribe ] Subscription registered:

    jdk.internal.net.http.PullPublisher$Subscription@4adbc393

# the "handshake" is complete and the HTTP client starts requesting

# items by calling `Subscription::request`

[request     ] Items requested: 1 ↺ 1

# the file publisher received the request for the first item and

# fulfills it by calling `Subscriber::onNext` on the HTTP client

# with a single `ByteBuffer` instance (of 16kb length)

[onNext      ] Bytes passed: 16384 ↺ 16384

# the `request/onNext` cycle continues

[request     ] Items requested: 1 ↺ 2

[onNext      ] Bytes passed: 16384 ↺ 32768

[request     ] Items requested: 1 ↺ 3

[onNext      ] Bytes passed: 16384 ↺ 49152

[... snip ...]

[request     ] Items requested: 1 ↺ 85

[onNext      ] Bytes passed: 16384 ↺ 1392640

# the file publisher realizes that there are no more bytes to

# publish and calls `Subscriber::onComplete` on the HTTP client

[onComplete  ] Publishing completed

有趣的是,BodyPublishers::ofFile返回的BodyPublisher是懒惰的(它读取的内容永远不会超过完成下一个请求所需的量),并且HTTP客户端只会在最后一个字节通过网络发送后请求新的字节。这意味着无论文件有多大,您永远不需要在内存中存储超过16kb的文件。

 

很容易与该逻辑集成,作为一个更详细的例子,创建一个发布服务器,它连接到数据库并使用分页,在任何时候都只在内存中保存整个结果的一个小窗口,同时将其转换为合理的表示,并将其作为请求主体的一部分流式传输。

反应流的另一个很好的用例是响应体的实时处理。接下来就是这个了!

 流式响应消息体:

当BodyPublisher负责发布通过线路发送的字节时,BodySubscriber<T>订阅作为响应一部分接收的字节,并将它们收集到T类型的实例中。这些字节以字节缓冲区列表的形式出现,这意味着BodySubscriber扩展了Subscriber<List<ByteBuffer>>。实现这一点意味着从缓冲区提取字节,了解字符集,决定将结果字符串拆分到哪里等等……总之,这是一件痛苦的事。所以我们不会这么做。

BodyHandlers 和 BodySubscribers:

我们可以实现Subscriber<String>并将其传递给BodyHandlers::fromLineSubscriber:

static CompletableFuture<Void> reactiveSearch(

        HttpClient client, URI url, String term) {

    HttpRequest request = HttpRequest.newBuilder(url).GET().build();

    // TODO: we need a subscriber

    Subscriber<String> stringFinder = // ...

    // TODO: we need to do something with the `CompletableFuture`

    client.sendAsync(request, BodyHandlers.fromLineSubscriber(finder));

    // TODO: we need to get the result out of the `stringFinder`

    //       and return it here as a `CompletableFuture`

    return // ...

}

什么是BodyHandlers?

BodyHandler<T>负责评估响应的状态代码、HTTP版本和头行,并为响应的字节创建BodySubscriber<T>。泛型类型T表示这些字节最终将转换为什么,并确定HttpResponse<T>中的T,从而确定HttpResponse::body的返回类型。在HTTP/2教程以及前面的流式处理请求体的示例中,我们调用了BodyHandlers::ofString来获取一个BodyHandler<String>,它将整个响应体表示为单个字符串,并将其传递给客户机的send方法。

这次我们要打电话BodyHandlers.fromLineSubscriber为什么订阅者给了我们更多的自由?稍后!)它将字节缓冲区的列表聚合为字符串,在新行中将它们拆分,然后期望订阅服务器处理这些单独的响应行。作为回报,我们不需要等到整个尸体到达我们才能处理它。

现在您已经知道订户必须遵循的协议,所以让我们快速实现一个简单的变体:

 

private static class StringFinder implements Subscriber<String> {

 

    private final String term;

    private Subscription subscription;

 

    private StringFinder(String term) {

        this.term = term;

    }

 

    @Override

    public void onSubscribe(Subscription subscription) {

        this.subscription = subscription;

        this.subscription.request(1);

    }

 

    @Override

    public void onNext(String line) {

        // TODO: scan the line and, if found, expose positive result

        subscription.request(1);

    }

 

    @Override

    public void onError(Throwable ex) {

        // TODO: expose the error

    }

 

    @Override

    public void onComplete() {

        // entire body was processed, but term was not found;

        // TODO: expose negative result

    }

 

}

 

如您所见,StringFinder通过存储订阅、请求项(在本例中是一行一行)并处理它们来实现反应式订阅者契约。

 

用CompletableFuture公开结果 

正如您还可以看到的,还有一些待办事项–它们都围绕如何公开结果展开,可以是:

  • positive: term found in body
  • negative: term not found in body
  • error: HTTP client reported an exception

我们可以用一个完整的future<Boolean>覆盖这三个用例:

private static class StringFinder implements Subscriber<String> {

 

    private final CompletableFuture<Boolean> found =

        new CompletableFuture<>()

 

    // [... other fields, constructor, `onSubscribe` as above...]

 

    @Override

    public void onNext(String line) {

        if (!found.isDone() && line.contains(term))

            found.complete(true);

        subscription.request(1);

    }

 

    @Override

    public void onError(Throwable ex) {

        found.completeExceptionally(ex);

    }

 

    @Override

    public void onComplete() {

        found.complete(false);

    }

 

    public CompletableFuture<Boolean> found() {

        return found;

    }

 

}

让我们将StringFinder插入reactiveSearch,看看我们得到了什么:

static CompletableFuture<Void> reactiveSearch(

        HttpClient client, URI url, String term) {

    HttpRequest request = HttpRequest.newBuilder(url).GET().build();

    StringFinder finder = new StringFinder(term);

    // DANGER ZONE!

    client.sendAsync(request, BodyHandlers.fromLineSubscriber(finder));

    return finder

        .found()

        .exceptionally(__ -> false)

        .thenAccept(found -> System.out.println(

            "Completed " + url + " / found: " + found));

}

 

我们将StringFinder传递给fromLineSubscriber,fromLineSubscriber将它包装成一个BodyHandler,然后返回finder公开的CompletableFuture。不过,有点不对劲: sendAsync 返回的future是怎么回事?我们不也需要吗?有点…但我们得绕道去。

fromLineSubscriber有一个重载,其语义是,一旦我们的订阅者处理了整个主体,它的结果就可以作为响应的主体使用。给定一个提取结果的函数<Subscriber<String>,T>(称为finisher),它将创建一个BodyHandler<T>,从而导致HttpResponse<T>。

我们调用的fromLineSubscriber有不同的语义,很难理解:所提供的订阅者可以随心所欲地处理主体,而不必在之后使其可用。因此,它返回一个BodyHandler<Void>,导致HttpResponse<Void>,这意味着 sendAsync 返回一个CompletableFuture,在响应完全处理时完成,但从不公开主体。

如果这听起来只是忽略 sendAsync 返回值的另一个原因,那么我已经成功地将您引入了我遵循的相同错误思想。我甚至可能会以这种方式发表这篇文章,如果没有互联网连接,我就不会在离地面3.5万英尺的地方工作。

错误处理:

 

虽然StringFinder通过其completeFuture公开错误来正确地处理错误,但这些并不是所有可能发生的错误。相反,这些只是在主体从服务器流到客户端时可能发生的一小部分错误(例如,连接丢失)。

但是有很多原因我们甚至不能流尸体!例如,无法建立连接。例如,因为你在空中。在这种情况下,StringFinder从不订阅任何内容,它的CompletableFuture永远不会完成,等待它永远阻塞。我们哪里出错了?这些错误在哪里出现?

这里是 sendAsync 返回的CompletableFuture的位置。是它暴露了这样的错误!因此,我们需要关注它的异常处理,让我们的finder的未来在同样的异常中完成:

StringFinder finder = new StringFinder(term);

client

    .sendAsync(request, BodyHandlers.fromLineSubscriber(finder))

    // this is a `CompletableFuture<Void>` ...

    .exceptionally(ex -> {

        finder.onError(ex);

        // ... which is why we need to return `null`

        return null;

    });

这样,StringFinder返回的CompletableFuture<Boolean>显示了在部署HTTP请求时可能出现的所有可能结果:

一旦找到这个词,它就会用true完成

如果整个Body都被扫描的话,它会以false结束

如果出现任何问题(包括在实体流式传输之前发生的问题),它将完成,但有例外情况

整洁!

还记得上一篇文章吗,在维基百科搜索十篇最长的文章“Foo”时,异步调用所用的时间大约是阻塞调用所用时间的80%。流式处理实体而不是将其整体组合在一起会减少内存占用,这通常会导致更长的运行时间。只是,尽管如此,仍然有大约85%的阻塞呼叫。

取消流:

只剩下一点:我们总是流整个Body,甚至在我们找到搜索词之后。一旦我们完成了,我们不能中止流吗?从技术上讲,是的,它甚至不复杂–我们只需要在StringFinder::onNext中添加一行:

@Override

public void onNext(String line) {

    if (line.contains(search.term())) {

        found.complete(true);

        subscription.cancel();

    }

    requestLine();

}

通过取消订阅,我们将不会收到更多的行,这在测量运行时时确实显示了:这需要阻塞调用所用时间的45%,即异步方法的55%。但请记住,这种加速在很大程度上取决于搜索词找到的速度!如果你搜索的是“Foobar”而不是“Foo”,那么十个站点中都没有包含它(真可惜),性能会回到运行时而不会被取消。

关于取消,还有两个细节我要提一下。第一种情况是,取消订阅会导致客户端调用onError,出现如下异常:

java.util.concurrent.CompletionException:

java.io.IOException: Stream 47 cancelled

自从一个错误的电话异常地找到了.complete,则将来必须已经完成(或者结果总是错误而不是true)。这就是为什么找到了。完成了(正确)必须在前面订阅.取消()!

最后,总结一下: 

  • 响应式流 有两个活动的参与者:发布者和订阅者,前者发布后者使用的项目
  • 要流式传输请求正文,您需要一个发布服务器<ByteBuffer>,HTTP客户机将订阅该发布服务器,然后通过网络发送请求的字节
  • 要流式处理响应体,通常将实现一个Subscriber<String>,客户机向传入的响应字节订阅该订阅服务器<String>,它将其转换为一个字符串,并逐行中断一部分供订阅服务器使用
  • 注意正确处理错误

学习响应式流 以及Java新的HTTP/2API如何使用它,不是一件容易的事。如果你自己玩它,它会变得更清晰,所以我想最后一次给你演示:克隆它,玩它,破坏它,修复它。最好的学习方式。