Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
</scm>
<properties>
<titan.compatible.versions />
<tinkerpop.version>3.2.0-incubating</tinkerpop.version>
<tinkerpop.version>3.2.3</tinkerpop.version>
<junit.version>4.12</junit.version>
<mrunit.version>1.1.0</mrunit.version>
<cassandra.version>2.1.9</cassandra.version>
Expand Down Expand Up @@ -275,6 +275,9 @@
<exclude>**/*</exclude>
</excludes> -->
<skipTests>${test.skip.tp}</skipTests>
<systemPropertyVariables>
<build.dir>${project.build.directory}</build.dir>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public class BerkeleyGraphComputerProvider extends AbstractTitanGraphComputerPro

@Override
public ModifiableConfiguration getTitanConfiguration(String graphName, Class<?> test, String testMethodName) {
ModifiableConfiguration config = BerkeleyStorageSetup.getBerkeleyJEConfiguration(StorageSetup.getHomeDir(graphName));
ModifiableConfiguration config = super.getTitanConfiguration(graphName, test, testMethodName);
config.setAll(BerkeleyStorageSetup.getBerkeleyJEConfiguration(StorageSetup.getHomeDir(graphName)).getAll());
config.set(GraphDatabaseConfiguration.IDAUTHORITY_WAIT, Duration.ofMillis(20));
config.set(GraphDatabaseConfiguration.STORAGE_TRANSACTIONAL,false);
return config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.thinkaurelius.titan.CassandraStorageSetup;
import com.thinkaurelius.titan.blueprints.AbstractTitanGraphComputerProvider;
import com.thinkaurelius.titan.blueprints.AbstractTitanGraphProvider;
import com.thinkaurelius.titan.diskstorage.configuration.ModifiableConfiguration;
import com.thinkaurelius.titan.graphdb.olap.computer.FulgoraGraphComputer;
import org.apache.tinkerpop.gremlin.GraphProvider;
Expand All @@ -16,7 +15,9 @@ public class ThriftGraphComputerProvider extends AbstractTitanGraphComputerProvi
@Override
public ModifiableConfiguration getTitanConfiguration(String graphName, Class<?> test, String testMethodName) {
CassandraStorageSetup.startCleanEmbedded();
return CassandraStorageSetup.getCassandraThriftConfiguration(graphName);
ModifiableConfiguration config = super.getTitanConfiguration(graphName, test, testMethodName);
config.setAll(CassandraStorageSetup.getCassandraThriftConfiguration(graphName).getAll());
return config;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@
test = "org.apache.tinkerpop.gremlin.process.computer.GraphComputerTest",
method = "shouldProcessResultGraphNewWithPersistVertexProperties",
reason = "The result graph should return an empty iterator when vertex.edges() or vertex.vertices() is called.")
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.structure.io.IoTest$GraphMLTest",
method = "shouldReadGraphMLWithNoEdgeLabels",
reason = "Titan does not support default edge label (edge) used when GraphML is missing edge labels.")
@Graph.OptOut(
test = "org.apache.tinkerpop.gremlin.process.computer.GraphComputerTest",
method = "shouldSupportGraphFilter",
reason = "Titan currently does not support graph filters but does not throw proper exception because doing so breaks numerous tests in gremlin-test ProcessComputerSuite.")
public interface TitanGraph extends TitanGraphTransaction {

/* ---------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.thinkaurelius.titan.diskstorage.keycolumnvalue.cache.KCVSCache;
import com.thinkaurelius.titan.diskstorage.log.kcvs.ExternalCachePersistor;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -425,7 +426,14 @@ public String toString() {


private final <V> V executeRead(Callable<V> exe) throws TitanException {
return BackendOperation.execute(exe, maxReadTime);
try {
return BackendOperation.execute(exe, maxReadTime);
} catch (TitanException e) {
// support traversal interruption
// TODO: Refactor to allow direct propagation of underlying interrupt exception
if (Thread.interrupted()) throw new TraversalInterruptedException();
throw e;
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public static final<V> V executeDirect(Callable<V> exe, Duration totalWaitTime)
try {
Thread.sleep(waitTime.toMillis());
} catch (InterruptedException r) {
// added thread interrupt signal to support traversal interruption
Thread.currentThread().interrupt();
throw new PermanentBackendException("Interrupted while waiting to retry failed backend operation", r);
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.thinkaurelius.titan.graphdb.types.TypeDefinitionCategory;
import com.thinkaurelius.titan.graphdb.types.TypeDefinitionDescription;

import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -119,6 +120,8 @@ public StandardSerializer() {
registerClassInternal(64,Duration.class, new DurationSerializer());
registerClassInternal(65,Instant.class, new InstantSerializer());
registerClassInternal(66,StandardTransactionId.class, new StandardTransactionIdSerializer());
registerClassInternal(67,TraverserSet.class, new SerializableSerializer());
registerClassInternal(68,HashMap.class, new SerializableSerializer());

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.thinkaurelius.titan.graphdb.database.serialize.attribute;

import com.thinkaurelius.titan.core.attribute.AttributeSerializer;
import com.thinkaurelius.titan.diskstorage.ScanBuffer;
import com.thinkaurelius.titan.diskstorage.WriteBuffer;
import com.thinkaurelius.titan.graphdb.database.serialize.DataOutput;
import com.thinkaurelius.titan.graphdb.database.serialize.Serializer;
import com.thinkaurelius.titan.graphdb.database.serialize.SerializerInjected;
import org.apache.commons.lang3.SerializationUtils;

import java.io.Serializable;
import java.util.HashMap;

public class SerializableSerializer<T extends Serializable> implements AttributeSerializer<T>, SerializerInjected {

private Serializer serializer;

@Override
public T read(ScanBuffer buffer) {
byte[] data = serializer.readObjectNotNull(buffer,byte[].class);
return (T) SerializationUtils.deserialize(data);
}

@Override
public void write(WriteBuffer buffer, T attribute) {
DataOutput out = (DataOutput) buffer;
out.writeObjectNotNull(SerializationUtils.serialize(attribute));
}

@Override
public void setSerializer(Serializer serializer) {
this.serializer = serializer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ protected VertexJobConverter(TitanGraph graph, VertexScanJob job) {
}

protected VertexJobConverter(VertexJobConverter copy) {
this.graph = new GraphProvider();
if (copy.graph.isProvided()) this.graph.setGraph(copy.graph.get());
this.graph = copy.graph;
this.job = copy.job.clone();
this.tx = copy.tx;
this.idManager = copy.idManager;
}

public static ScanJob convert(TitanGraph graph, VertexScanJob vertexJob) {
Expand All @@ -82,18 +83,22 @@ public static StandardTitanTx startTransaction(StandardTitanGraph graph) {

@Override
public void workerIterationStart(Configuration jobConfig, Configuration graphConfig, ScanMetrics metrics) {
graph.initializeGraph(graphConfig);
idManager = graph.get().getIDManager();
try {
tx = startTransaction(graph.get());
open(graphConfig);
job.workerIterationStart(graph.get(), jobConfig, metrics);
} catch (Throwable e) {
close();
throw e;
}
}

private void close() {
protected void open(Configuration graphConfig) {
graph.initializeGraph(graphConfig);
idManager = graph.get().getIDManager();
tx = startTransaction(graph.get());
}

protected void close() {
if (null != tx && tx.isOpen())
tx.rollback();
graph.close();
Expand Down
Loading