1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.tikv.common.operation.iterator;
19
20 import gnu.trove.list.array.TLongArrayList;
21 import java.util.ArrayList;
22 import java.util.Iterator;
23 import java.util.List;
24 import java.util.NoSuchElementException;
25 import java.util.concurrent.ExecutorCompletionService;
26 import org.tikv.common.Snapshot;
27 import org.tikv.common.TiConfiguration;
28 import org.tikv.common.TiSession;
29 import org.tikv.common.exception.TiClientInternalException;
30 import org.tikv.common.meta.TiDAGRequest;
31 import org.tikv.common.row.Row;
32 import org.tikv.common.util.RangeSplitter;
33 import org.tikv.common.util.RangeSplitter.RegionTask;
34
35 public class IndexScanIterator implements Iterator<Row> {
36 private final Iterator<Long> handleIterator;
37 private final TiDAGRequest dagReq;
38 private final Snapshot snapshot;
39 private final ExecutorCompletionService<Iterator<Row>> completionService;
40 private final int batchSize;
41 private Iterator<Row> rowIterator;
42 private int batchCount = 0;
43
44 public IndexScanIterator(Snapshot snapshot, TiDAGRequest req, Iterator<Long> handleIterator) {
45 TiSession session = snapshot.getSession();
46 TiConfiguration conf = session.getConf();
47 this.dagReq = req;
48 this.handleIterator = handleIterator;
49 this.snapshot = snapshot;
50 this.batchSize = conf.getIndexScanBatchSize();
51 this.completionService = new ExecutorCompletionService<>(session.getThreadPoolForIndexScan());
52 }
53
54 private TLongArrayList feedBatch() {
55 TLongArrayList handles = new TLongArrayList(512);
56 while (handleIterator.hasNext()) {
57 handles.add(handleIterator.next());
58 if (batchSize <= handles.size()) {
59 break;
60 }
61 }
62 return handles;
63 }
64
65 @Override
66 public boolean hasNext() {
67 try {
68 if (rowIterator == null) {
69 TiSession session = snapshot.getSession();
70 while (handleIterator.hasNext()) {
71 TLongArrayList handles = feedBatch();
72 batchCount++;
73 completionService.submit(
74 () -> {
75 List<RegionTask> tasks = new ArrayList<>();
76 List<Long> ids = dagReq.getPrunedPhysicalIds();
77 tasks.addAll(
78 RangeSplitter.newSplitter(session.getRegionManager())
79 .splitAndSortHandlesByRegion(ids, handles));
80
81 return org.tikv.common.operation.iterator.CoprocessorIterator.getRowIterator(
82 dagReq, tasks, session);
83 });
84 }
85 while (batchCount > 0) {
86 rowIterator = completionService.take().get();
87 batchCount--;
88
89 if (rowIterator.hasNext()) {
90 return true;
91 }
92 }
93 }
94 if (rowIterator == null) {
95 return false;
96 }
97 } catch (Exception e) {
98 throw new TiClientInternalException("Error reading rows from handle", e);
99 }
100 return rowIterator.hasNext();
101 }
102
103 @Override
104 public Row next() {
105 if (hasNext()) {
106 return rowIterator.next();
107 } else {
108 throw new NoSuchElementException();
109 }
110 }
111 }