-
Notifications
You must be signed in to change notification settings - Fork 41
Expand file tree
/
Copy pathCompact.java
More file actions
583 lines (481 loc) · 24.8 KB
/
Compact.java
File metadata and controls
583 lines (481 loc) · 24.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
package com.github.KeithSSmith.spark_compaction;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.cli.*;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SQLContext;
public class Compact {
private static Configuration conf = new Configuration();
private static CompressionCodecFactory codecFactory = null;
FileSystem fs = null;
FileStatus[] fsArray = null;
private static final String AVRO = "avro";
private static final String BLOCK = "BLOCK";
private static final String BZ2 = "bzip2";
private static final String GZIP = "gzip";
private static final String LZO = "lzo";
private static final String NONE = "none";
private static final String PARQUET = "parquet";
private static final String SNAPPY = "snappy";
private static final String SHOULD_COMPRESS_OUTPUT = "spark.hadoop.mapred.output.compress";
private static final String OUTPUT_COMPRESSION_CODEC = "spark.hadoop.mapred.output.compression.codec";
private static final String COMPRESSION_TYPE = "spark.hadoop.mapred.output.compression.type";
private static final String SPARK_PARQUET_COMPRESSION_CODEC = "spark.sql.parquet.compression.codec";
private static final String SPARK_AVRO_COMPRESSION_CODEC = "spark.sql.avro.compression.codec";
private static final String AVRO_COMPRESSION_CODEC = "avro.output.codec";
private static final String TEXT = "text";
private static final String TRUE = "true";
private static final String INPUT_PATH = "input-path";
private static final String OUTPUT_PATH = "output-path";
private static final String INPUT_COMPRESSION = "input-compression";
private static final String INPUT_SERIALIZATION = "input-serialization";
private static final String OUTPUT_COMPRESSION = "output-compression";
private static final String OUTPUT_SERIALIZATION = "output-serialization";
private static final double SNAPPY_RATIO = 1.7; // (100 / 1.7) = 58.8 ~ 40% compression rate on text
private static final double LZO_RATIO = 2.0; // (100 / 2.0) = 50.0 ~ 50% compression rate on text
private static final double GZIP_RATIO = 2.5; // (100 / 2.5) = 40.0 ~ 60% compression rate on text
private static final double BZ2_RATIO = 3.33; // (100 / 3.3) = 30.3 ~ 70% compression rate on text
private static final double AVRO_RATIO = 1.6; // (100 / 1.6) = 62.5 ~ 40% compression rate on text
private static final double PARQUET_RATIO = 2.0; // (100 / 2.0) = 50.0 ~ 50% compression rate on text
private static final double TEXT_RATIO = 1.0;
private static Options options;
private HashMap<String, Double> compressionRatios;
private HashMap<CompressionCodec, String> compressionTypes;
private HashMap<String, String> serializationExtensions;
private String inputPath;
private long inputPathSize;
private String inputCompression;
private String inputSerialization;
private String outputPath;
private String outputCompression;
private String outputSerialization;
private double outputBlockSize;
private int splitSize;
private double inputCompressionRatio;
private double outputCompressionRatio;
private Path inputCompressionPath;
public Compact() {
// Defining HDFS Configuration and File System definition.
conf.addResource(new Path("file:///etc/hadoop/conf/core-site.xml"));
conf.addResource(new Path("file:///etc/hadoop/conf/hdfs-site.xml"));
codecFactory = new CompressionCodecFactory(conf);
this.setInputPathSize(0);
this.setInputCompression(NONE);
this.setInputSerialization(TEXT);
this.setOutputCompression(NONE);
this.setOutputSerialization(TEXT);
this.setOutputBlockSize(0);
this.setSplitSize(0);
this.setInputCompressionRatio(0);
this.setOutputCompressionRatio(0);
this.setInputCompressionPath((Path) null);
compressionRatios = new HashMap<>();
compressionRatios.put(makeKey(AVRO, null), AVRO_RATIO * 1.0);
compressionRatios.put(makeKey(AVRO, BZ2), AVRO_RATIO * BZ2_RATIO);
compressionRatios.put(makeKey(AVRO, GZIP), AVRO_RATIO * GZIP_RATIO);
compressionRatios.put(makeKey(AVRO, LZO), AVRO_RATIO * LZO_RATIO);
compressionRatios.put(makeKey(AVRO, NONE), AVRO_RATIO);
compressionRatios.put(makeKey(AVRO, SNAPPY), AVRO_RATIO * SNAPPY_RATIO);
compressionRatios.put(makeKey(PARQUET, null), PARQUET_RATIO * 1.0);
compressionRatios.put(makeKey(PARQUET, BZ2), PARQUET_RATIO * BZ2_RATIO);
compressionRatios.put(makeKey(PARQUET, GZIP), PARQUET_RATIO * GZIP_RATIO);
compressionRatios.put(makeKey(PARQUET, LZO), PARQUET_RATIO * LZO_RATIO);
compressionRatios.put(makeKey(PARQUET, NONE), PARQUET_RATIO);
compressionRatios.put(makeKey(PARQUET, SNAPPY), PARQUET_RATIO * SNAPPY_RATIO);
compressionRatios.put(makeKey(TEXT, null), TEXT_RATIO * 1.0);
compressionRatios.put(makeKey(TEXT, BZ2), TEXT_RATIO * BZ2_RATIO);
compressionRatios.put(makeKey(TEXT, GZIP), TEXT_RATIO * GZIP_RATIO);
compressionRatios.put(makeKey(TEXT, LZO), TEXT_RATIO * LZO_RATIO);
compressionRatios.put(makeKey(TEXT, NONE), TEXT_RATIO);
compressionRatios.put(makeKey(TEXT, SNAPPY), TEXT_RATIO * SNAPPY_RATIO);
compressionTypes = new HashMap<>();
compressionTypes.put(null, NONE);
compressionTypes.put(codecFactory.getCodecByName(BZ2), BZ2);
compressionTypes.put(codecFactory.getCodecByName(GZIP), GZIP);
// compressionTypes.put(codecFactory.getCodecByName(LZO), LZO); // This is not a default codec and will not be supported at this time.
compressionTypes.put(codecFactory.getCodecByName(SNAPPY), SNAPPY);
serializationExtensions = new HashMap<>();
serializationExtensions.put(PARQUET, ".parquet");
serializationExtensions.put(AVRO, ".avro");
}
public void outputCompressionProperties(String outputCompression) {
if (outputCompression.toLowerCase().equals(NONE)) {
System.setProperty(SHOULD_COMPRESS_OUTPUT, "false");
System.setProperty(SPARK_PARQUET_COMPRESSION_CODEC, "uncompressed");
} else if (outputCompression.toLowerCase().equals(SNAPPY)) {
System.setProperty(SHOULD_COMPRESS_OUTPUT, TRUE);
System.setProperty(OUTPUT_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.SnappyCodec");
System.setProperty(COMPRESSION_TYPE, BLOCK);
System.setProperty(SPARK_PARQUET_COMPRESSION_CODEC, SNAPPY);
System.setProperty(SPARK_AVRO_COMPRESSION_CODEC, SNAPPY);
System.setProperty(AVRO_COMPRESSION_CODEC, SNAPPY);
} else if (outputCompression.toLowerCase().equals(GZIP)) {
System.setProperty(SHOULD_COMPRESS_OUTPUT, TRUE);
System.setProperty(OUTPUT_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.GzipCodec");
System.setProperty(COMPRESSION_TYPE, BLOCK);
System.setProperty(SPARK_PARQUET_COMPRESSION_CODEC, GZIP);
System.setProperty(SPARK_AVRO_COMPRESSION_CODEC, GZIP);
System.setProperty(AVRO_COMPRESSION_CODEC, GZIP);
} else if (outputCompression.toLowerCase().equals(BZ2)) {
System.setProperty(SHOULD_COMPRESS_OUTPUT, TRUE);
System.setProperty(OUTPUT_COMPRESSION_CODEC, "org.apache.hadoop.io.compress.BZip2Codec");
System.setProperty(COMPRESSION_TYPE, BLOCK);
System.setProperty(SPARK_PARQUET_COMPRESSION_CODEC, BZ2); //This will throw an error when Parquet + BZ2 is set b/c BZ2 is not supported in the upstream package.
System.setProperty(SPARK_AVRO_COMPRESSION_CODEC, BZ2); //This will throw an error when Avro + BZ2 is set b/c BZ2 is not supported in the upstream package.
System.setProperty(AVRO_COMPRESSION_CODEC, BZ2);
}
// } else if (outputCompression.toLowerCase().equals(LZO)) {
// System.setProperty(SHOULD_COMPRESS_OUTPUT, TRUE);
// System.setProperty(OUTPUT_COMPRESSION_CODEC, "com.hadoop.compression.lzo.LzoCodec");
// System.setProperty(COMPRESSION_TYPE, BLOCK);
// System.setProperty(SPARK_PARQUET_COMPRESSION_CODEC, LZO);
// System.setProperty(SPARK_AVRO_COMPRESSION_CODEC, LZO);
// System.setProperty(AVRO_COMPRESSION_CODEC, LZO);
// }
}
public void compact(String inputPath, String outputPath) throws IOException {
this.setCompressionAndSerializationOptions(inputPath, outputPath);
this.outputCompressionProperties(this.outputCompression);
// Defining Spark Context with a generic Spark Configuration.
SparkConf sparkConf = new SparkConf().setAppName("Spark Compaction");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
if (this.outputSerialization.equals(TEXT)) {
JavaRDD<String> textFile = sc.textFile(this.concatInputPath(inputPath));
textFile.coalesce(this.splitSize).saveAsTextFile(outputPath);
} else if (this.outputSerialization.equals(PARQUET)) {
SQLContext sqlContext = new SQLContext(sc);
Dataset parquetFile = sqlContext.read().parquet(this.concatInputPath(inputPath));
parquetFile.coalesce(this.splitSize).write().parquet(outputPath);
} else if (this.outputSerialization.equals(AVRO)) {
// For this to work the files must end in .avro
// Another issue is that when using compression the compression codec extension is not being added to the file name.
SQLContext sqlContext = new SQLContext(sc);
Dataset avroFile = sqlContext.read().format("com.databricks.spark.avro").load(this.concatInputPath(inputPath));
avroFile.coalesce(this.splitSize).write().format("com.databricks.spark.avro").save(outputPath);
} else {
System.out.println("Did not match any serialization type: text, parquet, or avro. Recieved: " +
this.outputSerialization);
}
}
public void compact(String[] args) throws IOException {
this.setCompressionAndSerializationOptions(this.parseCli(args));
this.outputCompressionProperties(this.outputCompression);
// Defining Spark Context with a generic Spark Configuration.
SparkConf sparkConf = new SparkConf().setAppName("Spark Compaction");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
if (this.outputSerialization.equals(TEXT)) {
JavaRDD<String> textFile = sc.textFile(this.concatInputPath(inputPath));
textFile.coalesce(this.splitSize).saveAsTextFile(outputPath);
} else if (this.outputSerialization.equals(PARQUET)) {
SQLContext sqlContext = new SQLContext(sc);
Dataset parquetFile = sqlContext.read().parquet(this.concatInputPath(inputPath));
parquetFile.coalesce(this.splitSize).write().parquet(outputPath);
} else if (this.outputSerialization.equals(AVRO)) {
// For this to work the files must end in .avro
SQLContext sqlContext = new SQLContext(sc);
Dataset avroFile = sqlContext.read().format("com.databricks.spark.avro").load(this.concatInputPath(inputPath));
avroFile.coalesce(this.splitSize).write().format("com.databricks.spark.avro").save(outputPath);
} else {
System.out.println("Did not match any serialization type: text, parquet, or avro. Recieved: " +
this.outputSerialization);
}
}
public static void main(String[] args) throws IOException {
// Defining Compact variable to process this compaction logic and parse the CLI arguments.
Compact splits = new Compact();
// Example of calling the CLI.
splits.compact(args);
// Example of using the API with input and output directories passed.
// splits.compact("hdfs:///landing/compaction/input", "hdfs:///landing/compaction/output_text_none");
}
private String makeKey(String serializationType, String compressionType) {
String result;
if(null == compressionType) {
result = serializationType + "_";
} else {
result = serializationType + "_" + compressionType;
}
return result;
}
private String concatInputPath(String inputPath) throws IOException {
List<String> resultList = new ArrayList<String>();
for(FileStatus fileStatus : fsArray) {
if (fileStatus.getPath().getName().startsWith("_") || fileStatus.getPath().getName().startsWith(".")) {
continue;
}
resultList.add(fileStatus.getPath().toString());
}
return StringUtils.join(resultList, ",");
}
private void initializeOptions() {
options = new Options();
Option option = new Option("i", INPUT_PATH, true,
"The input file path where files need to be compacted\n(required : true)");
option.setRequired(true);
options.addOption(option);
option = new Option("o", OUTPUT_PATH, true,
"The output directory where the files will be compacted to\n(required : true)");
option.setRequired(true);
options.addOption(option);
option = new Option("is", INPUT_SERIALIZATION, true,
"The serialization used on the files for the input path provided\n(avro, parquet, text)\n(required : false)");
option.setRequired(false);
options.addOption(option);
option = new Option("ic", INPUT_COMPRESSION, true,
"The compression used on the files for the input path provided\n(none, snappy, gzip, bzip2)\n(required : false)");
option.setRequired(false);
options.addOption(option);
option = new Option("os", OUTPUT_SERIALIZATION, true,
"The serialization used on the files generated by the compaction process\n(avro, parquet, text)\n(required : false)");
option.setRequired(false);
options.addOption(option);
option = new Option("oc", OUTPUT_COMPRESSION, true,
"The compression used on the files generated by the compaction process, (none, snappy, gzip, bzip2), (required : false)");
option.setRequired(false);
options.addOption(option);
}
private void printHelp(String additionalMessage) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("SparkCompaction [options] " + additionalMessage, options);
System.exit(1);
}
private CommandLine parseCli(String[] args) throws IllegalArgumentException, IOException {
this.initializeOptions();
CommandLineParser parser = new GnuParser();
CommandLine line = null;
try {
line = parser.parse(options, args);
} catch (ParseException e) {
printHelp("");
}
this.setInputPath(line.getOptionValue(INPUT_PATH));
this.setOutputPath(line.getOptionValue(OUTPUT_PATH));
this.setOutputBlockSize(this.outputPath);
return line;
}
private void setCompressionAndSerializationOptions(CommandLine line) throws IOException {
String ic = line.getOptionValue(INPUT_COMPRESSION);
if (ic == null) {
this.setInputCompression(new Path(this.getInputPath()));
}
String is = line.getOptionValue(INPUT_SERIALIZATION);
if (is != null) {
this.setInputSerialization(is);
}
String oc = line.getOptionValue(OUTPUT_COMPRESSION);
if (oc == null) {
this.setOutputCompression(this.getInputCompression()); // Output Compression will the be same as the Input if left null.
} else {
this.setOutputCompression(oc);
}
String os = line.getOptionValue(OUTPUT_SERIALIZATION);
if (os == null) {
this.setOutputSerialization(this.getInputSerialization()); // Output Serialization will the be same as the Input if left null.
} else {
this.setOutputSerialization(os);
}
this.validateCompressionAndSerializationOptions();
}
private void setCompressionAndSerializationOptions(String inputPath, String outputPath) throws IOException {
this.setInputPath(inputPath);
this.setInputCompression(new Path(this.getInputPath()));
this.setOutputPath(outputPath);
this.setOutputBlockSize(outputPath);
this.setOutputCompression(this.getInputCompression()); // Output Compression will the be same as the Input if left null.
this.setOutputSerialization(this.getInputSerialization()); // Output Serialization will the be same as the Input if left null.
this.validateCompressionAndSerializationOptions();
}
private void validateCompressionAndSerializationOptions() throws IllegalArgumentException, IOException {
String errorMsg = null;
String ic = this.getInputCompression();
if(null == errorMsg && !ic.equals(BZ2) && !ic.equals(GZIP) && !ic.equals(NONE) && !ic.equals(SNAPPY)) {
errorMsg = "Invalid input compression format specified!";
}
String is = this.getInputSerialization();
if(!is.equals(AVRO) && !is.equals(PARQUET) && !is.equals(TEXT)) {
errorMsg = "Invalid input serialization format specified!";
}
String oc = this.getOutputCompression();
if(null == errorMsg && !oc.equals(BZ2) && !oc.equals(GZIP) && !oc.equals(NONE) && !oc.equals(SNAPPY)) {
errorMsg = "Invalid output compression format specified!";
}
String os = this.getOutputSerialization();
if(null == errorMsg && !os.equals(AVRO) && !os.equals(PARQUET) && !os.equals(TEXT)) {
errorMsg = "Invalid output serialization format specified!";
}
if(null != errorMsg) {
printHelp(errorMsg);
}
this.setInputCompressionRatio(this.inputCompression, this.inputSerialization);
this.setOutputCompressionRatio(this.outputCompression, this.outputSerialization);
this.setInputPathSize(this.inputPath);
this.setSplitSize(this.outputPath);
}
public long getInputPathSize() {
return inputPathSize;
}
public void setInputPathSize(long inputSize) {
this.inputPathSize = inputSize;
}
public void setInputPathSize(String inputPath) throws IOException {
long fileSize = 0;
for (FileStatus fileStatus : this.fsArray) {
if (fileStatus.getPath().getName().startsWith("_") || fileStatus.getPath().getName().startsWith(".")) {
continue;
}
fileSize = fileSize + this.fs.getContentSummary(fileStatus.getPath()).getSpaceConsumed();
}
this.inputPathSize = (long) (fileSize * this.inputCompressionRatio);
}
public void setInputPathSize(String inputPath, String inputCompression, String inputSerialization) throws IOException {
long fileSize = 0;
this.setInputCompressionRatio(inputCompression, inputSerialization);
for (FileStatus fileStatus : this.fsArray) {
if (fileStatus.getPath().getName().startsWith("_") || fileStatus.getPath().getName().startsWith(".")) {
continue;
}
fileSize = fileSize + this.fs.getContentSummary(fileStatus.getPath()).getSpaceConsumed();
}
this.inputPathSize = (long) (fileSize * this.inputCompressionRatio);
}
public int getSplitSize() {
return splitSize;
}
public void setSplitSize(int splitSize) {
this.splitSize = splitSize;
}
public void setSplitSize(String outputPath) throws IOException {
this.setOutputBlockSize(outputPath);
double inputPathSizeDouble = (double) this.inputPathSize;
this.splitSize = (int) (Math.floor(((inputPathSizeDouble / this.outputCompressionRatio) / this.outputBlockSize)) + 1.0);
}
public void setSplitSize(String outputPath, long inputPathSize, double outputCompressionRatio) throws IOException {
this.setOutputBlockSize(outputPath);
double inputPathSizeDouble = (double) inputPathSize;
this.splitSize = (int) (Math.floor(((inputPathSizeDouble / outputCompressionRatio) / this.outputBlockSize)) + 1.0);
}
public double getInputCompressionRatio() {
return inputCompressionRatio;
}
public void setInputCompressionRatio(double inputCompressionRatio) {
this.inputCompressionRatio = inputCompressionRatio;
}
public void setInputCompressionRatio(String compressionType, String serializationType) {
this.inputCompressionRatio = this.compressionRatios.get(this.makeKey(serializationType, compressionType));
}
public double getOutputCompressionRatio() {
return outputCompressionRatio;
}
public void setOutputCompressionRatio(double outputCompressionRatio) {
this.outputCompressionRatio = outputCompressionRatio;
}
public void setOutputCompressionRatio(String compressionType, String serializationType) {
this.outputCompressionRatio = this.compressionRatios.get(this.makeKey(serializationType, compressionType));
}
public String getInputPath() {
return inputPath;
}
public void setInputPath(String inputPath) throws IllegalArgumentException, IOException {
this.inputPath = inputPath;
this.fs = new Path(this.inputPath).getFileSystem(conf);
this.fsArray = fs.globStatus(new Path(this.inputPath));
}
public String getInputCompression() {
return inputCompression;
}
public void setInputCompression(Path inputPath) throws IOException {
CompressionCodec fileCodec = null;
if (fsArray[0].isDirectory()) {
fsArray = fs.listStatus(inputPath);
}
for (FileStatus fileStatus : fsArray) {
if (fileStatus.getPath().getName().startsWith("_") || fileStatus.getPath().getName().startsWith(".")) {
continue;
}
this.inputCompressionPath = (Path) null;
this.getInputSerialization(fileStatus.getPath().toString());
fileCodec = codecFactory.getCodec(this.inputCompressionPath);
if (fileCodec != null) {
this.inputCompression = this.compressionTypes.get(codecFactory.getCodec(this.inputCompressionPath));
break;
}
}
}
public void setInputCompression(String inputCompression) {
this.inputCompression = inputCompression.toLowerCase();
}
public void setInputCompression(CompressionCodec inputCompression) {
this.inputCompression = compressionTypes.get(inputCompression);
}
public String getInputSerialization() {
return inputSerialization;
}
public String getInputSerialization(String serializationPath) {
for (Entry<String, String> serialization : serializationExtensions.entrySet()) {
if (serializationPath.endsWith(serialization.getValue())) {
this.inputSerialization = serialization.getKey();
this.setInputCompressionPath(serializationPath.substring(0, serializationPath.length() - serialization.getValue().length()));
break;
}
}
if (inputCompressionPath == null) {
this.setInputCompressionPath(serializationPath);
}
return inputSerialization;
}
public void setInputSerialization(String inputSerialization) {
this.inputSerialization = inputSerialization.toLowerCase();
}
public String getOutputPath() {
return outputPath;
}
public void setOutputPath(String outputPath) {
this.outputPath = outputPath;
}
public String getOutputCompression() {
return outputCompression;
}
public void setOutputCompression(String outputCompression) {
this.outputCompression = outputCompression.toLowerCase();
}
public String getOutputSerialization() {
return outputSerialization;
}
public void setOutputSerialization(String outputSerialization) {
this.outputSerialization = outputSerialization.toLowerCase();
}
public Path getInputCompressionPath() {
return inputCompressionPath;
}
public void setInputCompressionPath(String inputCompressionPath) {
this.inputCompressionPath = new Path(inputCompressionPath);
}
public void setInputCompressionPath(Path inputCompressionPath) {
this.inputCompressionPath = inputCompressionPath;
}
public double getOutputBlockSize() {
return outputBlockSize;
}
public void setOutputBlockSize(double outputBlockSize) {
this.outputBlockSize = outputBlockSize;
}
public void setOutputBlockSize(Path outputPath) {
this.outputBlockSize = new Long(fs.getDefaultBlockSize(outputPath)).doubleValue();
}
public void setOutputBlockSize(String outputPath) {
this.outputBlockSize = new Long(fs.getDefaultBlockSize(new Path(outputPath))).doubleValue();
}
}