Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,16 @@
package org.apache.hadoop.mapred.split;

import java.io.IOException;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.tez.mapreduce.grouper.TezSplitGrouper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
Expand Down Expand Up @@ -129,14 +137,58 @@ public class TezGroupedSplitsRecordReader implements RecordReader<K, V> {
int idx = 0;
long progress;
RecordReader<K, V> curReader;

final AtomicInteger initIndex;
final int numReaders;
final ExecutorService initReaderExecService;
final Queue<Future<RecordReader<K,V>>> initedReaders;

public TezGroupedSplitsRecordReader(TezGroupedSplit split, JobConf job,
Reporter reporter) throws IOException {
this.groupedSplit = split;
this.job = job;
this.reporter = reporter;
this.initIndex = new AtomicInteger(0);
int numThreads = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS,
TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT);
this.numReaders = conf.getInt(TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS,
TezSplitGrouper.TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT);
this.initReaderExecService = Executors.newFixedThreadPool(numThreads,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will want to somehow make this static (to share between usages of TezGroupedSplitsRecordReader) or figure out how to call shutdown in a reliable manner. Otherwise I think a long lived process that uses multiple TezGroupedSplitsRecordReaders throughout its life will end up having a large number of unused threads since they will not auto shutdown and thus not garbage collected.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html
See finalization section.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about the context, but if the goal is to properly shutdown a global resource somehow, it can done by the shutdownhandler, somewhere here: https://github.com/apache/tez/blob/master/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java#L954

new ThreadFactoryBuilder()
.setDaemon(true)
.setPriority(Thread.MAX_PRIORITY)
.setNameFormat("TEZ-Split-Init-Thread-%d")
.build());
this.initedReaders = new LinkedList<>();
preInitReaders();
initNextRecordReader();
}

private void preInitReaders() {
if (initReaderExecService == null) {
LOG.info("Init record reader threadpool is not initialized");
return;
}
for (int i = 0; i < numReaders; i++) {
initedReaders.offer(this.initReaderExecService.submit(() -> {
try {
int index = initIndex.getAndIncrement();
if (index >= groupedSplit.wrappedSplits.size()) {
return null;
}
InputSplit s = groupedSplit.wrappedSplits.get(index);
RecordReader<K, V> reader = wrappedInputFormat.getRecordReader(s, job, reporter);
LOG.debug("Init Thread processed reader number {} initialization", index);
return reader;
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
cancelsFutures();
throw new RuntimeException(e);
}
}));
}
}

@Override
public boolean next(K key, V value) throws IOException {
Expand Down Expand Up @@ -183,6 +235,8 @@ protected boolean initNextRecordReader() throws IOException {

// if all chunks have been processed, nothing more to do.
if (idx == groupedSplit.wrappedSplits.size()) {
LOG.info("Shutting down the init record reader threadpool");
initReaderExecService.shutdownNow();
return false;
}

Expand All @@ -193,15 +247,25 @@ protected boolean initNextRecordReader() throws IOException {

// get a record reader for the idx-th chunk
try {
curReader = wrappedInputFormat.getRecordReader(
groupedSplit.wrappedSplits.get(idx), job, reporter);
curReader = initedReaders.poll().get();
preInitReaders();
} catch (Exception e) {
throw new RuntimeException (e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
cancelsFutures();
throw new RuntimeException(e);
}
idx++;
return true;
}

private void cancelsFutures() {
for (Future<RecordReader<K,V>> f : initedReaders) {
f.cancel(true);
}
}

@Override
public long getPos() throws IOException {
long subprogress = 0; // bytes processed in current split
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ public abstract class TezSplitGrouper {
public static final String TEZ_GROUPING_NODE_LOCAL_ONLY = "tez.grouping.node.local.only";
public static final boolean TEZ_GROUPING_NODE_LOCAL_ONLY_DEFAULT = false;

/**
* Number of threads used to initialize the grouped splits, to asynchronously open the readers.
*/
public static final String TEZ_GROUPING_SPLIT_INIT_THREADS = "tez.grouping.split.init-threads";
public static final int TEZ_GROUPING_SPLIT_INIT_THREADS_DEFAULT = 4;

/**
* Number of record readers to asynchronously and proactively init.
*/
public static final String TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS = "tez.grouping.split.init.num-recordreaders";
public static final int TEZ_GROUPING_SPLIT_INIT_NUM_RECORDREADERS_DEFAULT = 10;

static class LocationHolder {
List<SplitContainer> splits;
Expand Down