public OutputCommitter getOutputCommitter()

in storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java [84:155]


        public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
            final OutputCommitter baseOutputCommitter = super.getOutputCommitter(context);

            return new OutputCommitter() {
                @Override
                public void setupJob(JobContext jobContext) throws IOException {
                    baseOutputCommitter.setupJob(jobContext);
                }

                @Override
                public void setupTask(TaskAttemptContext taskContext) throws IOException {
                    baseOutputCommitter.setupTask(taskContext);
                }

                @Override
                public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
                    return baseOutputCommitter.needsTaskCommit(taskContext);
                }

                @Override
                public void commitTask(TaskAttemptContext taskContext) throws IOException {
                    baseOutputCommitter.commitTask(taskContext);
                }

                @Override
                public void abortTask(TaskAttemptContext taskContext) throws IOException {
                    baseOutputCommitter.abortTask(taskContext);
                }

                @Override
                public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
                    try {
                        baseOutputCommitter.abortJob(jobContext, state);
                    } finally {
                        cleanupScratch(jobContext);
                    }
                }

                @Override
                public void commitJob(JobContext jobContext) throws IOException {
                    try {
                        baseOutputCommitter.commitJob(jobContext);
                        Configuration conf = jobContext.getConfiguration();
                        try {
                            //import hfiles
                            new LoadIncrementalHFiles(conf)
                                .doBulkLoad(HFileOutputFormat.getOutputPath(jobContext),
                                    new HTable(conf,
                                        conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY)));
                        } catch (Exception e) {
                            throw new IOException("BulkLoad failed.", e);
                        }
                    } finally {
                        cleanupScratch(jobContext);
                    }
                }

                @Override
                public void cleanupJob(JobContext context) throws IOException {
                    try {
                        baseOutputCommitter.cleanupJob(context);
                    } finally {
                        cleanupScratch(context);
                    }
                }

                private void cleanupScratch(JobContext context) throws IOException {
                    FileSystem fs = FileSystem.get(context.getConfiguration());
                    fs.delete(HFileOutputFormat.getOutputPath(context), true);
                }
            };
        }