View Javadoc
1   /*
2    * Copyright 2021 TiKV Project Authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    * http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   *
16   */
17  
18  package org.tikv.common.operation.iterator;
19  
20  import gnu.trove.list.array.TLongArrayList;
21  import java.util.ArrayList;
22  import java.util.Iterator;
23  import java.util.List;
24  import java.util.NoSuchElementException;
25  import java.util.concurrent.ExecutorCompletionService;
26  import org.tikv.common.Snapshot;
27  import org.tikv.common.TiConfiguration;
28  import org.tikv.common.TiSession;
29  import org.tikv.common.exception.TiClientInternalException;
30  import org.tikv.common.meta.TiDAGRequest;
31  import org.tikv.common.row.Row;
32  import org.tikv.common.util.RangeSplitter;
33  import org.tikv.common.util.RangeSplitter.RegionTask;
34  
35  public class IndexScanIterator implements Iterator<Row> {
36    private final Iterator<Long> handleIterator;
37    private final TiDAGRequest dagReq;
38    private final Snapshot snapshot;
39    private final ExecutorCompletionService<Iterator<Row>> completionService;
40    private final int batchSize;
41    private Iterator<Row> rowIterator;
42    private int batchCount = 0;
43  
44    public IndexScanIterator(Snapshot snapshot, TiDAGRequest req, Iterator<Long> handleIterator) {
45      TiSession session = snapshot.getSession();
46      TiConfiguration conf = session.getConf();
47      this.dagReq = req;
48      this.handleIterator = handleIterator;
49      this.snapshot = snapshot;
50      this.batchSize = conf.getIndexScanBatchSize();
51      this.completionService = new ExecutorCompletionService<>(session.getThreadPoolForIndexScan());
52    }
53  
54    private TLongArrayList feedBatch() {
55      TLongArrayList handles = new TLongArrayList(512);
56      while (handleIterator.hasNext()) {
57        handles.add(handleIterator.next());
58        if (batchSize <= handles.size()) {
59          break;
60        }
61      }
62      return handles;
63    }
64  
65    @Override
66    public boolean hasNext() {
67      try {
68        if (rowIterator == null) {
69          TiSession session = snapshot.getSession();
70          while (handleIterator.hasNext()) {
71            TLongArrayList handles = feedBatch();
72            batchCount++;
73            completionService.submit(
74                () -> {
75                  List<RegionTask> tasks = new ArrayList<>();
76                  List<Long> ids = dagReq.getPrunedPhysicalIds();
77                  tasks.addAll(
78                      RangeSplitter.newSplitter(session.getRegionManager())
79                          .splitAndSortHandlesByRegion(ids, handles));
80  
81                  return org.tikv.common.operation.iterator.CoprocessorIterator.getRowIterator(
82                      dagReq, tasks, session);
83                });
84          }
85          while (batchCount > 0) {
86            rowIterator = completionService.take().get();
87            batchCount--;
88  
89            if (rowIterator.hasNext()) {
90              return true;
91            }
92          }
93        }
94        if (rowIterator == null) {
95          return false;
96        }
97      } catch (Exception e) {
98        throw new TiClientInternalException("Error reading rows from handle", e);
99      }
100     return rowIterator.hasNext();
101   }
102 
103   @Override
104   public Row next() {
105     if (hasNext()) {
106       return rowIterator.next();
107     } else {
108       throw new NoSuchElementException();
109     }
110   }
111 }