func UploadLogsCallback()

in Solutions/BloodHound Enterprise/Data Connectors/pkg/connector/main.go [538:710]


func UploadLogsCallback(bloodhoundClient *sdk.ClientWithResponses, bloodhoundServer string, lastRun *time.Time, azLogsClient *azlogs.Client, ruleId string, maxUploadSize int64) ([]string, error) {
	// TODO is there a generic sdk client type

	if lastRun == nil {
		log.Print("Starting log processing lastRun is nil")
	} else {
		log.Printf("Starting log processing lastRun is %v", lastRun)
	}
	var responseLogs = make([]string, 0)

	bloodhoundRecordData := make(map[string][]BloodhoundEnterpriseData)

	lastAnalysisTime, err := bloodhound.GetLastAnalysisTime(bloodhoundClient)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("failed to get last analysis time %v", err))
		return responseLogs, err
	}

	if lastRun != nil {
		if lastRun.Compare(*lastAnalysisTime) == +1 {
			responseLogs = append(responseLogs, fmt.Sprintf("last ingest time %v after last analysis time %v.  We will skip ingest", lastRun, lastAnalysisTime))
			return responseLogs, nil
		} else {
			responseLogs = append(responseLogs, fmt.Sprintf("last ingest time %v before last analysis time %v.  We will continue", lastRun, lastAnalysisTime))
		}
	}

	mapping, err := bloodhound.GetDomainMapping(bloodhoundClient)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("failed to get domain mapping %v", err))
		return responseLogs, err
	}
	responseLogs = append(responseLogs, fmt.Sprintf("got %d domain mappings", len(*mapping)))

	pathMap, err := getAttackPathTitles(bloodhoundServer, bloodhoundClient, responseLogs)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("failed to get attack path titles %v", err))
		// We will continue without the path titles
	}
	responseLogs = append(responseLogs, fmt.Sprintf("got %d attack path titles", len(pathMap)))

	domainIds := make([]string, 0, len(*mapping))
	for k, _ := range *mapping {
		domainIds = append(domainIds, k)
	}

	// Posture Data
	// Get postureData from Bloodhound using bloodhoundClient
	postureData, err := bloodhound.GetPostureData(bloodhoundClient, lastRun)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("failed to getposture postureData %v", err))
		return responseLogs, err
	}
	responseLogs = append(responseLogs, fmt.Sprintf("got %d posture records", len(*postureData)))

	// Transform postureData
	postureBHERecords, err := transformPostureDataArray(mapping, postureData, lastRun)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("failed to transform postureData %v", err))
		return responseLogs, err
	}

	bloodhoundRecordData["postureData"] = postureBHERecords

	// Get Audit lgos
	auditLogData, err := bloodhound.GetAuditLog(bloodhoundClient, lastRun)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("failed to get audit logs, skipping %v", err))
	}
	log.Printf("Received %d audit log rows.", len(auditLogData))

	// Transform audit logs to BHE sentinel records
	auditLogBHERecords, err := transformAudiLogs(auditLogData)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("failed to get audit logs, skipping %v", err))
	}
	log.Printf("Transformed %d audit log records.", len(auditLogBHERecords))

	bloodhoundRecordData["auditLogs"] = auditLogBHERecords

	// Attack Path

	// Supporting finding data for paths
	findingsPerDomain, err := bloodhound.GetAttackPathTypesForDomain(bloodhoundClient, domainIds)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("error getting attack path types %v", err))
		return responseLogs, err
	}
	responseLogs = append(responseLogs, fmt.Sprintf("got %d attack path types", len(findingsPerDomain)))

	// Get Attack Path Data
	attackPathData, err := bloodhound.GetAttackPathData(bloodhoundClient, domainIds, findingsPerDomain)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("error getting attack path data %v", err))
		return responseLogs, err
	}
	responseLogs = append(responseLogs, fmt.Sprintf("got %d attack path data", len(attackPathData)))
	// Transform attack path data
	attackPathBHERecords, err := transformAttackPathData(mapping, attackPathData, pathMap)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("error transforming attack path data %v", err))
	} else {
		bloodhoundRecordData["attackPathData"] = attackPathBHERecords
	}

	// Get Attack Path Aggregate Data
	aggregatorData, err := bloodhound.GetAttackPathAggregatorData(bloodhoundClient, lastRun, domainIds, findingsPerDomain)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("Error getting attack path aggregator data %v", err))
		return responseLogs, err
	}
	responseLogs = append(responseLogs, fmt.Sprintf("got %d attack path aggregator records", len(aggregatorData)))

	attackPathAggregateBHERecords, err := transformAttackAggregator(mapping, aggregatorData, &pathMap)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("Error transforming attack path aggregator data %v", err))
		return responseLogs, err
	}

	if len(attackPathAggregateBHERecords) > 0 {
		responseLogs = append(responseLogs, fmt.Sprintf("Got %d attack path aggregator records PathTitle of first is %s", len(attackPathAggregateBHERecords), attackPathAggregateBHERecords[0].PathTitle))
	} else {
		responseLogs = append(responseLogs, fmt.Sprintf("Got 0 attack path aggregator records"))
	}

	bloodhoundRecordData["attackPathAggregateData"] = attackPathAggregateBHERecords

	var tierZeroData []sdk.ModelAssetGroupMember = make([]sdk.ModelAssetGroupMember, 0)

	// Get Tier Zero asset group and then its members
	tierZeroGroup, err := bloodhound.GetTierZeroGroup(bloodhoundClient)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("Error getting tier zero group skipping %v", err))
	} else {
		tierZeroData, err = bloodhound.GetTierZeroPrincipals(bloodhoundClient, tierZeroGroup)
		if err != nil {
			responseLogs = append(responseLogs, fmt.Sprintf("Error getting tier zero principals skipping %v", err))
		}
		responseLogs = append(responseLogs, fmt.Sprintf("Got %d cypher query graph nodes", len(tierZeroData)))	
	}


	tier0BHERecords, err := transformTierZeroPrincipal(tierZeroData, *mapping)
	if err != nil {
		responseLogs = append(responseLogs, fmt.Sprintf("Error transforming tier zero principal data %v", err))
		return responseLogs, err
	}
	responseLogs = append(responseLogs, fmt.Sprintf("Got %d tier zero principals", len(tier0BHERecords)))

	bloodhoundRecordData["tier0"] = tier0BHERecords

	var lastError error
	for kind, recordList := range bloodhoundRecordData {
		log.Printf("About to upload %s data %d records ", kind, len(recordList))

		recordsJSON, err := CreateJsonBatches(recordList, maxUploadSize)
		if err != nil {
			responseLogs = append(responseLogs, fmt.Sprintf("failed to generate batched json for %s data Error: %v", kind, err))
			lastError = err
			continue
		}
		for _, jsonBatch := range recordsJSON {
			_, err = azLogsClient.Upload(context.TODO(), ruleId, "Custom-BloodHoundLogs_CL", jsonBatch, nil)
			if err != nil {
				responseLogs = append(responseLogs, fmt.Sprintf("failed to upload %s data Error: %v", kind, err))
				lastError = err
				continue
			}
			responseLogs = append(responseLogs, fmt.Sprintf("Uploaded %s data size(bytes): %d", kind, len(jsonBatch)))
		}
	}
	return responseLogs, lastError
}