in pp3/au/Importer.php [19:189]
public static function import($row, $source) {
//die(var_dump($row));
self::$_source = $source;
// load up catalogs, distros, configs to cache
if (self::$_cacheLoaded == false) {
self::loadCache();
}
// let's import that
$ip = $row['ip'];
$ts = $row['ts'];
$path = $row['path'];
$distro = $row['distro'];
$user_id = $row['user_id'];
$user2_id = $row['user2_id'];
$response = $row['response'];
$size = $row['size'];
// resolve the catalog
if (!self::$catalogCache[$path]) {
Db::query("INSERT INTO catalogs (path, source) VALUES ('$path', '$source')");
$catalogId = mysqli_insert_id(Db::$link);
self::$catalogCache[$path] = $catalogId;
} else {
$catalogId = self::$catalogCache[$path];
}
// resolve the distro
if (preg_match('/(NB[A-Z]*)?_?([_A-Z]+)?/', $distro, $match)) {
$distro_code = "";
$config = "";
$config_sig = 0;
$warning_count = 0;
if (isset($match[1])) {
$distro_code = $match[1];
}
if (isset($match[2])) {
$config = $match[2];
$config_parts = explode("_", $config);
foreach ($config_parts as $part) {
if (isset(self::$_packs[$part])) {
$config_sig += self::$_packs[$part];
} else {
$warning_count++;
Logger::write(Logger::INFO, "Uknown pack ID: $part; $config; $distro_code");
}
}
}
}
if (!self::$distroCache[$distro_code]) {
Db::query("INSERT INTO distros (distro) VALUES ('$distro_code')");
$distroId = mysqli_insert_id(Db::$link);
self::$distroCache[$distro_code] = $distroId;
} else {
$distroId = self::$distroCache[$distro_code];
}
// resolve config
if (!self::$configCache[$config]) {
Db::query("INSERT INTO configs (config, signature) VALUES ('$config', $config_sig)");
$configId = mysqli_insert_id(Db::$link);
self::$configCache[$config] = $configId;
} else {
$configId = self::$configCache[$config];
}
// resolve the IP address
// if (!self::$ipCache[$ip]) {
// // lookup db for it
// $res = Db::query('SELECT id FROM ips WHERE ip="' . $ip . '" LIMIT 1');
// if ($res) {
// if (mysqli_num_rows($res) > 0) {
// $row = mysqli_fetch_assoc($res);
// $ipId = $row['id'];
// self::$ipCache[$ip] = $ipId;
// } else {
// Db::query("INSERT INTO ips (ip, country) VALUES ('$ip' ,'" . getCCByIP($ip) . "')");
// $ipId = mysqli_insert_id();
// self::$ipCache[$ip] = $ipId;
// }
// }
// } else {
// $ipId = self::$ipCache[$ip];
// }
$ipId=0;
// resolve userId
$tsInt = strtotime($ts);
if (!self::$userCache[$user_id]) {
$res = Db::query('SELECT id, since, last_seen FROM users WHERE unique_id="' . $user_id . '"');
if ($res) {
if (mysqli_num_rows($res) > 0) {
$row = mysqli_fetch_assoc($res);
$userId = $row['id'];
// if this ping is newer then one in DB, save it as last seen and calc delay
if ($tsInt > $row['last_seen']) {
// calc the delay since last ping
$delay = round(($tsInt - $row['last_seen']) / (60 * 60 * 24));
// update last seen and delay
if($delay>0) {
Db::query("UPDATE users SET last_seen=$tsInt, catalog_id=$catalogId, delay=$delay WHERE id=" . $userId);
}
}
self::$userCache[$user_id] = array('id' => $userId, 'since' => $row['since'], 'last_seen' => $tsInt, 'catalog_id' => $catalogId, 'delay' => $delay);
} else {
Db::query("INSERT INTO users (unique_id, since, last_seen, catalog_id, delay) VALUES ('$user_id', '$ts', $tsInt, $catalogId, 9999)");
$userId = mysqli_insert_id(Db::$link);
self::$userCache[$user_id] = array('id' => $userId, 'since' => $ts, 'last_seen' => $tsInt, 'catalog_id' => $catalogId, 'delay' => 9999);
}
}
} else {
$userId=self::$userCache[$user_id]['id'];
// check if the hit from cache has newer timestamp, if so, mark user for update her timestamp
if (self::$userCache[$user_id]['since'] > $ts) {
$updateUsers = array('id' => self::$userCache[$user_id]['id'], 'since' => $ts);
}
}
// resolve userId
if (!self::$user2Cache[$user2_id]) {
$res = Db::query('SELECT id, since, last_seen FROM users2 WHERE unique_id="' . $user2_id . '"');
if ($res) {
if (mysqli_num_rows($res) > 0) {
$row = mysqli_fetch_assoc($res);
$user2Id = $row['id'];
// if this ping is newer then one in DB, save it as last seen and calc delay
if ($tsInt > $row['last_seen']) {
// calc the delay since last ping
$delay = round(($tsInt - $row['last_seen']) / (60 * 60 * 24));
// update last seen and delay
if($delay>0) {
Db::query("UPDATE users2 SET last_seen=$tsInt, delay=$delay WHERE id=" . $user2Id);
}
}
self::$user2Cache[$user2_id] = array('id' => $user2Id, 'since' => $row['since'], 'last_seen' => $tsInt, 'delay' => $delay);
} else {
Db::query("INSERT INTO users2 (unique_id, since, last_seen, delay) VALUES ('$user2_id', '$ts', $tsInt,9999)");
$user2Id = mysqli_insert_id(Db::$link);
self::$user2Cache[$user2_id] = array('id' => $user2Id, 'since' => $ts, 'last_seen' => $tsInt, 'delay' => 9999);
}
}
} else {
$user2Id=self::$user2Cache[$user2_id]['id'];
// check if the hit from cache has newer timestamp, if so, mark user for update her timestamp
if (self::$user2Cache[$user2_id]['since'] > $ts) {
$updateUsers2 = array('id' => self::$user2Cache[$user2_id]['id'], 'since' => $ts);
}
}
// now save it to hits table finally
$res = Db::query("INSERT INTO pings (ip_id, ts, path_id, distro_id, config_id, user_id, user2_id, response, size) VALUES ($ipId, '$ts', $catalogId, $distroId, $configId, $userId, $user2Id, $response, $size)");
if ($res) {
$ret = true;
} else {
$ret = false;
}
// now update users since timestamps (might be case if we back import older logs)
if (!empty($updateUsers)) {
foreach ($updateUsers as $u) {
Db::query("UPDATE users SET since='" . $u['since'] . "' WHERE id=" . $u['id']);
}
}
if (!empty($updateUsers2)) {
foreach ($updateUsers2 as $u) {
Db::query("UPDATE users2 SET since='" . $u['since'] . "' WHERE id=" . $u['id']);
}
}
return $ret;
}