View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.util;
19  
20  import static org.tikv.common.key.Key.toRawKey;
21  import static org.tikv.common.util.KeyRangeUtils.formatByteString;
22  import static org.tikv.common.util.KeyRangeUtils.makeCoprocRange;
23  
24  import com.google.common.collect.ImmutableList;
25  import com.google.protobuf.ByteString;
26  import gnu.trove.list.array.TLongArrayList;
27  import gnu.trove.map.hash.TLongObjectHashMap;
28  import java.io.Serializable;
29  import java.util.*;
30  import org.tikv.common.key.RowKey;
31  import org.tikv.common.pd.PDUtils;
32  import org.tikv.common.region.RegionManager;
33  import org.tikv.common.region.TiRegion;
34  import org.tikv.common.region.TiStore;
35  import org.tikv.common.region.TiStoreType;
36  import org.tikv.kvproto.Coprocessor.KeyRange;
37  
38  public class RangeSplitter {
39    private final RegionManager regionManager;
40  
41    private RangeSplitter(RegionManager regionManager) {
42      this.regionManager = regionManager;
43    }
44  
45    public static RangeSplitter newSplitter(RegionManager mgr) {
46      return new RangeSplitter(mgr);
47    }
48  
49    /**
50     * Group by a list of handles by the handles' region, handles will be sorted.
51     *
52     * @param tableId Table id used for the handle
53     * @param handles Handle list
54     * @return {@code <Region, HandleList>} map
55     */
56    public Map<Pair<TiRegion, TiStore>, TLongArrayList> groupByAndSortHandlesByRegionId(
57        long tableId, TLongArrayList handles) {
58      TLongObjectHashMap<TLongArrayList> regionHandles = new TLongObjectHashMap<>();
59      TLongObjectHashMap<Pair<TiRegion, TiStore>> idToRegionStorePair = new TLongObjectHashMap<>();
60      Map<Pair<TiRegion, TiStore>, TLongArrayList> result = new HashMap<>();
61      handles.sort();
62  
63      byte[] endKey = null;
64      TiRegion curRegion = null;
65      TLongArrayList handlesInCurRegion = new TLongArrayList();
66      for (int i = 0; i < handles.size(); i++) {
67        long curHandle = handles.get(i);
68        RowKey key = RowKey.toRowKey(tableId, curHandle);
69        if (endKey == null
70            || (endKey.length != 0 && FastByteComparisons.compareTo(key.getBytes(), endKey) >= 0)) {
71          if (curRegion != null) {
72            regionHandles.put(curRegion.getId(), handlesInCurRegion);
73            handlesInCurRegion = new TLongArrayList();
74          }
75          Pair<TiRegion, TiStore> regionStorePair =
76              regionManager.getRegionStorePairByKey(ByteString.copyFrom(key.getBytes()));
77          curRegion = regionStorePair.first;
78          idToRegionStorePair.put(curRegion.getId(), regionStorePair);
79          endKey = curRegion.getEndKey().toByteArray();
80        }
81        handlesInCurRegion.add(curHandle);
82      }
83      if (!handlesInCurRegion.isEmpty() && curRegion != null) {
84        regionHandles.put(curRegion.getId(), handlesInCurRegion);
85      }
86      regionHandles.forEachEntry(
87          (k, v) -> {
88            Pair<TiRegion, TiStore> regionStorePair = idToRegionStorePair.get(k);
89            result.put(regionStorePair, v);
90            return true;
91          });
92      return result;
93    }
94  
95    public List<RegionTask> splitAndSortHandlesByRegion(List<Long> ids, TLongArrayList handles) {
96      Set<RegionTask> regionTasks = new HashSet<>();
97      for (Long id : ids) {
98        regionTasks.addAll(splitAndSortHandlesByRegion(id, handles));
99      }
100     return new ArrayList<>(regionTasks);
101   }
102 
103   /**
104    * Build region tasks from handles split by region, handles will be sorted.
105    *
106    * @param tableId Table ID
107    * @param handles Handle list
108    * @return A list of region tasks
109    */
110   private List<RegionTask> splitAndSortHandlesByRegion(long tableId, TLongArrayList handles) {
111     // Max value for current index handle range
112     ImmutableList.Builder<RegionTask> regionTasks = ImmutableList.builder();
113 
114     Map<Pair<TiRegion, TiStore>, TLongArrayList> regionHandlesMap =
115         groupByAndSortHandlesByRegionId(tableId, handles);
116 
117     regionHandlesMap.forEach((k, v) -> createTask(0, v.size(), tableId, v, k, regionTasks));
118 
119     return regionTasks.build();
120   }
121 
122   private void createTask(
123       int startPos,
124       int endPos,
125       long tableId,
126       TLongArrayList handles,
127       Pair<TiRegion, TiStore> regionStorePair,
128       ImmutableList.Builder<RegionTask> regionTasks) {
129     List<KeyRange> newKeyRanges = new ArrayList<>(endPos - startPos + 1);
130     long startHandle = handles.get(startPos);
131     long endHandle = startHandle;
132     for (int i = startPos + 1; i < endPos; i++) {
133       long curHandle = handles.get(i);
134       if (endHandle + 1 == curHandle) {
135         endHandle = curHandle;
136       } else {
137         newKeyRanges.add(
138             makeCoprocRange(
139                 RowKey.toRowKey(tableId, startHandle).toByteString(),
140                 RowKey.toRowKey(tableId, endHandle + 1).toByteString()));
141         startHandle = curHandle;
142         endHandle = startHandle;
143       }
144     }
145     newKeyRanges.add(
146         makeCoprocRange(
147             RowKey.toRowKey(tableId, startHandle).toByteString(),
148             RowKey.toRowKey(tableId, endHandle + 1).toByteString()));
149     regionTasks.add(new RegionTask(regionStorePair.first, regionStorePair.second, newKeyRanges));
150   }
151 
152   /**
153    * Split key ranges into corresponding region tasks and group by their region id
154    *
155    * @param keyRanges List of key ranges
156    * @param storeType Store type, null or TiKV for TiKV(leader), otherwise TiFlash(learner)
157    * @return List of RegionTask, each task corresponds to a different region.
158    */
159   public List<RegionTask> splitRangeByRegion(List<KeyRange> keyRanges, TiStoreType storeType) {
160     if (keyRanges == null || keyRanges.size() == 0) {
161       return ImmutableList.of();
162     }
163 
164     int i = 0;
165     KeyRange range = keyRanges.get(i++);
166     Map<Long, List<KeyRange>> idToRange = new HashMap<>(); // region id to keyRange list
167     Map<Long, Pair<TiRegion, TiStore>> idToRegion = new HashMap<>();
168 
169     while (true) {
170       Pair<TiRegion, TiStore> regionStorePair =
171           regionManager.getRegionStorePairByKey(range.getStart(), storeType);
172 
173       if (regionStorePair == null) {
174         throw new NullPointerException(
175             "fail to get region/store pair by key " + formatByteString(range.getStart()));
176       }
177       TiRegion region = regionStorePair.first;
178       idToRegion.putIfAbsent(region.getId(), regionStorePair);
179 
180       // both key range is close-opened
181       // initial range inside PD is guaranteed to be -INF to +INF
182       // Both keys are at right hand side and then always not -INF
183       if (toRawKey(range.getEnd()).compareTo(toRawKey(region.getEndKey())) > 0) {
184         // current region does not cover current end key
185         KeyRange cutRange =
186             KeyRange.newBuilder().setStart(range.getStart()).setEnd(region.getEndKey()).build();
187 
188         List<KeyRange> ranges = idToRange.computeIfAbsent(region.getId(), k -> new ArrayList<>());
189         ranges.add(cutRange);
190 
191         // cut new remaining for current range
192         range = KeyRange.newBuilder().setStart(region.getEndKey()).setEnd(range.getEnd()).build();
193       } else {
194         // current range covered by region
195         List<KeyRange> ranges = idToRange.computeIfAbsent(region.getId(), k -> new ArrayList<>());
196         ranges.add(range);
197         if (i >= keyRanges.size()) {
198           break;
199         }
200         range = keyRanges.get(i++);
201       }
202     }
203 
204     ImmutableList.Builder<RegionTask> resultBuilder = ImmutableList.builder();
205     idToRange.forEach(
206         (k, v) -> {
207           Pair<TiRegion, TiStore> regionStorePair = idToRegion.get(k);
208           resultBuilder.add(new RegionTask(regionStorePair.first, regionStorePair.second, v));
209         });
210     return resultBuilder.build();
211   }
212 
213   /**
214    * Split key ranges into corresponding region tasks and group by their region id
215    *
216    * @param keyRanges List of key ranges
217    * @return List of RegionTask, each task corresponds to a different region.
218    */
219   public List<RegionTask> splitRangeByRegion(List<KeyRange> keyRanges) {
220     return splitRangeByRegion(keyRanges, TiStoreType.TiKV);
221   }
222 
223   public static class RegionTask implements Serializable {
224     private final TiRegion region;
225     private final TiStore store;
226     private final List<KeyRange> ranges;
227     private final String host;
228 
229     RegionTask(TiRegion region, TiStore store, List<KeyRange> ranges) {
230       this.region = region;
231       this.store = store;
232       this.ranges = ranges;
233       String host = null;
234       try {
235         host = PDUtils.addrToUri(store.getStore().getAddress()).getHost();
236       } catch (Exception ignored) {
237       }
238       this.host = host;
239     }
240 
241     public static RegionTask newInstance(TiRegion region, TiStore store, List<KeyRange> ranges) {
242       return new RegionTask(region, store, ranges);
243     }
244 
245     public TiRegion getRegion() {
246       return region;
247     }
248 
249     public TiStore getStore() {
250       return store;
251     }
252 
253     public List<KeyRange> getRanges() {
254       return ranges;
255     }
256 
257     public String getHost() {
258       return host;
259     }
260 
261     @Override
262     public String toString() {
263       StringBuilder sb = new StringBuilder();
264       sb.append(String.format("Region [%s]", region));
265       sb.append(" ");
266 
267       for (KeyRange range : ranges) {
268         sb.append(
269             String.format(
270                 "Range Start: [%s] Range End: [%s]",
271                 formatByteString(range.getStart()), formatByteString(range.getEnd())));
272       }
273 
274       return sb.toString();
275     }
276   }
277 }