main.go (228 lines of code) (raw):
package main
import (
"database/sql"
"flag"
"fmt"
"github.com/go-pg/pg"
"github.com/go-redis/redis/v7"
"github.com/go-redis/redis_rate/v8"
"io/ioutil"
"net/http"
"os"
"strconv"
"strings"
"time"
)
type ProductAnalyticsEvent struct {
AppId sql.NullString `db:"app_id"`
Platform sql.NullString `db:"platform"`
CollectorTstamp time.Time `db:"collector_tstamp"`
DvceCreatedTstamp time.Time `db:"dvce_created_tstamp"`
Event sql.NullString `db:"event"`
EventId string `db:"event_id"`
NameTracker sql.NullString `db:"name_tracker"`
VTracker sql.NullString `db:"v_tracker"`
VCollector string `db:"v_collector"`
VEtl string `db:"v_elt"`
UserId sql.NullInt64 `db:"user_id"`
UserIpaddress sql.NullString `db:"user_ipaddress"`
UserFingerprint sql.NullString `db:"user_fingerprint"`
DomainUserid sql.NullString `db:"domain_userid"`
DomainSessionid sql.NullString `db:"domain_sessionid"`
DomainSessionidx int `db:"domain_sessionidx"`
GeoCountry sql.NullString `db:"geo_country"`
GeoRegion sql.NullString `db:"geo_region"`
GeoCity sql.NullString `db:"geo_city"`
GeoZipcode sql.NullString `db:"geo_zipcode"`
GeoLatitude float32 `db:"geo_latitude"`
GeoLongitude float32 `db:"geo_longitude"`
GeoRegionName sql.NullString `db:"geo_region_name"`
PageUrl sql.NullString `db:"page_url"`
PageTitle sql.NullString `db:"page_title"`
PageReferrer sql.NullString `db:"page_referrer"`
PageUrlhost sql.NullString `db:"page_urlhost"`
PageUrlpath sql.NullString `db:"page_urlpath"`
PageUrlquery sql.NullString `db:"page_urlquery"`
RefrUrlhost sql.NullString `db:"refr_urlhost"`
RefrUrlpath sql.NullString `db:"refr_urlpath"`
RefrUrlquery sql.NullString `db:"refr_urlquery"`
RefrMedium sql.NullString `db:"refr_medium"`
RefrSource sql.NullString `db:"refr_source"`
RefrTerm sql.NullString `db:"refr_term"`
MktContent sql.NullString `db:"mkt_content"`
MktCampaign sql.NullString `db:"mkt_campaign"`
SeCategory sql.NullString `db:"se_category"`
SeAction sql.NullString `db:"se_action"`
SeLabel sql.NullString `db:"se_label"`
SeProperty sql.NullString `db:"se_property"`
SeValue sql.NullFloat64 `db:"se_value"`
Useragent sql.NullString `db:"useragent"`
BrName sql.NullString `db:"br_name"`
BrFamily sql.NullString `db:"br_family"`
BrVersion sql.NullString `db:"br_version"`
BrType sql.NullString `db:"br_type"`
BrRenderengine sql.NullString `db:"br_renderengine"`
BrLang sql.NullString `db:"br_lang"`
BrFeaturesPdf bool `db:"br_features_pdf"`
BrFeaturesFlash bool `db:"br_features_flash"`
BrFeaturesJava bool `db:"br_features_java"`
BrFeaturesDirector bool `db:"br_features_director"`
BrFeaturesQuicktime bool `db:"br_features_quicktime"`
BrFeaturesRealplayer bool `db:"br_features_realplayer"`
BrFeaturesWindowsmedia bool `db:"br_features_windowsmedia"`
BrFeaturesGears bool `db:"br_features_gears"`
BrFeaturesSilverlight bool `db:"br_features_silverlight"`
BrCookies bool `db:"br_cookies"`
BrColordepth sql.NullInt64 `db:"br_colordepth"`
BrViewwidth int `db:"br_viewwidth"`
BrViewheight int `db:"br_viewheight"`
OsName sql.NullString `db:"os_name"`
OsFamily sql.NullString `db:"os_family"`
OsManufacturer sql.NullString `db:"os_manufacturer"`
OsTimezone sql.NullString `db:"os_timezone"`
DvceType sql.NullString `db:"dvce_type"`
DvceIsmobile bool `db:"dvce_ismobile"`
DvceScreenwidth int `db:"dvce_screenwidth"`
DvceScreenheight int `db:"dvce_screenheight"`
DocCharset sql.NullString `db:"doc_charset"`
DocWidth int `db:"doc_width"`
DocHeight int `db:"doc_height"`
}
type Persistance struct {
db *pg.DB
}
var version string
var limiter *redis_rate.Limiter
func SetVersion() {
if os.Getenv("GITLAB_VERSION") != "" {
version = "GitLab " + os.Getenv("GITLAB_VERSION")
} else if os.Getenv("CI_SERVER_VERSION") != "" {
version = "GitLab test server " + os.Getenv("CI_SERVER_VERSION")
}
if version == "" {
// If GITLAB_VERSION is still blank, try to read from rails root.
// This will work only if collector is inside gitlab-development-kit dir.
contents, _ := ioutil.ReadFile("../gitlab/VERSION")
if len(contents) > 0 {
version = "GitLab " + strings.TrimSuffix(string(contents), "\n")
} else {
panic("No GitLab version file found.")
}
}
fmt.Println("Running the Product Analytics Collector.", version)
}
func SetLimiter() {
rdb := redis.NewClient(&redis.Options{
Network: "unix",
Addr: os.Getenv("REDIS_SOCKET"),
Password: "",
DB: 1,
})
_ = rdb.FlushDB().Err()
limiter = redis_rate.NewLimiter(rdb)
}
func main() {
SetVersion()
SetLimiter()
persistance := &Persistance{db: pg.Connect(&pg.Options{
Network: os.Getenv("PA_NETWORK"),
User: os.Getenv("PA_USER"),
Password: os.Getenv("PA_PASSWORD"),
Database: os.Getenv("PA_DATABASE"),
Addr: os.Getenv("PA_ADDR"),
})}
defer persistance.db.Close()
http.HandleFunc("/", persistance.HelloServer)
http.ListenAndServe(os.Getenv("PA_PORT"), nil)
}
func NullFromParams(r *http.Request, param string) sql.NullString {
keys, _ := r.URL.Query()[param]
if keys == nil {
return sql.NullString{"", false}
} else {
return sql.NullString{keys[0], true}
}
}
func StringFromParams(r *http.Request, param string) string {
keys, _ := r.URL.Query()[param]
return keys[0]
}
func BooleanFromParams(r *http.Request, param string) bool {
keys, _ := r.URL.Query()[param]
if len(keys) == 0 {
return false
} else {
return keys[0] == "1"
}
}
func NullIntFromParams(r *http.Request, param string) sql.NullInt64 {
keys, _ := r.URL.Query()[param]
i, _ := strconv.ParseInt(keys[0], 10, 64)
return sql.NullInt64{i, true}
}
func TimeFromParams(r *http.Request, param string) time.Time {
keys, _ := r.URL.Query()[param]
i, _ := strconv.ParseInt(keys[0], 10, 64)
return time.Unix(i, 0)
}
func IntFromParams(r *http.Request, param string) int {
keys, _ := r.URL.Query()[param]
i, _ := strconv.Atoi(keys[0])
return i
}
func (persistance *Persistance) HelloServer(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/i" {
// For the map see: https://github.com/snowplow/snowplow/blob/3b8d9cc839e4af0b97c68477fb1c9f484de233e2/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/EnrichmentManager.scala#L802
// But we didn't add everything yet and we did it manually.
// TODO Items to still parse:
// "res=3008x1692", see https://github.com/snowplow/snowplow/blob/3b8d9cc839e4af0b97c68477fb1c9f484de233e2/3-enrich/scala-common-enrich/src/main/scala/com.snowplowanalytics.snowplow.enrich/common/enrichments/ClientEnrichments.scala#L41
// "ds=1235x1429"
// and a ton more that are in type ProductAnalyticsEvent struct and should be aded below as well
event := ProductAnalyticsEvent{
AppId: NullFromParams(r, "aid"),
Platform: NullFromParams(r, "p"),
CollectorTstamp: time.Now(),
EventId: StringFromParams(r, "eid"),
VTracker: NullFromParams(r, "tv"),
VCollector: version,
VEtl: version,
OsTimezone: NullFromParams(r, "tz"),
NameTracker: NullFromParams(r, "tna"),
BrLang: NullFromParams(r, "lang"),
DocCharset: NullFromParams(r, "cs"),
BrFeaturesPdf: BooleanFromParams(r, "f_pdf"),
BrFeaturesFlash: BooleanFromParams(r, "f_fla"),
BrFeaturesJava: BooleanFromParams(r, "f_java"),
BrFeaturesDirector: BooleanFromParams(r, "f_dir"),
BrFeaturesQuicktime: BooleanFromParams(r, "f_qt"),
BrFeaturesRealplayer: BooleanFromParams(r, "f_realp"),
BrFeaturesWindowsmedia: BooleanFromParams(r, "f_wma"),
BrFeaturesGears: BooleanFromParams(r, "f_gears"),
BrFeaturesSilverlight: BooleanFromParams(r, "f_ag"),
BrColordepth: NullIntFromParams(r, "cd"),
BrCookies: BooleanFromParams(r, "cookie"),
DvceCreatedTstamp: TimeFromParams(r, "dtm"),
BrViewheight: IntFromParams(r, "vp"),
DomainSessionidx: IntFromParams(r, "vid"),
DomainSessionid: NullFromParams(r, "sid"),
DomainUserid: NullFromParams(r, "duid"),
UserFingerprint: NullFromParams(r, "fp"),
PageReferrer: NullFromParams(r, "refr"),
PageUrl: NullFromParams(r, "url"),
}
res := Allow(limiter, event.AppId)
if res.Allowed {
if flag.Lookup("test.v") == nil { // Normal run
err := persistance.db.Insert(&event)
if err != nil {
fmt.Println(err)
w.WriteHeader(http.StatusInternalServerError)
}
}
} else {
fmt.Println("Too Many Requests.")
w.WriteHeader(http.StatusTooManyRequests)
}
} else {
fmt.Fprintf(w, "Non Snowplow URL.Path = %q\n", r.URL.Path)
// Setting a status here gives a warning "superfluous response.WriteHeader call from main", not sure why.
}
}
func Allow(limiter *redis_rate.Limiter, app_id sql.NullString) *redis_rate.Result {
res, err := limiter.Allow(app_id.String, redis_rate.PerMinute(100))
if err != nil {
panic(err)
}
// fmt.Println(res.Allowed, res.Remaining)
// Output: true 9
return res
}