Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,24 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.*;

public class LocalOutgoingChunkedConnector implements OutgoingChunkedConnector {

private String resourcePath;

private boolean dmaFlag;

private int buffLen;

private static final Logger logger = LoggerFactory.getLogger(LocalOutgoingChunkedConnector.class);


@Override
public void init(ConnectorConfig connectorConfig) throws Exception {
this.resourcePath = connectorConfig.getResourcePath();
this.dmaFlag = connectorConfig.getBooleanTransportProperty(ConnectorConfig.LocalConfigs.DMA_ENABLED, false);
this.buffLen = connectorConfig.getIntTransportProperty(ConnectorConfig.LocalConfigs.BUFF_LEN, 16 * 1024 * 1024);
}

@Override
Expand All @@ -52,55 +55,60 @@ public void failed() throws Exception {
@Override
public void uploadChunk(int chunkId, long startByte, long endByte, String uploadFile) throws Exception {

FileInputStream from = new FileInputStream(new File(uploadFile));
FileOutputStream to = new FileOutputStream(new File(this.resourcePath));

final int buffLen = 1024;

byte[] buf = new byte[buffLen];

from.skip(startByte);

long fileSize = endByte - startByte + 1;

while (true) {
int bufSize = 0;

if (buffLen < fileSize) {
bufSize = buffLen;
} else {
bufSize = (int) fileSize;
logger.info("Uploading chunk {} with start byte {} and end byte {} to file {} from upload file {}",
chunkId, startByte, endByte, this.resourcePath, uploadFile);

if (dmaFlag) {
try {
FileInputStream from = new FileInputStream(uploadFile);
RandomAccessFile file = new RandomAccessFile(this.resourcePath, "rw");
file.seek(startByte);
FileOutputStream to = new FileOutputStream(file.getFD());
from.getChannel().transferTo(0, endByte - startByte, to.getChannel());
file.close();
from.close();
to.close();
} catch (Exception e) {
logger.error("Unexpected error occurred while uploading chunk {} to file {} from upload file {}",
chunkId, this.resourcePath, uploadFile, e);
throw e;
}

bufSize = (int) from.read(buf, 0, bufSize);

if (bufSize < 0) {
break;
}

to.write(buf, 0, bufSize);
to.flush();

fileSize -= bufSize;

if (fileSize == 0L) {
break;
} else {
try {
RandomAccessFile file = new RandomAccessFile(this.resourcePath, "rw");
file.seek(startByte);
BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(file.getFD()));
BufferedInputStream bis = new BufferedInputStream(new FileInputStream(uploadFile), buffLen);
byte[] buffer = new byte[buffLen];
int write = 0;
long totalWritten = 0l;
while ((write = bis.read(buffer, 0, Math.min(buffLen, (int) (endByte - totalWritten)))) > 0) {
bos.write(buffer, (int) 0, write);
totalWritten += write;
}
bis.close();
bos.close();
file.close();
} catch (Exception e) {
logger.error("Unexpected error occurred while uploading chunk {} to file {} from upload file {}",
chunkId, this.resourcePath, uploadFile, e);
throw e;
}
}

from.close();
to.close();
}

@Override
public void uploadChunk(int chunkId, long startByte, long endByte, InputStream inputStream) throws Exception {

logger.info("Uploading chunk {} with start byte {} and end byte {} to file {} from inputStream {}",
chunkId, startByte, endByte, this.resourcePath, "test");

FileOutputStream outputStream = new FileOutputStream(new File(this.resourcePath));
RandomAccessFile file = new RandomAccessFile(this.resourcePath, "rw");
file.seek(startByte);
FileOutputStream outputStream = new FileOutputStream(file.getFD());

byte[] buffer = new byte[1024];
byte[] buffer = new byte[buffLen];
int bytesRead;
inputStream.skip(startByte);
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
Expand Down