public void write()

in stack/services/src/main/java/org/apache/usergrid/services/assets/data/AWSBinaryStore.java [122:289]


    public void write( final UUID appId, final Entity entity, InputStream inputStream ) throws Exception {

        String uploadFileName = AssetUtils.buildAssetKey( appId, entity );
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        long written = IOUtils.copyLarge( inputStream, baos, 0, FIVE_MB );

        byte[] data = baos.toByteArray();

        InputStream awsInputStream = new ByteArrayInputStream(data);

        final Map<String, Object> fileMetadata = AssetUtils.getFileMetadata( entity );
        fileMetadata.put( AssetUtils.LAST_MODIFIED, System.currentTimeMillis() );

        String mimeType = AssetMimeHandler.get().getMimeType( entity, data );

        Boolean overSizeLimit = false;

        EntityManager em = emf.getEntityManager( appId );


        if ( written < FIVE_MB ) { // total smaller than 5mb

            ObjectMetadata om = new ObjectMetadata();
            om.setContentLength(written);
            om.setContentType( mimeType );
            PutObjectResult result = null;
            result = getS3Client().putObject( bucketName, uploadFileName, awsInputStream, om );

            String md5sum = Hex.encodeHexString( Base64.decodeBase64(result.getContentMd5()) );
            String eTag = result.getETag();

            fileMetadata.put( AssetUtils.CONTENT_LENGTH, written );

            if(md5sum != null)
                fileMetadata.put( AssetUtils.CHECKSUM, md5sum );
            fileMetadata.put( AssetUtils.E_TAG, eTag );

            em.update( entity );

        }
        else { // bigger than 5mb... dump 5 mb tmp files and upload from them
            written = 0; //reset written to 0, we still haven't wrote anything in fact
            int partNumber = 1;
            int firstByte = 0;
            Boolean isFirstChunck = true;
            List<PartETag> partETags = new ArrayList<PartETag>();


            //get the s3 client in order to initialize the multipart request
            getS3Client();
            InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest( bucketName, uploadFileName );
            InitiateMultipartUploadResult initResponse = getS3Client().initiateMultipartUpload( initRequest );


            InputStream firstChunck = new ByteArrayInputStream(data);
            PushbackInputStream chunckableInputStream = new PushbackInputStream(inputStream, 1);

            // determine max size file allowed, default to 50mb
            long maxSizeBytes = 50 * FileUtils.ONE_MB;
            String maxSizeMbString = properties.getProperty( "usergrid.binary.max-size-mb", "50" );
            if ( StringUtils.isNumeric( maxSizeMbString )) {
                maxSizeBytes = Long.parseLong( maxSizeMbString ) * FileUtils.ONE_MB;
            }

            // always allow files up to 5mb
            if (maxSizeBytes < 5 * FileUtils.ONE_MB ) {
                maxSizeBytes = 5 * FileUtils.ONE_MB;
            }

            while (-1 != (firstByte = chunckableInputStream.read())) {
                long partSize = 0;
                chunckableInputStream.unread(firstByte);
                File tempFile = File.createTempFile( entity.getUuid().toString().concat("-part").concat(String.valueOf(partNumber)), "tmp" );

                tempFile.deleteOnExit();
                OutputStream os = null;
                try {
                    os = new BufferedOutputStream( new FileOutputStream( tempFile.getAbsolutePath() ) );

                    if(isFirstChunck == true) {
                        partSize = IOUtils.copyLarge( firstChunck, os, 0, ( FIVE_MB ) );
                        isFirstChunck = false;
                    }
                    else {
                        partSize = IOUtils.copyLarge( chunckableInputStream, os, 0, ( FIVE_MB ) );
                    }
                    written += partSize;

                    if(written> maxSizeBytes){
                        overSizeLimit = true;
                        logger.error( "OVERSIZED FILE ({}). STARTING ABORT", written );
                        break;
                        //set flag here and break out of loop to run abort
                    }
                }
                finally {
                    IOUtils.closeQuietly( os );
                }

                FileInputStream chunk = new FileInputStream(tempFile);

                Boolean isLastPart = -1 == (firstByte = chunckableInputStream.read());
                if(!isLastPart)
                    chunckableInputStream.unread(firstByte);

                UploadPartRequest uploadRequest = new UploadPartRequest().withUploadId(initResponse.getUploadId())
                                                                         .withBucketName(bucketName)
                                                                         .withKey(uploadFileName)
                                                                         .withInputStream(chunk)
                                                                         .withPartNumber(partNumber)
                                                                         .withPartSize(partSize)
                                                                         .withLastPart(isLastPart);
                partETags.add( getS3Client().uploadPart(uploadRequest).getPartETag() );
                partNumber++;
            }

            //check for flag here then abort.
            if(overSizeLimit) {

                AbortMultipartUploadRequest abortRequest =
                    new AbortMultipartUploadRequest( bucketName, uploadFileName, initResponse.getUploadId() );

                ListMultipartUploadsRequest listRequest = new ListMultipartUploadsRequest( bucketName );

                MultipartUploadListing listResult = getS3Client().listMultipartUploads( listRequest );

                //upadte the entity with the error.
                try {
                    logger.error("starting update of entity due to oversized asset");
                    fileMetadata.put( "error", "Asset size is larger than max size of " + maxSizeBytes );
                    em.update( entity );
                }
                catch ( Exception e ) {
                    logger.error( "Error updating entity with error message", e );
                }

                int timesIterated = 20;
                //loop and abort all the multipart uploads
                while ( listResult.getMultipartUploads().size()!=0  && timesIterated > 0) {

                    getS3Client().abortMultipartUpload( abortRequest );
                    Thread.sleep( 1000 );
                    timesIterated--;
                    listResult = getS3Client().listMultipartUploads( listRequest );
                    if (logger.isDebugEnabled()) {
                        logger.debug("Files that haven't been aborted are: {}", listResult.getMultipartUploads().listIterator().toString());
                    }

                }
                if ( timesIterated == 0 ){
                    logger.error( "Files parts that couldn't be aborted in 20 seconds are:" );
                    Iterator<MultipartUpload> multipartUploadIterator = listResult.getMultipartUploads().iterator();
                    while(multipartUploadIterator.hasNext()){
                        logger.error( multipartUploadIterator.next().getKey() );
                    }
                }
            }
            else {
                CompleteMultipartUploadRequest request =
                    new CompleteMultipartUploadRequest( bucketName, uploadFileName, initResponse.getUploadId(),
                        partETags );
                CompleteMultipartUploadResult amazonResult = getS3Client().completeMultipartUpload( request );
                fileMetadata.put( AssetUtils.CONTENT_LENGTH, written );
                fileMetadata.put( AssetUtils.E_TAG, amazonResult.getETag() );
                em.update( entity );
            }
        }
    }