public abstract class FileCommitProtocol extends Object implements Logging
1. Implementations must be serializable, as the committer instance instantiated on the driver will be used for tasks on executors. 2. Implementations should have a constructor with 2 or 3 arguments: (jobId: String, path: String) or (jobId: String, path: String, dynamicPartitionOverwrite: Boolean) 3. A committer should not be reused across multiple Spark jobs.
The proper call sequence is:
1. Driver calls setupJob. 2. As part of each task's execution, executor calls setupTask and then commitTask (or abortTask if task failed). 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job failed to execute (e.g. too many failed tasks), the job should call abortJob.
Modifier and Type | Class and Description |
---|---|
static class |
FileCommitProtocol.EmptyTaskCommitMessage$ |
static class |
FileCommitProtocol.TaskCommitMessage |
Constructor and Description |
---|
FileCommitProtocol() |
Modifier and Type | Method and Description |
---|---|
abstract void |
abortJob(org.apache.hadoop.mapreduce.JobContext jobContext)
Aborts a job after the writes fail.
|
abstract void |
abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
Aborts a task after the writes have failed.
|
abstract void |
commitJob(org.apache.hadoop.mapreduce.JobContext jobContext,
scala.collection.Seq<FileCommitProtocol.TaskCommitMessage> taskCommits)
Commits a job after the writes succeed.
|
abstract FileCommitProtocol.TaskCommitMessage |
commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
Commits a task after the writes succeed.
|
boolean |
deleteWithJob(org.apache.hadoop.fs.FileSystem fs,
org.apache.hadoop.fs.Path path,
boolean recursive)
Specifies that a file should be deleted with the commit of this job.
|
static FileCommitProtocol |
instantiate(String className,
String jobId,
String outputPath,
boolean dynamicPartitionOverwrite)
Instantiates a FileCommitProtocol using the given className.
|
abstract String |
newTaskTempFile(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext,
scala.Option<String> dir,
String ext)
Notifies the commit protocol to add a new file, and gets back the full path that should be
used.
|
abstract String |
newTaskTempFileAbsPath(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext,
String absoluteDir,
String ext)
Similar to newTaskTempFile(), but allows files to committed to an absolute output location.
|
void |
onTaskCommit(FileCommitProtocol.TaskCommitMessage taskCommit)
Called on the driver after a task commits.
|
static void |
org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1) |
static org.slf4j.Logger |
org$apache$spark$internal$Logging$$log_() |
abstract void |
setupJob(org.apache.hadoop.mapreduce.JobContext jobContext)
Setups up a job.
|
abstract void |
setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
Sets up a task within a job.
|
equals, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
initializeLogging, initializeLogIfNecessary, initializeLogIfNecessary, isTraceEnabled, log, logDebug, logDebug, logError, logError, logInfo, logInfo, logName, logTrace, logTrace, logWarning, logWarning
public static FileCommitProtocol instantiate(String className, String jobId, String outputPath, boolean dynamicPartitionOverwrite)
className
- (undocumented)jobId
- (undocumented)outputPath
- (undocumented)dynamicPartitionOverwrite
- (undocumented)public static org.slf4j.Logger org$apache$spark$internal$Logging$$log_()
public static void org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger x$1)
public abstract void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext)
jobContext
- (undocumented)public abstract void commitJob(org.apache.hadoop.mapreduce.JobContext jobContext, scala.collection.Seq<FileCommitProtocol.TaskCommitMessage> taskCommits)
jobContext
- (undocumented)taskCommits
- (undocumented)public abstract void abortJob(org.apache.hadoop.mapreduce.JobContext jobContext)
Calling this function is a best-effort attempt, because it is possible that the driver just crashes (or killed) before it can call abort.
jobContext
- (undocumented)public abstract void setupTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
taskContext
- (undocumented)public abstract String newTaskTempFile(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext, scala.Option<String> dir, String ext)
Note that the returned temp file may have an arbitrary path. The commit protocol only promises that the file will be at the location specified by the arguments after job commit.
A full file path consists of the following parts: 1. the base path 2. some sub-directory within the base path, used to specify partitioning 3. file prefix, usually some unique job id with the task id 4. bucket id 5. source specific file extension, e.g. ".snappy.parquet"
The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest are left to the commit protocol implementation to decide.
Important: it is the caller's responsibility to add uniquely identifying content to "ext" if a task is going to write out multiple files to the same dir. The file commit protocol only guarantees that files written by different tasks will not conflict.
taskContext
- (undocumented)dir
- (undocumented)ext
- (undocumented)public abstract String newTaskTempFileAbsPath(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext, String absoluteDir, String ext)
Important: it is the caller's responsibility to add uniquely identifying content to "ext" if a task is going to write out multiple files to the same dir. The file commit protocol only guarantees that files written by different tasks will not conflict.
taskContext
- (undocumented)absoluteDir
- (undocumented)ext
- (undocumented)public abstract FileCommitProtocol.TaskCommitMessage commitTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
taskContext
- (undocumented)public abstract void abortTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext)
Calling this function is a best-effort attempt, because it is possible that the executor just crashes (or killed) before it can call abort.
taskContext
- (undocumented)public boolean deleteWithJob(org.apache.hadoop.fs.FileSystem fs, org.apache.hadoop.fs.Path path, boolean recursive)
fs
- (undocumented)path
- (undocumented)recursive
- (undocumented)public void onTaskCommit(FileCommitProtocol.TaskCommitMessage taskCommit)
taskCommit
- (undocumented)