网站首页 > 博客文章 正文
通过分析源码的方式了解了无状态操作和有状态操作的区别,每一个中间操作方法是如何实现的,Stream是惰性流,调用中间操作比如filter()、map()等方法不会立即执行声明的lambda表达式,只有通过调用终止操作才会处理Stream中的元素。本章我们将分析终止操作相关源码,深入了解内部原理。
终止操作
在Stream API中有一个接口TerminalOp代表终止操作。
interface TerminalOp<E_IN, R> {
default StreamShape inputShape() { return StreamShape.REFERENCE; }
default int getOpFlags() { return 0; }
default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
Spliterator<P_IN> spliterator) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
return evaluateSequential(helper, spliterator);
}
<P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,
Spliterator<P_IN> spliterator);
}
复制代码
接口上定义了两个泛型类型:
- E_IN:输入的元素类型
- R:结果类型
来看下方法定义:
- inputShape():获取操作的输入元素类型,返回枚举StreamShape,译为Stream的形状。可以看到定义了四种枚举,正好对应Stream、IntStream、LongStream、DoubleStream。
enum StreamShape {
REFERENCE,
INT_VALUE,
LONG_VALUE,
DOUBLE_VALUE
}
复制代码
- evaluateParallel():Stream是并行时,终止操作最终调用这个方法处理数据,我们不关心。
- evaluateSequential():串行流时最终调用这个方法处理数据。
通过继承关系可以看到,只有四个类直接实现TerminalOp接口,Stream有这么多终止方法,而实际上可以归类为4个,单独说明一下:toArray()方法也是终止操作,但是与其它不同,没有使用TerminalOp。
现在来看下终止操作划分:
操作类型 | 方法 |
非短路操作 | forEach() forEachOrdered() toArray() reduce() collect() min() max() count() |
短路操作 | anyMatch() allMatch() noneMatch() findFirst() findAny() |
非短路操作
非短路操作主要有ForEachOp和ReduceOp两个TerminalOp实现,其实toArray()也属于非短路操作,但是它没有依靠实现TerminalOp来完成相应的功能,所以不作讲解,感兴趣的小伙伴可以自己看一下相关源码。
ForEachOp
用于遍历Stream元素的forEach()和forEachOrdered()方法都是通过ForEachOp的子类完成相应工作的。
ForEachOp的继承结构比较复杂,除了实现上面的TerminalOp外,还实现了TerminalSink。TerminalSink聚合了Sink和Supplier接口,这两个大家应该都不陌生,Sink在前面的文章有详细讲解,Supplier是一个函数式接口,用于提供一个结果给调用者。
通过前面的文章和继承结构,我们可以大胆猜测:ForEachOp除了具备终止操作的能力,在数据处理之前,自己还会作为一个sink,与中间操作中的sink实例组成sink链表,通过责任链模式依次处理Stream中的元素。到底是不是这样呢?我们进入源码求证。
//省略了并行处理相关的代码
//泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值
static abstract class ForEachOp<T>
implements TerminalOp<T, Void>, TerminalSink<T, Void> {
//odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注
private final boolean ordered;
protected ForEachOp(boolean ordered) {
this.ordered = ordered;
}
// TerminalOp
@Override
public int getOpFlags() {
return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
}
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
// TerminalSink
@Override
public Void get() {
return null;
}
}
复制代码
重点关注evaluateSequential(),内部会调用PipelineHelper#wrapAndCopyInto()方法,看到这个方法是不是很熟悉,没错就是前面文章中多次提到的处理数据的方法,它的职责是将传入sink与中间操作产生的sink组合成链表,然后调用源Spliterator的方法,发送Stream元素给sink链处理。
我们进入这个方法看一下,它是在AbstractPipeline中实现的:
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
复制代码
可以看到首先调用wrapSink()方法,将封装有终止操作逻辑的sink再次包装:
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
复制代码
wrapSink()方法前面也有多次讲到,Stream经过一系列中间操作调用,返回的实际上是一个Stream链表,结构如下:
这里的逻辑就是从后向前遍历这个链表,调用每一个节点上的AbstractPipeline#opWrapSink()方法,将除开Head节点之外,每一个中间操作当中的sink,与后一个中间操作的sink节点连接,最后一个节点就是代表终止操作的sink。
拿到sink链表之后,再来看看copyInto()方法是如何执行数据处理逻辑的:
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
//非短路操作
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
//1.begin()方法调用
wrappedSink.begin(spliterator.getExactSizeIfKnown());
//2.forEachRemaining()方法调用,发送元素
spliterator.forEachRemaining(wrappedSink);
//3.end()方法调用,通知结束
wrappedSink.end();
}
//短路操作
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
复制代码
因为ForEachOp是非短路操作,所以必定走第一个分支,现在只分析这个逻辑:
- 在发送元素之前,调用begin()方法,将元素大小传入sink链表,经过有状态中间操作时会初始化相应的变量,这在前一章有详细分析,最终调用终止操作sink。
- 调用Spliterator#forEachRemaining()方法遍历元素,内部会调用Sink#accept()方法传入每一个元素,同样元素会在sink链表上经过中间操作处理,最后到达终止操作。
- end()方法,同理通知结束,做清理工作。
forEach()方法
forEach()方法,调用action函数处理Stream上传递过来的每一个元素,注意在并行流中,元素实际被处理的顺序可能不是Stream的元素顺序。ReferencePipelinet#forEach():
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
复制代码
ForEachOps#makeRef()方法:
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
复制代码
创建并返回OfRef对象,可以看到它继承自ForEachOp,实现了accept()方法,也就是元素到达终止操作sink时,会调用lambda表达式。
static final class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
@Override
public void accept(T t) {
consumer.accept(t);
}
}
复制代码
最后再来看一下AbstractPipeline#evaluate(),每一个终止操作方法内部都会调用evaluate()方法触发Stream上的逻辑执行,在它的内部,又会调用TerminalOp#evaluateSequential()方法:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
//触发真正的逻辑
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
复制代码
所以通过前面的分析,forEach()方法的流程应该很清晰了:先创建继承自ForEachOp的OfRef对象,然后通过调用AbstractPipeline#evaluate()方法,间接调用ForEachOp#evaluateSequential()方法,将OfRef作为终止操作的sink与中间操作的sink构建成链表,最后通过调用Spliterator#forEachRemaining()方法遍历元素将元素传递给sink链, 而代表forEach()终止操作sink的OfRef对象每接收到上一个sink传递过来的元素都会调用声明的lambda表达式进行处理。
forEachOrdered()方法
forEachOrdered()方法,调用action函数处理Stream上传递过来的每一个元素,在并行流中也能保证元素处理顺序和Stream一致。在串行流中行为和forEach()表现一致。
public void forEachOrdered(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, true));
}
复制代码
ReduceOp
ReduceOp是用于聚合Stream中元素的终止操作,ReduceOp应该是工作中被开发者使用最多的终止操作,比如:reduce()、collect()、max()等等。
//省略了并行处理的方法
private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
implements TerminalOp<T, R> {
private final StreamShape inputShape;
ReduceOp(StreamShape shape) {
inputShape = shape;
}
public abstract S makeSink();
@Override
public StreamShape inputShape() {
return inputShape;
}
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
}
复制代码
可以看到ReduceOp继承关系比ForEachOp更简单,ReduceOp没有实现TerminalSink接口,所以它自身不包含终止操作的逻辑,是通过调用重写的makeSink()方法拿到TerminalSink,再传入AbstractPipeline#wrapAndCopyInto()进行处理的,具体的逻辑还得在各个终止操作方法中查看。
reduce()方法
reduce()有三个重载方法,我们选择两个重点分析:
- reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator):
public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
}
复制代码
ReduceOps#makeRef():
public static <T, U> TerminalOp<T, U>
makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
Objects.requireNonNull(reducer);
Objects.requireNonNull(combiner);
//实现了AccumulatingSink接口,其实AccumulatingSink接口继承自TerminalSink
//然后扩展了一个方法combine(),这个方法是用于并行计算的,不用关心
//ReducingSink还继承了Box,Box中只保存了一个state初始变量,
//用于与Stream中的每一个元素进行聚合计算
class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
@Override
public void begin(long size) {
//初始变量state,这里赋值的就是Stream#reduce()方法传入的identity
state = seed;
}
@Override
public void accept(T t) {
//应用reducer函数将元素和state计算,产生一个新的结果并覆盖之前的结果
state = reducer.apply(state, t);
}
//用于并行计算
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
//创建上面的内部类实例
return new ReducingSink();
}
};
}
复制代码
reduce()方法确实是创建了一个 ReduceOp实例,它利用ReducingSink来保存初始结果,并且与每一个元素计算之后覆盖之前的结果。最后通过ReducingSink#get()方法作为Stream终止操作的结果返回。
- reduce(BinaryOperator<P_OUT> accumulator):
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
return evaluate(ReduceOps.makeRef(accumulator));
}
复制代码
ReduceOps#makeRef():
public static <T> TerminalOp<T, Optional<T>>
makeRef(BinaryOperator<T> operator) {
Objects.requireNonNull(operator);
class ReducingSink
implements AccumulatingSink<T, Optional<T>, ReducingSink> {
//sink初始化时为空,也就是没有计算结果
private boolean empty;
//保存计算结果
private T state;
public void begin(long size) {
//初始状态
empty = true;
state = null;
}
@Override
public void accept(T t) {
if (empty) {
//Stream中的第一个元素到来,直接作为结果保存到state
empty = false;
state = t;
} else {
//之后的元素,都应用operator函数与之前的结果计算,产生一个新的结果
state = operator.apply(state, t);
}
}
@Override
public Optional<T> get() {
//返回结果的方法,如果Stream中没有元素到达终止操作,则不存在结果,所以返回Optional
return empty ? Optional.empty() : Optional.of(state);
}
@Override
public void combine(ReducingSink other) {
if (!other.empty)
accept(other.state);
}
}
return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
};
}
复制代码
可以看到,与上面的reduce()方法不同的是,它没有一个初始结果,所以在ReducingSink中,使用了两个变量来保存状态,第一个元素作为初始结果与后面的元素计算,并且方法声明的返回值是Optional类型,代表这个操作可能是没有结果的。
max()方法
max()方法,使用比较器比较元素中最大的值并返回。max()和min()方法都是对reduce()方法的特殊应用。
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
return reduce(BinaryOperator.maxBy(comparator));
}
复制代码
max()方法调用了reduce()方法,我们看下BinaryOperator#maxBy()返回什么:
public static <T> BinaryOperator<T> maxBy(Comparator<? super T> comparator) {
Objects.requireNonNull(comparator);
return (a, b) -> comparator.compare(a, b) >= 0 ? a : b;
}
复制代码
逻辑很简单,就是利用比较器比较当前元素和前面所有元素的较大值,并作为最终结果返回。
min()方法
min()方法,使用比较器比较元素中最小的值并返回。逻辑跟max()方法基本完全一样。
public static <T> BinaryOperator<T> minBy(Comparator<? super T> comparator) {
Objects.requireNonNull(comparator);
return (a, b) -> comparator.compare(a, b) <= 0 ? a : b;
}
复制代码
collect()方法
上面说ReduceOp是工作中用到的最多的终止操作,那collect()方法应该就是ReduceOp操作中使用最频繁的终止操作方法了。collect()也有两个重载方法,我们重点看带Collector参数的方法。
Collector中文意思是收集器,它提供了获取Supplier、BiConsumer和BinaryOperator等函数的方法,Collector常用在分组、规约、聚合等操作当中。关于Collector这个强大的收集器,我打算放到下一章详细讲解,本章我们先看下collect()是如何使用ReduceOp的。
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
//并行处理,不关注
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
//重点在这里
container = evaluate(ReduceOps.makeRef(collector));
}
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
复制代码
进入ReduceOps#makeRef()方法:
public static <T, I> TerminalOp<T, I>
makeRef(Collector<? super T, I, ?> collector) {
//提供结果的Supplier函数
Supplier<I> supplier = Objects.requireNonNull(collector).supplier();】
//用于计算累加结果的函数
BiConsumer<I, ? super T> accumulator = collector.accumulator();
BinaryOperator<I> combiner = collector.combiner();
class ReducingSink extends Box<I>
implements AccumulatingSink<T, I, ReducingSink> {
@Override
public void begin(long size) {
//获取和初始化结果
state = supplier.get();
}
@Override
public void accept(T t) {
//应用accumulator函数计算结果
accumulator.accept(state, t);
}
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
@Override
public int getOpFlags() {
return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
? StreamOpFlag.NOT_ORDERED
: 0;
}
};
}
复制代码
可以看到跟reduce()方法一样,collect()也是利用ReduceOp来实现的,不过它返回的ReducingSink有些细微的差别,collect()中保持结果的state是由Collector中的supplier提供的,并且处理Stream元素时不会覆盖之前的结果,而是使用Collector#accumulator()方法提供的BiConsumer函数修改state的内部数据。
Stream API通过提供Collector接口,使开发者可以更灵活的处理数据,甚至可以自定义Collector实现类以满足功能。
短路操作
短路操作跟非短路操作的主要区别在于:如果是短路操作,那么不一定需要完整的遍历整个Stream的元素,在某些条件下,可以提前得到结果,提前结束遍历过程。短路操作有MatchOp和FindOp两个TerminalOp实现,分别表示匹配和查找。
MatchOp
MatchOp是一种通过Predicate函数与元素匹配的终止操作,匹配类型包含:ALL、ANY和NONE。
private static final class MatchOp<T> implements TerminalOp<T, Boolean> {
private final StreamShape inputShape;
final MatchKind matchKind;
//创建TerminalSink的函数
final Supplier<BooleanTerminalSink<T>> sinkSupplier;
MatchOp(StreamShape shape,
MatchKind matchKind,
Supplier<BooleanTerminalSink<T>> sinkSupplier) {
this.inputShape = shape;
this.matchKind = matchKind;
this.sinkSupplier = sinkSupplier;
}
@Override
public int getOpFlags() {
return StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED;
}
@Override
public StreamShape inputShape() {
return inputShape;
}
@Override
public <S> Boolean evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
//通过sink#getAndClearState()返回匹配结果
return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
}
}
复制代码
可以看到实现基本上与其它的TerminalOp相同,只是这里要注意两点:
- 它通过sinkSupplier工厂方法函数提供的封装终止操作逻辑的sink,讲方法时会详解。
- getOpFlags()方法返回StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED,代表短路操作。在触发数据处理逻辑时,通过这个标志走短路的遍历逻辑。
AbstractPipeline#copyInto():
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
//短路操作走这里
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
复制代码
AbstractPipeline#copyIntoWithCancel():
final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
@SuppressWarnings({"rawtypes","unchecked"})
//拿到pipeline链表上的Head对象
AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) {
p = p.previousStage;
}
wrappedSink.begin(spliterator.getExactSizeIfKnown());
//注意这里与非短路操作的区别
p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end();
}
复制代码
短路操作通过遍历pipeline链表,拿到Head节点对象,然后调用它的forEachWithCancel()方法,当然也会在此前后调用begin()和end()方法,就不再展开了。
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}
复制代码
其实forEachWithCancel()就是在do-while循环中调用spliterator#tryAdvance()方法遍历元素,不过每遍历一个元素之前都要满足一个条件:!sink.cancellationRequested(),表示请求没有被取消。下面讲解方法时会细说。
anyMatch()方法
anyMatch()方法,Stream中的元素与Predicate函数匹配,如果有任何一个元素匹配则返回true,否则返回false。
下面进入源码,分析这个方法具体如何实现的,ReferencePipeline#anyMatch():
public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
}
复制代码
同样的套路,通过工厂方法MatchOps#makeRef()创建MatchOp对象:
public static <T> TerminalOp<T, Boolean> makeRef(Predicate<? super T> predicate,
MatchKind matchKind) {
Objects.requireNonNull(predicate);
Objects.requireNonNull(matchKind);
class MatchSink extends BooleanTerminalSink<T> {
MatchSink() {
super(matchKind);
}
@Override
public void accept(T t) {
//流程未终止且元素匹配结果符合条件
//则标识流程终止,并将matchKind.shortCircuitResult赋值给匹配结果
if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
stop = true;
value = matchKind.shortCircuitResult;
}
}
}
//这里通过传入的sinkSupplier函数创建MatchSink实例
return new MatchOp<>(StreamShape.REFERENCE, matchKind, MatchSink::new);
}
复制代码
MatchSink的父类BooleanTerminalSink:
private static abstract class BooleanTerminalSink<T> implements Sink<T> {
boolean stop;
boolean value;
BooleanTerminalSink(MatchKind matchKind) {
value = !matchKind.shortCircuitResult;
}
public boolean getAndClearState() {
return value;
}
@Override
public boolean cancellationRequested() {
//匹配成功之后返回true,实现短路
return stop;
}
}
复制代码
可以看到返回的确实是MatchOp实例,MatchSink是封装终止操作逻辑的sink。先看一下MatchKind枚举类:
enum MatchKind {
ANY(true, true),
ALL(false, false),
NONE(true, false);
private final boolean stopOnPredicateMatches;
private final boolean shortCircuitResult;
private MatchKind(boolean stopOnPredicateMatches,
boolean shortCircuitResult) {
this.stopOnPredicateMatches = stopOnPredicateMatches;
this.shortCircuitResult = shortCircuitResult;
}
}
复制代码
MatchKind定义了三种枚举类型ANY、ALL、NONE,这里着重关注ANY:字段stopOnPredicateMatches的值为true,如果Predicate函数与元素匹配返回true,则流程终止;字段shortCircuitResult为true,表示符合匹配规则时,返回给调用方的结果为true,否则返回false。
MatchSink通过stop和value变量,以及MatchKind枚举来控制短路和保存匹配结果。
allMatch()方法
allMatch()方法,Stream中的元素与Predicate函数匹配,如果有任何一个元素不匹配,则返回false,否则就表示所有元素都匹配,返回true。注意这里说的比较拗口,主要是为了体现这个方法也是一个短路操作。
ReferencePipeline#allMatch()方法跟anyMatch()差不多,只是传入的MatchKind是ALL类型,就不再展示说明了。
public final boolean allMatch(Predicate<? super P_OUT> predicate) {
return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
}
复制代码
noneMatch()方法
noneMatch()方法,其实就是allMatch()方法的否定,Stream中的元素与Predicate函数匹配,如果有任何一个元素匹配,则返回false,否则就表示所有元素都不匹配,返回true。
ReferencePipeline#noneMatch()方法,创建MatchOp实例时传入的是NONE类型。
public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
}
复制代码
FindOp
FindOp是一种查找Stream中元素的短路操作,当找到一个结果时,终止元素遍历,直接返回结果。
private static final class FindOp<T, O> implements TerminalOp<T, O> {
//查找的元素的类型
private final StreamShape shape;
//是否必须满足查找的是元素中的第一个元素
final boolean mustFindFirst;
//Stream中没有元素时,返回的值
final O emptyValue;
//判断元素存在的函数
final Predicate<O> presentPredicate;
//提供创建封装终止操作逻辑的工厂方法函数
final Supplier<TerminalSink<T, O>> sinkSupplier;
FindOp(boolean mustFindFirst,
StreamShape shape,
O emptyValue,
Predicate<O> presentPredicate,
Supplier<TerminalSink<T, O>> sinkSupplier) {
this.mustFindFirst = mustFindFirst;
this.shape = shape;
this.emptyValue = emptyValue;
this.presentPredicate = presentPredicate;
this.sinkSupplier = sinkSupplier;
}
@Override
public int getOpFlags() {
return StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);
}
@Override
public StreamShape inputShape() {
return shape;
}
@Override
public <S> O evaluateSequential(PipelineHelper<T> helper,
Spliterator<S> spliterator) {
//数据处理逻辑,并且调用sink#get()方法获取查找结果
O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
return result != null ? result : emptyValue;
}
}
复制代码
咋一看,这个类的成员变量非常多,控制逻辑应该很复杂,其实mustFindFirst和presentPredicate都是用于并行流处理过程的。剩下的跟MatchOp非常相似,下面分析具体方法的时候再详解。
findFirst()方法
findFirst()方法,返回一个Optional,这个Optional中的值是Stream的第一个元素,如果Stream中没有元素,则返回Optional#empty()。
在ReferencePipeline#findFirst()方法中,通过FindOps#makeRef()创建FindOp对象:
public final Optional<P_OUT> findFirst() {
return evaluate(FindOps.makeRef(true));
}
复制代码
FindOps#makeRef()方法:
public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {
return new FindOp<>(mustFindFirst, StreamShape.REFERENCE, Optional.empty(),
Optional::isPresent, FindSink.OfRef::new);
}
复制代码
这里的sinkSupplier创建的是FindSink.OfRef实例:
static final class OfRef<T> extends FindSink<T, Optional<T>> {
@Override
public Optional<T> get() {
return hasValue ? Optional.of(value) : null;
}
}
复制代码
FindSink.OfRef只是实现了get()方法,用于获取查找结果,但是它继承了FindSink:
private static abstract class FindSink<T, O> implements TerminalSink<T, O> {
boolean hasValue;
T value;
FindSink() {} // Avoid creation of special accessor
@Override
public void accept(T value) {
if (!hasValue) {
hasValue = true;
this.value = value;
}
}
@Override
public boolean cancellationRequested() {
return hasValue;
}
}
复制代码
FindSink通过变量hasValue控制短路流程,当接收到一个元素时,使用value保存结果,并且终止遍历。
findAny()方法
findAny()方法,返回一个Optional,这个Optional中的值是Stream中的一个元素,如果Stream中没有元素,则返回Optional#empty()。注意在并行流调用中,返回的不一定是Stream的第一个元素,但如果是串行流,则行为跟findFirst()一样,Optional中的值就是Stream第一个元素。
public final Optional<P_OUT> findFirst() {
return evaluate(FindOps.makeRef(true));
}
复制代码
总结
本文先解释了Stream调用过程中,终止操作是所声明的lambda表达式的触发时机,讲解了中间操作调用时的链表封装和终止操作的调用流程。依据是否完整遍历Stream元素,将终止操作划分为非短路操作和短路操作,通过带领大家阅读源码,深入分析了ForEachOp、ReduceOp、MatchOp和FindOp四种终止操作的实现原理,以及每一个终止操作方法是如何实现的。
写在最后
- 本篇文章是本系列的第四章,相信大家完整阅读之后,对Stream的整体工作流程和原理有了更深入的理解。
- 在讲解ReduceOp时说到collect()方法拥有分组规约的能力,能够帮助开发者简化很多流程代码、提升开发效率,下一章我们讲深入其中,看看它为何具有如此强大的功能,以及当Collectors中提供的功能不能满足我们的需要时,如何扩展相应的能力。
- 上一篇: Java foreach语句的用法
- 下一篇: Java 8 之 Stream
猜你喜欢
- 2024-12-26 Java 8 Stream 处理大数据集:实战与优化
- 2024-12-26 面试官:Java8 lambda 表达式 forEach 如何提前终止?
- 2024-12-26 Javascript中,forEach和map到底有什么区别?
- 2024-12-26 Excel VBA之For Each遍历循环的应用
- 2024-12-26 为什么建议使用 for…of 循环而不是 foreach 循环呢
- 2024-12-26 前端开发map和foreach区别,map遍历方式用法介绍
- 2024-12-26 Rust语言从入门到精通系列 - 零基础掌握Stream流迭代器
- 2024-12-26 Map遍历的四种方法效率对比
- 2024-12-26 java集合类之java中集合类有哪些?如何分类?
- 2024-12-26 【一分钟学Java】之List
你 发表评论:
欢迎- 367℃用AI Agent治理微服务的复杂性问题|QCon
- 358℃初次使用IntelliJ IDEA新建Maven项目
- 356℃手把手教程「JavaWeb」优雅的SpringMvc+Mybatis整合之路
- 351℃Maven技术方案最全手册(mavena)
- 348℃安利Touch Bar 专属应用,让闲置的Touch Bar活跃起来!
- 346℃InfoQ 2024 年趋势报告:架构篇(infoq+2024+年趋势报告:架构篇分析)
- 345℃IntelliJ IDEA 2018版本和2022版本创建 Maven 项目对比
- 342℃从头搭建 IntelliJ IDEA 环境(intellij idea建包)
- 最近发表
- 标签列表
-
- powershellfor (55)
- messagesource (56)
- aspose.pdf破解版 (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- vue数组concat (56)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)