in stack/services/src/main/java/org/apache/usergrid/services/assets/data/AWSBinaryStore.java [122:289]
public void write( final UUID appId, final Entity entity, InputStream inputStream ) throws Exception {
String uploadFileName = AssetUtils.buildAssetKey( appId, entity );
ByteArrayOutputStream baos = new ByteArrayOutputStream();
long written = IOUtils.copyLarge( inputStream, baos, 0, FIVE_MB );
byte[] data = baos.toByteArray();
InputStream awsInputStream = new ByteArrayInputStream(data);
final Map<String, Object> fileMetadata = AssetUtils.getFileMetadata( entity );
fileMetadata.put( AssetUtils.LAST_MODIFIED, System.currentTimeMillis() );
String mimeType = AssetMimeHandler.get().getMimeType( entity, data );
Boolean overSizeLimit = false;
EntityManager em = emf.getEntityManager( appId );
if ( written < FIVE_MB ) { // total smaller than 5mb
ObjectMetadata om = new ObjectMetadata();
om.setContentLength(written);
om.setContentType( mimeType );
PutObjectResult result = null;
result = getS3Client().putObject( bucketName, uploadFileName, awsInputStream, om );
String md5sum = Hex.encodeHexString( Base64.decodeBase64(result.getContentMd5()) );
String eTag = result.getETag();
fileMetadata.put( AssetUtils.CONTENT_LENGTH, written );
if(md5sum != null)
fileMetadata.put( AssetUtils.CHECKSUM, md5sum );
fileMetadata.put( AssetUtils.E_TAG, eTag );
em.update( entity );
}
else { // bigger than 5mb... dump 5 mb tmp files and upload from them
written = 0; //reset written to 0, we still haven't wrote anything in fact
int partNumber = 1;
int firstByte = 0;
Boolean isFirstChunck = true;
List<PartETag> partETags = new ArrayList<PartETag>();
//get the s3 client in order to initialize the multipart request
getS3Client();
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest( bucketName, uploadFileName );
InitiateMultipartUploadResult initResponse = getS3Client().initiateMultipartUpload( initRequest );
InputStream firstChunck = new ByteArrayInputStream(data);
PushbackInputStream chunckableInputStream = new PushbackInputStream(inputStream, 1);
// determine max size file allowed, default to 50mb
long maxSizeBytes = 50 * FileUtils.ONE_MB;
String maxSizeMbString = properties.getProperty( "usergrid.binary.max-size-mb", "50" );
if ( StringUtils.isNumeric( maxSizeMbString )) {
maxSizeBytes = Long.parseLong( maxSizeMbString ) * FileUtils.ONE_MB;
}
// always allow files up to 5mb
if (maxSizeBytes < 5 * FileUtils.ONE_MB ) {
maxSizeBytes = 5 * FileUtils.ONE_MB;
}
while (-1 != (firstByte = chunckableInputStream.read())) {
long partSize = 0;
chunckableInputStream.unread(firstByte);
File tempFile = File.createTempFile( entity.getUuid().toString().concat("-part").concat(String.valueOf(partNumber)), "tmp" );
tempFile.deleteOnExit();
OutputStream os = null;
try {
os = new BufferedOutputStream( new FileOutputStream( tempFile.getAbsolutePath() ) );
if(isFirstChunck == true) {
partSize = IOUtils.copyLarge( firstChunck, os, 0, ( FIVE_MB ) );
isFirstChunck = false;
}
else {
partSize = IOUtils.copyLarge( chunckableInputStream, os, 0, ( FIVE_MB ) );
}
written += partSize;
if(written> maxSizeBytes){
overSizeLimit = true;
logger.error( "OVERSIZED FILE ({}). STARTING ABORT", written );
break;
//set flag here and break out of loop to run abort
}
}
finally {
IOUtils.closeQuietly( os );
}
FileInputStream chunk = new FileInputStream(tempFile);
Boolean isLastPart = -1 == (firstByte = chunckableInputStream.read());
if(!isLastPart)
chunckableInputStream.unread(firstByte);
UploadPartRequest uploadRequest = new UploadPartRequest().withUploadId(initResponse.getUploadId())
.withBucketName(bucketName)
.withKey(uploadFileName)
.withInputStream(chunk)
.withPartNumber(partNumber)
.withPartSize(partSize)
.withLastPart(isLastPart);
partETags.add( getS3Client().uploadPart(uploadRequest).getPartETag() );
partNumber++;
}
//check for flag here then abort.
if(overSizeLimit) {
AbortMultipartUploadRequest abortRequest =
new AbortMultipartUploadRequest( bucketName, uploadFileName, initResponse.getUploadId() );
ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest( bucketName );
MultipartUploadListing listResult = getS3Client().listMultipartUploads( listRequest );
//upadte the entity with the error.
try {
logger.error("starting update of entity due to oversized asset");
fileMetadata.put( "error", "Asset size is larger than max size of " + maxSizeBytes );
em.update( entity );
}
catch ( Exception e ) {
logger.error( "Error updating entity with error message", e );
}
int timesIterated = 20;
//loop and abort all the multipart uploads
while ( listResult.getMultipartUploads().size()!=0 && timesIterated > 0) {
getS3Client().abortMultipartUpload( abortRequest );
Thread.sleep( 1000 );
timesIterated--;
listResult = getS3Client().listMultipartUploads( listRequest );
if (logger.isDebugEnabled()) {
logger.debug("Files that haven't been aborted are: {}", listResult.getMultipartUploads().listIterator().toString());
}
}
if ( timesIterated == 0 ){
logger.error( "Files parts that couldn't be aborted in 20 seconds are:" );
Iterator<MultipartUpload> multipartUploadIterator = listResult.getMultipartUploads().iterator();
while(multipartUploadIterator.hasNext()){
logger.error( multipartUploadIterator.next().getKey() );
}
}
}
else {
CompleteMultipartUploadRequest request =
new CompleteMultipartUploadRequest( bucketName, uploadFileName, initResponse.getUploadId(),
partETags );
CompleteMultipartUploadResult amazonResult = getS3Client().completeMultipartUpload( request );
fileMetadata.put( AssetUtils.CONTENT_LENGTH, written );
fileMetadata.put( AssetUtils.E_TAG, amazonResult.getETag() );
em.update( entity );
}
}
}