Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,6 @@ public TSStatus visitPipeEnrichedDeleteDataNode(

@Override
public TSStatus visitWriteObjectFile(ObjectNode node, DataRegion dataRegion) {
try {
dataRegion.writeObject(node);
dataRegion.insertSeparatorToWAL();
return StatusUtils.OK;
} catch (final Exception e) {
LOGGER.error("Error in executing plan node: {}", node, e);
return RpcUtils.getStatus(TSStatusCode.OBJECT_INSERT_ERROR.getStatusCode(), e.getMessage());
}
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,10 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.storageengine.dataregion.IObjectPath;
import org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataInputStream;
import java.io.IOException;
Expand All @@ -45,8 +41,6 @@
import java.util.List;
import java.util.Map;

import static org.apache.iotdb.db.utils.ObjectTypeUtils.generateObjectBinary;

public class RelationalInsertRowsNode extends InsertRowsNode {
// deviceId cache for Table-view insertion
private IDeviceID[] deviceIDs;
Expand Down Expand Up @@ -181,8 +175,6 @@ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
insertRowNode.getDeviceID(),
TimePartitionUtils.getTimePartitionSlot(insertRowNode.getTime()),
analysis.getDatabaseName());
// handle object type
handleObjectValue(insertRowNode, dataRegionReplicaSet, writePlanNodeList);

// Collect redirectInfo
redirectInfo.add(dataRegionReplicaSet.getDataNodeLocations().get(0).getClientRpcEndPoint());
Expand All @@ -202,41 +194,6 @@ public List<WritePlanNode> splitByPartition(IAnalysis analysis) {
return writePlanNodeList;
}

private void handleObjectValue(
InsertRowNode insertRowNode,
TRegionReplicaSet dataRegionReplicaSet,
List<WritePlanNode> writePlanNodeList) {
for (int j = 0; j < insertRowNode.getDataTypes().length; j++) {
if (insertRowNode.getDataTypes()[j] == TSDataType.OBJECT) {
Object[] values = insertRowNode.getValues();
if (values[j] == null) {
continue;
}
byte[] binary = ((Binary) values[j]).getValues();
ByteBuffer buffer = ByteBuffer.wrap(binary);
boolean isEoF = buffer.get() == 1;
long offset = buffer.getLong();
byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining());
IObjectPath relativePath =
IObjectPath.Factory.FACTORY.create(
dataRegionReplicaSet.getRegionId().getId(),
insertRowNode.getTime(),
insertRowNode.getDeviceID(),
insertRowNode.getMeasurements()[j]);
ObjectNode objectNode = new ObjectNode(isEoF, offset, content, relativePath);
objectNode.setDataRegionReplicaSet(dataRegionReplicaSet);
writePlanNodeList.add(objectNode);
if (isEoF) {
((Binary) values[j])
.setValues(generateObjectBinary(offset + content.length, relativePath).getValues());
insertRowNode.setValues(values);
} else {
values[j] = null;
}
}
}
}

public RelationalInsertRowsNode emptyClone() {
return new RelationalInsertRowsNode(this.getPlanNodeId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
import org.apache.iotdb.db.storageengine.dataregion.IObjectPath;
import org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
Expand All @@ -58,8 +57,6 @@
import java.util.Map;
import java.util.Map.Entry;

import static org.apache.iotdb.db.utils.ObjectTypeUtils.generateObjectBinary;

public class RelationalInsertTabletNode extends InsertTabletNode {

// deviceId cache for Table-view insertion
Expand Down Expand Up @@ -402,11 +399,6 @@ protected List<WritePlanNode> doSplit(Map<TRegionReplicaSet, List<Integer>> spli
// Avoid using system arraycopy when there is no need to split
setRange(entry.getValue());
setDataRegionReplicaSet(entry.getKey());
for (int i = 0; i < columns.length; i++) {
if (dataTypes[i] == TSDataType.OBJECT) {
handleObjectValue(i, 0, times.length, entry, result);
}
}
result.add(this);
return result;
}
Expand Down Expand Up @@ -443,9 +435,6 @@ private List<WritePlanNode> generateOneSplitList(
System.arraycopy(times, start, subNode.times, destLoc, length);
for (int i = 0; i < subNode.columns.length; i++) {
if (dataTypes[i] != null) {
if (dataTypes[i] == TSDataType.OBJECT) {
handleObjectValue(i, start, end, entry, result);
}
System.arraycopy(columns[i], start, subNode.columns[i], destLoc, length);
}
if (subNode.bitMaps != null && this.bitMaps[i] != null) {
Expand All @@ -461,46 +450,6 @@ private List<WritePlanNode> generateOneSplitList(
return result;
}

private void handleObjectValue(
int column,
int startRow,
int endRow,
Map.Entry<TRegionReplicaSet, List<Integer>> entry,
List<WritePlanNode> result) {
for (int j = startRow; j < endRow; j++) {
if (((Binary[]) columns[column])[j] == null) {
continue;
}
byte[] binary = ((Binary[]) columns[column])[j].getValues();
if (binary == null || binary.length == 0) {
continue;
}
ByteBuffer buffer = ByteBuffer.wrap(binary);
boolean isEoF = buffer.get() == 1;
long offset = buffer.getLong();
byte[] content = ReadWriteIOUtils.readBytes(buffer, buffer.remaining());
IObjectPath relativePath =
IObjectPath.Factory.FACTORY.create(
entry.getKey().getRegionId().getId(), times[j], getDeviceID(j), measurements[column]);
ObjectNode objectNode = new ObjectNode(isEoF, offset, content, relativePath);
objectNode.setDataRegionReplicaSet(entry.getKey());
result.add(objectNode);
if (isEoF) {
((Binary[]) columns[column])[j] =
generateObjectBinary(offset + content.length, relativePath);
} else {
((Binary[]) columns[column])[j] = null;
if (bitMaps == null) {
bitMaps = new BitMap[columns.length];
}
if (bitMaps[column] == null) {
bitMaps[column] = new BitMap(rowCount);
}
bitMaps[column].mark(j);
}
}
}

@Override
public void checkDataType(AbstractMemTable memTable) throws DataTypeInconsistentException {
if (singleDevice) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.ObjectNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy;
Expand Down Expand Up @@ -162,7 +161,6 @@
import org.apache.iotdb.db.utils.EncryptDBUtils;
import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.db.utils.ObjectTypeUtils;
import org.apache.iotdb.db.utils.ObjectWriter;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
Expand Down Expand Up @@ -196,7 +194,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -3817,59 +3814,6 @@ public int compact() {
}
}

public void writeObject(ObjectNode objectNode) throws Exception {
writeLock("writeObject");
try {
String relativeTmpPathString = objectNode.getFilePathString() + ".tmp";
String objectFileDir = null;
File objectTmpFile = null;
for (String objectDir : TierManager.getInstance().getAllObjectFileFolders()) {
File tmpFile = FSFactoryProducer.getFSFactory().getFile(objectDir, relativeTmpPathString);
if (tmpFile.exists()) {
objectFileDir = objectDir;
objectTmpFile = tmpFile;
break;
}
}
if (objectTmpFile == null) {
objectFileDir = TierManager.getInstance().getNextFolderForObjectFile();
objectTmpFile =
FSFactoryProducer.getFSFactory().getFile(objectFileDir, relativeTmpPathString);
}
try (ObjectWriter writer = new ObjectWriter(objectTmpFile)) {
writer.write(
objectNode.isGeneratedByRemoteConsensusLeader(),
objectNode.getOffset(),
objectNode.getContent());
}
if (objectNode.isEOF()) {
File objectFile =
FSFactoryProducer.getFSFactory().getFile(objectFileDir, objectNode.getFilePathString());
if (objectFile.exists()) {
String relativeBackPathString = objectNode.getFilePathString() + ".back";
File objectBackFile =
FSFactoryProducer.getFSFactory().getFile(objectFileDir, relativeBackPathString);
Files.move(
objectFile.toPath(), objectBackFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
Files.move(
objectTmpFile.toPath(), objectFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
FileMetrics.getInstance().decreaseObjectFileNum(1);
FileMetrics.getInstance().decreaseObjectFileSize(objectBackFile.length());
Files.delete(objectBackFile.toPath());
} else {
Files.move(
objectTmpFile.toPath(), objectFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
FileMetrics.getInstance().increaseObjectFileNum(1);
FileMetrics.getInstance().increaseObjectFileSize(objectFile.length());
}
getWALNode()
.ifPresent(walNode -> walNode.log(TsFileProcessor.MEMTABLE_NOT_EXIST, objectNode));
} finally {
writeUnlock();
}
}

/**
* Load a new tsfile to unsequence dir.
*
Expand Down