in tools/mongodb-hybrid-dlp/dlpfunction.go [133:213]
func init() {
mongoScanner = MongoScanner{
LastChanges: make(map[MongoSource]MongoChangeSource, 0),
}
if os.Getenv("RUN_PERIOD") != "" {
runPeriod, err := time.ParseDuration(os.Getenv("RUN_PERIOD"))
if err != nil {
log.Fatal().Err(err).Msgf("Failed to parse RUN_PERIOD: %s", os.Getenv("RUN_PERIOD"))
}
mongoScanner.RunPeriod = runPeriod
} else {
mongoScanner.RunPeriod, _ = time.ParseDuration("10m")
}
if os.Getenv("MONGO_CONNECTION_STRING") != "" {
mongoScanner.ConnectionString = os.Getenv("MONGO_CONNECTION_STRING")
} else {
log.Fatal().Msg("No MONGO_CONNECTION_STRING specified!")
}
if os.Getenv("MONGO_USERNAME") != "" {
mongoScanner.Username = os.Getenv("MONGO_USERNAME")
}
if os.Getenv("MONGO_PASSWORD") != "" {
mongoScanner.Password = os.Getenv("MONGO_PASSWORD")
}
if os.Getenv("MONGO_DEPLOYMENTS") != "" {
deployments := strings.Split(os.Getenv("MONGO_DEPLOYMENTS"), ",")
for _, d := range deployments {
mongoScanner.Deployments = append(mongoScanner.Deployments, MongoChangeSource{Source: MongoSource{Deployment: strings.TrimSpace(d)}})
}
}
if os.Getenv("MONGO_DATABASES") != "" {
databases := strings.Split(os.Getenv("MONGO_DATABASES"), ",")
for _, d := range databases {
mongoScanner.Databases = append(mongoScanner.Databases, MongoChangeSource{Source: MongoSource{Database: strings.TrimSpace(d)}})
}
}
if os.Getenv("MONGO_COLLECTIONS") != "" {
collections := strings.Split(os.Getenv("MONGO_COLLECTIONS"), ",")
for _, d := range collections {
c := strings.SplitN(d, ".", 2)
mongoScanner.Collections = append(mongoScanner.Collections, MongoChangeSource{Source: MongoSource{Database: strings.TrimSpace(c[0]), Collection: strings.TrimSpace(c[1])}})
}
}
if len(mongoScanner.Deployments) == 0 && len(mongoScanner.Databases) == 0 && len(mongoScanner.Deployments) == 0 {
log.Fatal().Msg("No sources for change streams specified, set at least one of: MONGO_COLLECTIONS, MONGO_DATABASES, MONGO_DEPLOYMENTS")
}
if os.Getenv("DLP_TRIGGER_NAME") != "" {
mongoScanner.GcpDlpTriggerName = os.Getenv("DLP_TRIGGER_NAME")
} else {
log.Fatal().Msg("No DLP_TRIGGER_NAME specified!")
}
if os.Getenv("PROJECT_ID") != "" {
mongoScanner.GcpBillingProject = os.Getenv("PROJECT_ID")
} else {
log.Fatal().Msg("No PROJECT_ID specified!")
}
if os.Getenv("DLP_ENDPOINT") != "" {
mongoScanner.GcpDlpEndpoint = os.Getenv("DLP_ENDPOINT")
}
if os.Getenv("STATE_FILE") != "" {
if !strings.HasPrefix(os.Getenv("STATE_FILE"), "gs://") {
log.Fatal().Msg("State file should start with gs://!")
}
url, err := url.Parse(os.Getenv("STATE_FILE"))
if err != nil {
log.Fatal().Err(err).Msg("Failed to parse STATE_FILE location!")
}
mongoScanner.StateFile = url
} else {
log.Fatal().Msg("No STATE_FILE specified!")
}
functions.HTTP("DLPFunctionHTTP", DLPFunctionHTTP)
}