Let's NiP05 scan the #spambots
Better coders can finetune I suppose
Assumptions and Setup
Nostr Relay: You have a WebSocket connection to a Nostr relay (e.g., wss://relay.damus.io) to fetch events.
NIP-05 Provider Data: You have a database or API with member public keys and their NIP-05 registration timestamps.
Libraries: Uses websocket for Nostr relay communication, json for event parsing, and random for selecting replies. You may need additional libraries like requests for NIP-05 verification checks.
Database: A simple SQLite database is assumed for storing member data, but you can adapt to your setup (e.g., MySQL, API).
Bot Detection: Basic bot detection checks for identical or nonsensical replies using simple text comparison. You can enhance this with NLP libraries like nltk or textblob for advanced analysis.
Script
import websocket
import json
import time
import random
import sqlite3
from datetime import datetime, timedelta
import requests
import re
from collections import Counter
# Configuration
RELAY_URL = "wss://relay.damus.io" # Replace with your relay
DB_PATH = "nip05_members.db" # SQLite database for NIP-05 members
TIME_WINDOW = 7 # Days to check for recent activity
REPLY_SAMPLE_SIZE = 5 # Number of replies to analyze
SIMILARITY_THRESHOLD = 0.9 # Jaccard similarity threshold for identical replies
# Initialize database
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS members (
pubkey TEXT PRIMARY KEY,
nip05_address TEXT,
registration_timestamp INTEGER
)
""")
# Example: Populate with dummy data (replace with your actual data)
cursor.execute("INSERT OR IGNORE INTO members VALUES (?, ?, ?)",
("b0635d6a9851d3aed0cd6c495b282167acf761729078d975fc341b22650b07b9",
"bob@example.com", int(time.time() - 86400 * 30)))
conn.commit()
return conn, cursor
# Connect to Nostr relay
def connect_to_relay():
ws = websocket.WebSocket()
ws.connect(RELAY_URL)
return ws
# Fetch events from relay
def fetch_events(ws, pubkey, since_timestamp, kind=1):
subscription_id = f"scan_{pubkey[:8]}"
filter = {
"authors": [pubkey],
"kinds": [kind], # kind=1 for posts/replies
"since": since_timestamp
}
ws.send(json.dumps(["REQ", subscription_id, filter]))
events = []
timeout = time.time() + 10 # 10-second timeout
while time.time() < timeout:
try:
message = json.loads(ws.recv())
if message[0] == "EVENT" and message[1] == subscription_id:
events.append(message[2])
except websocket.WebSocketTimeoutException:
break
ws.send(json.dumps(["CLOSE", subscription_id]))
return events
# Calculate Jaccard similarity between two texts
def jaccard_similarity(text1, text2):
if not text1 or not text2:
return 0.0
set1 = set(re.findall(r'\w+', text1.lower()))
set2 = set(re.findall(r'\w+', text2.lower()))
intersection = len(set1 & set2)
union = len(set1 | set2)
return intersection / union if union > 0 else 0.0
# Check if text appears nonsensical (basic heuristic)
def is_nonsense(text):
# Basic check: too short, repetitive characters, or random strings
if len(text) < 10:
return True
if re.match(r'^(.)\1{3,}$', text): # Repeated characters
return True
# Add more checks (e.g., entropy, common spam phrases)
return False
# Check NIP-05 registration status
def check_nip05_status(pubkey, nip05_address):
try:
local_part, domain = nip05_address.split('@')
url = f"https://{domain}/.well-known/nostr.json?name={local_part}";
response = requests.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
if data.get("names", {}).get(local_part) == pubkey:
return True
return False
except Exception as e:
print(f"Error checking NIP-05 for {nip05_address}: {e}")
return False
# Main scanning function
def scan_for_spam_bots():
conn, cursor = init_db()
ws = connect_to_relay()
# Get all members
cursor.execute("SELECT pubkey, nip05_address, registration_timestamp FROM members")
members = cursor.fetchall()
# Calculate timestamp for 7 days ago
since_timestamp = int((datetime.now() - timedelta(days=TIME_WINDOW)).timestamp())
log = []
for pubkey, nip05_address, reg_timestamp in members:
print(f"Scanning {nip05_address} ({pubkey[:8]}...)")
# Step 1: Check for at least one reply in the past 7 days
events = fetch_events(ws, pubkey, since_timestamp, kind=1)
replies = [e for e in events if "e" in e.get("tags", [])] # Events with 'e' tag are replies
if len(replies) < 1:
log.append({
"pubkey": pubkey,
"nip05_address": nip05_address,
"status": "No replies in past 7 days",
"nip05_active": check_nip05_status(pubkey, nip05_address),
"registration_date": datetime.fromtimestamp(reg_timestamp).isoformat()
})
continue
# Step 2: Analyze up to 5 random replies
sample_replies = random.sample(replies, min(len(replies), REPLY_SAMPLE_SIZE))
suspicious = False
reply_texts = [r["content"] for r in sample_replies]
# Check for identical or similar replies
for i, text1 in enumerate(reply_texts):
for text2 in reply_texts[i+1:]:
if jaccard_similarity(text1, text2) > SIMILARITY_THRESHOLD:
suspicious = True
break
if suspicious:
break
# Check for nonsensical replies
if not suspicious:
suspicious = any(is_nonsense(text) for text in reply_texts)
# Step 3: Check NIP-05 status (for logging only)
nip05_active = check_nip05_status(pubkey, nip05_address)
# Log results
log.append({
"pubkey": pubkey,
"nip05_address": nip05_address,
"status": "Suspicious" if suspicious else "Clean",
"reply_count": len(replies),
"nip05_active": nip05_active,
"registration_date": datetime.fromtimestamp(reg_timestamp).isoformat()
})
ws.close()
conn.close()
# Save log to file
with open("spam_scan_log.json", "w") as f:
json.dump(log, f, indent=2)
return log
# Run the script
if __name__ == "__main__":
results = scan_for_spam_bots()
print("Scan complete. Results saved to spam_scan_log.json")
for result in results:
print(result)