-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocessor.py
98 lines (78 loc) · 3.7 KB
/
processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# coding: utf-8
from datetime import datetime
import geoip2.database
from geoip2.errors import GeoIP2Error
import geoip2.webservice
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import os.path
import re
import shelve
NPM_LOG_LINE_REGEX = (r'\[(?P<datetime>.*?)\].*(?P<scheme>http[s]?)\s(?P<domain>%s).*\[Client (?P<ip>.*?)\].*')
NPM_LOG_DATETIME_FMT = '%d/%b/%Y:%H:%M:%S %z'
class Processor:
def __init__(self, cfg):
self._cfg = cfg
# Cache files for log file position (like tail -f) and avoid redundant insert
self._log_seek_pos_cache = shelve.open(os.path.join(cfg.cache.dir, cfg.cache.log_seek))
self._domain_datetime_cache = shelve.open(os.path.join(cfg.cache.dir, cfg.cache.domain_dt))
# Compile most used regex
self._rx_access_infos = re.compile(NPM_LOG_LINE_REGEX % cfg.logs.domain_rx)
# GeoIP2 client
if cfg.maxmind.prefer_db and os.path.exists(cfg.maxmind.db):
self._maxmind_client = geoip2.database.Reader(cfg.maxmind.db)
else:
self._maxmind_client = geoip2.webservice.Client(cfg.maxmind.id, cfg.maxmind.pk, host='geolite.info')
# InfluxDB
self._influxdb_client = InfluxDBClient(url=cfg.influxdb.host, token=cfg.influxdb.token, org=cfg.influxdb.org)
self._influxdb_write_api = self._influxdb_client.write_api(SYNCHRONOUS)
def __del__(self):
if self._log_seek_pos_cache:
self._log_seek_pos_cache.close()
if self._domain_datetime_cache:
self._domain_datetime_cache.close()
def analyze_log_file(self, log_path, from_tail=True):
prev_seek_pos = self._log_seek_pos_cache.get(log_path, None)
current_size = os.path.getsize(log_path)
# Nothing to read, may break cache info.
if current_size == 0:
return
# Log rotation
if prev_seek_pos is None:
prev_seek_pos = current_size if from_tail else 0
if prev_seek_pos > current_size:
prev_seek_pos = 0
# Read and process new lines
with open(log_path, 'r') as f_log_path:
f_log_path.seek(prev_seek_pos, 0)
for line in f_log_path:
self._process_line(line)
prev_seek_pos = f_log_path.tell()
# Update pos in file
self._log_seek_pos_cache[log_path] = prev_seek_pos
def _process_line(self, line):
r = self._rx_access_infos.match(line)
if r:
dt = datetime.strptime(r.group('datetime'), NPM_LOG_DATETIME_FMT)
last_dt_access = self._domain_datetime_cache.get(r.group('domain'), None)
if last_dt_access is None or dt > last_dt_access:
self._domain_datetime_cache[r.group('domain')] = dt
self._record_log_entry(dt, r.group('scheme'), r.group('domain'), r.group('ip'))
def _get_location_from_ip(self, ip):
response = self._maxmind_client.city(ip)
return response.country.name, response.city.name, response.location.latitude, response.location.longitude
def _record_log_entry(self, logts, scheme, domain, ip):
try:
country, city, latitude, longitude = self._get_location_from_ip(ip)
p = Point("Access") \
.tag('domain', domain) \
.tag('scheme', scheme) \
.field('ip', ip) \
.field('city', city) \
.field('country', country) \
.field('latitude', float(latitude)) \
.field('longitude', float(longitude)) \
.time(logts)
self._influxdb_write_api.write(bucket=self._cfg.influxdb.bucket, record=p)
except GeoIP2Error as e:
print(e)