Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
*/
@Private
@Unstable
public class TezSharedExecutor implements TezExecutors {
public class TezSharedExecutor implements TezExecutors, AutoCloseable {

// The shared executor service which will be used to execute all the tasks.
private final ThreadPoolExecutor service;
Expand Down Expand Up @@ -96,6 +96,11 @@ public void shutdownNow() {
poller.interrupt();
}

@Override
public void close() {
this.shutdownNow();
}

@Override
protected void finalize() {
this.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -34,7 +35,12 @@
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.tez.common.MRFrameworkConfigs;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezSharedExecutor;
Expand Down Expand Up @@ -62,7 +68,6 @@
import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
Expand All @@ -76,7 +81,7 @@ public class TestMapProcessor {
private static JobConf defaultConf = new JobConf();
private static FileSystem localFs = null;
private static Path workDir = null;
static float progressUpdate = 0.0f;
private static CountDownLatch progressLatch;
static {
try {
defaultConf.set("fs.defaultFS", "file:///");
Expand Down Expand Up @@ -152,49 +157,74 @@ public void testMapProcessor() throws Exception {
OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
.setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);

TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec),
sharedExecutor);
try (TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf)) {
LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(localFs, workDir, jobConf, 0,
new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
Collections.singletonList(mapInputSpec), Collections.singletonList(mapOutputSpec),
sharedExecutor);

task.initialize();
task.run();
task.close();
sharedExecutor.shutdownNow();
task.initialize();
task.run();
task.close();

OutputContext outputContext = task.getOutputContexts().iterator().next();
TezTaskOutput mapOutputs = new TezTaskOutputFiles(
jobConf, outputContext.getUniqueIdentifier(),
outputContext.getDagIdentifier());
OutputContext outputContext = task.getOutputContexts().iterator().next();
TezTaskOutput mapOutputs = new TezTaskOutputFiles(
jobConf, outputContext.getUniqueIdentifier(),
outputContext.getDagIdentifier());


// TODO NEWTEZ FIXME OutputCommitter verification
// TODO NEWTEZ FIXME OutputCommitter verification
// MRTask mrTask = (MRTask)t.getProcessor();
// Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask
// .getCommitter().getClass().getName());
// t.close();

Path mapOutputFile = getMapOutputFile(jobConf, outputContext);
LOG.info("mapOutputFile = " + mapOutputFile);
IFile.Reader reader =
new IFile.Reader(localFs, mapOutputFile, null, null, null, false, 0, -1);
LongWritable key = new LongWritable();
Text value = new Text();
DataInputBuffer keyBuf = new DataInputBuffer();
DataInputBuffer valueBuf = new DataInputBuffer();
long prev = Long.MIN_VALUE;
while (reader.nextRawKey(keyBuf)) {
reader.nextRawValue(valueBuf);
key.readFields(keyBuf);
value.readFields(valueBuf);
if (prev != Long.MIN_VALUE) {
assert(prev <= key.get());
prev = key.get();
Path mapOutputFile = getMapOutputFile(jobConf, outputContext);
LOG.info("mapOutputFile = " + mapOutputFile);
IFile.Reader reader =
new IFile.Reader(localFs, mapOutputFile, null, null, null, false, 0, -1);
LongWritable key = new LongWritable();
Text value = new Text();
DataInputBuffer keyBuf = new DataInputBuffer();
DataInputBuffer valueBuf = new DataInputBuffer();
long prev = Long.MIN_VALUE;
while (reader.nextRawKey(keyBuf)) {
reader.nextRawValue(valueBuf);
key.readFields(keyBuf);
value.readFields(valueBuf);
if (prev != Long.MIN_VALUE) {
assert(prev <= key.get());
prev = key.get();
}
LOG.info("key = " + key.get() + "; value = " + value);
}
reader.close();
}
}

/**
* A mapper that waits on a latch after the first record, giving the
* ProgressHelper time to report intermediate progress. The latch is
* released by the test once progress has been captured.
*/
public static class LatchMapper extends MapReduceBase
implements Mapper<LongWritable, Text, LongWritable, Text> {
private boolean waited = false;

@Override
public void map(LongWritable key, Text value,
OutputCollector<LongWritable, Text> output, Reporter reporter)
throws IOException {
output.collect(key, value);
if (!waited) {
waited = true;
try {
progressLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
LOG.info("key = " + key.get() + "; value = " + value);
}
reader.close();
}

@Test(timeout = 30000)
Expand All @@ -204,6 +234,9 @@ public void testMapProcessorProgress() throws Exception {
JobConf jobConf = new JobConf(defaultConf);
setUpJobConf(jobConf);

jobConf.setMapperClass(LatchMapper.class);
progressLatch = new CountDownLatch(1);

MRHelpers.translateMRConfToTez(jobConf);
jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);

Expand All @@ -215,7 +248,7 @@ public void testMapProcessorProgress() throws Exception {
Path mapInput = new Path(workDir, "map0");


MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 100000);
MapUtils.generateInputSplit(localFs, workDir, jobConf, mapInput, 100);

InputSpec mapInputSpec = new InputSpec("NullSrcVertex",
InputDescriptor.create(MRInputLegacy.class.getName())
Expand All @@ -229,30 +262,29 @@ public void testMapProcessorProgress() throws Exception {
OutputDescriptor.create(OrderedPartitionedKVOutput.class.getName())
.setUserPayload(TezUtils.createUserPayloadFromConf(jobConf)), 1);

TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
final LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask
(localFs, workDir, jobConf, 0,
new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
Collections.singletonList(mapInputSpec),
Collections.singletonList(mapOutputSpec), sharedExecutor);

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
Thread monitorProgress = new Thread(new Runnable() {
@Override
public void run() {
try (TezSharedExecutor sharedExecutor = new TezSharedExecutor(jobConf);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1)) {

final LogicalIOProcessorRuntimeTask task = MapUtils.createLogicalTask(
localFs, workDir, jobConf, 0,
new Path(workDir, "map0"), new TestUmbilical(), dagName, vertexName,
Collections.singletonList(mapInputSpec),
Collections.singletonList(mapOutputSpec), sharedExecutor);

scheduler.scheduleAtFixedRate(() -> {
float prog = task.getProgress();
if(prog > 0.0f && prog < 1.0f)
progressUpdate = prog;
}
});

task.initialize();
scheduler.scheduleAtFixedRate(monitorProgress, 0, 1,
TimeUnit.MILLISECONDS);
task.run();
Assert.assertTrue("Progress Updates should be captured!",
progressUpdate > 0.0f && progressUpdate < 1.0f);
task.close();
sharedExecutor.shutdownNow();
if (prog > 0.0f && prog < 1.0f) {
progressLatch.countDown();
}
}, 0, 1, TimeUnit.MILLISECONDS);

task.initialize();
task.run();

GenericTestUtils.waitFor(() -> progressLatch.getCount() == 0, 10, 5000,
"Progress update between 0.0 and 1.0 was never captured");

task.close();
}
}
}