main.go (228 lines of code) (raw):

package main import ( "database/sql" "flag" "fmt" "github.com/go-pg/pg" "github.com/go-redis/redis/v7" "github.com/go-redis/redis_rate/v8" "io/ioutil" "net/http" "os" "strconv" "strings" "time" ) type ProductAnalyticsEvent struct { AppId sql.NullString `db:"app_id"` Platform sql.NullString `db:"platform"` CollectorTstamp time.Time `db:"collector_tstamp"` DvceCreatedTstamp time.Time `db:"dvce_created_tstamp"` Event sql.NullString `db:"event"` EventId string `db:"event_id"` NameTracker sql.NullString `db:"name_tracker"` VTracker sql.NullString `db:"v_tracker"` VCollector string `db:"v_collector"` VEtl string `db:"v_elt"` UserId sql.NullInt64 `db:"user_id"` UserIpaddress sql.NullString `db:"user_ipaddress"` UserFingerprint sql.NullString `db:"user_fingerprint"` DomainUserid sql.NullString `db:"domain_userid"` DomainSessionid sql.NullString `db:"domain_sessionid"` DomainSessionidx int `db:"domain_sessionidx"` GeoCountry sql.NullString `db:"geo_country"` GeoRegion sql.NullString `db:"geo_region"` GeoCity sql.NullString `db:"geo_city"` GeoZipcode sql.NullString `db:"geo_zipcode"` GeoLatitude float32 `db:"geo_latitude"` GeoLongitude float32 `db:"geo_longitude"` GeoRegionName sql.NullString `db:"geo_region_name"` PageUrl sql.NullString `db:"page_url"` PageTitle sql.NullString `db:"page_title"` PageReferrer sql.NullString `db:"page_referrer"` PageUrlhost sql.NullString `db:"page_urlhost"` PageUrlpath sql.NullString `db:"page_urlpath"` PageUrlquery sql.NullString `db:"page_urlquery"` RefrUrlhost sql.NullString `db:"refr_urlhost"` RefrUrlpath sql.NullString `db:"refr_urlpath"` RefrUrlquery sql.NullString `db:"refr_urlquery"` RefrMedium sql.NullString `db:"refr_medium"` RefrSource sql.NullString `db:"refr_source"` RefrTerm sql.NullString `db:"refr_term"` MktContent sql.NullString `db:"mkt_content"` MktCampaign sql.NullString `db:"mkt_campaign"` SeCategory sql.NullString `db:"se_category"` SeAction sql.NullString `db:"se_action"` SeLabel sql.NullString `db:"se_label"` SeProperty sql.NullString `db:"se_property"` SeValue sql.NullFloat64 `db:"se_value"` Useragent sql.NullString `db:"useragent"` BrName sql.NullString `db:"br_name"` BrFamily sql.NullString `db:"br_family"` BrVersion sql.NullString `db:"br_version"` BrType sql.NullString `db:"br_type"` BrRenderengine sql.NullString `db:"br_renderengine"` BrLang sql.NullString `db:"br_lang"` BrFeaturesPdf bool `db:"br_features_pdf"` BrFeaturesFlash bool `db:"br_features_flash"` BrFeaturesJava bool `db:"br_features_java"` BrFeaturesDirector bool `db:"br_features_director"` BrFeaturesQuicktime bool `db:"br_features_quicktime"` BrFeaturesRealplayer bool `db:"br_features_realplayer"` BrFeaturesWindowsmedia bool `db:"br_features_windowsmedia"` BrFeaturesGears bool `db:"br_features_gears"` BrFeaturesSilverlight bool `db:"br_features_silverlight"` BrCookies bool `db:"br_cookies"` BrColordepth sql.NullInt64 `db:"br_colordepth"` BrViewwidth int `db:"br_viewwidth"` BrViewheight int `db:"br_viewheight"` OsName sql.NullString `db:"os_name"` OsFamily sql.NullString `db:"os_family"` OsManufacturer sql.NullString `db:"os_manufacturer"` OsTimezone sql.NullString `db:"os_timezone"` DvceType sql.NullString `db:"dvce_type"` DvceIsmobile bool `db:"dvce_ismobile"` DvceScreenwidth int `db:"dvce_screenwidth"` DvceScreenheight int `db:"dvce_screenheight"` DocCharset sql.NullString `db:"doc_charset"` DocWidth int `db:"doc_width"` DocHeight int `db:"doc_height"` } type Persistance struct { db *pg.DB } var version string var limiter *redis_rate.Limiter func SetVersion() { if os.Getenv("GITLAB_VERSION") != "" { version = "GitLab " + os.Getenv("GITLAB_VERSION") } else if os.Getenv("CI_SERVER_VERSION") != "" { version = "GitLab test server " + os.Getenv("CI_SERVER_VERSION") } if version == "" { // If GITLAB_VERSION is still blank, try to read from rails root. // This will work only if collector is inside gitlab-development-kit dir. contents, _ := ioutil.ReadFile("../gitlab/VERSION") if len(contents) > 0 { version = "GitLab " + strings.TrimSuffix(string(contents), "\n") } else { panic("No GitLab version file found.") } } fmt.Println("Running the Product Analytics Collector.", version) } func SetLimiter() { rdb := redis.NewClient(&redis.Options{ Network: "unix", Addr: os.Getenv("REDIS_SOCKET"), Password: "", DB: 1, }) _ = rdb.FlushDB().Err() limiter = redis_rate.NewLimiter(rdb) } func main() { SetVersion() SetLimiter() persistance := &Persistance{db: pg.Connect(&pg.Options{ Network: os.Getenv("PA_NETWORK"), User: os.Getenv("PA_USER"), Password: os.Getenv("PA_PASSWORD"), Database: os.Getenv("PA_DATABASE"), Addr: os.Getenv("PA_ADDR"), })} defer persistance.db.Close() http.HandleFunc("/", persistance.HelloServer) http.ListenAndServe(os.Getenv("PA_PORT"), nil) } func NullFromParams(r *http.Request, param string) sql.NullString { keys, _ := r.URL.Query()[param] if keys == nil { return sql.NullString{"", false} } else { return sql.NullString{keys[0], true} } } func StringFromParams(r *http.Request, param string) string { keys, _ := r.URL.Query()[param] return keys[0] } func BooleanFromParams(r *http.Request, param string) bool { keys, _ := r.URL.Query()[param] if len(keys) == 0 { return false } else { return keys[0] == "1" } } func NullIntFromParams(r *http.Request, param string) sql.NullInt64 { keys, _ := r.URL.Query()[param] i, _ := strconv.ParseInt(keys[0], 10, 64) return sql.NullInt64{i, true} } func TimeFromParams(r *http.Request, param string) time.Time { keys, _ := r.URL.Query()[param] i, _ := strconv.ParseInt(keys[0], 10, 64) return time.Unix(i, 0) } func IntFromParams(r *http.Request, param string) int { keys, _ := r.URL.Query()[param] i, _ := strconv.Atoi(keys[0]) return i } func (persistance *Persistance) HelloServer(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/i" { // For the map see: https://github.com/snowplow/snowplow/blob/3b8d9cc839e4af0b97c68477fb1c9f484de233e2/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala#L802 // But we didn't add everything yet and we did it manually. // TODO Items to still parse: // "res=3008x1692", see https://github.com/snowplow/snowplow/blob/3b8d9cc839e4af0b97c68477fb1c9f484de233e2/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/ClientEnrichments.scala#L41 // "ds=1235x1429" // and a ton more that are in type ProductAnalyticsEvent struct and should be aded below as well event := ProductAnalyticsEvent{ AppId: NullFromParams(r, "aid"), Platform: NullFromParams(r, "p"), CollectorTstamp: time.Now(), EventId: StringFromParams(r, "eid"), VTracker: NullFromParams(r, "tv"), VCollector: version, VEtl: version, OsTimezone: NullFromParams(r, "tz"), NameTracker: NullFromParams(r, "tna"), BrLang: NullFromParams(r, "lang"), DocCharset: NullFromParams(r, "cs"), BrFeaturesPdf: BooleanFromParams(r, "f_pdf"), BrFeaturesFlash: BooleanFromParams(r, "f_fla"), BrFeaturesJava: BooleanFromParams(r, "f_java"), BrFeaturesDirector: BooleanFromParams(r, "f_dir"), BrFeaturesQuicktime: BooleanFromParams(r, "f_qt"), BrFeaturesRealplayer: BooleanFromParams(r, "f_realp"), BrFeaturesWindowsmedia: BooleanFromParams(r, "f_wma"), BrFeaturesGears: BooleanFromParams(r, "f_gears"), BrFeaturesSilverlight: BooleanFromParams(r, "f_ag"), BrColordepth: NullIntFromParams(r, "cd"), BrCookies: BooleanFromParams(r, "cookie"), DvceCreatedTstamp: TimeFromParams(r, "dtm"), BrViewheight: IntFromParams(r, "vp"), DomainSessionidx: IntFromParams(r, "vid"), DomainSessionid: NullFromParams(r, "sid"), DomainUserid: NullFromParams(r, "duid"), UserFingerprint: NullFromParams(r, "fp"), PageReferrer: NullFromParams(r, "refr"), PageUrl: NullFromParams(r, "url"), } res := Allow(limiter, event.AppId) if res.Allowed { if flag.Lookup("test.v") == nil { // Normal run err := persistance.db.Insert(&event) if err != nil { fmt.Println(err) w.WriteHeader(http.StatusInternalServerError) } } } else { fmt.Println("Too Many Requests.") w.WriteHeader(http.StatusTooManyRequests) } } else { fmt.Fprintf(w, "Non Snowplow URL.Path = %q\n", r.URL.Path) // Setting a status here gives a warning "superfluous response.WriteHeader call from main", not sure why. } } func Allow(limiter *redis_rate.Limiter, app_id sql.NullString) *redis_rate.Result { res, err := limiter.Allow(app_id.String, redis_rate.PerMinute(100)) if err != nil { panic(err) } // fmt.Println(res.Allowed, res.Remaining) // Output: true 9 return res }