-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
146 lines (113 loc) · 4.55 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from streamlit import runtime
import grp
import os
import sys
import pwd
from collections import deque
import streamlit as st
import requests
from collections import defaultdict
from typing import Optional
connections = deque(maxlen=1000)
# Abnormal traffic threshold:
THRESHOLD_PACKETS = 1000
THRESHOLD_PORTS = 500
MONITOR_INTERVAL = 1
ip_packet_count = defaultdict(int)
ip_target_count = defaultdict(set)
port_scan_count = defaultdict(lambda: defaultdict(set))
def detect_anomalies() -> list:
"""
Fonction: detect_anomalies
Input:
- None.
Output:
- alerts: list, Suspected alerts.
Description:
Detects potential port scan and DDoS (Distributed Denial of Service) attacks based on network packet analysis.
The function checks whether any IP has scanned an excessive number of ports or has sent an unusually high
number of packets in a given time period.
"""
alerts = []
detected_ips = set()
for ip, count in ip_packet_count.items():
for dst_ip, ports in port_scan_count[ip].items():
if len(ports) > THRESHOLD_PORTS:
alert = f"ALERT: Possible port scan detected from {ip} targeting {dst_ip} with {len(ports)} ports scanned in {MONITOR_INTERVAL} seconds."
st.error(alert)
alerts.append(alert)
detected_ips.add(ip)
return alerts
if count > THRESHOLD_PACKETS and ip not in detected_ips:
alert = f"ALERT: Possible DDoS detected from {ip} with {count} packets in {MONITOR_INTERVAL} seconds."
st.error(alert)
alerts.append(alert)
return alerts
return alerts
def is_streamlit() -> bool:
"""
Fonction: is_streamlit
Input:
- None
Output:
- boolean, Streamlit is running or not.
Description:
Detects if the streamlit application is running.
"""
return runtime.exists()
def check_permissions():
"""
Function: check_permissions
Input:
- None
Output:
- None
Description:
Checks if the current user has the necessary permissions to sniff packets using PyShark.
It verifies if the user is part of the 'wireshark' group, which is typically required for packet capture operations.
If the user has the correct permissions:
- In Streamlit, a success message is displayed.
- In a standard environment, a success message is printed to the console.
If the user does not have the necessary permissions:
- In Streamlit, an error message is displayed, instructing the user to add the user to the 'wireshark' group.
- In a standard environment, an error message is printed, and the program exits with status code 1.
If any error occurs while checking the permissions, the function:
- In Streamlit, displays an error message with the exception details.
- In a standard environment, prints the exception details and exits with status code 1.
"""
try:
user_id = os.getuid()
user_name = pwd.getpwuid(user_id).pw_name
groups = [g.gr_name for g in grp.getgrall() if user_name in g.gr_mem]
if 'wireshark' in groups:
if is_streamlit():
st.success("✅ The user has the necessary permissions to capture packets.")
else:
print("The user has the necessary permissions to capture packets.\n---")
else:
if is_streamlit():
st.error("🚨 The user does not have the necessary permissions. Please add the user to the 'wireshark' group.")
else:
print("The user does not have the necessary permissions. Please add the user to the 'wireshark' group.")
sys.exit(1)
except Exception as e:
if is_streamlit():
st.error(f"Error checking permissions: {e}")
else:
print(f"Error checking permissions: {e}")
sys.exit(1)
def load_lottieurl(url: str) -> Optional[dict]:
"""
Function: load_lottieurl
Input:
- url: str, URL pointing to the Lottie animation JSON file.
Output:
- Optional[dict], Returns the JSON response as a dictionary if the request is successful (status code 200).
Returns None if the request fails (status code is not 200).
Description:
Loads and displays the lottie animation.
"""
r = requests.get(url)
if r.status_code != 200:
return None
return r.json()