public void parse()

in usage/src/main/java/com/cloud/usage/UsageManagerImpl.java [480:952]


    public void parse(UsageJobVO job, long startDateMillis, long endDateMillis) {
        // TODO: Shouldn't we also allow parsing by the type of usage?

        boolean success = false;
        long timeStart = System.currentTimeMillis();
        try {
            if ((endDateMillis == 0) || (endDateMillis > timeStart)) {
                endDateMillis = timeStart;
            }

            long lastSuccess = _usageJobDao.getLastJobSuccessDateMillis();
            if (lastSuccess != 0) {
                startDateMillis = lastSuccess + 1; // 1 millisecond after
            }

            if (startDateMillis >= endDateMillis) {
                if (logger.isInfoEnabled()) {
                    logger.info("not parsing usage records since start time mills (" + startDateMillis + ") is on or after end time millis (" + endDateMillis + ")");
                }

                TransactionLegacy jobUpdateTxn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
                try {
                    jobUpdateTxn.start();
                    // everything seemed to work...set endDate as the last success date
                    _usageJobDao.updateJobSuccess(job.getId(), startDateMillis, endDateMillis, System.currentTimeMillis() - timeStart, success);

                    // create a new job if this is a recurring job
                    if (job.getJobType() == UsageJobVO.JOB_TYPE_RECURRING) {
                        _usageJobDao.createNewJob(_hostname, _pid, UsageJobVO.JOB_TYPE_RECURRING);
                    }
                    jobUpdateTxn.commit();
                } finally {
                    jobUpdateTxn.close();
                }

                return;
            }
            Date startDate = new Date(startDateMillis);
            Date endDate = new Date(endDateMillis);
            logger.info("Parsing usage records between [{}] and [{}].", DateUtil.displayDateInTimezone(usageAggregationTimeZone, startDate),
                    DateUtil.displayDateInTimezone(usageAggregationTimeZone, endDate));

            List<AccountVO> accounts = null;
            List<UserStatisticsVO> userStats = null;
            Map<String, UsageNetworkVO> networkStats = null;
            List<VmDiskStatisticsVO> vmDiskStats = null;
            Map<String, UsageVmDiskVO> vmDiskUsages = null;
            List<BucketStatisticsVO> bucketStats = null;
            TransactionLegacy userTxn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
            try {
                Long limit = Long.valueOf(500);
                Long offset = Long.valueOf(0);
                Long lastAccountId = _usageDao.getLastAccountId();
                if (lastAccountId == null) {
                    lastAccountId = Long.valueOf(0);
                }

                do {
                    Filter filter = new Filter(AccountVO.class, "id", true, offset, limit);

                    accounts = _accountDao.findActiveAccounts(lastAccountId, filter);

                    if ((accounts != null) && !accounts.isEmpty()) {
                        // now update the accounts in the cloud_usage db
                        _usageDao.updateAccounts(accounts);
                    }
                    offset = new Long(offset.longValue() + limit.longValue());
                } while ((accounts != null) && !accounts.isEmpty());

                // reset offset
                offset = Long.valueOf(0);

                do {
                    Filter filter = new Filter(AccountVO.class, "id", true, offset, limit);

                    accounts = _accountDao.findRecentlyDeletedAccounts(lastAccountId, startDate, filter);

                    if ((accounts != null) && !accounts.isEmpty()) {
                        // now update the accounts in the cloud_usage db
                        _usageDao.updateAccounts(accounts);
                    }
                    offset = new Long(offset.longValue() + limit.longValue());
                } while ((accounts != null) && !accounts.isEmpty());

                // reset offset
                offset = Long.valueOf(0);

                do {
                    Filter filter = new Filter(AccountVO.class, "id", true, offset, limit);

                    accounts = _accountDao.findNewAccounts(lastAccountId, filter);

                    if ((accounts != null) && !accounts.isEmpty()) {
                        // now copy the accounts to cloud_usage db
                        _usageDao.saveAccounts(accounts);
                    }
                    offset = new Long(offset.longValue() + limit.longValue());
                } while ((accounts != null) && !accounts.isEmpty());

                // reset offset
                offset = Long.valueOf(0);

                // get all the user stats to create usage records for the network usage
                Long lastUserStatsId = _usageDao.getLastUserStatsId();
                if (lastUserStatsId == null) {
                    lastUserStatsId = Long.valueOf(0);
                }

                SearchCriteria<UserStatisticsVO> sc2 = _userStatsDao.createSearchCriteria();
                sc2.addAnd("id", SearchCriteria.Op.LTEQ, lastUserStatsId);
                do {
                    Filter filter = new Filter(UserStatisticsVO.class, "id", true, offset, limit);

                    userStats = _userStatsDao.search(sc2, filter);

                    if ((userStats != null) && !userStats.isEmpty()) {
                        // now copy the accounts to cloud_usage db
                        _usageDao.updateUserStats(userStats);
                    }
                    offset = new Long(offset.longValue() + limit.longValue());
                } while ((userStats != null) && !userStats.isEmpty());

                // reset offset
                offset = Long.valueOf(0);

                sc2 = _userStatsDao.createSearchCriteria();
                sc2.addAnd("id", SearchCriteria.Op.GT, lastUserStatsId);
                do {
                    Filter filter = new Filter(UserStatisticsVO.class, "id", true, offset, limit);

                    userStats = _userStatsDao.search(sc2, filter);

                    if ((userStats != null) && !userStats.isEmpty()) {
                        // now copy the accounts to cloud_usage db
                        _usageDao.saveUserStats(userStats);
                    }
                    offset = new Long(offset.longValue() + limit.longValue());
                } while ((userStats != null) && !userStats.isEmpty());

                // reset offset
                offset = Long.valueOf(0);

                // get all the vm network stats to create usage_VM_network records for the vm network usage
                Long lastVmDiskStatsId = _usageDao.getLastVmDiskStatsId();
                if (lastVmDiskStatsId == null) {
                    lastVmDiskStatsId = Long.valueOf(0);
                }
                SearchCriteria<VmDiskStatisticsVO> sc4 = _vmDiskStatsDao.createSearchCriteria();
                sc4.addAnd("id", SearchCriteria.Op.LTEQ, lastVmDiskStatsId);
                do {
                    Filter filter = new Filter(VmDiskStatisticsVO.class, "id", true, offset, limit);

                    vmDiskStats = _vmDiskStatsDao.search(sc4, filter);

                    if ((vmDiskStats != null) && !vmDiskStats.isEmpty()) {
                        // now copy the accounts to cloud_usage db
                        _usageDao.updateVmDiskStats(vmDiskStats);
                    }
                    offset = new Long(offset.longValue() + limit.longValue());
                } while ((vmDiskStats != null) && !vmDiskStats.isEmpty());

                // reset offset
                offset = Long.valueOf(0);

                sc4 = _vmDiskStatsDao.createSearchCriteria();
                sc4.addAnd("id", SearchCriteria.Op.GT, lastVmDiskStatsId);
                do {
                    Filter filter = new Filter(VmDiskStatisticsVO.class, "id", true, offset, limit);

                    vmDiskStats = _vmDiskStatsDao.search(sc4, filter);

                    if ((vmDiskStats != null) && !vmDiskStats.isEmpty()) {
                        // now copy the accounts to cloud_usage db
                        _usageDao.saveVmDiskStats(vmDiskStats);
                    }
                    offset = new Long(offset.longValue() + limit.longValue());
                } while ((vmDiskStats != null) && !vmDiskStats.isEmpty());

                // reset offset
                offset = Long.valueOf(0);

                // get all the user stats to create usage records for the bucket usage
                Long lastBucketStatsId = _usageDao.getLastBucketStatsId();
                if (lastBucketStatsId == null) {
                    lastBucketStatsId = Long.valueOf(0);
                }

                SearchCriteria<BucketStatisticsVO> sc5 = _bucketStatisticsDao.createSearchCriteria();
                sc5.addAnd("id", SearchCriteria.Op.LTEQ, lastBucketStatsId);
                do {
                    Filter filter = new Filter(BucketStatisticsVO.class, "id", true, offset, limit);

                    bucketStats = _bucketStatisticsDao.search(sc5, filter);

                    if ((bucketStats != null) && !bucketStats.isEmpty()) {
                        // now copy the accounts to cloud_usage db
                        _usageDao.updateBucketStats(bucketStats);
                    }
                    offset = new Long(offset.longValue() + limit.longValue());
                } while ((bucketStats != null) && !bucketStats.isEmpty());

                // reset offset
                offset = Long.valueOf(0);

                sc5 = _bucketStatisticsDao.createSearchCriteria();
                sc5.addAnd("id", SearchCriteria.Op.GT, lastBucketStatsId);
                do {
                    Filter filter = new Filter(BucketStatisticsVO.class, "id", true, offset, limit);

                    bucketStats = _bucketStatisticsDao.search(sc5, filter);

                    if ((bucketStats != null) && !bucketStats.isEmpty()) {
                        // now copy the accounts to cloud_usage db
                        _usageDao.saveBucketStats(bucketStats);
                    }
                    offset = new Long(offset.longValue() + limit.longValue());
                } while ((bucketStats != null) && !bucketStats.isEmpty());

            } finally {
                userTxn.close();
            }

            // TODO:  Fetch a maximum number of events and process them before moving on to the next range of events

            // - get a list of the latest events
            // - insert the latest events into the usage.events table
            List<UsageEventVO> events = _usageEventDao.getRecentEvents(new Date(endDateMillis));

            TransactionLegacy usageTxn = TransactionLegacy.open(TransactionLegacy.USAGE_DB);
            try {
                usageTxn.start();

                // make sure start date is before all of our un-processed events (the events are ordered oldest
                // to newest, so just test against the first event)
                if ((events != null) && (events.size() > 0)) {
                    Date oldestEventDate = events.get(0).getCreateDate();
                    if (oldestEventDate.getTime() < startDateMillis) {
                        startDateMillis = oldestEventDate.getTime();
                        startDate = new Date(startDateMillis);
                    }

                    // - loop over the list of events and create entries in the helper tables
                    // - create the usage records using the parse methods below
                    for (UsageEventVO event : events) {
                        event.setProcessed(true);
                        _usageEventDao.update(event.getId(), event);
                        createHelperRecord(event);
                    }
                }

                // TODO:  Fetch a maximum number of user stats and process them before moving on to the next range of user stats

                // get user stats in order to compute network usage
                networkStats = _usageNetworkDao.getRecentNetworkStats();

                Calendar recentlyDeletedCal = Calendar.getInstance(usageAggregationTimeZone);
                recentlyDeletedCal.setTimeInMillis(startDateMillis);
                recentlyDeletedCal.add(Calendar.MINUTE, -1 * THREE_DAYS_IN_MINUTES);
                Date recentlyDeletedDate = recentlyDeletedCal.getTime();

                // Keep track of user stats for an account, across all of its public IPs
                Map<String, UserStatisticsVO> aggregatedStats = new HashMap<String, UserStatisticsVO>();
                int startIndex = 0;
                do {
                    userStats = _userStatsDao.listActiveAndRecentlyDeleted(recentlyDeletedDate, startIndex, 500);

                    if (userStats != null) {
                        for (UserStatisticsVO userStat : userStats) {
                            if (userStat.getDeviceId() != null) {
                                String hostKey = userStat.getDataCenterId() + "-" + userStat.getAccountId() + "-Host-" + userStat.getDeviceId() + "-Network-" + userStat.getNetworkId();
                                UserStatisticsVO hostAggregatedStat = aggregatedStats.get(hostKey);
                                if (hostAggregatedStat == null) {
                                    hostAggregatedStat =
                                            new UserStatisticsVO(userStat.getAccountId(), userStat.getDataCenterId(), userStat.getPublicIpAddress(), userStat.getDeviceId(),
                                                    userStat.getDeviceType(), userStat.getNetworkId());
                                }

                                hostAggregatedStat.setAggBytesSent(hostAggregatedStat.getAggBytesSent() + userStat.getAggBytesSent());
                                hostAggregatedStat.setAggBytesReceived(hostAggregatedStat.getAggBytesReceived() + userStat.getAggBytesReceived());
                                aggregatedStats.put(hostKey, hostAggregatedStat);
                            }
                        }
                    }
                    startIndex += 500;
                } while ((userStats != null) && !userStats.isEmpty());

                // loop over the user stats, create delta entries in the usage_network helper table
                int numAcctsProcessed = 0;
                usageNetworks.clear();
                for (String key : aggregatedStats.keySet()) {
                    UsageNetworkVO currentNetworkStats = null;
                    if (networkStats != null) {
                        currentNetworkStats = networkStats.get(key);
                    }

                    createNetworkHelperEntry(aggregatedStats.get(key), currentNetworkStats, endDateMillis);
                    numAcctsProcessed++;
                }
                _usageNetworkDao.saveUsageNetworks(usageNetworks);

                if (logger.isDebugEnabled()) {
                    logger.debug("created network stats helper entries for " + numAcctsProcessed + " accts");
                }

                // get vm disk stats in order to compute vm disk usage
                vmDiskUsages = _usageVmDiskDao.getRecentVmDiskStats();

                // Keep track of user stats for an account, across all of its public IPs
                Map<String, VmDiskStatisticsVO> aggregatedDiskStats = new HashMap<String, VmDiskStatisticsVO>();
                startIndex = 0;
                do {
                    vmDiskStats = _vmDiskStatsDao.listActiveAndRecentlyDeleted(recentlyDeletedDate, startIndex, 500);

                    if (vmDiskUsages != null) {
                        for (VmDiskStatisticsVO vmDiskStat : vmDiskStats) {
                            if (vmDiskStat.getVmId() != null) {
                                String hostKey =
                                        vmDiskStat.getDataCenterId() + "-" + vmDiskStat.getAccountId() + "-Vm-" + vmDiskStat.getVmId() + "-Disk-" + vmDiskStat.getVolumeId();
                                VmDiskStatisticsVO hostAggregatedStat = aggregatedDiskStats.get(hostKey);
                                if (hostAggregatedStat == null) {
                                    hostAggregatedStat =
                                            new VmDiskStatisticsVO(vmDiskStat.getAccountId(), vmDiskStat.getDataCenterId(), vmDiskStat.getVmId(), vmDiskStat.getVolumeId());
                                }

                                hostAggregatedStat.setAggIORead(hostAggregatedStat.getAggIORead() + vmDiskStat.getAggIORead());
                                hostAggregatedStat.setAggIOWrite(hostAggregatedStat.getAggIOWrite() + vmDiskStat.getAggIOWrite());
                                hostAggregatedStat.setAggBytesRead(hostAggregatedStat.getAggBytesRead() + vmDiskStat.getAggBytesRead());
                                hostAggregatedStat.setAggBytesWrite(hostAggregatedStat.getAggBytesWrite() + vmDiskStat.getAggBytesWrite());
                                aggregatedDiskStats.put(hostKey, hostAggregatedStat);
                            }
                        }
                    }
                    startIndex += 500;
                } while ((userStats != null) && !userStats.isEmpty());

                // loop over the user stats, create delta entries in the usage_disk helper table
                numAcctsProcessed = 0;
                usageVmDisks.clear();
                for (String key : aggregatedDiskStats.keySet()) {
                    UsageVmDiskVO currentVmDiskStats = null;
                    if (vmDiskStats != null) {
                        currentVmDiskStats = vmDiskUsages.get(key);
                    }

                    createVmDiskHelperEntry(aggregatedDiskStats.get(key), currentVmDiskStats, endDateMillis);
                    numAcctsProcessed++;
                }
                _usageVmDiskDao.saveUsageVmDisks(usageVmDisks);

                if (logger.isDebugEnabled()) {
                    logger.debug("created vm disk stats helper entries for " + numAcctsProcessed + " accts");
                }

                // commit the helper records, then start a new transaction
                usageTxn.commit();
                usageTxn.start();

                boolean parsed = false;
                numAcctsProcessed = 0;

                Date currentStartDate = startDate;
                Date currentEndDate = endDate;
                Date tempDate = endDate;

                Calendar aggregateCal = Calendar.getInstance(usageAggregationTimeZone);

                while ((tempDate.after(startDate)) && ((tempDate.getTime() - startDate.getTime()) > 60000)) {
                    currentEndDate = tempDate;
                    aggregateCal.setTime(tempDate);
                    aggregateCal.add(Calendar.MINUTE, -_aggregationDuration);
                    tempDate = aggregateCal.getTime();
                }

                while (!currentEndDate.after(endDate) || (currentEndDate.getTime() - endDate.getTime() < 60000)) {
                    Long offset = Long.valueOf(0);
                    Long limit = Long.valueOf(500);

                    do {
                        Filter filter = new Filter(AccountVO.class, "id", true, offset, limit);
                        accounts = _accountDao.listAll(filter);
                        if ((accounts != null) && !accounts.isEmpty()) {
                            for (AccountVO account : accounts) {
                                parsed = parseHelperTables(account, currentStartDate, currentEndDate);
                                numAcctsProcessed++;
                            }
                        }
                        offset = new Long(offset.longValue() + limit.longValue());
                    } while ((accounts != null) && !accounts.isEmpty());

                    if (logger.isDebugEnabled()) {
                        logger.debug("processed VM/Network Usage for " + numAcctsProcessed + " ACTIVE accts");
                    }
                    numAcctsProcessed = 0;

                    // reset offset
                    offset = Long.valueOf(0);

                    do {
                        Filter filter = new Filter(AccountVO.class, "id", true, offset, limit);

                        accounts = _accountDao.findRecentlyDeletedAccounts(null, recentlyDeletedDate, filter);

                        if ((accounts != null) && !accounts.isEmpty()) {
                            for (AccountVO account : accounts) {
                                parsed = parseHelperTables(account, currentStartDate, currentEndDate);
                                List<Long> publicTemplates = _usageDao.listPublicTemplatesByAccount(account.getId());
                                for (Long templateId : publicTemplates) {
                                    //mark public templates owned by deleted accounts as deleted
                                    List<UsageStorageVO> storageVOs = _usageStorageDao.listById(account.getId(), templateId, StorageTypes.TEMPLATE);
                                    if (storageVOs.size() > 1) {
                                        logger.warn("More that one usage entry for storage: " + templateId + " assigned to account: " + account.getId() +
                                                "; marking them all as deleted...");
                                    }
                                    for (UsageStorageVO storageVO : storageVOs) {
                                        if (logger.isDebugEnabled()) {
                                            logger.debug("deleting template: " + storageVO.getId() + " from account: " + storageVO.getAccountId());
                                        }
                                        storageVO.setDeleted(account.getRemoved());
                                        _usageStorageDao.update(storageVO);
                                    }
                                }
                                numAcctsProcessed++;
                            }
                        }
                        offset = new Long(offset.longValue() + limit.longValue());
                    } while ((accounts != null) && !accounts.isEmpty());

                    currentStartDate = new Date(currentEndDate.getTime() + 1);
                    aggregateCal.setTime(currentEndDate);
                    aggregateCal.add(Calendar.MINUTE, _aggregationDuration);
                    currentEndDate = aggregateCal.getTime();
                }

                if (logger.isDebugEnabled()) {
                    logger.debug("processed Usage for " + numAcctsProcessed + " RECENTLY DELETED accts");
                }

                // FIXME: we don't break the above loop if something fails to parse, so it gets reset every account,
                //        do we want to break out of processing accounts and rollback if there are errors?
                if (!parsed) {
                    usageTxn.rollback();
                } else {
                    success = true;
                }
            } catch (Exception ex) {
                logger.error("Exception in usage manager", ex);
                usageTxn.rollback();
            } finally {
                // everything seemed to work...set endDate as the last success date
                _usageJobDao.updateJobSuccess(job.getId(), startDateMillis, endDateMillis, System.currentTimeMillis() - timeStart, success);

                // create a new job if this is a recurring job
                if (job.getJobType() == UsageJobVO.JOB_TYPE_RECURRING) {
                    _usageJobDao.createNewJob(_hostname, _pid, UsageJobVO.JOB_TYPE_RECURRING);
                }
                usageTxn.commit();
                usageTxn.close();

                // switch back to CLOUD_DB
                TransactionLegacy swap = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
                if (!success) {
                    _alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_USAGE_SERVER_RESULT, 0, new Long(0), "Usage job failed. Job id: " + job.getId(),
                            "Usage job failed. Job id: " + job.getId());
                } else {
                    _alertMgr.clearAlert(AlertManager.AlertType.ALERT_TYPE_USAGE_SERVER_RESULT, 0, 0);
                }
                swap.close();

            }
        } catch (Exception e) {
            logger.error("Usage Manager error", e);
        }
    }