Package java.util.stream
用于支持元素流上的功能样式操作的类,例如集合上的map-reduce转换。 例如:
int sum = widgets.stream() .filter(b -> b.getColor() == RED) .mapToInt(b -> b.getWeight()) .sum();
这里我们使用widgets ,一个Collection
此包中引入的关键抽象是流 。 类Stream , IntStream , LongStream ,和DoubleStream超过目的和原始流int , long和double类型。 Streams在几个方面与集合不同:
没有存储空间 流不是存储元素的数据结构; 相反,它通过计算操作管道传递来自诸如数据结构,数组,生成器函数或I / O通道的源的元素。
功能性。 对流的操作会产生结果,但不会修改其源。 例如,过滤从集合中获取的Stream会生成不带过滤元素的新Stream ,而不是从源集合中删除元素。
懒惰寻求。 许多流操作(例如过滤,映射或重复删除)可以懒惰地实现,从而暴露出优化的机会。 例如,“找到带有三个连续元音的第一个String ”不需要检查所有输入字符串。 流操作分为中间( Stream生产)操作和终端(产生价值或副作用)操作。 中间操作总是很懒惰。
可能是无限的。 虽然集合的大小有限,但流不需要。 短路操作(例如limit(n)或findFirst()可以允许无限流上的计算在有限时间内完成。
耗材。 流的元素仅在流的生命期间访问过一次。 与Iterator类似 ,必须生成新流以重新访问源的相同元素。
流可以通过多种方式获得。 一些例子包括:
从Collection通过stream()和parallelStream()方法;
从阵列通过Arrays.stream(Object[]) ;
从上流类静态工厂的方法,例如Stream.of(Object[]) , IntStream.range(int, int)或Stream.iterate(Object, UnaryOperator) ;
文件的行可以从BufferedReader.lines()获得;
文件路径的流可以从Files中的方法获得;
随机数的流可以从Random.ints()获得;
许多其它的数据流的方法的轴承在JDK,包括BitSet.stream() , Pattern.splitAsStream(java.lang.CharSequence) ,和JarFile.stream() 。
第三方库可以使用these techniques提供其他流源。
Stream operations and pipelines
流操作分为中间操作和终端操作,并组合成流管道 。 流管道由源(例如Collection ,阵列,生成器功能或I / O通道)组成; 然后是零个或多个中间操作,如Stream.filter或Stream.map ; 和终端操作,如Stream.forEach或Stream.reduce 。
中间操作返回一个新流。 他们总是懒惰 ; 执行诸如filter()类的中间操作实际上不执行任何过滤,而是创建一个新流,当遍历时,该流包含与给定谓词匹配的初始流的元素。 在执行管道的终端操作之前,不会开始遍历管道源。
终端操作(例如Stream.forEach或IntStream.sum )可以遍历流以产生结果或副作用。 在执行终端操作之后,流管道被认为已消耗,并且不能再使用; 如果需要再次遍历同一数据源,则必须返回数据源以获取新流。 在几乎所有情况下,终端操作都很渴望 ,在返回之前完成数据源的遍历和管道的处理。 只有终端操作iterator()和spliterator()不是; 这些是作为“逃生舱口”提供的,以便在现有操作不足以执行任务时启用任意客户端控制的管道遍历。
懒惰地处理流可以显着提高效率; 在诸如上面的filter-map-sum示例的流水线中,过滤,映射和求和可以融合到数据上的单个传递中,具有最小的中间状态。 懒惰还允许在不必要时避免检查所有数据; 对于诸如“查找超过1000个字符的第一个字符串”之类的操作,只需要检查足够的字符串以找到具有所需特征的字符串,而不检查源中可用的所有字符串。 (当输入流是无限的而不仅仅是大的时候,这种行为变得更加重要。)
中间操作进一步分为无 状态操作和有状态操作。 无状态操作(例如filter和map在处理新元素时不保留先前看到的元素的状态 - 每个元素可以独立于其他元素上的操作进行处理。 有状态操作(例如distinct和sorted )可以在处理新元素时包含先前看到的元素的状态。
有状态操作可能需要在生成结果之前处理整个输入。 例如,在查看流的所有元素之前,不能通过对流进行排序来产生任何结果。 因此,在并行计算下,某些包含有状态中间操作的管道可能需要对数据进行多次传递,或者可能需要缓冲重要数据。 仅包含无状态中间操作的管道可以在一次通过中处理,无论是顺序还是并行,具有最小的数据缓冲。
此外,一些操作被认为是短路操作。 如果在呈现无限输入时,它可能产生有限流,则中间操作是短路的。 如果在呈现无限输入时它可以在有限时间内终止,则终端操作是短路的。 在流水线中进行短路操作是处理无限流以在有限时间内正常终止的必要但不充分的条件。
Parallelism
具有显式for-循环的处理元素本质上是串行的。 Streams通过将计算重新定义为聚合操作的流水线而不是作为每个单独元素的命令操作来促进并行执行。 所有流操作都可以串行或并行执行。 除非明确请求并行性,否则JDK中的流实现会创建串行流。 例如, Collection具有方法Collection.stream()和Collection.parallelStream() ,它们分别产生顺序和并行流; 其他流方法(如IntStream.range(int, int))生成顺序流,但这些流可以通过调用它们的BaseStream.parallel()方法进行有效并行化。 要并行执行先前的“窗口小部件权重总和”查询,我们会这样做:
int sumOfWeights = widgets.parallelStream() .filter(b -> b.getColor() == RED) .mapToInt(b -> b.getWeight()) .sum();
此示例的串行和并行版本之间的唯一区别是使用“ parallelStream() ”而不是“ stream() ”创建初始流。 根据调用终端操作的流的模式,顺序地或并行地执行流管道。 可以使用BaseStream.isParallel()方法确定流的顺序或并行模式,并且可以使用BaseStream.sequential()和BaseStream.parallel()操作修改流的模式。 最近的顺序或并行模式设置适用于整个流管道的执行。
除了标识为明确不确定的操作(例如findAny() ,流是顺序执行还是并行执行不应更改计算结果。
大多数流操作接受描述用户指定行为的参数,这些参数通常是lambda表达式。 为了保持正确的行为,这些行为参数必须是无干扰的 ,并且在大多数情况下必须是无状态的 。 这些参数始终是functional interface的实例,例如Function ,并且通常是lambda表达式或方法引用。
Non-interference
Streams使您能够在各种数据源上执行可能并行的聚合操作,甚至包括非线程安全的集合,例如ArrayList 。 只有在执行流管道期间我们能够防止干扰数据源时,才有可能实现这一点。 除了逃逸舱口操作iterator()和spliterator() ,在终端操作被调用时开始执行,并且在终端操作完成时结束。 对于大多数数据源,防止干扰意味着确保在流管道的执行期间根本不修改数据源。 值得注意的例外是其源是并发集合的流,这些集合专门用于处理并发修改。 并发流源是Spliterator报告CONCURRENT特征的那些源。
因此,源流可能不是并发的流管道中的行为参数永远不应修改流的数据源。 如果行为参数修改或导致修改流的数据源,则该行为参数会干扰非并发数据源。 不干涉的需要适用于所有管道,而不仅仅是并行管道。 除非流源是并发的,否则在执行流管道期间修改流的数据源可能会导致异常,错误答案或不一致的行为。 对于性能良好的流源,可以在终端操作开始之前修改源,并且这些修改将反映在所覆盖的元素中。 例如,请考虑以下代码:
List
首先创建一个包含两个字符串的列表:“one”; 和“两个”。 然后从该列表创建流。 接下来,通过添加第三个字符串来修改列表:“three”。 最后,收集流的元素并将它们连接在一起。 由于在终端collect操作开始之前修改了列表,结果将是一串“一二三”。 从JDK集合和大多数其他JDK类返回的所有流都以这种方式表现良好; 对于其他库生成的流,请参阅Low-level stream construction以了解构建行为良好的流的要求。
Stateless behaviors
如果流操作的行为参数是有状态的,则流管道结果可能是不确定的或不正确的。 有状态lambda(或实现适当功能接口的其他对象)的结果取决于在流管道执行期间可能发生变化的任何状态。 有状态lambda的一个示例是map()的参数:
Set
这里,如果映射操作是并行执行的,则由于线程调度差异,相同输入的结果可能因运行而异,而对于无状态lambda表达式,结果将始终相同。
另请注意,尝试从行为参数访问可变状态会使您在安全性和性能方面做出错误的选择; 如果您没有同步对该状态的访问,则会出现数据争用,因此您的代码已损坏,但如果您同步访问该状态,则存在争用的风险会破坏您希望从中受益的并行性。 最好的方法是避免有状态的行为参数完全流动操作; 通常有一种方法可以重构流管道以避免状态。
Side-effects
通常,不鼓励行为参数对流操作的副作用,因为它们通常会导致无意中违反无国籍要求以及其他线程安全危险。
如果行为参数确实有副作用,除非明确说明,否则不保证:
visibility对其他线程的副作用;
对同一流管道中“相同”元素的不同操作在同一个线程中执行; 和
总是调用行为参数,因为流实现可以自由地从流管道中删除操作(或整个阶段),如果它可以证明它不会影响计算结果。
副作用的排序可能令人惊讶。 即使管道被约束以产生与流源的遭遇顺序一致的结果 (例如, IntStream.range(0,5).parallel().map(x -> x*2).toArray()必须产生[0, 2, 4, 6, 8] ),也不保证将映射器功能应用于各个元素的顺序,或者在什么线程中为给定元素执行任何行为参数。
消除副作用也可能令人惊讶。 除了终端操作forEach和forEachOrdered之外 ,当流实现可以优化掉行为参数的执行而不影响计算结果时,可能不总是执行行为参数的副作用。 (有关具体示例,请参阅count操作中记录的API说明。)
许多可能试图使用副作用的计算可以更安全和有效地表达而没有副作用,例如使用reduction而不是可变累加器。 但是,诸如使用println()进行调试的副作用通常是无害的。 少数流操作,例如forEach()和peek() ,只能通过副作用运行; 这些应该小心使用。
作为如何将不适当地使用副作用的流管道转换为不使用副作用的流管道的示例,以下代码在字符串流中搜索与给定正则表达式匹配的那些,并将匹配放在列表中。
ArrayList
此代码不必要地使用副作用。 如果并行执行, ArrayList的非线程安全性将导致不正确的结果,并且添加所需的同步将导致争用,从而破坏并行性的好处。 此外,在这里使用副作用是完全没有必要的; 可以简单地将forEach()替换为更安全,更高效且更易于并行化的还原操作:
List
Ordering
流可能有也可能没有已定义的遭遇顺序 。 流是否具有遭遇顺序取决于源和中间操作。 某些流源(例如List或数组)本质上是有序的,而其他流(例如HashSet )则不是。 某些中间操作(例如sorted() )可能会在其他无序流上强制执行遭遇顺序,而其他中间操作可能会呈现无序的有序流,例如BaseStream.unordered() 。 此外,一些终端操作可以忽略遭遇顺序,例如forEach() 。
如果订购了流,则大多数操作都被约束为对其遭遇顺序中的元素进行操作; 如果流的源是List含有[1, 2, 3] ,然后执行的结果map(x -> x*2)必须是[2, 4, 6] 。 但是,如果源没有定义的遭遇顺序,则值[2, 4, 6]任何排列都将是有效结果。
对于顺序流,遭遇顺序的存在与否不会影响性能,只影响确定性。 如果订购了流,则在相同的源上重复执行相同的流管道将产生相同的结果; 如果没有订购,重复执行可能会产生不同的结果。
对于并行流,放宽排序约束有时可以实现更高效的执行。 如果元素的排序不相关,则可以更有效地实现某些聚合操作,例如过滤重复( distinct() )或分组缩减( Collectors.groupingBy() )。 类似地,与遇到订单本质上相关的操作(例如limit() )可能需要缓冲以确保正确排序,从而破坏并行性的好处。 在流具有遭遇顺序但用户不特别关心该遭遇顺序的情况下,使用unordered()明确地对流进行排序可以改善某些有状态或终端操作的并行性能。 然而,大多数流管道,例如上面的“块的权重总和”示例,即使在排序约束下仍然有效地并行化。
Reduction operations
缩减操作(也称为折叠 )采用一系列输入元素,并通过重复应用组合操作将它们组合成单个汇总结果,例如查找一组数字的总和或最大值,或将元素累积到列表中。 该流的类具有普遍减少操作,所谓的多种形式reduce()和collect() ,以及多个专业化还原的形式,如sum() , max() ,或count() 。
当然,这样的操作可以很容易地实现为简单的顺序循环,如:
int sum = 0; for (int x : numbers) { sum += x; }
然而,有充分理由优先考虑减少操作而不是如上所述的变异累积。 简化不仅“更抽象” - 它作为一个整体而不是单个元素在整个流上运行 - 但正确构造的reduce操作本质上是可并行化的,只要用于处理元素的函数是associative和stateless 。 例如,给定我们想要找到总和的数字流,我们可以写:
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
要么:
int sum = numbers.stream().reduce(0, Integer::sum);
这些减少操作可以安全地并行运行,几乎不需要修改:
int sum = numbers.parallelStream().reduce(0, Integer::sum);
减少并行很好,因为实现可以并行地对数据的子集进行操作,然后将中间结果组合以获得最终的正确答案。 (即使语言具有“并行for-each”结构,变异累积方法仍然需要开发人员为共享累积变量sum提供线程安全更新,并且所需的同步可能会消除并行性能带来的任何性能提升。)使用reduce()代替了并行化还原操作的所有负担,并且库可以提供有效的并行实现,而无需额外的同步。
前面显示的“小部件”示例显示了简化如何与其他操作结合使用批量操作替换循环。 如果widgets是Widget对象的集合,其具有getWeight方法,我们可以找到最重的小部件:
OptionalInt heaviest = widgets.parallelStream() .mapToInt(Widget::getWeight) .max();
在其更一般的形式,一个reduce上类型的元素的操作
U reduce(U identity, BiFunction accumulator, BinaryOperator combiner);
这里, identity元素既是缩减的初始种子值,也是没有输入元素的默认结果。 累加器函数获取部分结果和下一个元素,并产生新的部分结果。 组合器功能组合了两个部分结果以产生新的部分结果。 (组合器在并行缩减中是必要的,其中输入被分区,为每个分区计算部分累积,然后组合部分结果以产生最终结果。)
更正式地说, identity值必须是组合器函数的标识 。 这意味着,对于所有u , combiner.apply(identity, u)等于u 。 此外, combiner功能必须是associative ,并必须与兼容accumulator功能:对所有u和t , combiner.apply(u, accumulator.apply(identity, t))必须equals()至accumulator.apply(u, t) 。
三参数形式是两参数形式的概括,将映射步骤结合到累积步骤中。 我们可以使用更通用的形式重新构建简单的权重总和示例,如下所示:
int sumOfWeights = widgets.stream() .reduce(0, (sum, b) -> sum + b.getWeight(), Integer::sum);
虽然显式的map-reduce形式更具可读性,因此通常应该是首选。 通过将映射和缩减组合成单个函数,可以优化远离重要工作的情况提供通用形式。
Mutable reduction
可变减少操作将输入元素累积到可变结果容器中,例如Collection或StringBuilder ,因为它处理流中的元素。
如果我们想要获取字符串流并将它们连接成一个长字符串,我们可以通过普通减少来实现:
String concatenated = strings.reduce("", String::concat)
我们会得到理想的结果,甚至可以并行工作。 但是,我们可能对性能不满意! 这样的实现会进行大量的字符串复制,并且运行时间的字符数为O(n ^ 2) 。 更高效的方法是将结果累积到StringBuilder中 ,这是一个用于累积字符串的可变容器。 我们可以使用相同的技术来并行化可变缩减,就像我们使用普通缩减一样。
可变还原操作称为collect() ,因为它将所需结果收集到结果容器(例如Collection 。 collect操作需要三个功能:构造结果容器的新实例的供应商函数,将输入元素合并到结果容器中的累加器函数,以及将一个结果容器的内容合并到另一个中的合并函数。 这种形式与普通减少的一般形式非常相似:
与reduce() ,以这种抽象方式表达collect的好处是它直接适用于并行化:我们可以并行累积部分结果然后将它们组合,只要累积和组合函数满足适当的要求即可。 例如,要将流中元素的String表示形式收集到ArrayList ,我们可以为每个表单编写明显的顺序:
ArrayList
或者我们可以使用可并行化的收集表单:
ArrayList
或者,将映射操作从累加器函数中拉出来,我们可以更简洁地表达它:
List
在这里,我们的供应商只是ArrayList constructor ,累加器将字符串化元素添加到ArrayList ,组合器只使用addAll将字符串从一个容器复制到另一个容器。
collect的三个方面 - 供应商,累加器和组合器 - 紧密耦合。 我们可以使用Collector的抽象来捕获所有这三个方面。 以上用于将字符串收集到List可以使用标准Collector重写为:
List
将可变缩减打包到收集器中具有另一个优点:可组合性。 类Collectors包含许多用于收集器的预定义工厂,包括将一个收集器转换为另一个收集器的组合器。 例如,假设我们有一个收集器来计算员工流的工资总和,如下所示:
Collector
(第二个类型参数的?仅表示我们不关心此收集器使用的中间表示。)如果我们想创建一个收集器以按部门列出工资总额,我们可以使用summingSalaries重用summingSalaries :
Map
与常规还原操作一样,如果满足适当的条件,则只能并行化collect()操作。 对于任何部分累积的结果,将其与空结果容器组合必须产生等效结果。 也就是说,对于部分累积的结果p ,它是任何一系列累加器和组合器调用的结果, p必须等效于combiner.apply(p, supplier.get()) 。
此外,然而,计算是分开的,它必须产生相同的结果。 对于任何输入元素t1和t2 ,以下计算中的结果r1和r2必须是等效的:
A a1 = supplier.get(); accumulator.accept(a1, t1); accumulator.accept(a1, t2); R r1 = finisher.apply(a1); // result without splitting A a2 = supplier.get(); accumulator.accept(a2, t1); A a3 = supplier.get(); accumulator.accept(a3, t2); R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting
这里,等价通常意味着根据Object.equals(Object) 。 但在某些情况下,可以放宽等同性以解释顺序的差异。
Reduction, concurrency, and ordering
对于一些复杂的归约操作,例如collect()产生一个Map ,如:
Map<Buyer, List<Transaction>> salesByBuyer = txns.parallelStream() .collect(Collectors.groupingBy(Transaction::getBuyer));
`
实际上并行执行操作可能会适得其反。 这是因为所述组合步骤(合并一个Map到另一个由键)可以是用于某些昂贵Map实现。
但是,假设此减少中使用的结果容器是可同时修改的集合 - 例如ConcurrentHashMap 。 在这种情况下,累加器的并行调用实际上可以将它们的结果同时存入同一个共享结果容器中,从而消除了组合器合并不同结果容器的需要。 这可能会提升并行执行性能。 我们称之为同时减少。
甲Collector支持并发还原标有Collector.Characteristics.CONCURRENT特性。 但是,并发收集也有缺点。 如果多个线程同时将结果存入共享容器,则存储结果的顺序是不确定的。 因此,只有在对正在处理的流不重要的情况下,才能实现并发减少。 Stream.collect(Collector)实现只会执行并发减少
流是平行的;
收集器具有Collector.Characteristics.CONCURRENT特性,并且;
流是无序的,或者收集器具有Collector.Characteristics.UNORDERED特性。
您可以使用BaseStream.unordered()方法确保流无序。 例如:
Map
(其中Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>)是并发等效groupingBy )。
请注意,如果给定键的元素按照它们在源中出现的顺序出现很重要,那么我们就不能使用并发缩减,因为排序是并发插入的牺牲品之一。 然后,我们将被限制为实现顺序缩减或基于合并的并行缩减。
Associativity
如果满足以下op则运算符或函数op是关联的:
(a op b) op c == a op (b op c)
如果我们将其扩展为四个术语,可以看出这对并行评估的重要性:
a op b op c op d == (a op b) op (c op d)
因此我们可以与(c op d)并行评估(a op b) ,然后在结果上调用op 。
关联操作的示例包括数字加法,最小值和最大值以及字符串连接。
Low-level stream construction
到目前为止,所有流示例都使用了Collection.stream()或Arrays.stream(Object[])等方法来获取流。 这些流式方法是如何实现的?
StreamSupport类有许多用于创建流的低级方法,所有这些方法都使用某种形式的Spliterator 。 分裂器是Iterator的并行模拟器 ; 它描述了一个(可能是无限的)元素集合,支持顺序前进,批量遍历,并将输入的某些部分分成另一个可以并行处理的分裂器。 在最低级别,所有流都由分裂器驱动。
在实现spliterator时有许多实现选择,几乎所有这些都是使用该spliterator在实现的简单性和流的运行时性能之间进行权衡。 创建spliterator的最简单但性能最差的方法是使用Spliterators.spliteratorUnknownSize(java.util.Iterator, int)从迭代器创建一个。 虽然这样的分裂器可以工作,但它可能会提供较差的并行性能,因为我们丢失了大小调整信息(底层数据集有多大),以及被限制为简单的分裂算法。
更高质量的分裂器将提供平衡和已知大小的分割,准确的大小调整信息,以及可以由实现用于优化执行的分裂器或数据的许多其他characteristics 。
可变数据源的分裂器还有一个挑战; 绑定到数据的时间,因为数据可能在创建分裂器的时间和流管道的执行时间之间发生变化。 理想情况下,流的分裂器会报告IMMUTABLE或CONCURRENT的特征; 如果不是它应该是late-binding 。 如果某个来源无法直接提供推荐的分裂器,则可以使用Supplier间接提供分裂器,并通过Supplier接受版本的stream()构建流。 仅在流管道的终端操作开始之后才从供应商获得分离器。
这些要求显着减少了流源突变和流管道执行之间潜在干扰的范围。 基于具有所需特征的分裂器的流或使用基于供应商的工厂形式的流不受在终端操作开始之前对数据源的修改的影响(假设流操作的行为参数满足非操作的要求标准)干涉和无国籍)。 有关详细信息,请参阅Non-Interference 。