Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
2aae2cb
TemplateTap had been deprecated for a while, dropped now. Drop Templa…
Oct 21, 2015
873c650
a few .gitignores (test data)
Oct 21, 2015
69b8dec
Begin attempting the jump to Cascading 3.0.2 !
Oct 21, 2015
28600d1
One property disappeared, got renamed. Follow suit + @deprecated
Oct 21, 2015
97463cc
Tap-n-Scheme generic definition slightly changed upstream, follow.
Oct 21, 2015
7552303
CascadingStats is now CascadingStats[_]
Oct 21, 2015
f8b5125
FlowStep generic definition changed as well
Oct 21, 2015
9282448
(possibly reckless) attempt at telling Scala which macro interface we…
Oct 21, 2015
f37cc26
Revert "(possibly reckless) attempt at telling Scala which macro inte…
Oct 23, 2015
9dc3c23
Cascading 2→3 generic API changes
Oct 23, 2015
8a68c5c
TemplateSource had to go, so should the test
Oct 23, 2015
24f4fbe
Can't reference changed jgrapht internals anymore
Oct 23, 2015
9275149
steps no longer have numbers, they have an ID
Oct 23, 2015
b3e1adc
(not sure about 2.10) scala complains about "useless" @transient decl…
Oct 23, 2015
5301f1c
Adjust to some Cascading API changes (getGraph→getElementGraph)
Oct 23, 2015
96c29a1
actually use the setMapSideAggregationThreshold→Capacity terminology …
Oct 23, 2015
24ddd8c
reference cascading3-enabled serialization libraries (all pull reques…
Oct 23, 2015
b8723e3
(JobConf)→(_ <: Configuration)
Oct 26, 2015
b08811d
Update some expected error messages coming from Cascading 3.0
Oct 26, 2015
d614c35
Reinstate scala 2.10.5 cross-build
Oct 26, 2015
fa3cfa8
Record the Elephant-bird snapshot's pull request
Oct 26, 2015
af482f6
Updating some upstream deps versions + exposing prebuilt dependencies…
Dec 7, 2015
be20c6e
Following advice from cwensel, depending on altered groupId/artifactI…
Dec 8, 2015
b29be5a
temporary groupId, yes, but correct temporary groupId
Dec 8, 2015
a351589
(rollbacking the temp.cchepelov.* groupId namespace, causes too much …
Dec 8, 2015
c63ed49
Bumping source-compatibility to Java 8 was as bad an idea as ninja-dr…
Dec 8, 2015
cd82ac8
(wip) add travis_wait calls to dodge 10-minute timeouts
Dec 8, 2015
1dad073
(wut? travis_wait is a bash function it seems)
Dec 8, 2015
8dc8ebe
Revert "(wut? travis_wait is a bash function it seems)"
Dec 8, 2015
9c4c252
Revert "(wip) add travis_wait calls to dodge 10-minute timeouts"
Dec 8, 2015
697b401
updating to latest released version of Cascading 3.0.x
Dec 8, 2015
e05c248
new InvalidSourceTap also needs Tap interface changes
Jan 7, 2016
45ade96
new: ConfigBridge, in order to deal with the various [Config] types F…
Jan 13, 2016
835cf37
cascading 3.0: Flow[JobConf] → Flow[_] where _ can be JobConf, Config…
Jan 13, 2016
ef28440
Adjust how Pipe description texts are carried + update tests
Jan 13, 2016
6f0f5ee
Adjust how the numReducers property is accessed (cascading 3.0)
Jan 13, 2016
caf4f5f
cascading 3.0 flow step id is now alphanumeric, not numeric
Jan 13, 2016
8e850ec
update to currently locally-built cascading WIP. FIXME ASAP: use WIP …
Jan 13, 2016
537240e
Cheat a bit on the dfs-datastores VersionedKeyValSourceTest
Jan 14, 2016
408f2e8
Reductor estimate test failed, as Hadoop2.6 understands the legacy pr…
Jan 14, 2016
68356d6
Adjusting dependencies (new locally-built cascading-3.1-wip-dev)
cchepelov Jan 15, 2016
338117e
per @posco's suggestion, FlowStep[_ <: Any] not FlowStep[_]
cchepelov Jan 15, 2016
24dfb7c
RFC: merge of grouped++streamed no longer supported by Cascading 3.0.…
cchepelov Jan 18, 2016
16c4ff8
Using cascading-3.1-wip-47
cchepelov Jan 18, 2016
214c59d
Adjusting expected test line numbers (yearning for C++'s good old __L…
Jan 18, 2016
15047dc
bump Hadoop to 2.7.1, pulled in by Cascading anyway
Jan 18, 2016
a7602d5
(WIP: opening up hadoop2-mr1 and tez tests)
Jan 18, 2016
b752423
(WIP; fixing some compile errors)
cchepelov Jan 18, 2016
f100094
Disable tests which will not work in Tez
Jan 19, 2016
62de186
Revisit fabric-switching logic to also work in Mode#openForRead (requ…
Jan 19, 2016
8eec3bf
The Tez fabric can't determine whether Maple tap/inputformats content…
Jan 19, 2016
c39c6b3
PlatformTest.IterableSource test: remove incorrect implicit ordering …
Jan 19, 2016
fd4d7f4
Ensure at least SOME test also run on Tez
Jan 19, 2016
e861b08
Reduce the amount of logging noise from Tez during tests
Jan 19, 2016
734c282
Add cascading-hadoop back to the assembly used by REPL
Jan 19, 2016
4189f69
tracking Cascading 3.1-wip-48
Jan 20, 2016
9dcbd7f
more test output into .gitignore
Jan 20, 2016
b228b73
(-commons: post-merge with upstream/develop, lost a "provided" dep on…
Jan 21, 2016
a691774
re-C3'ing the VersionedTap/KeyValueByteScheme that were absorbed from…
Jan 21, 2016
eb7546f
(follow with Cascading, 3.1-wip-52 + tez 0.8.2; and with Scalding/dev…
Jan 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ tutorial/data/rightDiff.tsv
tutorial/data/tmp3.tsv
tutorial/data/jsonoutput0.tsv
tutorial/data/avrooutput0.avro
tutorial/data/execution_output.txt
.scalding_repl
scalding-hadoop-test/NOTICE
NOTICE
*~
build/
.staging
16 changes: 16 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ matrix:
env: BUILD="base" TEST_TARGET="scalding-hadoop-test"
script: "scripts/run_test.sh"

- scala: 2.10.6
env: BUILD="base" TEST_TARGET="scalding-hadoop2-mr1-test"
script: "scripts/run_test.sh"

- scala: 2.11.7
env: BUILD="base" TEST_TARGET="scalding-hadoop2-mr1-test"
script: "scripts/run_test.sh"

- scala: 2.10.6
env: BUILD="base" TEST_TARGET="scalding-hadoop2-tez-test"
script: "scripts/run_test.sh"

- scala: 2.11.7
env: BUILD="base" TEST_TARGET="scalding-hadoop2-tez-test"
script: "scripts/run_test.sh"

- scala: 2.10.6
env: BUILD="base" TEST_TARGET="scalding-serialization"
script: "scripts/run_test.sh"
Expand Down
31 changes: 16 additions & 15 deletions maple/src/main/java/com/etsy/cascading/tap/local/LocalTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Properties;
import java.util.logging.Logger;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
Expand Down Expand Up @@ -45,18 +46,18 @@ public class LocalTap<SourceCtx, SinkCtx> extends Tap<Properties, RecordReader,
private JobConf defaults;
private Lfs lfs;

public LocalTap(String path, Scheme<JobConf, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme,
public LocalTap(String path, Scheme<Configuration, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme,
SinkMode sinkMode) {
super(new LocalScheme<SourceCtx, SinkCtx>(scheme), sinkMode);
setup(path, scheme);
}

public LocalTap(String path, Scheme<JobConf, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme) {
public LocalTap(String path, Scheme<Configuration, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme) {
super(new LocalScheme<SourceCtx, SinkCtx>(scheme));
setup(path, scheme);
}

private void setup(String path, Scheme<JobConf, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme) {
private void setup(String path, Scheme<Configuration, RecordReader, OutputCollector, SourceCtx, SinkCtx> scheme) {
this.path = path;

/*
Expand Down Expand Up @@ -90,13 +91,13 @@ public String getIdentifier() {
}

@Override
public TupleEntryIterator openForRead(FlowProcess<Properties> flowProcess, RecordReader input) throws IOException {
public TupleEntryIterator openForRead(FlowProcess<? extends Properties> flowProcess, RecordReader input) throws IOException {
JobConf jobConf = mergeDefaults("LocalTap#openForRead", flowProcess.getConfigCopy(), defaults);
return lfs.openForRead(new HadoopFlowProcess(jobConf));
}

@Override
public TupleEntryCollector openForWrite(FlowProcess<Properties> flowProcess, OutputCollector output)
public TupleEntryCollector openForWrite(FlowProcess<? extends Properties> flowProcess, OutputCollector output)
throws IOException {
JobConf jobConf = mergeDefaults("LocalTap#openForWrite", flowProcess.getConfigCopy(), defaults);
return lfs.openForWrite(new HadoopFlowProcess(jobConf));
Expand Down Expand Up @@ -141,11 +142,11 @@ private static class LocalScheme<SourceContext, SinkContext> extends
Scheme<Properties, RecordReader, OutputCollector, SourceContext, SinkContext> {
private static final long serialVersionUID = 5710119342340369543L;

private Scheme<JobConf, RecordReader, OutputCollector, SourceContext, SinkContext> scheme;
private Scheme<Configuration, RecordReader, OutputCollector, SourceContext, SinkContext> scheme;
private JobConf defaults;
private Lfs lfs;

public LocalScheme(Scheme<JobConf, RecordReader, OutputCollector, SourceContext, SinkContext> scheme) {
public LocalScheme(Scheme<Configuration, RecordReader, OutputCollector, SourceContext, SinkContext> scheme) {
super(scheme.getSourceFields(), scheme.getSinkFields());
this.scheme = scheme;
}
Expand All @@ -159,53 +160,53 @@ private void setLfs(Lfs lfs) {
}

@Override
public Fields retrieveSourceFields(FlowProcess<Properties> flowProcess,
public Fields retrieveSourceFields(FlowProcess<? extends Properties> flowProcess,
Tap tap) {
return scheme.retrieveSourceFields(new HadoopFlowProcess(defaults), lfs);
}

@Override
public void presentSourceFields(FlowProcess<Properties> flowProcess,
public void presentSourceFields(FlowProcess<? extends Properties> flowProcess,
Tap tap, Fields fields) {
scheme.presentSourceFields(new HadoopFlowProcess(defaults), lfs, fields);
}

@Override
public void sourceConfInit(FlowProcess<Properties> flowProcess,
public void sourceConfInit(FlowProcess<? extends Properties> flowProcess,
Tap<Properties, RecordReader, OutputCollector> tap, Properties conf) {
JobConf jobConf = mergeDefaults("LocalScheme#sourceConfInit", conf, defaults);
scheme.sourceConfInit(new HadoopFlowProcess(jobConf), lfs, jobConf);
overwriteProperties(conf, jobConf);
}

@Override
public Fields retrieveSinkFields(FlowProcess<Properties> flowProcess,
public Fields retrieveSinkFields(FlowProcess<? extends Properties> flowProcess,
Tap tap) {
return scheme.retrieveSinkFields(new HadoopFlowProcess(defaults), lfs);
}

@Override
public void presentSinkFields(FlowProcess<Properties> flowProcess,
public void presentSinkFields(FlowProcess<? extends Properties> flowProcess,
Tap tap, Fields fields) {
scheme.presentSinkFields(new HadoopFlowProcess(defaults), lfs, fields);
}

@Override
public void sinkConfInit(FlowProcess<Properties> flowProcess,
public void sinkConfInit(FlowProcess<? extends Properties> flowProcess,
Tap<Properties, RecordReader, OutputCollector> tap, Properties conf) {
JobConf jobConf = mergeDefaults("LocalScheme#sinkConfInit", conf, defaults);
scheme.sinkConfInit(new HadoopFlowProcess(jobConf), lfs, jobConf);
overwriteProperties(conf, jobConf);
}

@Override
public boolean source(FlowProcess<Properties> flowProcess, SourceCall<SourceContext, RecordReader> sourceCall)
public boolean source(FlowProcess<? extends Properties> flowProcess, SourceCall<SourceContext, RecordReader> sourceCall)
throws IOException {
throw new RuntimeException("LocalTap#source is never called");
}

@Override
public void sink(FlowProcess<Properties> flowProcess, SinkCall<SinkContext, OutputCollector> sinkCall)
public void sink(FlowProcess<? extends Properties> flowProcess, SinkCall<SinkContext, OutputCollector> sinkCall)
throws IOException {
throw new RuntimeException("LocalTap#sink is never called");
}
Expand Down
12 changes: 6 additions & 6 deletions maple/src/main/java/com/twitter/maple/hbase/HBaseScheme.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public String[] getFamilyNames() {
}

@Override
public void sourcePrepare(FlowProcess<JobConf> flowProcess,
public void sourcePrepare(FlowProcess<? extends JobConf> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) {
Object[] pair =
new Object[]{sourceCall.getInput().createKey(), sourceCall.getInput().createValue()};
Expand All @@ -163,13 +163,13 @@ public void sourcePrepare(FlowProcess<JobConf> flowProcess,
}

@Override
public void sourceCleanup(FlowProcess<JobConf> flowProcess,
public void sourceCleanup(FlowProcess<? extends JobConf> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) {
sourceCall.setContext(null);
}

@Override
public boolean source(FlowProcess<JobConf> flowProcess,
public boolean source(FlowProcess<? extends JobConf> flowProcess,
SourceCall<Object[], RecordReader> sourceCall) throws IOException {
Tuple result = new Tuple();

Expand Down Expand Up @@ -206,7 +206,7 @@ public boolean source(FlowProcess<JobConf> flowProcess,
}

@Override
public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall)
public void sink(FlowProcess<? extends JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall)
throws IOException {
TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
OutputCollector outputCollector = sinkCall.getOutput();
Expand All @@ -231,7 +231,7 @@ public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputColl
}

@Override
public void sinkConfInit(FlowProcess<JobConf> process,
public void sinkConfInit(FlowProcess<? extends JobConf> process,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
conf.setOutputFormat(TableOutputFormat.class);

Expand All @@ -240,7 +240,7 @@ public void sinkConfInit(FlowProcess<JobConf> process,
}

@Override
public void sourceConfInit(FlowProcess<JobConf> process,
public void sourceConfInit(FlowProcess<? extends JobConf> process,
Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
conf.setInputFormat(TableInputFormat.class);

Expand Down
8 changes: 4 additions & 4 deletions maple/src/main/java/com/twitter/maple/hbase/HBaseTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ protected HBaseAdmin getHBaseAdmin(JobConf conf) throws MasterNotRunningExceptio
}

@Override
public void sinkConfInit(FlowProcess<JobConf> process, JobConf conf) {
public void sinkConfInit(FlowProcess<? extends JobConf> process, JobConf conf) {
if(quorumNames != null) {
conf.set("hbase.zookeeper.quorum", quorumNames);
}
Expand Down Expand Up @@ -178,12 +178,12 @@ public String getIdentifier() {
}

@Override
public TupleEntryIterator openForRead(FlowProcess<JobConf> jobConfFlowProcess, RecordReader recordReader) throws IOException {
public TupleEntryIterator openForRead(FlowProcess<? extends JobConf> jobConfFlowProcess, RecordReader recordReader) throws IOException {
return new HadoopTupleEntrySchemeIterator(jobConfFlowProcess, this, recordReader);
}

@Override
public TupleEntryCollector openForWrite(FlowProcess<JobConf> jobConfFlowProcess, OutputCollector outputCollector) throws IOException {
public TupleEntryCollector openForWrite(FlowProcess<? extends JobConf> jobConfFlowProcess, OutputCollector outputCollector) throws IOException {
HBaseTapCollector hBaseCollector = new HBaseTapCollector( jobConfFlowProcess, this );
hBaseCollector.prepare();
return hBaseCollector;
Expand Down Expand Up @@ -230,7 +230,7 @@ public long getModifiedTime(JobConf jobConf) throws IOException {
}

@Override
public void sourceConfInit(FlowProcess<JobConf> process, JobConf conf) {
public void sourceConfInit(FlowProcess<? extends JobConf> process, JobConf conf) {
// a hack for MultiInputFormat to see that there is a child format
FileInputFormat.setInputPaths( conf, getPath() );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class HBaseTapCollector extends TupleEntrySchemeCollector<JobConf, TupleE
/** Field writer */
private RecordWriter writer;
/** Field flowProcess */
private final FlowProcess<JobConf> hadoopFlowProcess;
private final FlowProcess<? extends JobConf> hadoopFlowProcess;
/** Field tap */
private final Tap<JobConf, RecordReader, OutputCollector> tap;
/** Field reporter */
Expand All @@ -58,7 +58,7 @@ public class HBaseTapCollector extends TupleEntrySchemeCollector<JobConf, TupleE
* @throws IOException
* when fails to initialize
*/
public HBaseTapCollector(FlowProcess<JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap) throws IOException {
public HBaseTapCollector(FlowProcess<? extends JobConf> flowProcess, Tap<JobConf, RecordReader, OutputCollector> tap) throws IOException {
super(flowProcess, tap.getScheme());
this.hadoopFlowProcess = flowProcess;
this.tap = tap;
Expand Down
3 changes: 2 additions & 1 deletion maple/src/main/java/com/twitter/maple/tap/MemorySinkTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;

import java.io.File;
Expand Down Expand Up @@ -43,7 +44,7 @@ public static String getTempDir() {
}

@Override
public boolean commitResource(JobConf conf) throws java.io.IOException {
public boolean commitResource(Configuration conf) throws java.io.IOException {
TupleEntryIterator it = new HadoopFlowProcess(conf).openTapForRead(this);

boolean first_time = true;
Expand Down
14 changes: 7 additions & 7 deletions maple/src/main/java/com/twitter/maple/tap/MemorySourceTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ public List<Tuple> getTuples() {
}

@Override
public void sourceConfInit(FlowProcess<JobConf> flowProcess,
public void sourceConfInit(FlowProcess<? extends JobConf> flowProcess,
Tap<JobConf, RecordReader<TupleWrapper, NullWritable>, Void> tap, JobConf conf) {
FileInputFormat.setInputPaths(conf, this.id);
conf.setInputFormat(TupleMemoryInputFormat.class);
TupleMemoryInputFormat.storeTuples(conf, TupleMemoryInputFormat.TUPLES_PROPERTY, this.tuples);
}

@Override
public void sinkConfInit(FlowProcess<JobConf> flowProcess,
public void sinkConfInit(FlowProcess<? extends JobConf> flowProcess,
Tap<JobConf, RecordReader<TupleWrapper, NullWritable>, Void> tap, JobConf conf) {
throw new UnsupportedOperationException("Not supported yet.");
}

@Override
public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[],
public void sourcePrepare( FlowProcess<? extends JobConf> flowProcess, SourceCall<Object[],
RecordReader<TupleWrapper, NullWritable>> sourceCall ) {
sourceCall.setContext( new Object[ 2 ] );

Expand All @@ -69,7 +69,7 @@ public void sourcePrepare( FlowProcess<JobConf> flowProcess, SourceCall<Object[]
}

@Override
public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[],
public boolean source(FlowProcess<? extends JobConf> flowProcess, SourceCall<Object[],
RecordReader<TupleWrapper, NullWritable>> sourceCall) throws IOException {
TupleWrapper key = (TupleWrapper) sourceCall.getContext()[ 0 ];
NullWritable value = (NullWritable) sourceCall.getContext()[ 1 ];
Expand All @@ -84,13 +84,13 @@ public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[],
}

@Override
public void sourceCleanup( FlowProcess<JobConf> flowProcess, SourceCall<Object[],
public void sourceCleanup( FlowProcess<? extends JobConf> flowProcess, SourceCall<Object[],
RecordReader<TupleWrapper, NullWritable>> sourceCall ) {
sourceCall.setContext( null );
}

@Override
public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Void, Void> sinkCall ) throws IOException {
public void sink(FlowProcess<? extends JobConf> flowProcess, SinkCall<Void, Void> sinkCall ) throws IOException {
throw new UnsupportedOperationException("Not supported.");
}

Expand Down Expand Up @@ -127,7 +127,7 @@ public boolean equals(Object object) {
}

@Override
public TupleEntryIterator openForRead( FlowProcess<JobConf> flowProcess, RecordReader<TupleWrapper,
public TupleEntryIterator openForRead( FlowProcess<? extends JobConf> flowProcess, RecordReader<TupleWrapper,
NullWritable> input ) throws IOException {
// input may be null when this method is called on the client side or cluster side when accumulating
// for a HashJoin
Expand Down
3 changes: 2 additions & 1 deletion maple/src/main/java/com/twitter/maple/tap/StdoutTap.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import cascading.tap.hadoop.Lfs;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntryIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;

import java.io.File;
Expand Down Expand Up @@ -32,7 +33,7 @@ public static String getTempDir() {
}

@Override
public boolean commitResource(JobConf conf) throws java.io.IOException {
public boolean commitResource(Configuration conf) throws java.io.IOException {
TupleEntryIterator it = new HadoopFlowProcess(conf).openTapForRead(this);
System.out.println("");
System.out.println("");
Expand Down
Loading