原来你是这样的 Stream —— 浅析 Java Stream 实现原理

Stream 为什么会出现?

Stream 出现之前,遍历一个集合最传统的做法大概是用 Iterator,或者 for 循环。这种两种方式都属于外部迭代,然而外部迭代存在着一些问题。

  • 开发者需要自己手写迭代的逻辑,虽然大部分场景迭代逻辑都是每个元素遍历一次。
  • 如果存在像排序这样的有状态的中间操作,不得不进行多次迭代。
  • 多次迭代会增加临时变量,从而导致内存的浪费。

虽然 Java 5 引入的 foreach 解决了部分问题,但也引入了新的问题。

  • foreach 遍历不能对元素进行赋值操作
  • 遍历的时候,只有当前被遍历的元素可见,其他不可见

随着大数据的兴起,传统的遍历方式已经无法满足开发者的需求。
就像小作坊发展到一定程度要变成大工厂才能满足市场需求一样。大工厂和小作坊除了规模变大、工人不多之外,最大的区别就是多了流水线。流水线可以将工人们更高效的组织起来,使得生产力有质的飞跃。

所以不安于现状的开发者们想要开发一种更便捷,更实用的特性。

  • 它可以像流水线一样来处理数据
  • 它应该兼容常用的集合
  • 它的编码应该更简洁
  • 它应该具有更高的可读性
  • 它可以提供对数据集合的常规操作
  • 它可以拼装不同的操作

经过不懈的能力,Stream 就诞生了。加上 lambda 表达式的加成,简直是如虎添翼。

你可以用 Stream 干什么?

下面以简单的需求为例,看一下 Stream 的优势:

从一列单词中选出以字母a开头的单词,按字母排序后返回前3个。

外部迭代实现方式

List<String> list = Lists.newArrayList("are", "where", "advance", "anvato", "java", "abc");
List<String> tempList = Lists.newArrayList();
List<String> result = Lists.newArrayList();
for( int i = 0; i < list.size(); i++) {
    if (list.get(i).startsWith("a")) {
        tempList.add(list.get(i));
    }
}
tempList.sort(Comparator.naturalOrder());
result = tempList.subList(0,3);
result.forEach(System.out::println);

stream实现方式

List<String> list = Lists.newArrayList("are", "where", "anvato", "java", "abc");
list.stream().filter(s -> s.startsWith("a")).sorted().limit(3)
                .collect(Collectors.toList()).forEach(System.out::println);

Stream 是怎么实现的?

需要解决的问题:

  • 如何定义流水线?
  • 原料如何流入?
  • 如何让流水线上的工人将处理过的原料交给下一个工人?
  • 流水线何时开始运行?
  • 流水线何时结束运行?

总观全局

Stream 处理数据的过程可以类别成工厂的流水线。数据可以看做流水线上的原料,对数据的操作可以看做流水线上的工人对原料的操作。

事实上 Stream 只是一个接口,并没有操作的缺省实现。最主要的实现是 ReferencePipeline,而 ReferencePipeline 继承自 AbstractPipelineAbstractPipeline 实现了 BaseStream 接口并实现了它的方法。但 ReferencePipeline 仍然是一个抽象类,因为它并没有实现所有的抽象方法,比如 AbstractPipeline 中的 opWrapSinkReferencePipeline内部定义了三个静态内部类,分别是:Head, StatelessOp, StatefulOp,但只有 Head 不再是抽象类。

图1

流水线的结构有点像双向链表,节点之间通过引用连接。节点可以分为三类,控制数据输入的节点、操作数据的中间节点和控制数据输出的节点。

ReferencePipeline 包含了控制数据流入的 Head ,中间操作 StatelessOp, StatefulOp,终止操作 TerminalOp

Stream 常用的流操作包括:
* 中间操作(Intermediate Operations)
* 无状态(Stateless)操作:每个数据的处理是独立的,不会影响或依赖之前的数据。如 filter()flatMap()flatMapToDouble()flatMapToInt()flatMapToLong()map()mapToDouble()mapToInt()mapToLong()peek()unordered()
* 有状态(Stateful)操作:处理时会记录状态,比如处理了几个。后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去。如 distinct()sorted()sorted(comparator)limit()skip()
* 终止操作(Terminal Operations)
* 非短路操作:处理完所有数据才能得到结果。如 collect()count()forEach()forEachOrdered()max()min()reduce()toArray() 等。
* 短路(short-circuiting)操作:拿到符合预期的结果就会停下来,不一定会处理完所有数据。如 anyMatch()allMatch()noneMatch()findFirst()findAny() 等。

源码分析

了解了流水线的结构和定义,接下来我们基于上面的例子逐步看一下源代码。

定义输入源

stream() 是 Collection 中的 default 方法,实际上调用的是 StreamSupport.stream() 方法,返回的是 ReferencePipeline.Head的实例。

ReferencePipeline.Head 的构造函数传递是 ArrayList 中实现的 spliterator 。常用的集合都实现了 Spliterator 接口以支持 Stream。可以这样理解,Spliterator 定义了数据集合流入流水线的方式。

定义流水线节点

filter() 是 Stream 中定义的方法,在 ReferencePipeline 中实现,返回 StatelessOp 的实例。

可以看到 filter() 接收的参数是谓词,可以用 lambda 表达式。StatelessOp的构造函数接收了 this,也就是 ReferencePipeline.Head 实例的引用。并且实现了 AbstractPipeline 中定义的 opWrapSink 方法。

@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                         StreamOpFlag.NOT_SIZED) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }

                @Override
                public void accept(P_OUT u) {
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}

sorted()limit() 的返回值和也都是 Stream 的实现类,并且都接收了 this 。不同的是 sorted() 返回的是 ReferencePipeline.StatefulOp 的子类 SortedOps.OfRef 的实例。limit() 返回的 ReferencePipeline.StatefulOp 的实例。

现在可以粗略地看到,这些中间操作(不管是无状态的 filter(),还是有状态的 sorted()limit() 都只是返回了一个包含上一节点引用的中间节点。有点像 HashMap 中的反向单向链表。就这样把一个个中间操作拼接到了控制数据流入的 Head 后面,但是并没有开始做任何数据处理的动作

这也就是 Stream 延时执行的特性原因之所在。

参见附录I会发现 StatelessOp 和StatefulOp 初始化的时候还会将当前节点的引用传递给上一个节点。

previousStage.nextStage = this;

所以各个节点组成了一个双向链表的结构。

组装流水线

最后来看一下终止操作 .collect() 接收的是返回类型对应的 Collector。

此例中的 Collectors.toList() 是 Collectors 针对 ArrayList 的创建的 CollectorImpl 的实例。

@Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
    A container;
    if (isParallel()
        && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
        && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
        container = collector.supplier().get();
        BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
        forEach(u -> accumulator.accept(container, u));
    }
    else {
        container = evaluate(ReduceOps.makeRef(collector));//1
    }
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
        ? (R) container
        : collector.finisher().apply(container);
}

先忽略并行的情况,来看一下加注释了1的代码:

  1. ReduceOps.makeRef 接收此 Collector 返回了一个 ReduceOp(实现了 TerminalOp 接口)的实例。
  2. 返回的 ReduceOp 实例又被传递给 AbstractPipeline 中的 evaluate() 方法。
  3. evaluate 中,调用了 ReduceOp实例的 evaluateSequential 方法,并将上流水线上最后一个节点的引用和 sourceSpliterator 传递进去。
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
    return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
  1. 然后调用 ReduceOp 实例的 makeSink() 方法返回其 makeRef() 方法内部类 ReducingSink 的实例。
  2. 接着 ReducingSink 的实例作为参数和 spliterator 一起传入最后一个节点的 wrapAndCopyInto() 方法,返回值是 Sink 。

启动流水线

流水线组装好了,现在就该启动流水线了。这里的核心方法是 wrapAndCopyInto,根据方法名也能看出来这里应该做了两件事,wrapSink()copyInto()

wrapSink()

将最后一个节点创建的 Sink 传入,并且看到里面有个 for 循环。参见附录I可以发现

每个节点都记录了上一节点的引用( previousStage )和每一个节点的深度( depth )。

所以这个 for 循环是从最后一个节点开始,到第二个节点结束。每一次循环都是将上一节点的 combinedFlags 和当前的 Sink 包起来生成一个新的 Sink 。这和前面拼接各个操作很类似,只不过拼接的是 Sink 的实现类的实例,方向相反。

(Head.combinedFlags, (StatelessOp.combinedFlags, (StatefulOp.combinedFlags,(StatefulOp.combinedFlags ,TerminalOp.sink)))

@Override
@SuppressWarnings("unchecked")
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
   Objects.requireNonNull(sink);

   for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
       sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
   }
   return (Sink<P_IN>) sink;
}

copyInto()

终于到了要真正开始迭代的时候,这个方法接收两个参数 Sink<P_IN> wrappedSink, Spliterator<P_IN> spliteratorwrappedSink对应的是 Head节点后面的第一个操作节点(它相当于这串 Sink 的头),spliterator 对应着数据源。

这个时候我们回过头看一下 Sink 这个接口,它继承自 Consumer 接口,又定义了 begin()end()cancellationRequested() 方法。Sink 直译过来是水槽,如果把数据流比作水,那水槽就是水会流过的地方。begin() 用于通知水槽的水要过来了,里面会做一些准备工作,同样 end() 是做一些收尾工作。cancellationRequested() 是原来判断是不是可以停下来了。Consumer 里的accept() 是消费数据的地方。

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
   Objects.requireNonNull(wrappedSink);
   if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
       wrappedSink.begin(spliterator.getExactSizeIfKnown());//1
       spliterator.forEachRemaining(wrappedSink);//2
       wrappedSink.end();//3
   }
   else {
       copyIntoWithCancel(wrappedSink, spliterator);
   }
}

有了完整的水槽链,就可以让水流进去了。copyInto() 里做了三个动作:

  1. 通知第一个水槽(Sink)水要来了,准备一下。
  2. 让水流进水槽(Sink)里。
  3. 通知第一个水槽(Sink)水流完了,该收尾了。

突然想到宋丹丹老师的要把大象放冰箱要几步?

注:图中蓝色线表示数据实际的处理流程。

每一个 Sink 都有自己的职责,但具体表现各有不同。无状态操作的 Sink 接收到通知或者数据,处理完了会马上通知自己的 下游。有状态操作的 Sink 则像有一个缓冲区一样,它会等要处理的数据处理完了才开始通知下游,并将自己处理的结果传递给下游。

例如 sorted() 就是一个有状态的操作,一般会有一个属于自己的容器,用来记录处自己理过的数据的状态。sorted() 是在执行 begin 的时候初始化这个容器,在执行 accept 的时候把数据放到容器中,最后在执行 end 方法时才正在开始排序。排序之后再将数据,采用同样的方式依次传递给下游节点。

最后数据流到终止节点,终止节点将数据收集起来就结束了。

然后就没有然后了,copyInto() 返回类型是 void ,没有返回值。

wrapAndCopyInto() 返回了 TerminalOps 创建的 Sink,这时候它里面已经包含了最终处理的结果。调用它的 get() 方法就获得了最终的结果。

回顾

再来回顾一下整个过程。首先是将 Collection 转化为 Stream,也就是流水线的头。然后将各个中间操作节点像拼积木一样拼接起来。每个中间操作节点都定义了自己对应的 Sink,并重写了 makeSink() 方法用来返回自己的 Sink 实例。直到终止操作节点出现时才开始将 Sink 实例化并串起来。然后就是上面提到的那三步:通知、数据流入、结束。

本文介绍和分析了最常规的 stream 用法和实现,实际上 stream 还有很多高阶用法,比如利用协程实现的并行流。感兴趣的同学可以研究一下。当然既然是高阶用法,用的时候一定要多注意。

参考

附录I

以下是初始化 Head 节点和中间操作的实现。

/**
 * Constructor for the head of a stream pipeline.
 *
 * @param source {@code Spliterator} describing the stream source
 * @param sourceFlags the source flags for the stream source, described in
 * {@link StreamOpFlag}
 * @param parallel {@code true} if the pipeline is parallel
 */
//初始化Head节点的时候会执行
AbstractPipeline(Spliterator<?> source,
                 int sourceFlags, boolean parallel) {
    this.previousStage = null;
    this.sourceSpliterator = source;
    this.sourceStage = this;
    this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
    // The following is an optimization of:
    // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
    this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;
    this.depth = 0;
    this.parallel = parallel;
}

/**
 * Constructor for appending an intermediate operation stage onto an
 * existing pipeline.
 *
 * @param previousStage the upstream pipeline stage
 * @param opFlags the operation flags for the new stage, described in
 * {@link StreamOpFlag}
 */
//初始化中间操作StatelessOp和StatefulOp的时候会执行
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
    if (previousStage.linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    previousStage.linkedOrConsumed = true;
    previousStage.nextStage = this;

    this.previousStage = previousStage;
    this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
    this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
    this.sourceStage = previousStage.sourceStage;
    if (opIsStateful())
        sourceStage.sourceAnyStateful = true;
    this.depth = previousStage.depth + 1;
}
0

ConcurrentHashMap 的 size 方法原理分析

前言

JAVA 语言提供了大量丰富的集合, 比如 List, Set, Map 等。其中 Map 是一个常用的一个数据结构,HashMap 是基于 Hash 算法实现 Map 接口而被广泛使用的集类。HashMap 里面是一个数组,然后数组中每个元素是一个单向链表。但是 HashMap 并不是线程安全的, 在多线程场景下使用存在并发和死循环问题。HashMap 结构如图所示:

线程安全的解决方案

线程安全的 Map 的实现有 HashTable 和 ConcurrentHashMap 等。HashTable 对集合读写操作通过 Synchronized 同步保障线程安全, 整个集合只有一把锁, 对集合的操作只能串行执行,性能不高。ConcurrentHashMap 是另一个线程安全的 Map, 通常来说他的性能优于 HashTable。 ConcurrentHashMap 的实现在 JDK1.7 和 JDK 1.8 有所不同。

在 JDK1.7 版本中,ConcurrentHashMap 的数据结构是由一个 Segment 数组和多个 HashEntry 组成。简单理解就是ConcurrentHashMap 是一个 Segment 数组,Segment 通过继承 ReentrantLock 来进行加锁,所以每次需要加锁的操作锁住的是一个 Segment,这样只要保证每个 Segment 是线程安全的,也就实现了全局的线程安全。

JDK1.8 的实现已经摒弃了 Segment 的概念,而是直接用 Node 数组 + 链表 + 红黑树的数据结构来实现,并发控制使用 Synchronized 和 CAS 来操作,整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本。 通过 HashMap 查找的时候,根据 hash 值能够快速定位到数组的具体下标,如果发生 Hash 碰撞,需要顺着链表一个个比较下去才能找到我们需要的,时间复杂度取决于链表的长度,为 O(n)。为了降低这部分的开销,在 Java8 中,当链表中的元素超过了 8 个以后,会将链表转换为红黑树,在这些位置进行查找的时候可以降低时间复杂度为 O(logN)。

如何计算 ConcurrentHashMap Size

由上面分析可知,ConcurrentHashMap 更适合作为线程安全的 Map。在实际的项目过程中,我们通常需要获取集合类的长度, 那么计算 ConcurrentHashMap 的元素大小就是一个有趣的问题,因为他是并发操作的,就是在你计算 size 的时候,它还在并发的插入数据,可能会导致你计算出来的 size 和你实际的 size 有差距。本文主要分析下 JDK1.8 的实现。 关于 JDK1.7 简单提一下。

在 JDK1.7 中,第一种方案他会使用不加锁的模式去尝试多次计算 ConcurrentHashMap 的 size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的。 第二种方案是如果第一种方案不符合,他就会给每个 Segment 加上锁,然后计算 ConcurrentHashMap 的 size 返回。其源码实现:

public int size() {
  final Segment<K,V>[] segments = this.segments;
  int size;
  boolean overflow; // true if size overflows 32 bits
  long sum;         // sum of modCounts
  long last = 0L;   // previous sum
  int retries = -1; // first iteration isn't retry
  try {
    for (;;) {
      if (retries++ == RETRIES_BEFORE_LOCK) {
        for (int j = 0; j < segments.length; ++j)
          ensureSegment(j).lock(); // force creation
      }
      sum = 0L;
      size = 0;
      overflow = false;
      for (int j = 0; j < segments.length; ++j) {
        Segment<K,V> seg = segmentAt(segments, j);
        if (seg != null) {
          sum += seg.modCount;
          int c = seg.count;
          if (c < 0 || (size += c) < 0)
            overflow = true;
        }
      }
      if (sum == last)
        break;
      last = sum;
    }
  } finally {
    if (retries > RETRIES_BEFORE_LOCK) {
      for (int j = 0; j < segments.length; ++j)
        segmentAt(segments, j).unlock();
    }
  }
  return overflow ? Integer.MAX_VALUE : size;
}

JDK1.8 实现相比 JDK 1.7 简单很多,只有一种方案,我们直接看 size() 代码:

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
    }

最大值是 Integer 类型的最大值,但是 Map 的 size 可能超过 MAX_VALUE, 所以还有一个方法 mappingCount(),JDK 的建议使用 mappingCount() 而不是 size()mappingCount() 的代码如下:

   public long mappingCount() {
        long n = sumCount();
        return (n < 0L) ? 0L : n; // ignore transient negative values
    }

以上可以看出,无论是 size() 还是 mappingCount(), 计算大小的核心方法都是 sumCount()sumCount() 的代码如下:

    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

分析一下 sumCount() 代码。ConcurrentHashMap 提供了 baseCount、counterCells 两个辅助变量和一个 CounterCell 辅助内部类。sumCount() 就是迭代 counterCells 来统计 sum 的过程。 put 操作时,肯定会影响 size(),在 put() 方法最后会调用 addCount() 方法。

addCount() 代码如下:
- 如果 counterCells == null, 则对 baseCount 做 CAS 自增操作。

  • 如果并发导致 baseCount CAS 失败了使用 counterCells。

  • 如果counterCells CAS 失败了,在 fullAddCount 方法中,会继续死循环操作,直到成功。

然后,CounterCell 这个类到底是什么?我们会发现它使用了 @sun.misc.Contended 标记的类,内部包含一个 volatile 变量。@sun.misc.Contended 这个注解标识着这个类防止需要防止 "伪共享"。那么,什么又是伪共享呢?

缓存系统中是以缓存行(cache line)为单位存储的。缓存行是2的整数幂个连续字节,一般为32-256个字节。最常见的缓存行大小是64个字节。当多线程修改互相独立的变量时,如果这些变量共享同一个缓存行,就会无意中影响彼此的性能,这就是伪共享。

CounterCell 代码如下:

    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

总结

  • JDK1.7 和 JDK1.8 对 size 的计算是不一样的。 1.7 中是先不加锁计算三次,如果三次结果不一样在加锁。
  • JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。
  • JDK 8 推荐使用mappingCount 方法,因为这个方法的返回值是 long 类型,不会因为 size 方法是 int 类型限制最大值。
0

Common Pool2 对象池应用浅析

我们系统中一般都会存在很多可重用并长期使用的对象,比如线程、TCP 连接、数据库连接等。虽然我们可以简单的在使用这些对象时进行创建、使用结束后销毁,但初始化和销毁对象的操作会造成一些资源消耗。我们可以使用对象池将这些对象集中管理,减少对象初始化和销毁的次数以节约资源消耗。

顾名思义,对象池简单来说就是存放对象的池子,可以存放任何对象,并对这些对象进行管理。它的优点就是可以复用池中的对象,避免了分配内存和创建堆中对象的开销;避免了释放内存和销毁堆中对象的开销,进而减少垃圾收集器的负担;避免内存抖动,不必重复初始化对象状态。对于构造和销毁比较耗时的对象来说非常合适。

当然,我们可以自己去实现一个对象池,不过要实现的比较完善还是要花上不少精力的。所幸的是, Apache 提供了一个通用的对象池技术的实现: Common Pool2,可以很方便的实现自己需要的对象池。Jedis 的内部对象池就是基于 Common Pool2 实现的。

核心接口

Common Pool2 的核心部分比较简单,围绕着三个基础接口和相关的实现类来实现:

  • ObjectPool:对象池,持有对象并提供取/还等方法。
  • PooledObjectFactory:对象工厂,提供对象的创建、初始化、销毁等操作,由 Pool 调用。一般需要使用者自己实现这些操作。
  • PooledObject:池化对象,对池中对象的封装,封装对象的状态和一些其他信息。

Common Pool2 提供的最基本的实现就是由 Factory 创建对象并使用 PooledObject 封装对象放入 Pool 中。

对象池实现

对象池有两个基础的接口 ObjectPoolKeyedObjectPool, 持有的对象都是由 PooledObject 封装的池化对象。 KeyedObjectPool 的区别在于其是用键值对的方式维护对象。

ObjectPoolKeyedObjectPool 分别有一个默认的实现类 GenericObjectPoolGenericKeyedObjectPool 可以直接使用,他们的公共部分和配置被抽取到了 BaseGenericObjectPool 中。

SoftReferenceObjectPool 是一个比较特殊的实现,在这个对象池实现中,每个对象都会被包装到一个SoftReference中。SoftReference允许垃圾回收机制在需要释放内存时回收对象池中的对象,可以避免一些内存泄露的问题。

ObjectPool

下面简单介绍一下 ObjectPool 接口的核心方法,KeyedObjectPoolObjectPool 类似,区别在于方法多了个参数: K key

public interface ObjectPool<T> {

    // 从池中获取一个对象,客户端在使用完对象后必须使用 returnObject 方法返还获取的对象
    T borrowObject() throws Exception, NoSuchElementException,
            IllegalStateException;

    // 将对象返还到池中。对象必须是从 borrowObject 方法获取到的
    void returnObject(T obj) throws Exception;

    // 使池中的对象失效,当获取到的对象被确定无效时(由于异常或其他问题),应该调用该方法
    void invalidateObject(T obj) throws Exception;

    // 池中当前闲置的对象数量
    int getNumIdle();

    // 当前从池中借出的对象的数量
    int getNumActive();

    // 清除池中闲置的对象
    void clear() throws Exception, UnsupportedOperationException;

    // 关闭这个池,并释放与之相关的资源
    void close();

    ...
}

PooledObjectFactory

对象工厂,负责对象的创建、初始化、销毁和验证等工作。Factory 对象由ObjectPool持有并使用。

public interface PooledObjectFactory<T> {

    // 创建一个池对象
    PooledObject<T> makeObject() throws Exception;

    // 销毁对象
    void destroyObject(PooledObject<T> p) throws Exception;

    // 验证对象是否可用
    boolean validateObject(PooledObject<T> p);

    // 激活对象,从池中取对象时会调用此方法
    void activateObject(PooledObject<T> p) throws Exception;

    // 钝化对象,向池中返还对象时会调用此方法
    void passivateObject(PooledObject<T> p) throws Exception;
}

Common Pool2 并没有提供 PooledObjectFactory 可以直接使用的子类实现,因为对象的创建、初始化、销毁和验证的工作无法通用化,需要由使用方自己实现。不过它提供了一个抽象子类 BasePooledObjectFactory,实现自己的工厂时可以继承 BasePooledObjectFactory,就只需要实现 createwrap 两个方法了。

PooledObject

PooledObject 有两个实现类,DefaultPooledObject 是普通通用的实现,PooledSoftReference 使用 SoftReference 封装了对象,供 SoftReferenceObjectPool 使用。

下面是 PooledObject 接口的一些核心方法:

public interface PooledObject<T> extends Comparable<PooledObject<T>> {

    // 获取封装的对象
    T getObject();

    // 对象创建的时间
    long getCreateTime();

    // 对象上次处于活动状态的时间
    long getActiveTimeMillis();

    // 对象上次处于空闲状态的时间
    long getIdleTimeMillis();

    // 对象上次被借出的时间
    long getLastBorrowTime();

    // 对象上次返还的时间
    long getLastReturnTime();

    // 对象上次使用的时间
    long getLastUsedTime();

    // 将状态置为 PooledObjectState.INVALID
    void invalidate();

    // 更新 lastUseTime
    void use();

    // 获取对象状态
    PooledObjectState getState();

    // 将状态置为 PooledObjectState.ABANDONED
    void markAbandoned();

    // 将状态置为 PooledObjectState.RETURNING
    void markReturning();
}

对象池配置

对象池配置提供了对象池初始化所需要的参数,Common Pool2 中的基础配置类是 BaseObjectPoolConfig。其有两个实现类分别为 GenericObjectPoolConfigGenericKeyedObjectPoolConfig,分别为 GenericObjectPoolGenericKeyedObjectPool 所使用。

下面是一些重要的配置项:

  • lifo 连接池放池对象的方式,true:放在空闲队列最前面,false:放在空闲队列最后面,默认为 true
  • fairness 从池中获取/返还对象时是否使用公平锁机制,默认为 false
  • maxWaitMillis 获取资源的等待时间。blockWhenExhausted 为 true 时有效。-1 代表无时间限制,一直阻塞直到有可用的资源
  • minEvictableIdleTimeMillis 对象空闲的最小时间,达到此值后空闲对象将可能会被移除。-1 表示不移除;默认 30 分钟
  • softMinEvictableIdleTimeMillis 同上,额外的条件是池中至少保留有 minIdle 所指定的个数的对象
  • numTestsPerEvictionRun 资源回收线程执行一次回收操作,回收资源的数量。默认 3
  • evictionPolicyClassName 资源回收策略,默认值 org.apache.commons.pool2.impl.DefaultEvictionPolicy
  • testOnCreate 创建对象时是否调用 factory.validateObject 方法,默认 false
  • testOnBorrow 取对象时是否调用 factory.validateObject 方法,默认 false
  • testOnReturn 返还对象时是否调用 factory.validateObject 方法,默认 false
  • testWhileIdle 池中的闲置对象是否由逐出器验证。无法验证的对象将从池中删除销毁。默认 false
  • timeBetweenEvictionRunsMillis 回收资源线程的执行周期,默认 -1 表示不启用回收资源线程
  • blockWhenExhausted 资源耗尽时,是否阻塞等待获取资源,默认 true

池化对象的状态

池化对象的状态定义在 PooledObjectState 枚举中,有以下值:

  • IDLE 在池中,处于空闲状态
  • ALLOCATED 被使用中
  • EVICTION 正在被逐出器验证
  • VALIDATION 正在验证
  • INVALID 驱逐测试或验证失败并将被销毁
  • ABANDONED 对象被客户端拿出后,长时间未返回池中,或没有调用 use 方法,即被标记为抛弃的

这些状态的转换逻辑大致如下图:

状态流转图

Demo

最后,我们来实现一个简单的 Demo 来上手 Common Pool2 的使用,这是一个 StringBuffer 的对象池的使用。

首先要实现工厂的创建、封装和销毁操作。对象池和池化对象封装使用默认实现就可以了。

public class StringBufferFactory extends BasePooledObjectFactory<StringBuffer> {
    // 创建一个新的对象
    @Override
    public StringBuffer create() {
        return new StringBuffer();
    }

    // 封装为池化对象
    @Override
    public PooledObject<StringBuffer> wrap(StringBuffer buffer) {
        return new DefaultPooledObject<>(buffer);
    }

    // 使用完返还对象时将 StringBuffer 清空
    @Override
    public void passivateObject(PooledObject<StringBuffer> pooledObject) {
        pooledObject.getObject().setLength(0);
    }
}

然后就可以使用对象池了,基本的操作就是获取、返还和标记失效等。

// 创建对象池配置
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
// 创建对象工厂
PooledObjectFactory factory = new StringBufferFactory();
// 创建对象池
ObjectPool<StringBuffer> pool = new GenericObjectPool<>(factory, config);

StringReader in = new StringReader("abcdefg");

StringBuffer buf = null;
try {
    // 从池中获取对象
    buf = pool.borrowObject();

    // 使用对象
    for (int c = in.read(); c != -1; c = in.read()) {
        buf.append((char) c);
    }
    return buf.toString();
} catch (Exception e) {
    try {
        // 出现错误将对象置为失效
        pool.invalidateObject(buf);
        // 避免 invalidate 之后再 return 抛异常
        buf = null; 
    } catch (Exception ex) {
        // ignored
    }

    throw e;
} finally {
    try {
        in.close();
    } catch (Exception e) {
        // ignored
    }

    try {
        if (null != buf) {
            // 使用完后必须 returnObject
            pool.returnObject(buf);
        }
    } catch (Exception e) {
        // ignored
    }
}

总结

Common Pool2 的应用非常广泛,在日常的开发工作中也有很多使用场景。它的整体架构也并不复杂,可以将其简单划分为 3 个角色和相关的配置、状态,掌握起来比较简单。而且 Common Pool2 官方也提供了一些通用的实现,有特殊的开发需求时也可以简单的扩展其提供的抽象类,可以满足大部分的日常开发需求。

0

简单聊聊各种语言的函数扩展

背景

最近有同事反应,我们运营后台下载的 CSV 文件出现错乱的情况。问题的原因是原始数据中有 CSV 中非法的字符,比如说姓名字段,因为是用户填写的,内容有可能包含了 ," 等字符,会导致 CSV 文件内容错乱。

于是我就想用一个简单的方式来解决这个问题。一个简单粗暴的解决方案就是导出时对字符串进行处理,将一些特殊字符替换掉,或者前后用"包起来。但是这样的话,需要所有下载 CSV 的地方都要改写,会比较麻烦。如果我们可以简单的给 String 增加一个方法(如 String.csv())直接就把字符串处理成 CSV 兼容的格式,就会方便很多。我们的运营后台是使用 Scala 语言开发的,所幸的是,Scala 里提供了一个非常强大的功能,可以满足我们的需求,那就是隐式转换。

Scala 的隐式转换

在 Scala 里可以通过 implicit 隐式转换来实现函数扩展。

编译器在碰到类型不匹配或是调用一个不存在的方法的时候,会去搜索符合条件的隐式类型转换,如果找不到合适的隐式转换方法则会报错。

下面是处理 CSV 下载字符串的代码:

trait CsvHelper {
  implicit def stringToCsvString(s: String) = new CsvString(s)
}
class CsvString(val s: String){
  def csv = s"""${s.replaceAll(",", " ").replaceAll("\"", "'")}"""
}

class Controller extends CsvHelper {
    def dowload(){
        ...
        ",foo,".csv //foo
    }
}

Controller 中我调用 String.csv 方法,但是 String 没有 csv 方法。这时候编译器就会去找 Controller 中有没有隐式转换的方法,发现在其父类 CsvHelper 中有方法把 String 转换成 CsvString,而 CsvString 中实现了 csv 方法。所以编译器最终会调用到 CsvString.csv 这个方法。

隐式转换是一个很强大,但是也很容易误用的功能。Scala 里隐式转换有一些基本规则:

  • 优先规则:如果存在两个或者多个符合条件的隐式转换,如果编译器不能选择一条最优的隐式转换,则提示错误。具体的规则是:当前类中的隐式转换优先级大于父类中的隐式转换;多个隐式转换返回的类型有父子关系的时候,子类优先级大于父类。
  • 隐式转换只会隐式的调用一次,编译器不会调用多个隐式方法,不会产生调用链。
  • 如果当期代码已经是合法的,不需要隐式转换则不会使用隐式转换。

Java 的动态扩展

我们再来看看我们熟悉的 Java 语言。Java 是一门静态语言,本身没有直接提供动态扩展的方法,但是我们可以通过 AOP 动态代理的方式来修改一个方法,从而间接的实现方法的动态扩展。

下面就是一个我们就用 AspectJ 来实现一个动态扩展,用于分页查询后获取数据的总条数。

@Aspect
@Component
public class PaginationAspect {
    @AfterReturning(
        pointcut = "execution(* com.xingren..*.*ByPage(..))",
        returning = "result"
    )
    public void afterByPage(JoinPoint joinPoint, Object result) {
        //根据result获取sql信息,再查询总条数封装到result中。
    }
}

其中 AfterReturning 注解表明在被注解方法返回后的一些后续动作。pointcut 定义切点的表达式,可以用通配符 * 表示;returning 指定返回的参数名。然后就可以对返回的结果进行处理。这样就可以达到动态的修改原始函数功能。

当然除了 AspectJ 也可以使用 CGLib 来代理来实现简单的 AOP。

public class FooService {
    public Page findByPage(){
        return new Page();
    }
    public Page findPage(){
        return new Page();
    }
}
@Data
public class Page {
    private String sql = "";
    private List<Object> content = new ArrayList();
    private Integer size = 0;
    private Integer page = 0;
    private Integer total = 0;
}

创建一个对象 FooService 用来模拟查询分页方法。

public class CGLibProxyFactory implements MethodInterceptor {

    private Object object;

    public CGLibProxyFactory(Object object){
        this.object = object;
    }

    @Override
    public Object intercept(Object o, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
        System.out.println("before method! do something...");

        Object result = methodProxy.invoke(object, objects);
        //进行方法判断,是否需要处理
        if (method.getName().contains("ByPage")) {
            if (result instanceof Page) {
                System.out.println("after method! do something...");
                ((Page) result).setTotal(100);
            }
        }
        return result;
    }
}

创建一个代理类实现 MethodInterceptor 接口,手动调用 invoke 方法,用来动态的修改被代理的实现方法。可以在执行之前做一些参数校验,或者一些参数的预处理。也可以获取修改执行的结果,或者干脆不调用 invoke 方法,自定义实现。也可以在调用后做一些后续动作。

public class ObjectFactoryUtils {
    public static <T> Optional<T> getProxyObject(Class<T> clazz) {
        try {
            T obj = clazz.newInstance();
            CGLibProxyFactory factory = new CGLibProxyFactory(obj);
            Enhancer enhancer=new Enhancer();//利用`Enhancer`来创建被代理类的代理实例
            enhancer.setSuperclass(clazz);//设置目标class
            enhancer.setCallback(factory);//设置回调代理类
            return Optional.of((T)enhancer.create());
        } catch (InstantiationException | IllegalAccessException e) {
            e.printStackTrace();
        }
        return Optional.empty();
    }
}

public static void main(String[] args) {
        Optional<FooService> proxyObject = ObjectFactoryUtils.getProxyObject(FooService.class);
        if(proxyObject.isPresent()) {
            FooService foo = proxyObject.get();
            System.out.println("findByPage:");
            System.out.println(foo.findByPage().getTotal());
            System.out.println("findPage:");
            System.out.println(foo.findPage().getTotal());
        }
}

最后打印的输出是:

findByPage:
before method! do something...
after method! do something...
100
findPage:
before method! do something...
0

当然除了 CGLIB 代理也可以使用 Proxy 动态代理,同样的逻辑也可以达到动态的修改原始方法的目的,从而间接的实现函数扩展。不过 Proxy 动态代理是基于接口的代理。

其它语言的函数扩展

其实除了 Scala 的隐式转换和 Java 的动态代理,其他很多语言也能支持各种不同的函数扩展。

Swift

在 Swift 中可以通过关键词 extension 对已有的类进行扩展,可以扩展方法、属性、下标、构造器等等。

extension Int {
    func times(task: () -> Void) {
        for _ in 0..<self {
            task()
        } 
    }
}

比如说我给 Int 增加一个 times 方法。即执行任务的次数。就可以如下使用:

2.times({
    print("Hello!")
})

上面的代码会执行 2 次打印方法。

Go

在 Go 中可以通过在方法名前面加上一个变量,这个附加的参数会将该函数附加到这种类型上。即给一个方法加上接收器。

func (s string) toUpper() string {
    return strings.ToUpper(s)
}

"aaaaa".toUpper //输出 AAAAA

Kotlin

Kotlin 的函数扩展非常简单,就是定义的时候,函数名写成 接收器 + . + 方法名 就行了。

class C {

}
fun C.foo() { println("extension") }

C().foo() //输出extension

注意当给一个类扩展已有的方法的时候,默认使用的是类自带的成员函数。如下:

class C {
    fun foo() { println("member") }
}

fun C.foo() { println("extension") }

C().foo() //输出member

可以通过函数重载的方式区分成员函数(fun C.foo(i:Int) { println("extension") }),在调用的地方显示的区分。

JavaScript

在 JavaScript 中也可以很方便的给一个对象扩展函数。写法就是 对象 + . + 函数名

var date = new Date();
date.format = function() {
    return this.toISOString().slice(0, 10);
}
date.format(); //"2017-11-29"

也可以给一个 Object 进行扩展:

Date.prototype.format = function() {
     return this.toISOString().slice(0, 10);
}
new Date().format(); //"2017-11-29"

总结

其实了解不同语言对于函数扩展的实现挺有意思的,本文只是粗略的介绍了一下。合理的使用这些语言的扩展,可以帮助我们提高代码质量和工作效率。我们还可以通过函数扩展来对第三方类库进行修改或者扩展,从而更灵活的调用第三方类库。

0

JVM 揭秘:一个 class 文件的前世今生

引子:我们都知道,要运行一个包含 main 方法的 java 文件,首先要将其编译成 class 文件,然后加载 JVM 中,就可以运行了,但是这里存在一些疑问,比如编译之后的 class 文件中到底是什么东西呢?JVM 是如何执行 class 文件的呢?下面我们就以一个很简单的例子来看一下 JVM 到底是如何运行的。

1. 准备

后面所介绍的内容都以下面的 java 文件和 class 文件为例子:

java 文件:

HelloWorld-Java-File

class 文件:

HelloWorld-Class-File

2. class 文件的结构

从上面可以看到,class 文件确实和它的另一个名字字节码文件一样是由一个个的字节码组成的。这里要注意的是因为 class 文件是由一个个字节组成的,所以如果当一个数据大于一个字节的时候,是使用无符号大端模式进行存储的,大小端模式的区别可以参考这里。那么这些字节表示什么意思呢?JVM 是如何解析这些字节数据的呢?我们到 oracle 的官方文档上看一下他们是如何定义 class 文件的结构的:

Class-File-Structure

从上面可以看到,一个 Class 文件中的每一个字节都有指定的意义,比如一开始的 4 个字节代表的是 magic number,这个值对所有的 Class 文件都一样,就是 CAFEBABE,接下来的 2 个字节是次版本号。再比如 cp_info,这是一个非常重要的字段,就是后面要着重介绍的常量池。

如果需要看每个字段的代表的意思可以看一下Java Language and Virtual Machine Specifications

上面的结构看起来可以比较抽象,那么可以看一下下面这张示意图:

Class-File-Format

现在大家应该可以想到了,实际上 class 文件中的所有的字节都代表了固定的信息,所以 JVM 只要根据 class 文件的格式就可以知道这个 class 文件中的存放了什么内容了,比如说方法的信息,字段信息等。

3. class文件的重要组成

现在我们已经知道 class 文件的结构,现在来介绍一下 class 文件中一些重要组成部分。

3.1 常量池

常量池就是前面看到的 ClassFile 里的 cp_info 字段。我们先来直观的看一下常量池到底长什么样子:

Constant-Pool-Structure

上面就是 HelloWorld.class 的常量池。常量池的头两个字节表明了常量池中常量项的个数,因为只有两个字节所以常量项是有数量限制的。具体多少个可以自行计算。常量项个数后面紧跟的就是各个常量项了。每个常量项都有一个 1 个字节的 tag 标志位,用于表示这个常量项具体代表的内容,从图中可以看到如果 tag 是 0A 的话就表示这是一个 MethodRef 的常量项,从名字就可以看出来这是一个表示 Method 信息的常量项。

用专业一点的术语描述的话常量池中保存的内容就是字面量和符号引用。字面量就像类似于文本字符串,或者声明为 final 的常量值。符号引用包括 3 类常量类和接口的全限定名,字段名称和描述符,方法名称和描述符。

特别要注意的一点是常量池中的常量项的索引是从 1 开始的,这样做的目的是满足后面其他结构中需要表明不引用任何一个常量项的含义,这个时候就将索引值置为 0。

从前面的描述可以总结出来,所有的常量池项都具有如下通用格式:

cp_info {
   u1 tag;
   u1 info[];
}

常量池中,每个 cp_info 项(也就是常量项)的格式必须相同,它们都以一个表示 cp_info 类型的单字节 tag 项开头。后面 info[] 项的内容由tag的类型所决定。

tag 的类型有如下几种:

Constant-Pool-Tag

一些常见的常量项:

Class Info:

CONSTANT_Class_Info {
    u1 tag;
    u2 name_index;
}
  • tag 的值为 7
  • name_index 指向了常量池中索引为 name_index 的常量项

UTF8 Info:

CONSTANT_UTF8_Info {
    u1 tag;
    u2 length;
    u1 bytes[length];
}
  • tag 的值为 1
  • length 表示这个 UTF8 编码的字符串的字节数
  • bytes[length] 表示 length 长度的具体的字符串数据

注意:因为 class 文件中的方法名,字段名等都是要引用 UTF8 Info 的,但是 UTF8 Info 的数据长度就是2个字节,所以方法名,字段名的长度最大就是65535。

String Info:

CONSTANT_String_INFO {
    u1 tag;
    u2 string_index;
}
  • tag 的值为 8
  • string_index 指向了常量池中索引为送 string_index 的常量项

Field_Ref Info:

CONSTANT_Fieldref_Info {
    u1 tag;
    u2 class_index;
    u2 name_and_type_index;
}
  • tag 的值为 9
  • class_index 指向了常量池中索引为 class_index 的常量项,且这个常量项必须为 Class Info 类型
  • name_and_type_index 指向了常量池中索引为 name_and_type_index 的常量项,且这个常量项必须为 Name And Type Info 类型

Method_Ref Info:

CONSTANT_Methodref_Info {
    u1 tag;
    u2 class_index;
    u2 name_and_type_index;
}
  • tag 的值为 10
  • class_index 指向了常量池中索引为 class_index 的常量项,且这个常量项必须为 Class Info 类型
  • name_and_type_index 指向了常量池中索引为 name_and_type_index 的常量项,且这个常量项必须为 Name And Type Info 类型

NameAndType Info:

CONSTANT_NameAndType_Info {
    u1 tag;
    u2 name_index;
    u2 descriptor_index;
}
  • tag 的值为 12
  • name_index 指向了常量池中索引为 name_index 的常量项
  • descriptor_index 指向了常量池中索引为 descriptor_index 的常量项

3.2 字段

和之前的常量池一样,因为每个 class 中字段的数量是不确定的,所以字段部分的开头两个字节用于表示当前 class 文件中的字段的个数,紧跟着的才是具体的字段。

先来看一下字段的结构

    Field_Info {
        u2 access_flag;
        u2 name_index;
        u2 descriptor_index;
        u2 attribute_count;
        attribute_info attributes[attribute_count];
    }
  • access_flag 表示该字段的访问修饰符,字段的访问修饰符和类的表示方式相似,但是具体的内容不一样

    字段的访问标识

    FIELD-ACCESS-FLAG

  • name_index 指向常量池中的 name_index 索引的常量项

  • descriptor_index 指向常量池中的 descriptor_index 索引的常量项
  • attribute_count 表示该字段的属性个数
  • attributes[attribute_count] 表示该字段的具体的属性

注意:这里字段的 descriptor 代表的字段的类型,但是类型不是写代码的时候 int,String 这样整个单词的,它是一些字符的简写,如下:

DESCRIPTOR

所以,举个例子如果字段是 String 类型,那么它的 descriptor 就是Ljava/lang/Object; 如果字段是 int[][],那么它的 descriptor 就是[[I

字段的属性和下面介绍的方法的属性是一样的,下文统一介绍。

3.3 方法

方法和字段一样,也需要有一个表示方法个数的字段,同时这个字段后面紧跟的就是具体的方法

同样,来看一下方法的结构:

    Method_Info {
        u2 access_flag;
        u2 name_index;
        u2 descriptor_index;
        u2 attribute_count;
        attribute_info attributes[attribute_count]
    }
  • access_flag 的意义和之前field一样,只不过取值不同,method 的access flag 可以取的值如下:

    method-access-flag

  • name_index 的意义和 field 的也一样,表示了方法的名称

  • descriptor_index 的意义和 field 也一样,只不过其表示方法不同,让我们来看一下它是如何表示的:

    method 的 descriptor 由两部分组成,一部分是参数的 descriptor,一部分是返回值的 descriptor,所以 method 的 descriptor 的形式如下:

    ( ParameterDescriptor* ) ReturnDescriptor
    

    而参数的 descriptor 就是 field 的 descriptor,返回值的descriptor 也是 field 的 descriptor 但是多了一个类型就是 void 类型,其的 descriptor 如下:

        VoidDescriptor:V
    

    所以举个例子,如果一个方法的签名是

    Object m(int i, double d, Thread t) {..}
    

    那么它的 descriptor 就是

    (IDLjava/lang/Thread;)Ljava/lang/Object;
    
  • attribute_count 的意义和 field 一样表示属性的个数
  • attributes[attribute_count] 和 field 也一样表示具体的属性,属性的个数由 attribute_count 决定

3.4 属性

3.4.1 属性结构

属性这个数据结构可以出现在 class 文件,字段表,方法表中。有些属性是特有的,有些属性是三个共有的。

属性的描述如下:

Attribute

这里我们就不详细解释每一个属性了,我们来看一个方法表中最重要的属性,即 Code Attribute。为什么说它重要,因为我们的函数的代码就是在 Code Attribute 中(实际上存储的是指令)。其他属性的一些解释可以参考 Oracle 的 JVM 规范中的描述

3.4.2 Code Attribute

首先来看一下 Code Attribute 的结构

Code_attribute {
    u2 attribute_name_index;
    u4 attribute_length;
    u2 max_stack;
    u2 max_locals;
    u4 code_length;
    u1 code[code_length];
    u2 exception_table_length;
    {   u2 start_pc;
        u2 end_pc;
        u2 handler_pc;
        u2 catch_type;
    } exception_table[exception_table_length];
    u2 attributes_count;
    attribute_info attributes[attributes_count];
}

可以看到 Code Attribute 属性是非常复杂的,下面我们简单解释一下每个成员的含义:

  • attribute_name_index 指向的常量池中常量项的索引,而且这个常量项的类型必须是 UTF8 Info,值必须是 "Code"
  • attribute_length 表示这个属性的长度,但是不包括开始的 6 个字节
  • max_stack 表示 Code 属性所在的方法在运行时形成的函数栈帧中的操作数栈的最大深度
  • max_locals 表示最大局部变量表的长度
  • code_length表示 Code 属性所在的方法的长度(这个长度是方法代码编译成字节后字节的长度)
  • code[length]表示的就是具体的代码,所以说 java 函数的代码长度是有限制的,编译出来的字节指令的长度只能是 4 个字节所能代表的最大值。所以一个函数的代码不能太长,否者是不能编译的
  • exception_table_length 表示方法会抛出的异常数量
  • exception_table[exception_table_length] 表示具体的异常
  • attributes_count 表示 Code 属性中子属性的长度,之所以说属性复杂就是因为属性中还可以嵌套属性
  • attributes[attributes_count] 代表具体的属性

现在来直观的看一下 Code Attribute 的组成,下面就是 HelloWorld.class 中的 Code Attribute 属性:

Code-Attribute

3.4.3 Code Attribute的两个子属性

这里额外提一个 Code Attribute 中的两个子属性。不知道大家有没有想过为什么我们用 IDE 运行程序出错时,IDE 可以准确的定位到是哪一行代码出错了? 为什么我们在 IDE 中使用一个方法的时候可以看到这个方法的参数名,并且调试的时候可以根据参数名获取变量值?很关键的原因就在于 Code 属性的这两个子属性。

LineNumberTable

LineNumberTable的结构

LineNumberTable_attribute {
    u2 attribute_name_index;
    u4 attribute_length;
    u2 line_number_table_length;
    {   u2 start_pc;
        u2 line_number;
    } line_number_table[line_number_table_length];
}

我们着重要看的是 line_number_table 这个成员,可以看到这个成员表示的就是字节码指令和源码的对应关系,其中 start_pc 是 Code Attribute 中的 code[] 数组的索引值,line_number 是源文件的行号

LocalVariableTable

LocalVariableTable 的结构

LocalVariableTable_attribute {
    u2 attribute_name_index;
    u4 attribute_length;
    u2 local_variable_table_length;
    {   u2 start_pc;
        u2 length;
        u2 name_index;
        u2 descriptor_index;
        u2 index;
    } local_variable_table[local_variable_table_length];
}

其中最关键的成员大家也可以想到,肯定是 local_variable_table[local_variable_table_length],它里面属性的意义如下:

  • start_pc 和 length 表示局部变量的索引范围([start_pc, start_pc + length))
  • name_index 表示变量名在常量池中的索引
  • descriptor_index 表示变量描述符在常量池中的索引
  • index 表示此局部变量在局部变量表中的索引

LocalVariableTable 属性实际上是用于描述栈帧中局部变量表中的变量与 Java 源码中定义的变量之间的关系,所以根据这个属性,其他人引用这个方法时就可以知道这个方法的属性名,并且可以在调试的时候根据参数名称从上下文中获得参数值。

4. 执行引擎

前面讲的是 class 文件的静态结构,当 JVM 解析完 class 文件之后就会将其转成运行时结构,并将其存放在方法区中(也就是常说的永久代),然后会创建类对象(也就是 Class 对象)提供访问类数据的接口。

执行的时候 JVM 总是会先从 main 方法开始执行,其实就是从 Class 的所有方法中找到 main 方法,然后从 main 方法的 Code Attribute 中找到方法体的字节码然后调用执行引擎执行。所以要知道 JVM 是如何执行代码的就要了解一些字节码的内容。

4.1 运行时栈帧结构

先来看一看JVM的运行时结构

RUNTIME_STRUCT

因为JVM是一个基于栈的虚拟机,所以基本上所有的操作都是需要通过对栈的操作完成的。执行的过程就是从 main 函数开始(一开始就会为 main 函数创建一个函数栈帧),执行 main 函数的指令(在 Code Attribute 中),如果要调用方法就创建一个新的函数栈帧,如果函数执行完成就弹出第一个函数栈帧。

4.2 JVM的指令

不管你在 java 源文件中写了什么函数,用了什么高深的算法,经过编译器的编译,到了 class 文件中都是一个个的字节,而 Code Attribute 中的code[] 字段中的字节就是函数翻译过来的字节码指令。

JVM 支持的指令大致上可以分成 3 种:没有操作数的、一个操作数的和;两个操作数的。因为 JVM 用一个字节来表示指令,所以指令的最多只有 256 个。

JVM指令通用形式如下:

INSTRUCTION

4.3 几个常用的指令解析

因为 JVM 的指令太多了,在这里不可能全部都解析一遍,所以就选择了几个指令进行解析。

4.3.1 invokespecial

INVOKESPECIAL

说明:invokespecial 用于调用实例方法,专门用来处理调用超类方法、私有方法和实例初始化方法。

indexByte1 和indexByte2 用于组成常量池中的索引((indexbyte1 << 8)|indexbyte2)。所指向的常量项必须是 MethodRef Info 类型。同时该条指令还会创建一个函数栈帧,然后从当前的操作数栈中出栈被调用的方法的参数,并且将其放到被调用方法的函数栈帧的本地变量表中。

4.3.2 aload_n

ALOAD_N

说明:aload_n 从局部变量表加载一个 reference 类型值到操作数栈中,至于从当前函数栈帧的本地变量表中加载哪个变量是有N的值决定的。

4.3.3 astore_n

ASTORE_N

说明:将一个 reference 类型数据保存到局部变量表中,至于保存在局部变量表的哪个位置就由 N 的值决定。

好了,指令就介绍到这里,要看所有指令的说明可以看 Oracle 的 JVM 指令集,里面有对每一个指令的详细说明。

所以执行引擎要做的工作就是根据每一个指令要执行的功能进行对应的实现。

5. 总结

因为 JVM 的内容太过于丰富,这里只分析了 JVM 执行的主要的流程,还有些内容比如:类加载,类的链接(验证,准备,解析),初始化等过程没有说明。不是说这些内容不重要而是我们平时写代码的时候可以更加关注上面所介绍的一些内容。这里我也针对上面的内容写了一个可以运行的例子, 可以在这里找到。

6. 参考

  1. The Java® Virtual Machine Specification

  2. 深入理解Java虚拟机:JVM 高级特性与最佳实践(第2版)

  3. 深入java虚拟机第二版

0

响应式编程(下):Spring 5

引子:被誉为“中国大数据第一人”的涂子沛先生在其成名作《数据之巅》里提到,摩尔定律、社交媒体、数据挖掘是大数据的三大成因。IBM 的研究称,整个人类文明所获得的全部数据中,有 90% 是过去两年内产生的。在此背景下,包括NoSQL、Hadoop、Spark、Storm、Kylin在内的大批新技术应运而生。其中以 RxJavaReactor 为代表的响应式(Reactive)编程技术针对的就是经典的大数据4V定义(Volume,Variety,Velocity,Value)中的 Velocity,即高并发问题,而在刚刚发布的 Spring 5 中,也引入了响应式编程的支持。我将分上下两篇与你分享与响应式编程有关的一些学习心得。本篇是下篇,对刚刚发布的 Spring 5 中有关响应式编程的支持做一些简单介绍,并详解一个完整的 Spring 5 示例应用。

1. Spring 5 中的响应式编程

作为 Java 世界首个响应式 Web 框架,Spring 5 最大的亮点莫过于提供了完整的端到端响应式编程的支持。

图片出处:Spring Framework Reference Documentation

左侧是传统的基于 Servlet 的 Spring Web MVC 框架,右侧是 5.0 版本新引入的基于 Reactive Streams 的 Spring WebFlux 框架,从上到下依次是 Router Functions,WebFlux,Reactive Streams 三个新组件。

  • Router Functions: 对标 @Controller、@RequestMapping 等标准的 Spring MVC 注解,提供一套函数式风格的 API,用于创建 Router,Handler 和 Filter。
  • WebFlux: 核心组件,协调上下游各个组件提供响应式编程支持。
  • Reactive Streams: 一种支持背压(Backpressure)的异步数据流处理标准,主流实现有 RxJava 和 Reactor,Spring WebFlux 默认集成的是 Reactor。

在Web容器的选择上,Spring WebFlux 既支持像 Tomcat,Jetty 这样的的传统容器(前提是支持 Servlet 3.1 Non-Blocking IO API),又支持像 Netty,Undertow 那样的异步容器。不管是何种容器,Spring WebFlux 都会将其输入输出流适配成 Flux<DataBuffer> 格式,以便进行统一处理。

值得一提的是,除了新的 Router Functions 接口,Spring WebFlux 同时支持使用老的 Spring MVC 注解声明Reactive Controller。和传统的 MVC Controller 不同,Reactive Controller 操作的是非阻塞的ServerHttpRequest 和 ServerHttpResponse,而不再是 Spring MVC 里的 HttpServletRequest 和 HttpServletResponse。

2. 实战

下面我将以一个简单的 Spring 5 应用为例,介绍如何使用 Spring 5 快速搭建一个响应式Web应用(以下简称 RP 应用)。

2.1 环境准备

首先,从 GitHub 下载我的这个示例应用,地址是https://github.com/emac/spring5-features-demo

然后,从 MongoDB 官网下载最新版本的MongoDB,然后在命令行下运行 mongod & 启动服务。

现在,可以先试着跑一下项目中自带的测试用例。

./gradlew clean build

2.2 依赖介绍

接下来,看一下这个示例应用里的和响应式编程相关的依赖。

compile('org.springframework.boot:spring-boot-starter-webflux')
compile('org.springframework.boot:spring-boot-starter-data-mongodb-reactive')
  • spring-boot-starter-webflux: 启用 Spring 5 的 RP(Reactive Programming)支持,这是使用 Spring 5 开发 RP 应用的必要条件,就好比 spring-boot-starter-web 之于传统的 Spring MVC 应用。
  • spring-boot-starter-data-mongodb-reactive: Spring 5 中新引入的针对 MongoDB 的 Reactive Data 扩展库,允许通过统一的 RP 风格的API操作 MongoDB。

2.3 第一种方式:MVC 注解

Spring 5 提供了 Spring MVC 注解和 Router Functions 两种方式来编写 RP 应用。首先,我先用大家最熟悉的MVC注解来展示如何编写一个最简单的 RP Controller。

示例代码

@RestController
public class RestaurantController {

    /**
     * 扩展ReactiveCrudRepository接口,提供基本的CRUD操作
     */
    private final RestaurantRepository restaurantRepository;

    /**
     * spring-boot-starter-data-mongodb-reactive提供的通用模板
     */
    private final ReactiveMongoTemplate reactiveMongoTemplate;

    public RestaurantController(RestaurantRepository restaurantRepository, ReactiveMongoTemplate reactiveMongoTemplate) {
        this.restaurantRepository = restaurantRepository;
        this.reactiveMongoTemplate = reactiveMongoTemplate;
    }

    @GetMapping("/reactive/restaurants")
    public Flux<Restaurant> findAll() {
        return restaurantRepository.findAll();
    }

    @GetMapping("/reactive/restaurants/{id}")
    public Mono<Restaurant> get(@PathVariable String id) {
        return restaurantRepository.findById(id);
    }

    @PostMapping("/reactive/restaurants")
    public Flux<Restaurant> create(@RequestBody Flux<Restaurant> restaurants) {
        return restaurants
                .buffer(10000)
                .flatMap(rs -> reactiveMongoTemplate.insert(rs, Restaurant.class));
    }

    @DeleteMapping("/reactive/restaurants/{id}")
    public Mono<Void> delete(@PathVariable String id) {
        return restaurantRepository.deleteById(id);
    }
}

可以看到,实现一个 RP Controller 和一个普通的 Controller 是非常类似的,最核心的区别是,优先使用 RP 中最基础的两种数据类型,Flux(对应多值)和 Mono(单值),尤其是方法的参数和返回值。即便是空返回值,也应封装为 Mono<Void>。这样做的目的是,使得应用能够以一种统一的符合 RP 规范的方式处理数据,最理想的情况是从最底层的数据库(或者其他系统外部调用),到最上层的 Controller 层,所有数据都不落地,经由各种 FluxMono 铺设的“管道”,直供调用端。就像农夫山泉那句著名的广告词,我们不生产水,我们只是大自然的搬运工。

单元测试

和非 RP 应用的单元测试相比,RP 应用的单元测试主要是使用了一个 Spring 5 新引入的测试工具类,WebTestClient,专门用于测试 RP 应用。

@RunWith(SpringRunner.class)
@SpringBootTest
public class RestaurantControllerTests {

    @Test
    public void testNormal() throws InterruptedException {
        // start from scratch
        restaurantRepository.deleteAll().block();

        // prepare
        WebTestClient webClient = WebTestClient.bindToController(new RestaurantController(restaurantRepository, reactiveMongoTemplate)).build();
        Restaurant[] restaurants = IntStream.range(0, 100)
                .mapToObj(String::valueOf)
                .map(s -> new Restaurant(s, s, s))
                .toArray(Restaurant[]::new);

        // create
        webClient.post().uri("/reactive/restaurants")
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .syncBody(restaurants)
                .exchange()
                .expectStatus().isOk()
                .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
                .expectBodyList(Restaurant.class)
                .hasSize(100)
                .consumeWith(rs -> Flux.fromIterable(rs.getResponseBody())
                        .log()
                        .subscribe(r1 -> {
                            // get
                            webClient.get()
                                    .uri("/reactive/restaurants/{id}", r1.getId())
                                    .accept(MediaType.APPLICATION_JSON_UTF8)
                                    .exchange()
                                    .expectStatus().isOk()
                                    .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
                                    .expectBody(Restaurant.class)
                                    .consumeWith(r2 -> Assert.assertEquals(r1, r2));
                        })
                );
    }
}

创建 WebTestClient 实例时,首先要绑定一下待测试的 RP Controller。可以看到,和业务类一样,编写 RP 应用的单元测试,同样也是数据不落地的流式风格。

2.4 第二种方式:Router Functions

接着介绍实现 RP 应用的另一种实现方式 —— Router Functions。

Router Functions 是 Spring 5 新引入的一套 Reactive 风格(基于 Flux 和 Mono)的函数式接口,主要包括RouterFunctionHandlerFunctionHandlerFilterFunction,分别对应 Spring MVC 中的 @RequestMapping@ControllerHandlerInterceptor(或者 Servlet 规范中的 Filter)。

和 Router Functions 搭配使用的是两个新的请求/响应模型,ServerRequestServerResponse,这两个模型同样提供了 Reactive 风格的接口

示例代码

自定义 RouterFunction 和 HandlerFilterFunction
@Configuration
public class RestaurantServer implements CommandLineRunner {

    @Autowired
    private RestaurantHandler restaurantHandler;

    /**
     * 注册自定义RouterFunction
     */
    @Bean
    public RouterFunction<ServerResponse> restaurantRouter() {
        RouterFunction<ServerResponse> router = route(GET("/reactive/restaurants").and(accept(APPLICATION_JSON_UTF8)), restaurantHandler::findAll)
                .andRoute(GET("/reactive/delay/restaurants").and(accept(APPLICATION_JSON_UTF8)), restaurantHandler::findAllDelay)
                .andRoute(GET("/reactive/restaurants/{id}").and(accept(APPLICATION_JSON_UTF8)), restaurantHandler::get)
                .andRoute(POST("/reactive/restaurants").and(accept(APPLICATION_JSON_UTF8)).and(contentType(APPLICATION_JSON_UTF8)), restaurantHandler::create)
                .andRoute(DELETE("/reactive/restaurants/{id}").and(accept(APPLICATION_JSON_UTF8)), restaurantHandler::delete)
                // 注册自定义HandlerFilterFunction
                .filter((request, next) -> {
                    if (HttpMethod.PUT.equals(request.method())) {
                        return ServerResponse.status(HttpStatus.BAD_REQUEST).build();
                    }
                    return next.handle(request);
                });
        return router;
    }

    @Override
    public void run(String... args) throws Exception {
        RouterFunction<ServerResponse> router = restaurantRouter();
        // 转化为通用的Reactive HttpHandler
        HttpHandler httpHandler = toHttpHandler(router);
        // 适配成Netty Server所需的Handler
        ReactorHttpHandlerAdapter httpAdapter = new ReactorHttpHandlerAdapter(httpHandler);
        // 创建Netty Server
        HttpServer server = HttpServer.create("localhost", 9090);
        // 注册Handler并启动Netty Server
        server.newHandler(httpAdapter).block();
    }
}

可以看到,使用 Router Functions 实现 RP 应用时,你需要自己创建和管理容器,也就是说 Spring 5 并没有针对 Router Functions 提供 IoC 支持,这是 Router Functions 和 Spring MVC 相比最大的不同。除此之外,你需要通过 RouterFunction 的 API(而不是注解)来配置路由表和过滤器。对于简单的应用,这样做问题不大,但对于上规模的应用,就会导致两个问题:1)Router 的定义越来越庞大;2)由于 URI 和 Handler 分开定义,路由表的维护成本越来越高。那为什么 Spring 5 会选择这种方式定义 Router 呢?接着往下看。

自定义 HandlerFunction
@Component
public class RestaurantHandler {

    /**
     * 扩展ReactiveCrudRepository接口,提供基本的CRUD操作
     */
    private final RestaurantRepository restaurantRepository;

    /**
     * spring-boot-starter-data-mongodb-reactive提供的通用模板
     */
    private final ReactiveMongoTemplate reactiveMongoTemplate;

    public RestaurantHandler(RestaurantRepository restaurantRepository, ReactiveMongoTemplate reactiveMongoTemplate) {
        this.restaurantRepository = restaurantRepository;
        this.reactiveMongoTemplate = reactiveMongoTemplate;
    }

    public Mono<ServerResponse> findAll(ServerRequest request) {
        Flux<Restaurant> result = restaurantRepository.findAll();
        return ok().contentType(APPLICATION_JSON_UTF8).body(result, Restaurant.class);
    }

    public Mono<ServerResponse> findAllDelay(ServerRequest request) {
        Flux<Restaurant> result = restaurantRepository.findAll().delayElements(Duration.ofSeconds(1));
        return ok().contentType(APPLICATION_JSON_UTF8).body(result, Restaurant.class);
    }

    public Mono<ServerResponse> get(ServerRequest request) {
        String id = request.pathVariable("id");
        Mono<Restaurant> result = restaurantRepository.findById(id);
        return ok().contentType(APPLICATION_JSON_UTF8).body(result, Restaurant.class);
    }

    public Mono<ServerResponse> create(ServerRequest request) {
        Flux<Restaurant> restaurants = request.bodyToFlux(Restaurant.class);
        Flux<Restaurant> result = restaurants
                .buffer(10000)
                .flatMap(rs -> reactiveMongoTemplate.insert(rs, Restaurant.class));
        return ok().contentType(APPLICATION_JSON_UTF8).body(result, Restaurant.class);
    }

    public Mono<ServerResponse> delete(ServerRequest request) {
        String id = request.pathVariable("id");
        Mono<Void> result = restaurantRepository.deleteById(id);
        return ok().contentType(APPLICATION_JSON_UTF8).build(result);
    }
}

对比前面的 RestaurantController,由于去除了路由信息,RestaurantHandler 变得非常函数化,可以说就是一组相关的 HandlerFunction 的集合,同时各个方法的可复用性也大为提升。这就回答了上一小节提出的疑问,即以牺牲可维护性为代价,换取更好的函数特性。

单元测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class RestaurantHandlerTests extends BaseUnitTests {

    @Autowired
    private RouterFunction<ServerResponse> restaurantRouter;

    @Override
    protected WebTestClient prepareClient() {
        WebTestClient webClient = WebTestClient.bindToRouterFunction(restaurantRouter)
                .configureClient().baseUrl("http://localhost:9090").responseTimeout(Duration.ofMinutes(1)).build();
        return webClient;
    }
}

和针对 Controller 的单元测试相比,编写 Handler 的单元测试的主要区别在于初始化 WebTestClient 方式的不同,测试方法的主体可以完全复用。

3 小结

到此,有关响应式编程的介绍就暂且告一段落。回顾这两篇文章,我先是从响应式宣言说起,然后介绍了响应式编程的基本概念和关键特性,并且详解了 Spring 5 中和响应式编程相关的新特性,最后以一个示例应用结尾。希望读完这些文章,对你理解响应式编程能有所帮助。

4 参考

0

响应式编程(上):总览

引子:被誉为“中国大数据第一人”的涂子沛先生在其成名作《数据之巅》里提到,摩尔定律、社交媒体、数据挖掘是大数据的三大成因。IBM 的研究称,整个人类文明所获得的全部数据中,有 90% 是过去两年内产生的。在此背景下,包括 NoSQL、Hadoop、Spark、Storm、Kylin 在内的大批新技术应运而生。其中以 RxJavaReactor 为代表的响应式(Reactive)编程技术针对的就是经典的大数据 4V 定义(Volume,Variety,Velocity,Value)中的 Velocity,即高并发问题,而在刚刚发布的 Spring 5 中,也引入了响应式编程的支持。我将分上下两篇与你分享与响应式编程有关的一些学习心得。本篇是上篇,以 Reactor 框架为例介绍响应式编程的几个关键特性。

1. 响应式宣言

敏捷宣言一样,说起响应式编程,必先提到响应式宣言。

We want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems. - The Reactive Manifesto

图片出处:The Reactive Manifesto

不知道是不是为了向敏捷宣言致敬,响应式宣言中也包含了 4 组关键词:

  • Responsive:可响应的。要求系统尽可能做到在任何时候都能及时响应。
  • Resilient:可恢复的。要求系统即使出错了,也能保持可响应性。
  • Elastic:可伸缩的。要求系统在各种负载下都能保持可响应性。
  • Message Driven:消息驱动的。要求系统通过异步消息连接各个组件。

可以看到,对于任何一个响应式系统,首先要保证的就是可响应性,否则就称不上是响应式系统。从这个意义上来说,动不动就蓝屏的 Windows 系统显然不是一个响应式系统。

PS: 如果你赞同响应式宣言,不妨到官网上留下的你电子签名,我的编号是 18989,试试看能不能找到我。

2. 响应式编程

In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. - Reactive programming - Wikipedia

在上述响应式编程(后面简称 RP)的定义中,除了异步编程,还包含两个重要的关键词:

  • Data streams:即数据流,分为静态数据流(比如数组,文件)和动态数据流(比如事件流,日志流)两种。基于数据流模型,RP 得以提供一套统一的 Stream 风格的数据处理接口。和 Java 8 中的 Stream API 相比,RP API 除了支持静态数据流,还支持动态数据流,并且允许复用和同时接入多个订阅者。
  • The propagation of change:变化传播,简单来说就是以一个数据流为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程。这就有点像函数式编程中的组合函数,将多个函数串联起来,把一组输入数据转化为格式迥异的输出数据。

一个容易混淆的概念是响应式设计,虽然它的名字中也包含了“响应式”三个字,但其实和 RP 完全是两码事。响应式设计是指网页能够自动调整布局和样式以适配不同尺寸的屏幕,属于网站设计的范畴,而 RP 是一种关注系统可响应性,面向数据流的编程思想或者说编程框架。

特性

从本质上说,RP 是一种异步编程框架,和其他框架相比,RP 至少包含了以下三个特性:

  • 描述而非执行:在你最终调用 subscribe() 方法之前,从发布端到订阅端,没有任何事会发生。就好比无论多长的水管,只要水龙头不打开,水管里的水就不会流动。为了提高描述能力,RP 提供了比 Stream 丰富的多的多的API,比如 buffer()merge()onErrorMap() 等。
  • 提高吞吐量:类似于 HTTP/2 中的连接复用,RP 通过线程复用来提高吞吐量。在传统的Servlet容器中,每来一个请求就会发起一个线程进行处理。受限于机器硬件资源,单台服务器所能支撑的线程数是存在一个上限的,假设为T,那么应用同时能处理的请求数(吞吐量)必然也不会超过T。但对于一个使用 Spring 5 开发的 RP 应用,如果运行在像 Netty 这样的异步容器中,无论有多少个请求,用于处理请求的线程数是相对固定的,因此最大吞吐量就有可能超过T。
  • 背压(Backpressure)支持:简单来说,背压就是一种反馈机制。在一般的 Push 模型中,发布者既不知道也不关心订阅者的处理速度,当数据的发布速度超过处理速度时,需要订阅者自己决定是缓存还是丢弃。如果使用 RP,决定权就交回给发布者,订阅者只需要根据自己的处理能力问发布者请求相应数量的数据。你可能会问这不就是 Pull 模型吗?其实是不同的。在 Pull 模型中,订阅者每次处理完数据,都要重新发起一次请求拉取新的数据,而使用背压,订阅者只需要发起一次请求,就能连续不断的重复请求数据。

适用场景

了解了 RP 的这些特性,你可能已经猜想到 RP 有哪些适用场景了。一般来说,RP 适用于高并发、带延迟操作的场景,比如以下这些情况(的组合):

  • 一次请求涉及多次外部服务调用
  • 非可靠的网络传输
  • 高并发下的消息处理
  • 弹性计算网络

代价

Every coin has two sides.

和任何框架一样,有优势必然就有劣势。RP 的两个比较大的问题是:

  • 虽然复用线程有助于提高吞吐量,但一旦在某个回调函数中线程被卡住,那么这个线程上所有的请求都会被阻塞,最严重的情况,整个应用会被拖垮。
  • 难以调试。由于 RP 强大的描述能力,在一个典型的 RP 应用中,大部分代码都是以链式表达式的形式出现,比如flux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe(),一旦出错,你将很难定位到具体是哪个环节出了问题。所幸的是,RP 框架一般都会提供一些工具方法来辅助进行调试。

3. Reactor 实战

为了帮助你理解上面说的一些概念,下面我就通过几个测试用例,演示 RP 的两个关键特性:提高吞吐量和背压。完整的代码可参见我 GitHub 上的示例工程

提高吞吐量

    @Test
    public void testImperative() throws InterruptedException {
        _runInParallel(CONCURRENT_SIZE, () -> {
            ImperativeRestaurantRepository.INSTANCE.insert(load);
        });
    }

    private void _runInParallel(int nThreads, Runnable task) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
        for (int i = 0; i < nThreads; i++) {
            executorService.submit(task);
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.MINUTES);
    }

    @Test
    public void testReactive() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(CONCURRENT_SIZE);
        for (int i = 0; i < CONCURRENT_SIZE; i++) {
            ReactiveRestaurantRepository.INSTANCE.insert(load).subscribe(s -> {
            }, e -> latch.countDown(), latch::countDown);
        }
        latch.await();
    }

用例解读:

  • 第一个测试用例使用的是多线程 + MongoDB Driver,同时起 100 个线程,每个线程往 MongoDB 中插入 10000 条数据,总共 100 万条数据,平均用时15秒左右。
  • 第二个测试用例使用的是 Reactor + MongoDB Reactive Streams Driver,同样是插入 100 万条数据,平均用时不到 10 秒,吞吐量提高了
    50%!

背压

在演示测试用例之前,先看两张图,帮助你更形象的理解什么是背压。

图片出处:Dataflow and simplified reactive programming

两张图乍一看没啥区别,但其实是完全两种不同的背压策略。第一张图,发布速度(100/s)远大于订阅速度(1/s),但由于背压的关系,发布者严格按照订阅者的请求数量发送数据。第二张图,发布速度(1/s)小于订阅速度(100/s),当订阅者请求100个数据时,发布者会积满所需个数的数据再开始发送。可以看到,通过背压机制,发布者可以根据各个订阅者的能力动态调整发布速度。

    @BeforeEach
    public void beforeEach() {
        // initialize publisher
        AtomicInteger count = new AtomicInteger();
        timerPublisher = Flux.create(s ->
                new Timer().schedule(new TimerTask() {
                    @Override
                    public void run() {
                        s.next(count.getAndIncrement());
                        if (count.get() == 10) {
                            s.complete();
                        }
                    }
                }, 100, 100)
        );
    }

    @Test
    public void testNormal() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        timerPublisher
                .subscribe(r -> System.out.println("Continuous consuming " + r),
                        e -> latch.countDown(),
                        latch::countDown);
        latch.await();
    }

    @Test
    public void testBackpressure() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference<Subscription> timerSubscription = new AtomicReference<>();
        Subscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
            @Override
            protected void hookOnSubscribe(Subscription subscription) {
                timerSubscription.set(subscription);
            }

            @Override
            protected void hookOnNext(Integer value) {
                System.out.println("consuming " + value);
            }

            @Override
            protected void hookOnComplete() {
                latch.countDown();
            }

            @Override
            protected void hookOnError(Throwable throwable) {
                latch.countDown();
            }
        };
        timerPublisher.onBackpressureDrop().subscribe(subscriber);
        new Timer().schedule(new TimerTask() {
            @Override
            public void run() {
                timerSubscription.get().request(1);
            }
        }, 100, 200);
        latch.await();
    }

用例解读:

  • 第一个测试用例演示了在理想情况下,即订阅者的处理速度能够跟上发布者的发布速度(以 100ms 为间隔产生 10 个数字),控制台从 0 打印到 9,一共 10 个数字,和发布端一致。
  • 第二个测试用例故意调慢了订阅者的处理速度(每 200ms 处理一个数字),同时发布者采用了 Drop 的背压策略,结果控制台只打印了一半的数字(0,2,4,6,8),另外一半的数字由于背压的原因被发布者 Drop 掉了,并没有发给订阅者。

4 小结

通过上面的介绍,不难看出 RP 实际上是一种内置了发布者订阅者模型的异步编程框架,包含了线程复用,背压等高级特性,特别适用于高并发、有延迟的场景。

下篇我将对刚刚发布的 Spring 5 中有关响应式编程的支持做一些简单介绍,并详解一个完整的 Spring 5 示例应用,敬请期待。

5 参考

0

乐高式微服务化改造(下)—— 注册中心、配置中心和授权中心

上篇讲了杏仁微服务化改造的项目背景和基本框架,这篇我将进一步介绍其中的三大核心组件,即注册中心,配置中心和授权中心。
- 注册中心:所有服务注册到 Consul 集群,然后通过 Consul Template 刷新Nginx配置实现负载均衡
- 配置中心:使用自研的 Matrix 系统,通过自定义构建插件覆写配置,最小化对已有应用的侵入性
- 授权中心:基于 Spring Security OAuth,同时支持基于微信企业号的 SSO

1. 注册中心

作为微服务架构最基础也是最重要的组件之一,服务注册中心本质上是为了解耦服务提供者和服务消费者。对于任何一个微服务,原则上都应存在或者支持多个提供者,这是由微服务的分布式属性决定的。更进一步,为了支持弹性扩缩容特性,一个微服务的提供者的数量和分布往往是动态变化的,也是无法预先确定的。因此,原本在单体应用阶段常用的静态 LB 机制就不再适用了,需要引入额外的组件来管理微服务提供者的注册与发现,而这个组件就是服务注册中心。

设计或者选型一个服务注册中心,首先要考虑的就是服务注册与发现机制。纵观当下各种主流的服务注册中心解决方案,大致可归为三类:

  • 应用内:直接集成到应用中,依赖于应用自身完成服务的注册与发现,最典型的是 Netflix 提供的 Eureka
  • 应用外:把应用当成黑盒,通过应用外的某种机制将服务注册到注册中心,最小化对应用的侵入性,比如 Airbnb 的 SmartStack,HashiCorp 的 Consul
  • DNS:将服务注册为 DNS 的 SRV 记录,严格来说,是一种特殊的应用外注册方式,SkyDNS 是其中的代表

注1:对于第一类注册方式,除了 Eureka 这种一站式解决方案,还可以基于 ZooKeeper 或者 Etcd 自行实现一套服务注册机制,这在大公司比较常见,但对于小公司而言显然性价比太低。

注2:由于 DNS 固有的缓存缺陷,这里不对第三类注册方式作深入探讨。

除了基本的服务注册与发现机制,从开发和运维角度,至少还要考虑如下五个方面:

  • 测活:服务注册之后,如何对服务进行测活以保证服务的可用性?
  • 负载均衡:当存在多个服务提供者时,如何均衡各个提供者的负载?
  • 集成:在服务提供端或者调用端,如何集成注册中心?
  • 运行时依赖:引入注册中心之后,对应用的运行时环境有何影响?
  • 可用性:如何保证注册中心本身的可用性,特别是消除单点故障?

下面就围绕这几个方面,简单分析一下 Eureka,SmartStack,Consul 的利弊。

Eureka

eureka

从设计角度来看,Eureka 可以说是无懈可击,注册中心、提供者、调用者边界清晰,通过去中心化的集群支持保证了注册中心的整体可用性,但缺点是
Eureka 属于应用内的注册方式,对应用的侵入性太强,且只支持 Java 应用。

SmartStack

smartstack

SmartStack 可以说是三种方案中最复杂的,涉及了 ZooKeeper、HAProxy、Nerve 和 Synapse 四种异构组件,对运维提出了很高的要求。它最大的好处是对应用零侵入,且适用于任意类型的应用。

Consul

Consul 本质上属于应用外的注册方式,但可以通过集成 Consul SDK 加上本地 Agent的 方式简化注册流程。当服务以容器方式运行时,可以更进一步通过Registrator 实现自动注册。服务调用端的服务发现默认依赖于 SDK,但可以通过 Consul Template 去除 SDK 依赖。

最终方案

最终我们选择了 Consul 作为服务注册中心的实现方案,主要原因有两点:

  1. 最小化对已有应用的侵入性,这也是贯穿我们整个微服务化改造的原则之一。
  2. 降低运维的复杂度,通过使用 Registrator 和 Consul Template 实现服务自注册和自发现。

image_1bokoa1n8g8nhga1phjdcs1s2s13

2. 配置中心

我们知道,大至一个 PaaS 平台,小至一个缓存框架,一般都依赖于特定的配置以正常提供服务,微服务也不例外。

配置分类

  • 按配置的来源划分,主要有源代码(俗称 hard-code),文件,数据库和远程调用。
  • 按配置的适用环境划分,可分为开发环境,测试环境,预发布环境,生产环境等。
  • 按配置的集成阶段划分,可分为编译时,打包时和运行时。编译时,最常见的有两种,一是源代码级的配置,二是把配置文件和源代码一起提交到代码仓库中。打包时,即在应用打包阶段通过某种方式将配置(一般是文件形式)打入最终的应用包中。运行时,是指应用启动前并不知道具体的配置,而是在启动时,先从本地或者远程获取配置,然后再正常启动。
  • 按配置的加载方式划分,可分为单次加载型配置和动态加载型配置。

演变

随着业务复杂度的上升和技术架构的演变,对应用的配置方式也提出了越来越高的要求。一个典型的演变过程往往是这样的,起初所有配置跟源代码一起放在代码仓库中;之后出于安全性的考虑,将配置文件从代码仓库中分离出来,或者放在 CI 服务器上通过打包脚本打入应用包中,或者直接放到运行应用的服务器的特定目录下,剩下的非文件形式的关键配置则存入数据库中。上述这种方式,在单体应用阶段非常常见,也往往可以运行的很好,但到了微服务阶段,面对爆发式增长的应用数量和服务器数量,就显得无能为力了。这时,就轮到配置中心大显身手了。那什么是配置中心?简单来说,就是一种统一管理各种应用配置的基础服务组件。

框架选型

选型一个合格的配置中心,至少需要满足如下 4 个核心需求:

  • 非开发环境下应用配置的保密性,避免将关键配置写入源代码
  • 不同部署环境下应用配置的隔离性,比如非生产环境的配置不能用于生产环境
  • 同一部署环境下的服务器应用配置的一致性,即所有服务器使用同一份配置
  • 分布式环境下应用配置的可管理性,即提供远程管理配置的能力

现在开源社区主流的配置中心框架有 Spring Cloud Config 和 disconf,两者都满足了上述4个核心需求,但又有所区别。

Spring Cloud Config

spring-cloud-config

Spring Cloud Config 可以说是一个为 Spring 量身定做的轻量级配置中心,巧妙的将应用运行环境映射为 profile,应用版本映射为 label。在服务端,基于特定的外部系统(Git、文件系统或者 Vault)存储和管理应用配置;在客户端,利用强大的 Spring 配置系统,在运行时加载应用配置。

disconf

disconf

disconf是前百度资深研发工程师廖绮绮的开源作品。在服务端,提供了完善的操作界面管理各种运行环境,应用和配置文件;在客户端,深度集成Spring,通过Spring AOP实现应用配置的自动加载和刷新。

最终方案

不管是 Spring Cloud Config 还是 disconf,默认提供的客户端都深度绑定了 Spring 框架,这对非 Spring 应用而言无疑增加了集成成本,即便它们都提供了获取应用配置的 API。最终我们还是选用了微服务化改造之前自研的 Matrix 作为配置中心,一方面,可以保持新老系统使用同一套配置服务,降低维护成本,另一方面,在满足 4 个核心需求的前提下,Matrix 还提供了一些独有的能力。

  • 分离配置文件和配置项。对于配置文件,通过各类配套打包插件(sbt,maven,gradle),在打包时将配置文件打入应用包中,同时最小化对CI的侵入性;对于配置项,提供 SDK,帮助应用从服务端获取配置项,同时支持简单的缓存机制。
  • 增加应用版本维度,即对于同一应用,可以在服务端针对不同版本或版本区间维护不同的应用配置。
  • 应用配置的版本化支持,类似于 Git,可以将任一应用配置回退到任一历史版本。

3. 授权中心

有了服务注册中心和配置中心,下一步应该就可以发起服务调用了吧?Wait,还有一个关键问题要解决。不同于单体应用内部的方法调用,服务调用存在一个服务授权的概念。打个比方,原本一家三兄弟住一屋,每次上山打猎喊一声就行,后来三兄弟分了家,再打猎就要挨家挨户敲门了。这一敲一应就是所谓的服务授权。

严格来说,服务授权包含鉴权(Authentication)和授权(Authorization)两部分。鉴权解决的是调用方身份识别的问题,即敲门的是谁。授权解决的是调用是否被允许的问题,即让不让进门。两者一先一后,缺一不可。为避免歧义,如不特殊指明,下文所述授权都是宽泛意义上的授权,即包含了鉴权。

常见的服务授权有三种,简单授权,协议授权和中央授权。

  • 简单授权:服务提供方并不进行真正的授权,而是依赖于外部环境进行自动授权,比如IP地址白名单,内网域名等。这就好比三兄弟互相留了一个后门。
  • 协议授权:服务提供方和服务调用方事先约定一个密钥,服务调用方每次发起服务调用请求时,用约定的密钥对请求内容进行加密生成鉴权头(包含调用方唯一识别 ID),服务提供方收到请求后,根据鉴权头找到相应的密钥对请求进行鉴权,鉴权通过后再决定是否授权此次调用。这就好比三兄弟之间约定敲一声是大哥,敲两声是二哥,敲三声是三弟。
  • 中央授权:引入独立的授权中心,服务调用方每次发起服务调用请求时,先从授权中心获取一个授权码,然后附在原始请求上一起发给服务提供方,提供方收到请求后,先通过授权中心将授权码还原成调用方身份信息和相应的权限列表,然后决定是否授权此次调用。这就好比三兄弟每家家门口安装了一个110联网的指纹识别器,通过远程指纹识别敲门人的身份。

一般来说,简单授权在业务规则简单、安全性要求不高的场景下用的比较多。而协议授权,比较适用于点对点或者 C/S 架构的服务调用场景,比如 Amazon S3 API。对于网状结构的微服务而言,中央授权是三种方式中最适合也是最灵活的选择:

  1. 简化了服务提供方的实现,让提供方专注于权限设计而非实现。
  2. 更重要的是提供了一套独立于服务提供方和服务调用方的授权机制,无需重新发布服务,只要在授权中心修改服务授权规则,就可以影响后续的服务调用。

OAuth

说起具体的授权协议,很多人第一反应就是OAuth。事实上也的确如此,很多互联网公司的开放平台都是基于 OAuth 协议实现的,比如 Google APIs微信网页授权接口。一次标准的 OAuth 授权过程如下:

image_1bokoo1kq103u1smr19f13rtie61g

对应到微服务场景,服务提供方相当于上图中的 Resource Server,服务调用方相当于 Client,而授权中心相当于 Authorization Server 和 Resource Owner 的合体。

Beared Token

在标准的 OAuth 授权过程中,Resource Server 收到 Client 发来的请求后,需要到 Authorization Server 验证 Access Token,并获取 Client 的进一步信息。通过 OAuth 2.0 版本引入中的 Beared Token,我们可以省去这一次调用,将 Client 信息存入 Access Token,并在 Resource Server 端完成 Access Token 的鉴权。主流的 Beared Token 有 SAMLJWT 两种格式,SAML 基于 XML,而 JWT 基于 JSON。由于大多数微服务都使用 JSON 作为序列化格式,JWT 使用的更为广泛。

框架选型

在选型OAuth框架时,我主要调研了 CASApache OltuSpring Security OAuthOAuth-Apis,对比如下:

image_1bokoq7rq5gk1vnp1sbtvi9uud1t

不考虑实际业务场景,CAS 和 Spring Security OAuth 相对另外两种框架,无论是集成成本还是可扩展性,都有明显优势。前文提到,由于我们选用了 Spring Boot 作为统一的微服务实现框架,Spring Security OAuth 是更自然的选择,并且维护成本相对低一些(服务端)。

最终方案

最后我们基于 Spring Security OAuth 框架实现了自己的服务授权中心,鉴权部分目前支持私网认证,Scope 校验和域名校验。大致的服务授权流程如下:

image_1bokpif4c178t1m131olg2c4r8m3h

4. 更多

微服务是一个很大的话题,自 Martin Fowler 在 2014 年 3 月提出以来,愈演愈热,并跟另一个话题容器化一起开创了一个全新的 DevOps 时代,引领了国内外大大小小各个互联网公司的技术走向,也影响了我们这一代程序员尤其是后端和运维的思维方式。希望我的这两篇文章能给你带来一些新的启发和思考,欢迎留言交流。

少年读书如隙中窥月,中年读书如庭中望月,老年读书如台上玩月,皆以阅历之浅深为所得之浅深耳。-- 张潮 《幽梦影》

2+

喜欢该文章的用户:

  • avatar

乐高式微服务化改造(上)—— 微服务简介

技术圈流行一句话,凡脱离业务谈架构的,都是耍流氓。当新需求响应越来越慢,当加班成为家常便饭,你可曾怀念当年一下午徒手写一千行代码的爽快?面对一个不断吞噬团队时间的庞然大物(单体应用),分而治之往往是最有效的方法。今天我就和大家聊聊我对小公司如何进行微服务化改造的理解和一手经验。

1. 微服务简介

有关微服务的定义,最权威的版本莫属微服务之父 Martin Fowler 在 Microservices Resource Guide 一文中所述:

In short, the microservice architectural style is an approach to developing a single application as a suite of small services, each running in its own process and communicating with lightweight mechanisms, often an HTTP resource API. These services are built around business capabilities and independently deployable by fully automated deployment machinery. -- James Lewis and Martin Fowler

注意其中有3个关键词:small,independently deployable 和 automated deployment。Small 对应的就是微服务的微,很多初次接触微服务的同学对微的理解往往会停留在实现层面,以为代码少就是微,但实际上,这里的微更多的是体现在逻辑层面。微服务的一个重要设计原则是 share as little as possible,什么意思呢?就是说每个微服务应该设计成边界清晰不重叠,数据独享不共享,也就是我们常说的高内聚、低耦合。保证了 small,才能做到 independently deployable。而实现 automated deployment 的关键是 DevOps 文化,可参见 Fowler 另一篇谈 DevOps 的文章。需要提醒的是,随着业务复杂度的上升,一个微服务可能需要拆分为更多更细粒度的微服务,比方说,一开始只是一个简单的订单服务,后面逐步拆分出清算,支付,结算,对账等其他服务。

从本质上来看,相对单体应用,微服务是以牺牲强一致性、提高部署复杂性为代价,换取更彻底的分布式特性,比如异构性和强隔离性。对应 CAP 理论,就是用 Consistency 换 Partition。异构性比较容易理解,通过定义统一的 API 规范(一般采用 REST 风格),每个微服务团队可以根据各自的能力矩阵选用最适合的技术栈,而不是所有人必须使用相同的技术栈。强隔离性指的是,对于一个典型的单体应用,隔离性最高只能体现到模块级别,由于共享同一个代码仓库,模块的边界往往比较模糊,需要人为定义很多规范来保证良好的隔离性,但无论如何强调,稍一疏忽,就会产生“越界”行为,时间愈长,维护隔离性的成本愈高。而到了微服务阶段,自带应用级别的隔离性,“越界”的成本大大提升,无需任何规范,架构本身就保证了隔离性。

另一方面,由于采用了分布式架构,微服务无法再简单的通过数据库事务来保证强一致性,而是通过消息中间件或者某种事务补偿机制来保证最终一致性,比如微信朋友圈的点赞,淘宝订单的物流状态。其次,在微服务阶段,随着应用数量的激增,一次发布往往涉及多个应用,加上异构性带来的部署方式的多样性,对团队的运维水平尤其是自动化水平提出了更高的要求,运维和开发的边界进一步模糊。

sketch

讲完这些有关微服务的背景知识之后,现在就切入今天的正题,面对快速增长的业务需求,小公司如何进行微服务化改造?下面就以我在杏仁主导实施的微服务化改造的全过程为背景,给大家简单说一下我们微服务化改造的总体思路和核心中间件的技术选型过程。

2. 项目背景

首先介绍一下微服务化改造的背景。去年年初,在历经2年多的产品迭代之后,整个后台应用越来越庞大,已经成为一个典型意义上的 monolithic application:1.各个业务模块犬牙交错,重复代码随处可见,补丁代码越打越多。2.任何一个改动都需要一次全量发布,哪怕是修改一句文案,极大的拖慢了迭代速度。

与此同时,由于公司电商业务变得越来越复杂,老的业务模型越来越难以满足新的需求,急需对原有的订单模块进行重构,或者抽取一个独立的订单服务来进行支撑。反复考量之后,我们选择了后者。由于是团队第一次试水微服务,并且初期人员有限(一人主导,多人配合),最后我们决定走一条比较实用的改良式路线:

  • 最小化对已有应用的侵入性
  • 偏好主流的微服务框架
  • 只做必要的微服务治理

第一条定下了此次改造的基调,降低了方案无法落地的风险,确保了项目的整体可行性。第二条让我们站在巨人的肩膀上,不重复造轮子,聚焦在问题本身,而不是工具。第三条缩减项目范围,避免过度工程,以战养兵,不打无用之仗。

下图展示了目前杏仁微服务的整体架构。

image_1bokpsoft19241ap2i1qmjfich3u

3. 基本框架

基本框架我们选择的是 Spring Boot。Spring Boot 是 Spring开源社区提供的一个去容器、去XML配置的应用框架。和标准的基于 war 包的 Web 应用相比,Spring Boot 应用可以直接以 java -jar 的方式运行,也就是说不再需要部署到一个独立的 Web 容器(比如
Tomcat)中才能运行。其背后的运行机制简单来说就是,当一个 Spring Boot 应用启动时,在加载完核心框架类之后,会启动一个内嵌的 Web 容器(默认是 Tomcat),然后再加载应用本身的各种配置类和 Bean。也就是说不再是容器包应用,而是应用包容器。正是由于这个特性,Spring Boot 非常适用于开发微服务,毫不夸张的说 Spring Boot 就是为微服务而生。

image_1bokn9bli1st585p1q4515kjqsf9

有同学可能会问,不是还有 DubboSpring Cloud 吗?Dubbo 是阿里开源的第一代 RPC 框架,早在 2011 年就已经发布了 2.0 版本,三年后也就是 2014 年,Martin Fowler 才提出了微服务的概念。虽然用 Dubbo 也能开发微服务,但这就好比用EJB的规范去开发 Spring Bean,怎么用怎么别扭。Dubbo 最大的问题是升级缓慢,最近一次发布还是 2014 年 10 月,支持的 Spring 版本是 2.5.6.SEC03,要知道 Spring 5 都快出来了。

Spring Cloud 可以说是目前 Java 社区最好最完整的微服务框架(没有之一),底层用的也是 Spring Boot,照着 Spring Cloud 的新手指南,分分钟就可以搭建出一整套微服务应用,非常适合改革式但不是改良式的微服务改造,因为非 Spring 应用难以集成。

作为硬币的另一面,选用 Spring Boot,意味着我们需要做大量的自定义工作,以弥补 Spring Boot 在微服务治理方面所欠缺的能力,比如即将在下篇介绍的注册中心、配置中心和授权中心。欢迎留言交流你的心得和见解。

3+

喜欢该文章的用户:

  • avatar