尊龙凯时外语学习交流中心

导航切换

联系电话:
020-88888888     13988889999

尊龙凯时外语学习交流中心

尊龙凯时外语学习交流中心

Apache Calcite 优化器详解(三)

作者: 佚名 浏览:   日期:2024-07-29

介绍完 HepPlanner 之后,接下来再来看下基于成本优化(CBO)模型在 Calcite 中是如何实现、如何落地的,关于 Volcano 理论内容建议先看下相关理论知识,否则直接看实现的话可能会有一些头大。从 Volcano 模型的理论落地到实践是有很大区别的,这里先看一张 VolcanoPlanner 整体实现图,如下所示(图片来自?Cost-based Query Optimization in Apache Phoenix using Apache Calcite):

640?wx_fmt=png

?

上面基本展现了 VolcanoPlanner 内部实现的流程,也简单介绍了 VolcanoPlanner 在实现中的一些关键点(有些概念暂时不了解也不要紧,后面会介绍):

  1. Add Rule matches to Queue:向 Rule Match Queue 中添加相应的 Rule Match;

  2. Apply Rule match transformations to plan gragh:应用 Rule Match 对 plan graph 做 transformation 优化(Rule specifies an Operator sub-graph to match and logic to generate equivalent better sub-graph);

  3. Iterate for fixed iterations or until cost doesn’t change:进行相应的迭代,直到 cost 不再变化或者 Rule Match Queue 中 rule match 已经全部应用完成;

  4. Match importance based on cost of RelNode and height:Rule Match 的 importance 依赖于 RelNode 的 cost 和深度。

使用 VolcanoPlanner 实现的完整代码见?SqlVolcanoTest。

下面来看下 VolcanoPlanner 实现具体的细节。

VolcanoPlanner 在实现中引入了一些基本概念,先明白这些概念对于理解 VolcanoPlanner 的实现非常有帮助。

RelSet

关于 RelSet,源码中介绍如下:

RelSet is an equivalence-set of expressions that is, a set of expressions which have?identical semantics.
We are generally interested in using the expression which has?the lowest cost.
All of the expressions in an RelSet have the?same calling convention.

它有以下特点:

  1. 描述一组等价 Relation Expression,所有的 RelNode 会记录在中;

  2. have the same calling convention;

  3. 具有相同物理属性的 Relational Expression 会记录在其成员变量中.

RelSet 中比较重要成员变量如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class RelSet {
   // 记录属于这个 RelSet 的所有 RelNode
  final List<RelNode> rels = new ArrayList<>();
  
  final List<RelNode> parents = new ArrayList<>();
  //note: 具体相同物理属性的子集合(本质上 RelSubset 并不记录 RelNode,也是通过 RelSet 按物理属性过滤得到其 RelNode 子集合,见下面的 RelSubset 部分)
  final List<RelSubset> subsets = new ArrayList<>();

  
  final List<AbstractConverter> abstractConverters = new ArrayList<>();

  
  RelSet equivalentSet;
  RelNode rel;

  
  final Set<CorrelationId> variablesPropagated;

  
  final Set<CorrelationId> variablesUsed;
  final int id;

  
  boolean inMetadataQuery;
}

关于 RelSubset,源码中介绍如下:

Subset of an equivalence class where all relational expressions have the same physical properties.

它的特点如下:

  1. 描述一组物理属性相同的等价 Relation Expression,即它们具有相同的 Physical Properties;

  2. 每个 RelSubset 都会记录其所属的 RelSet;

  3. RelSubset 继承自 AbstractRelNode,它也是一种 RelNode,物理属性记录在其成员变量 traitSet 中。

RelSubset 一些比较重要的成员变量如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class RelSubset extends AbstractRelNode {
  
  RelOptCost bestCost;

  
  final RelSet set;

  
  RelNode best;

  
  boolean boosted;

  //~ Constructors -----------------------------------------------------------
  RelSubset(
      RelOptCluster cluster,
      RelSet set,
      RelTraitSet traits) {
    super(cluster, traits); // 继承自 AbstractRelNode,会记录其相应的 traits 信息
    this.set = set;
    this.boosted = false;
    assert traits.allSimple();
    computeBestCost(cluster.getPlanner()); //note: 计算 best
    recomputeDigest(); //note: 计算 digest
  }
}

每个 RelSubset 都将会记录其最佳 plan()和最佳 plan 的 cost()信息。

RuleMatch

RuleMatch 是这里对 Rule 和 RelSubset 关系的一个抽象,它会记录这两者的信息。

A match of a rule to a particular set of target relational expressions, frozen in time.

importance

importance 决定了在进行 Rule 优化时 Rule 应用的顺序,它是一个相对概念,在 VolcanoPlanner 中有两个 importance,分别是 RelSubset 和 RuleMatch 的 importance,这里先提前介绍一下。

RelSubset 的 importance

RelSubset importance 计算方法见其 api 定义(图中的 sum 改成 Math.max{}这个地方有误):

640?wx_fmt=png

omputeImportance

举个例子:假设一个 RelSubset(记为?s0s0) 的 cost 是3,对应的 importance 是0.5,这个 RelNode 有两个输入(inputs),对应的 RelSubset 记为?s1s1、s2s2(假设?s1s1、s2s2?不再有输入 RelNode),其 cost 分别为 2和5,那么?s1s1?的 importance 为

Importance of?s1s1?=?23+2+523+2+50.5 = 0.1

Importance of?s2s2?=?53+2+553+2+50.5 = 0.25

其中,2代表的是?s1s1?的 cost,3+2+53+2+5?代表的是?s0s0?的 cost(本节点的 cost 加上其所有 input 的 cost)。下面看下其具体的代码实现(调用 RuleQueue 中的计算其 importance):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//org.apache.calcite.plan.volcano.RuleQueue

public void recompute(RelSubset subset, boolean force) {
  Double previousImportance = subsetImportances.get(subset);
  if (previousImportance == null) { //note: subset 还没有注册的情况下
    if (!force) { //note: 如果不是强制,可以直接先返回
      // Subset has not been registered yet. Don't worry about it.
      return;
    }

    previousImportance = Double.NEGATIVE_INFINITY;
  }

  //note: 计算器 importance 值
  double importance = computeImportance(subset);
  if (previousImportance == importance) {
    return;
  }

  //note: 缓存中更新其 importance
  updateImportance(subset, importance);
}


// 计算一个节点的 importance
double computeImportance(RelSubset subset) {
  double importance;
  if (subset == planner.root) {
    // The root always has importance = 1
    //note: root RelSubset 的 importance 为1
    importance = 1.0;
  } else {
    final RelMetadataQuery mq = subset.getCluster().getMetadataQuery();

    // The importance of a subset is the max of its importance to its
    // parents
    //note: 计算其相对于 parent 的最大 importance,多个 parent 的情况下,选择一个最大值
    importance = 0.0;
    for (RelSubset parent : subset.getParentSubsets(planner)) {
      //note: 计算这个 RelSubset 相对于 parent 的 importance
      final double childImportance =
          computeImportanceOfChild(mq, subset, parent);
      //note: 选择最大的 importance
      importance = Math.max(importance, childImportance);
    }
  }
  LOGGER.trace("Importance of [{}] is {}", subset, importance);
  return importance;
}

//note:根据 cost 计算 child 相对于 parent 的 importance(这是个相对值)
private double computeImportanceOfChild(RelMetadataQuery mq, RelSubset child,
    RelSubset parent) {
  //note: 获取 parent 的 importance
  final double parentImportance = getImportance(parent);
  //note: 获取对应的 cost 信息
  final double childCost = toDouble(planner.getCost(child, mq));
  final double parentCost = toDouble(planner.getCost(parent, mq));
  double alpha = childCost / parentCost;
  if (alpha >= 1.0) {
    // child is always less important than parent
    alpha = 0.99;
  }
  //note: 根据 cost 比列计算其 importance
  final double importance = parentImportance * alpha;
  LOGGER.trace("Importance of [{}] to its parent [{}] is {} (parent importance={}, child cost={},"
      + " parent cost={})", child, parent, importance, parentImportance, childCost, parentCost);
  return importance;
}

在中计算 RelSubset 相对于 parent RelSubset 的 importance 时,一个比较重要的地方就是如何计算 cost,关于 cost 的计算见:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//org.apache.calcite.plan.volcano.VolcanoPlanner
//note: Computes the cost of a RelNode.
public RelOptCost getCost(RelNode rel, RelMetadataQuery mq) {
  assert rel != null : "pre-condition: rel != null";
  if (rel instanceof RelSubset) { //note: 如果是 RelSubset,证明是已经计算 cost 的 subset
    return ((RelSubset) rel).bestCost;
  }
  if (rel.getTraitSet().getTrait(ConventionTraitDef.INSTANCE)
      == Convention.NONE) {
    return costFactory.makeInfiniteCost(); //note: 这种情况下也会返回 infinite Cost
  }
  //note: 计算其 cost
  RelOptCost cost = mq.getNonCumulativeCost(rel);
  if (!zeroCost.isLt(cost)) { //note: cost 比0还小的情况
    // cost must be positive, so nudge it
    cost = costFactory.makeTinyCost();
  }
  //note: RelNode 的 cost 会把其 input 全部加上
  for (RelNode input : rel.getInputs()) {
    cost = cost.plus(getCost(input, mq));
  }
  return cost;
}

上面就是 RelSubset importance 计算的代码实现,从实现中可以发现这个特点:

  1. 越靠近 root 的 RelSubset,其 importance 越大,这个带来的好处就是在优化时,会尽量先优化靠近 root 的 RelNode,这样带来的收益也会最大。

RuleMatch 的 importance

RuleMatch 的 importance 定义为以下两个中比较大的一个(如果对应的 RelSubset 有 importance 的情况下):

  1. 这个 RuleMatch 对应 RelSubset(这个 rule match 的 RelSubset)的 importance;

  2. 输出的 RelSubset(taget RelSubset)的 importance(如果这个 RelSubset 在 VolcanoPlanner 的缓存中存在的话)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//org.apache.calcite.plan.volcano.VolcanoRuleMatch

double computeImportance() {
  assert rels[0] != null; //note: rels[0] 这个 Rule Match 对应的 RelSubset
  RelSubset subset = volcanoPlanner.getSubset(rels[0]);
  double importance = 0;
  if (subset != null) {
    //note: 获取 RelSubset 的 importance
    importance = volcanoPlanner.ruleQueue.getImportance(subset);
  }
  //note: Returns a guess as to which subset the result of this rule will belong to.
  final RelSubset targetSubset = guessSubset();
  if ((targetSubset != null) && (targetSubset != subset)) {
    // If this rule will generate a member of an equivalence class
    // which is more important, use that importance.
    //note: 获取 targetSubset 的 importance
    final double targetImportance =
        volcanoPlanner.ruleQueue.getImportance(targetSubset);
    if (targetImportance > importance) {
      importance = targetImportance;

      // If the equivalence class is cheaper than the target, bump up
      // the importance of the rule. A converter is an easy way to
      // make the plan cheaper, so we'd hate to miss this opportunity.
      //
      // REVIEW: jhyde, 2007/12/21: This rule seems to make sense, but
      // is disabled until it has been proven.
      //
      // CHECKSTYLE: IGNORE 3
      if ((subset != null)
          && subset.bestCost.isLt(targetSubset.bestCost)
          && false) { //note: 肯定不会进入
        importance *=
            targetSubset.bestCost.divideBy(subset.bestCost);
        importance = Math.min(importance, 0.99);
      }
    }
  }

  return importance;
}

RuleMatch 的 importance 主要是决定了在选择 RuleMatch 时,应该先处理哪一个?它本质上还是直接用的 RelSubset 的 importance。

VolcanoPlanner 处理流程

还是以前面的示例,只不过这里把优化器换成 VolcanoPlanner 来实现,通过这个示例来详细看下 VolcanoPlanner 内部的实现逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//1. 初始化 VolcanoPlanner 对象,并添加相应的 Rule
VolcanoPlanner planner = new VolcanoPlanner();
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
planner.addRelTraitDef(RelDistributionTraitDef.INSTANCE);
// 添加相应的 rule
planner.addRule(FilterJoinRule.FilterIntoJoinRule.FILTER_ON_JOIN);
planner.addRule(ReduceExpressionsRule.PROJECT_INSTANCE);
planner.addRule(PruneEmptyRules.PROJECT_INSTANCE);
// 添加相应的 ConverterRule
planner.addRule(EnumerableRules.ENUMERABLE_MERGE_JOIN_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_SORT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_VALUES_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_PROJECT_RULE);
planner.addRule(EnumerableRules.ENUMERABLE_FILTER_RULE);
//2. Changes a relational expression to an equivalent one with a different set of traits.
RelTraitSet desiredTraits =
    relNode.getCluster().traitSet().replace(EnumerableConvention.INSTANCE);
relNode = planner.changeTraits(relNode, desiredTraits);
//3. 通过 VolcanoPlanner 的 setRoot 方法注册相应的 RelNode,并进行相应的初始化操作
planner.setRoot(relNode);
//4. 通过动态规划算法找到 cost 最小的 plan
relNode = planner.findBestExp();

优化后的结果为:

1
2
3
4
5
6
7
EnumerableSort(sort0=[$0], dir0=[ASC])
  EnumerableProject(USER_ID=[$0], USER_NAME=[$1], USER_COMPANY=[$5], USER_AGE=[$2])
    EnumerableMergeJoin(condition=[=($0, $3)], joinType=[inner])
      EnumerableFilter(condition=[>($2, 30)])
        EnumerableTableScan(table=[[USERS]])
      EnumerableFilter(condition=[>($0, 10)])
        EnumerableTableScan(table=[[JOBS]])

在应用 VolcanoPlanner 时,整体分为以下四步:

  1. 初始化 VolcanoPlanner,并添加相应的 Rule(包括 ConverterRule);

  2. 对 RelNode 做等价转换,这里只是改变其物理属性();

  3. 通过 VolcanoPlanner 的方法注册相应的 RelNode,并进行相应的初始化操作;

  4. 通过动态规划算法找到 cost 最小的 plan;

下面来分享一下上面的详细流程。

1. VolcanoPlanner 初始化

在这里总共有三步,分别是 VolcanoPlanner 初始化,?添加 RelTraitDef,添加 rule,先看下 VolcanoPlanner 的初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//org.apache.calcite.plan.volcano.VolcanoPlanner

public VolcanoPlanner() {
  this(null, null);
}


public VolcanoPlanner(RelOptCostFactory costFactory, //
    Context externalContext) {
  super(costFactory == null ? VolcanoCost.FACTORY : costFactory, //
      externalContext);
  this.zeroCost = this.costFactory.makeZeroCost();
}

这里其实并没有做什么,只是做了一些简单的初始化,如果要想设置相应 RelTraitDef 的话,需要调用进行添加,其实现如下:

1
2
3
4
5
//org.apache.calcite.plan.volcano.VolcanoPlanner
//note: 添加 RelTraitDef
@Override public boolean addRelTraitDef(RelTraitDef relTraitDef) {
  return !traitDefs.contains(relTraitDef) && traitDefs.add(relTraitDef);
}

如果要给 VolcanoPlanner 添加 Rule 的话,需要调用进行添加,在这个方法里重点做的一步是将具体的 RelNode 与 RelOptRuleOperand 之间的关系记录下来,记录到中,相当于在优化时,哪个 RelNode 可以应用哪些 Rule 都是记录在这个缓存里的。其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//org.apache.calcite.plan.volcano.VolcanoPlanner
//note: 添加 rule
public boolean addRule(RelOptRule rule) {
  if (locked) {
    return false;
  }
  if (ruleSet.contains(rule)) {
    // Rule already exists.
    return false;
  }
  final boolean added = ruleSet.add(rule);
  assert added;

  final String ruleName = rule.toString();
  //note: 这里的 ruleNames 允许重复的 key 值,但是这里还是要求 rule description 保持唯一的,与 rule 一一对应
  if (ruleNames.put(ruleName, rule.getClass())) {
    Set<Class> x = ruleNames.get(ruleName);
    if (x.size() > 1) {
      throw new RuntimeException("Rule description '" + ruleName
          + "' is not unique; classes: " + x);
    }
  }

  //note: 注册一个 rule 的 description(保存在 mapDescToRule 中)
  mapRuleDescription(rule);

  // Each of this rule's operands is an 'entry point' for a rule call. Register each operand against all concrete sub-classes that could match it.
  //note: 记录每个 sub-classes 与 operand 的关系(如果能 match 的话,就记录一次)。一个 RelOptRuleOperand 只会有一个 class 与之对应,这里找的是 subclass
  for (RelOptRuleOperand operand : rule.getOperands()) {
    for (Class<? extends RelNode> subClass
        : subClasses(operand.getMatchedClass())) {
      classOperands.put(subClass, operand);
    }
  }

  // If this is a converter rule, check that it operates on one of the
  // kinds of trait we are interested in, and if so, register the rule
  // with the trait.
  //note: 对于 ConverterRule 的操作,如果其 ruleTraitDef 类型包含在我们初始化的 traitDefs 中,
  //note: 就注册这个 converterRule 到 ruleTraitDef 中
  //note: 如果不包含 ruleTraitDef,这个 ConverterRule 在本次优化的过程中是用不到的
  if (rule instanceof ConverterRule) {
    ConverterRule converterRule = (ConverterRule) rule;

    final RelTrait ruleTrait = converterRule.getInTrait();
    final RelTraitDef ruleTraitDef = ruleTrait.getTraitDef();
    if (traitDefs.contains(ruleTraitDef)) { //note: 这里注册好像也没有用到
      ruleTraitDef.registerConverterRule(this, converterRule);
    }
  }

  return true;
}

2. RelNode changeTraits

这里分为两步:

  1. 通过 RelTraitSet 的方法,将 RelTraitSet 中对应的 RelTraitDef 做对应的更新,其他的 RelTrait 不变;

  2. 这一步简单来说就是:Changes a relational expression to an equivalent one with a different set of traits,对相应的 RelNode 做 converter 操作,这里实际上也会做很多的内容,这部分会放在第三步讲解,主要是方法的实现。

3. VolcanoPlanner setRoot

VolcanoPlanner 会调用方法注册相应的 Root RelNode,并进行一系列 Volcano 必须的初始化操作,很多的操作都是在这里实现的,这里来详细看下其实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//org.apache.calcite.plan.volcano.VolcanoPlanner
public void setRoot(RelNode rel) {
  // We're registered all the rules, and therefore RelNode classes,
  // we're interested in, and have not yet started calling metadata providers.
  // So now is a good time to tell the metadata layer what to expect.
  registerMetadataRels();

  //note: 注册相应的 RelNode,会做一系列的初始化操作, RelNode 会有对应的 RelSubset
  this.root = registerImpl(rel, null);
  if (this.originalRoot == null) {
    this.originalRoot = rel;
  }

  // Making a node the root changes its importance.
  //note: 重新计算 root subset 的 importance
  this.ruleQueue.recompute(this.root);
  //Ensures that the subset that is the root relational expression contains converters to all other subsets in its equivalence set.
  ensureRootConverters();
}

对于方法来说,核心的处理流程是在方法中,在这个方法会进行相应的初始化操作(包括 RelNode 到 RelSubset 的转换、计算 RelSubset 的 importance 等),其他的方法在上面有相应的备注,这里我们看下具体做了哪些事情:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
//org.apache.calcite.plan.volcano.VolcanoPlanner

private RelSubset registerImpl(
    RelNode rel,
    RelSet set) {
  if (rel instanceof RelSubset) { //note: 如果是 RelSubset 类型,已经注册过了
    return registerSubset(set, (RelSubset) rel); //note: 做相应的 merge
  }

  assert !isRegistered(rel) : "already been registered: " + rel;
  if (rel.getCluster().getPlanner() != this) { //note: cluster 中 planner 与这里不同
    throw new AssertionError("Relational expression " + rel
        + " belongs to a different planner than is currently being used.");
  }

  // Now is a good time to ensure that the relational expression
  // implements the interface required by its calling convention.
  //note: 确保 relational expression 可以实施其 calling convention 所需的接口
  //note: 获取 RelNode 的 RelTraitSet
  final RelTraitSet traits = rel.getTraitSet();
  //note: 获取其 ConventionTraitDef
  final Convention convention = traits.getTrait(ConventionTraitDef.INSTANCE);
  assert convention != null;
  if (!convention.getInterface().isInstance(rel)
      && !(rel instanceof Converter)) {
    throw new AssertionError("Relational expression " + rel
        + " has calling-convention " + convention
        + " but does not implement the required interface '"
        + convention.getInterface() + "' of that convention");
  }
  if (traits.size() != traitDefs.size()) {
    throw new AssertionError("Relational expression " + rel
        + " does not have the correct number of traits: " + traits.size()
        + " != " + traitDefs.size());
  }

  // Ensure that its sub-expressions are registered.
  //note: 其实现在 AbstractRelNode 对应的方法中,实际上调用的还是 ensureRegistered 方法进行注册
  //note: 将 RelNode 的所有 inputs 注册到 planner 中
  //note: 这里会递归调用 registerImpl 注册 relNode 与 RelSet,直到其 inputs 全部注册
  //note: 返回的是一个 RelSubset 类型
  rel = rel.onRegister(this);

  // Record its provenance. (Rule call may be null.)
  //note: 记录 RelNode 的来源
  if (ruleCallStack.isEmpty()) { //note: 不知道来源时
    provenanceMap.put(rel, Provenance.EMPTY);
  } else { //note: 来自 rule 触发的情况
    final VolcanoRuleCall ruleCall = ruleCallStack.peek();
    provenanceMap.put(
        rel,
        new RuleProvenance(
            ruleCall.rule,
            ImmutableList.copyOf(ruleCall.rels),
            ruleCall.id));
  }

  // If it is equivalent to an existing expression, return the set that
  // the equivalent expression belongs to.
  //note: 根据 RelNode 的 digest(摘要,全局唯一)判断其是否已经有对应的 RelSubset,有的话直接放回
  String key = rel.getDigest();
  RelNode equivExp = mapDigestToRel.get(key);
  if (equivExp == null) { //note: 还没注册的情况
    // do nothing
  } else if (equivExp == rel) {//note: 已经有其缓存信息
    return getSubset(rel);
  } else {
    assert RelOptUtil.equal(
        "left", equivExp.getRowType(),
        "right", rel.getRowType(),
        Litmus.THROW);
    RelSet equivSet = getSet(equivExp); //note: 有 RelSubset 但对应的 RelNode 不同时,这里对其 RelSet 做下 merge
    if (equivSet != null) {
      LOGGER.trace(
          "Register: rel#{} is equivalent to {}", rel.getId(), equivExp.getDescription());
      return registerSubset(set, getSubset(equivExp));
    }
  }

  //note: Converters are in the same set as their children.
  if (rel instanceof Converter) {
    final RelNode input = ((Converter) rel).getInput();
    final RelSet childSet = getSet(input);
    if ((set != null)
        && (set != childSet)
        && (set.equivalentSet == null)) {
      LOGGER.trace(
          "Register #{} {} (and merge sets, because it is a conversion)",
          rel.getId(), rel.getDigest());
      merge(set, childSet);
      registerCount++;

      // During the mergers, the child set may have changed, and since
      // we're not registered yet, we won't have been informed. So
      // check whether we are now equivalent to an existing
      // expression.
      if (fixUpInputs(rel)) {
        rel.recomputeDigest();
        key = rel.getDigest();
        RelNode equivRel = mapDigestToRel.get(key);
        if ((equivRel != rel) && (equivRel != null)) {
          assert RelOptUtil.equal(
              "rel rowtype",
              rel.getRowType(),
              "equivRel rowtype",
              equivRel.getRowType(),
              Litmus.THROW);

          // make sure this bad rel didn't get into the
          // set in any way (fixupInputs will do this but it
          // doesn't know if it should so it does it anyway)
          set.obliterateRelNode(rel);

          // There is already an equivalent expression. Use that
          // one, and forget about this one.
          return getSubset(equivRel);
        }
      }
    } else {
      set = childSet;
    }
  }

  // Place the expression in the appropriate equivalence set.
  //note: 把 expression 放到合适的 等价集 中
  //note: 如果 RelSet 不存在,这里会初始化一个 RelSet
  if (set == null) {
    set = new RelSet(
        nextSetId++,
        Util.minus(
            RelOptUtil.getVariablesSet(rel),
            rel.getVariablesSet()),
        RelOptUtil.getVariablesUsed(rel));
    this.allSets.add(set);
  }

  // Chain to find 'live' equivalent set, just in case several sets are
  // merging at the same time.
  //note: 递归查询,一直找到最开始的 语义相等的集合,防止不同集合同时被 merge
  while (set.equivalentSet != null) {
    set = set.equivalentSet;
  }

  // Allow each rel to register its own rules.
  registerClass(rel);

  registerCount++;
  //note: 初始时是 0
  final int subsetBeforeCount = set.subsets.size();
  //note: 向等价集中添加相应的 RelNode,并更新其 best 信息
  RelSubset subset = addRelToSet(rel, set);

  //note: 缓存相关信息,返回的 key 之前对应的 value
  final RelNode xx = mapDigestToRel.put(key, rel);
  assert xx == null || xx == rel : rel.getDigest();

  LOGGER.trace("Register {} in {}", rel.getDescription(), subset.getDescription());

  // This relational expression may have been registered while we
  // recursively registered its children. If this is the case, we're done.
  if (xx != null) {
    return subset;
  }

  // Create back-links from its children, which makes children more
  // important.
  //note: 如果是 root,初始化其 importance 为 1.0
  if (rel == this.root) {
    ruleQueue.subsetImportances.put(
        subset,
        1.0); // todo: remove
  }
  //note: 将 Rel 的 input 对应的 RelSubset 的 parents 设置为当前的 Rel
  //note: 也就是说,一个 RelNode 的 input 为其对应 RelSubset 的 children 节点
  for (RelNode input : rel.getInputs()) {
    RelSubset childSubset = (RelSubset) input;
    childSubset.set.parents.add(rel);

    // Child subset is more important now a new parent uses it.
    //note: 重新计算 RelSubset 的 importance
    ruleQueue.recompute(childSubset);
  }
  if (rel == this.root) {// TODO: 2019-03-11 这里为什么要删除呢?
    ruleQueue.subsetImportances.remove(subset);
  }

  // Remember abstract converters until they're satisfied
  //note: 如果是 AbstractConverter 示例,添加到 abstractConverters 集合中
  if (rel instanceof AbstractConverter) {
    set.abstractConverters.add((AbstractConverter) rel);
  }

  // If this set has any unsatisfied converters, try to satisfy them.
  //note: check set.abstractConverters
  checkForSatisfiedConverters(set, rel);

  // Make sure this rel's subset importance is updated
  //note: 强制更新(重新计算) subset 的 importance
  ruleQueue.recompute(subset, true);

  //note: 触发所有匹配的 rule,这里是添加到对应的 RuleQueue 中
  // Queue up all rules triggered by this relexp's creation.
  fireRules(rel, true);

  // It's a new subset.
  //note: 如果是一个 new subset,再做一次触发
  if (set.subsets.size() > subsetBeforeCount) {
    fireRules(subset, true);
  }

  return subset;
}

?处理流程比较复杂,其方法实现,可以简单总结为以下几步:

  1. 在经过最上面的一些验证之后,会通过这步操作,递归地调用 VolcanoPlanner 的方法对其RelNode 进行注册,最后还是调用方法先注册叶子节点,然后再父节点,最后到根节点;

  2. 根据 RelNode 的 digest 信息(一般这个对于 RelNode 来说是全局唯一的),判断其是否已经存在缓存中,如果存在的话,那么判断会 RelNode 是否相同,如果相同的话,证明之前已经注册过,直接通过返回其对应的 RelSubset 信息,否则就对其 RelSubset 做下 merge;

  3. 如果 RelNode 对应的 RelSet 为 null,这里会新建一个 RelSet,并通过将 RelNode 添加到 RelSet 中,并且更新 VolcanoPlanner 的缓存记录(RelNode 与 RelSubset 的对应关系),在的最后还会更新 RelSubset 的 best plan 和 best cost(每当往一个 RelSubset 添加相应的 RelNode 时,都会判断这个 RelNode 是否代表了 best plan,如果是的话,就更新);

  4. 将这个 RelNode 的 inputs 设置为其对应 RelSubset 的 children 节点(实际的操作时,是在 RelSet 的中记录其父节点);

  5. 强制重新计算当前 RelNode 对应 RelSubset 的 importance;

  6. 如果这个 RelSubset 是新建的,会再触发一次方法(会先对 RelNode 触发一次),遍历找到所有可以 match 的 Rule,对每个 Rule 都会创建一个 VolcanoRuleMatch 对象(会记录 RelNode、RelOptRuleOperand 等信息,RelOptRuleOperand 中又会记录 Rule 的信息),并将这个 VolcanoRuleMatch 添加到对应的 RuleQueue 中(就是前面图中的那个 RuleQueue)。

这里,来看下方法的实现,它的目的是把配置的 RuleMatch 添加到 RuleQueue 中,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//org.apache.calcite.plan.volcano.VolcanoPlanner

void fireRules(
    RelNode rel,
    boolean deferred) {
  for (RelOptRuleOperand operand : classOperands.get(rel.getClass())) {
    if (operand.matches(rel)) { //note: rule 匹配的情况
      final VolcanoRuleCall ruleCall;
      if (deferred) { //note: 这里默认都是 true,会把 RuleMatch 添加到 queue 中
        ruleCall = new DeferringRuleCall(this, operand);
      } else {
        ruleCall = new VolcanoRuleCall(this, operand);
      }
      ruleCall.match(rel);
    }
  }
}


private static class DeferringRuleCall extends VolcanoRuleCall {
  DeferringRuleCall(
      VolcanoPlanner planner,
      RelOptRuleOperand operand) {
    super(planner, operand);
  }

  
  protected void onMatch() {
    final VolcanoRuleMatch match =
        new VolcanoRuleMatch(
            volcanoPlanner,
            getOperand0(), //note: 其实就是 operand
            rels,
            nodeInputs);
    volcanoPlanner.ruleQueue.addMatch(match);
  }
}

在上面的方法中,对于匹配的 Rule,将会创建一个 VolcanoRuleMatch 对象,之后再把这个 VolcanoRuleMatch 对象添加到对应的 RuleQueue 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//org.apache.calcite.plan.volcano.RuleQueue

void addMatch(VolcanoRuleMatch match) {
  final String matchName = match.toString();
  for (PhaseMatchList matchList : matchListMap.values()) {
    if (!matchList.names.add(matchName)) {
      // Identical match has already been added.
      continue;
    }

    String ruleClassName = match.getRule().getClass().getSimpleName();

    Set<String> phaseRuleSet = phaseRuleMapping.get(matchList.phase);
    //note: 如果 phaseRuleSet 不为 ALL_RULES,并且 phaseRuleSet 不包含这个 ruleClassName 时,就跳过(其他三个阶段都属于这个情况)
    //note: 在添加 rule match 时,phaseRuleSet 可以控制哪些 match 可以添加、哪些不能添加
    //note: 这里的话,默认只有处在 OPTIMIZE 阶段的 PhaseMatchList 可以添加相应的 rule match
    if (phaseRuleSet != ALL_RULES) {
      if (!phaseRuleSet.contains(ruleClassName)) {
        continue;
      }
    }

    LOGGER.trace("{} Rule-match queued: {}", matchList.phase.toString(), matchName);

    matchList.list.add(match);

    matchList.matchMap.put(
        planner.getSubset(match.rels[0]), match);
  }
}

到这里 VolcanoPlanner 需要初始化的内容都初始化完成了,下面就到了具体的优化部分。

4. VolcanoPlanner findBestExp

VolcanoPlanner 的是具体进行优化的地方,先介绍一下这里的优化策略(每进行一次迭代,?加1,它记录了总的迭代次数):

  1. 第一次找到可执行计划的迭代次数记为?,其对应的 Cost 暂时记为 BestCost;

  2. 制定下一次优化要达到的目标为?,再根据及当前的迭代次数计算?,这个值代表的意思是:如果迭代次数超过这个值还没有达到优化目标,那么将会放弃迭代,认为当前的 plan 就是 best plan;

  3. 如果 RuleQueue 中 RuleMatch 为空,那么也会退出迭代,认为当前的 plan 就是 best plan;

  4. 在每次迭代时都会从 RuleQueue 中选择一个 RuleMatch,策略是选择一个最高 importance 的 RuleMatch,可以保证在每次规则优化时都是选择当前优化效果最好的 Rule 去优化;

  5. 最后根据 best plan,构建其对应的 RelNode。

上面就是主要设计理念,这里来看其具体的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
//org.apache.calcite.plan.volcano.VolcanoPlanner

public RelNode findBestExp() {
  //note: 确保 root relational expression 的 subset(RelSubset)在它的等价集(RelSet)中包含所有 RelSubset 的 converter
  //note: 来保证 planner 从其他的 subsets 找到的实现方案可以转换为 root,否则可能因为 convention 不同,无法实施
  ensureRootConverters();
  //note: materialized views 相关,这里可以先忽略~
  registerMaterializations();
  int cumulativeTicks = 0; //note: 四个阶段通用的变量
  //note: 不同的阶段,总共四个阶段,实际上只有 OPTIMIZE 这个阶段有效,因为其他阶段不会有 RuleMatch
  for (VolcanoPlannerPhase phase : VolcanoPlannerPhase.values()) {
    //note: 在不同的阶段,初始化 RelSubSets 相应的 importance
    //note: root 节点往下子节点的 importance 都会被初始化
    setInitialImportance();

    //note: 默认是 VolcanoCost
    RelOptCost targetCost = costFactory.makeHugeCost();
    int tick = 0;
    int firstFiniteTick = -1;
    int splitCount = 0;
    int giveUpTick = Integer.MAX_VALUE;

    while (true) {
      ++tick;
      ++cumulativeTicks;
      //note: 第一次运行是 false,两个不是一个对象,一个是 costFactory.makeHugeCost, 一个是 costFactory.makeInfiniteCost
      //note: 如果低于目标 cost,这里再重新设置一个新目标、新的 giveUpTick
      if (root.bestCost.isLe(targetCost)) {
        //note: 本阶段第一次运行,目的是为了调用 clearImportanceBoost 方法,清除相应的 importance 信息
        if (firstFiniteTick < 0) {
          firstFiniteTick = cumulativeTicks;

          //note: 对于那些手动提高 importance 的 RelSubset 进行重新计算
          clearImportanceBoost();
        }
        if (ambitious) {
          // Choose a slightly more ambitious target cost, and
          // try again. If it took us 1000 iterations to find our
          // first finite plan, give ourselves another 100
          // iterations to reduce the cost by 10%.
          //note: 设置 target 为当前 best cost 的 0.9,调整相应的目标,再进行优化
          targetCost = root.bestCost.multiplyBy(0.9);
          ++splitCount;
          if (impatient) {
            if (firstFiniteTick < 10) {
              // It's possible pre-processing can create
              // an implementable plan -- give us some time
              // to actually optimize it.
              //note: 有可能在 pre-processing 阶段就实现一个 implementable plan,所以先设置一个值,后面再去优化
              giveUpTick = cumulativeTicks + 25;
            } else {
              giveUpTick =
                  cumulativeTicks
                      + Math.max(firstFiniteTick / 10, 25);
            }
          }
        } else {
          break;
        }
      //note: 最近没有任何进步(超过 giveUpTick 限制,还没达到目标值),直接采用当前的 best plan
      } else if (cumulativeTicks > giveUpTick) {
        // We haven't made progress recently. Take the current best.
        break;
      } else if (root.bestCost.isInfinite() && ((tick % 10) == 0)) {
        injectImportanceBoost();
      }

      LOGGER.debug("PLANNER = {}; TICK = {}/{}; PHASE = {}; COST = {}",
          this, cumulativeTicks, tick, phase.toString(), root.bestCost);

      VolcanoRuleMatch match = ruleQueue.popMatch(phase);
      //note: 如果没有规则,会直接退出当前的阶段
      if (match == null) {
        break;
      }

      assert match.getRule().matches(match);
      //note: 做相应的规则匹配
      match.onMatch();

      // The root may have been merged with another
      // subset. Find the new root subset.
      root = canonize(root);
    }

    //note: 当期阶段完成,移除 ruleQueue 中记录的 rule-match list
    ruleQueue.phaseCompleted(phase);
  }
  if (LOGGER.isTraceEnabled()) {
    StringWriter sw = new StringWriter();
    final PrintWriter pw = new PrintWriter(sw);
    dump(pw);
    pw.flush();
    LOGGER.trace(sw.toString());
  }
  //note: 根据 plan 构建其 RelNode 树
  RelNode cheapest = root.buildCheapestPlan(this);
  if (LOGGER.isDebugEnabled()) {
    LOGGER.debug(
        "Cheapest plan:
{}", RelOptUtil.toString(cheapest, SqlExplainLevel.ALL_ATTRIBUTES));

    LOGGER.debug("Provenance:
{}", provenance(cheapest));
  }
  return cheapest;
}

整体的流程正如前面所述,这里来看下 RuleQueue 中方法的实现,它的目的是选择 the highest importance 的 RuleMatch,这个方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
//org.apache.calcite.plan.volcano.RuleQueue

VolcanoRuleMatch popMatch(VolcanoPlannerPhase phase) {
  dump();

  //note: 选择当前阶段对应的 PhaseMatchList
  PhaseMatchList phaseMatchList = matchListMap.get(phase);
  if (phaseMatchList == null) {
    throw new AssertionError("Used match list for phase " + phase
        + " after phase complete");
  }

  final List<VolcanoRuleMatch> matchList = phaseMatchList.list;
  VolcanoRuleMatch match;
  for (;;) {
    //note: 按照前面的逻辑只有在 OPTIMIZE 阶段,PhaseMatchList 才不为空,其他阶段都是空
    // 参考 addMatch 方法
    if (matchList.isEmpty()) {
      return null;
    }
    if (LOGGER.isTraceEnabled()) {
      matchList.sort(MATCH_COMPARATOR);
      match = matchList.remove(0);

      StringBuilder b = new StringBuilder();
      b.append("Sorted rule queue:");
      for (VolcanoRuleMatch match2 : matchList) {
        final double importance = match2.computeImportance();
        b.append("
");
        b.append(match2);
        b.append(" importance ");
        b.append(importance);
      }

      LOGGER.trace(b.toString());
    } else { //note: 直接遍历找到 importance 最大的 match(上面先做排序,是为了输出日志)
      // If we're not tracing, it's not worth the effort of sorting the
      // list to find the minimum.
      match = null;
      int bestPos = -1;
      int i = -1;
      for (VolcanoRuleMatch match2 : matchList) {
        ++i;
        if (match == null
            || MATCH_COMPARATOR.compare(match2, match) < 0) {
          bestPos = i;
          match = match2;
        }
      }
      match = matchList.remove(bestPos);
    }

    if (skipMatch(match)) {
      LOGGER.debug("Skip match: {}", match);
    } else {
      break;
    }
  }

  // A rule match's digest is composed of the operand RelNodes' digests,
  // which may have changed if sets have merged since the rule match was
  // enqueued.
  //note: 重新计算一下这个 RuleMatch 的 digest
  match.recomputeDigest();

  //note: 从 phaseMatchList 移除这个 RuleMatch
  phaseMatchList.matchMap.remove(
      planner.getSubset(match.rels[0]), match);

  LOGGER.debug("Pop match: {}", match);
  return match;
}

到这里,我们就把 VolcanoPlanner 的优化讲述完了,当然并没有面面俱到所有的细节,VolcanoPlanner 的整体处理图如下:

640?wx_fmt=png


VolcanoPlanner 整体处理流程

一些思考

1. 初始化 RuleQueue 时,添加的 one useless rule name 有什么用?

在初始化 RuleQueue 时,会给 VolcanoPlanner 的四个阶段都初始化一个 PhaseMatchList 对象(记录这个阶段对应的 RuleMatch),这时候会给其中的三个阶段添加一个 useless rule,如下所示:

1
2
3
4
5
6
7
8
9
10
protected VolcanoPlannerPhaseRuleMappingInitializer
    getPhaseRuleMappingInitializer() {
  return phaseRuleMap -> {
    // Disable all phases except OPTIMIZE by adding one useless rule name.
    //note: 通过添加一个无用的 rule name 来 disable 优化器的其他三个阶段
    phaseRuleMap.get(VolcanoPlannerPhase.PRE_PROCESS_MDR).add("xxx");
    phaseRuleMap.get(VolcanoPlannerPhase.PRE_PROCESS).add("xxx");
    phaseRuleMap.get(VolcanoPlannerPhase.CLEANUP).add("xxx");
  };
}

开始时还困惑这个什么用?后来看到下面的代码基本就明白了

1
2
3
4
5
6
7
8
for (VolcanoPlannerPhase phase : VolcanoPlannerPhase.values()) {
  // empty phases get converted to "all rules"
  //note: 如果阶段对应的 rule set 为空,那么就给这个阶段对应的 rule set 添加一个 【ALL_RULES】
  //也就是只有 OPTIMIZE 这个阶段对应的会添加 ALL_RULES
  if (phaseRuleMapping.get(phase).isEmpty()) {
    phaseRuleMapping.put(phase, ALL_RULES);
  }
}

后面在调用 RuleQueue 的方法会做相应的判断,如果 phaseRuleSet 不为 ALL_RULES,并且 phaseRuleSet 不包含这个 ruleClassName 时,那么就跳过这个 RuleMatch,也就是说实际上只有?OPTIMIZE?这个阶段是发挥作用的,其他阶段没有添加任何 RuleMatch。

2. 四个 phase 实际上只用了 1个阶段,为什么要设置4个阶段?

VolcanoPlanner 的四个阶段?,实际只有进行真正的优化操作,其他阶段并没有,这里自己是有一些困惑的:

  1. 为什么要分为4个阶段,在添加 RuleMatch 时,是向四个阶段同时添加,这个设计有什么好处?为什么要优化四次?

  2. 设计了4个阶段,为什么默认只用了1个?

这两个问题,暂时也没有头绪,有想法的,欢迎交流。

平台注册入口