Article: Comprehensive Log Analysis with Python for Nginx and Apache Logs
Monitoring logs is crucial to understanding the behavior of your web application and identifying potential security threats. This article explains how to build a Python-based solution that allows you to:
- Analyze Nginx or Apache log files.
- Whitelist specific IPs or subnets to filter out known or trusted IP addresses.
- Check suspicious IPs against AbuseIPDB, an extensive database of known malicious IPs.
The article walks through each step, detailing the purpose of each script, how they work together, and how to use them effectively.
Overview of the Workflow
- Log Parsing (
log_check.py): The main script parses Nginx or Apache logs, identifies the top IP addresses based on hits within a specified time frame, and filters out whitelisted IPs. - Whitelist Management (
whitelist.txt): A simple text file that contains IPs or subnets that should be ignored during log analysis. - AbuseIPDB Integration: After identifying the top IPs, the script checks them against the AbuseIPDB API using the
log_analyzer.pyscript.
Step 1: Parsing and Analyzing Logs with log_check.py
The log_check.py script reads log files from either Nginx or Apache, extracts IP addresses, and counts how often they appear within a specific timeframe. It supports filtering based on whitelisted IPs stored in whitelist.txt.
Script: log_check.py
import argparse
import datetime
import re
from collections import Counter
import subprocess
# Regular expression patterns for extracting IP addresses
nginx_log_pattern = re.compile(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')
apache_log_pattern = re.compile(r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})')
# Define text colors for better console output
class colors:
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
# Function to check if an IP address matches the specified pattern or subnet
def is_matching_ip(ip_address, whitelist):
ip_parts = ip_address.split('.')
for pattern in whitelist:
pattern_parts = pattern.split('.')
match = True
for i in range(len(ip_parts)):
if pattern_parts[i] != '*' and pattern_parts[i] != ip_parts[i]:
match = False
break
if match:
return True
return False
# Function to filter log entries based on time range
def filter_log_entries(file_path, log_type, start_time, whitelist):
filtered_ips = Counter()
log_pattern = nginx_log_pattern if log_type == "nginx" else apache_log_pattern
try:
with open(file_path, 'r') as file:
for line in file:
# Extract date and time from log
log_parts = line.split()
try:
if log_type == "nginx":
entry_time_str = log_parts[3][1:]
entry_time = datetime.datetime.strptime(entry_time_str, '%d/%b/%Y:%H:%M:%S')
elif log_type == "apache":
entry_time_str = log_parts[3][1:21]
entry_time = datetime.datetime.strptime(entry_time_str, '%d/%b/%Y:%H:%M:%S')
if entry_time >= start_time:
# Search for IP addresses
match = log_pattern.search(line)
if match:
ip_address = match.group(1)
if not is_matching_ip(ip_address, whitelist):
filtered_ips[ip_address] += 1
except (IndexError, ValueError):
continue
except FileNotFoundError:
print(f"{colors.FAIL}Log file {file_path} not found.{colors.ENDC}")
return filtered_ips
return filtered_ips
# Function to read the whitelist from a file
def read_whitelist(file_path):
whitelist = []
try:
with open(file_path, 'r') as file:
for line in file:
line = line.strip()
if line and not line.startswith('#'):
whitelist.append(line)
except FileNotFoundError:
print(f"{colors.FAIL}Whitelist file {file_path} not found.{colors.ENDC}")
return whitelist
# Function to run abuseipdb check using log_analyzer.py script
def run_abuseipdb_check(ip_list):
for ip in ip_list:
print(f"Checking IP {ip} against AbuseIPDB...")
# Pass the IP to the log_analyzer.py script using subprocess
subprocess.run(['python', 'log_analyzer.py', '--ip', ip])
# Main function to process the log file and return top IP addresses
def main():
# Argument parsing
parser = argparse.ArgumentParser(
description='Log Analyzer for Nginx and Apache logs',
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('--log', type=str, required=True, help='Path to log file (e.g., /var/log/nginx/access.log)')
parser.add_argument('--nginx', action='store_true', help='Flag to specify Nginx log format')
parser.add_argument('--apache', action='store_true', help='Flag to specify Apache log format')
parser.add_argument('--time', type=str, required=True, help='Time frame (5m, 30m, 1h, 3h, 6h, 12h)')
parser.add_argument('--whitelist', type=str, default='whitelist.txt', help='Path to the whitelist file (default: whitelist.txt)')
parser.add_argument('--help', action='store_true', help='Display help with usage examples')
args = parser.parse_args()
# Display help and examples if --help is passed
if args.help:
print(f"""{colors.OKGREEN}
Usage Examples:
---------------
1. Analyze Nginx logs for the last 1 hour, using a specific whitelist file:
python log_check.py --log /var/log/nginx/access.log --nginx --time 1h --whitelist /path/to/whitelist.txt
2. Analyze Apache logs for the last 30 minutes, using the default whitelist file:
python log_check.py --log /var/log/apache2/access.log --apache --time 30m
3. Analyze Nginx logs for the last 5 minutes and check top IPs against AbuseIPDB:
python log_check.py --log /var/log/nginx/access.log --nginx --time 5m
{colors.ENDC}
""")
return
# Ensure either --nginx or --apache is specified
if not (args.nginx or args.apache):
print(f"{colors.FAIL}Error: You must specify either --nginx or --apache.{colors.ENDC}")
return
# Determine the log type
log_type = "nginx" if args.nginx else "apache"
# Calculate the start time based on the --time flag
time_map = {
'5m': 5,
'30m': 30,
'1h': 60,
'3h': 180,
'6h': 360,
'12h': 720
}
if args.time not in time_map:
print(f"{colors.FAIL}Error: Invalid time format. Use one of the following: 5m, 30m, 1h, 3h, 6h, 12h.{colors.ENDC}")
return
current_time = datetime.datetime.now()
start_time = current_time - datetime.timedelta(minutes=time_map[args.time])
# Read the whitelist from file
whitelist = read_whitelist(args.whitelist)
# Parse the log file and get top IP hits
ip_hits = filter_log_entries(args.log, log_type, start_time, whitelist)
# Sort IPs by hit count in descending order
sorted_ips = sorted(ip_hits.items(), key=lambda x: x[1], reverse=True)
# Display the top 5 IPs
print(f"{colors.OKGREEN}Top 5 IP Addresses in the last {args.time}:{colors.ENDC}")
top_ips = [ip for ip, count in sorted_ips[:5]]
for i, (ip, count) in enumerate(sorted_ips[:5]):
print(f"{i+1}. IP: {ip}, Hits: {count}")
# Pass top IPs to log_analyzer.py for AbuseIPDB check
if top_ips:
run_abuseipdb_check(top_ips)
if __name__ == "__main__":
main()
Step 2: Creating and Managing the whitelist.txt File
The whitelist.txt file stores the IPs or subnets you want to exclude from the analysis. Each IP or subnet should be on a separate line. You can also add comments by using the # symbol.
Example: whitelist.txt
# Trusted IPs/Subnets
192.168.1.*
10.0.0.*
203.0.113.45
Step 3: Checking IPs Against AbuseIPDB with log_analyzer.py
Once log_check.py identifies the top IPs, they are passed to the log_analyzer.py script to check them against the AbuseIPDB database. This script fetches detailed information about each IP, including
whether the IP has been reported for abusive behavior.
Key Functions of log_analyzer.py
- IP Parsing: The script processes the IPs passed from
log_check.py. - AbuseIPDB Integration: It checks each IP against the AbuseIPDB API, which provides details about the abuse confidence score, reports, and other relevant data.
You can refer to the log_analyzer.py script provided earlier for the full implementation.
Step 4: Using the Scripts Together
Once the scripts are set up, you can start analyzing your logs, filtering out trusted IPs, and checking the top IPs against AbuseIPDB.
Example Workflow
-
Analyze Nginx logs for the past 1 hour and whitelist IPs:
python log_check.py --log /var/log/nginx/access.log --nginx --time 1h --whitelist /path/to/whitelist.txt -
Output Example:
Top 5 IP Addresses in the last 1h: 1. IP: 203.0.113.45, Hits: 150 2. IP: 198.51.100.23, Hits: 120 3. IP: 192.0.2.12, Hits: 95 4. IP: 203.0.113.101, Hits: 70 5. IP: 198.51.100.56, Hits: 65 Checking IP 203.0.113.45 against AbuseIPDB... Checking IP 198.51.100.23 against AbuseIPDB... Checking IP 192.0.2.12 against AbuseIPDB... Checking IP 203.0.113.101 against AbuseIPDB... Checking IP 198.51.100.56 against AbuseIPDB...
This workflow demonstrates how the scripts integrate to provide a seamless log analysis and IP checking process.
Conclusion
In this article, we've walked through the full process of log analysis using Python. The log_check.py script parses Nginx or Apache logs, filtering out whitelisted IPs and identifying top IPs based on hits. The integration with AbuseIPDB via log_analyzer.py helps in checking those IPs for potential malicious behavior.
This solution is perfect for anyone looking to automate log analysis and security checks in a Linux-based environment.