好久没写代码了,折腾了两个晚上总算基于一个开源项目(nostr-ai-bot,GitHub上找到的)搞了个自己的aibot,账号懒得建新的,用的鸟巢那个。
大模型用的是deepseek-r1,目前用的是公有云送的免费额度。
可能会出现不稳定回答慢(ds-r1本来就慢😇)的情况,也可能随时下线这个机器人,有问题可以at我。
### aibot源代码第一部分
```python
import requests
import time
import ssl
import os
import json
import uuid
from pynostr.event import Event, EventKind
from pynostr.relay_manager import RelayManager
from pynostr.message_type import ClientMessageType
from pynostr.key import PrivateKey
from pynostr.filters import FiltersList, Filters
from pynostr.encrypted_dm import EncryptedDirectMessage
from pynostr.utils import get_timestamp
from openai import OpenAI
import gc
from datetime import datetime
import re
def add_relay(relay_manager, relays):
for relay in relays.split(","):
print("Adding relay: " + relay)
relay_manager.add_relay(relay)
print("Added relay: " + relay)
return True
relays = "wss://nostr.data.haus,wss://relay.nostr.net,wss://n.ok0.org,wss://ditto.pub/relay,wss://purplerelay.com,wss://relay.nostr.band,wss://relay.damus.io"
relay_manager_for_pubkey = RelayManager(timeout=5)
add_relay(relay_manager_for_pubkey, relays)
relay_manager_for_ids = RelayManager(timeout=5)
add_relay(relay_manager_for_ids, relays)
openai_client = OpenAI(
# 请用API Key将下行替换为:api_key="sk-xxx", base_url替换为你的大模型服务方url
api_key="sk-xxx",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
def extract_referenced_event_ids(event):
referenced_event_ids = []
for tag in event.tags:
if tag[0] == 'e':
referenced_event_ids.append(tag[1])
return referenced_event_ids
def process_event(event):
# 提取引用的事件ID
referenced_event_ids = extract_referenced_event_ids(event)
# 获取引用的事件内容
referenced_events = get_referenced_events(referenced_event_ids, current_event_id=event.id)
# 组合上下文信息
ref_contents = []
if not referenced_events:
return ref_contents
for ref_event in referenced_events:
if ref_event.pubkey == get_keypair()[1]:
ref_contents.append(f"我(大鸟)之前回复的内容: {ref_event.content}")
else:
content = re.sub(r'\b(nostr:)?(nprofile|npub)[0-9a-z]+[\s]*', '', ref_event.content)
ref_contents.append(f"用户之前提问或者交流的内容: {content}")
return ref_contents
def get_referenced_events(event_ids, current_event_id=None):
# 从Nostr网络查询引用的事件
filters = FiltersList([
Filters(ids=event_ids)
])
subscription_id = uuid.uuid1().hex
relay_manager_for_ids.add_subscription_on_all_relays(subscription_id, filters)
relay_manager_for_ids.run_sync()
referenced_events = []
processed_event_ids = set() # 用于存储已处理的事件ID
while relay_manager_for_ids.message_pool.has_events():
event_msg = relay_manager_for_ids.message_pool.get_event()
if event_msg.event.id not in processed_event_ids and event_msg.event.id != current_event_id:
# 过滤掉当前事件ID
# 过滤掉已处理的事件
referenced_events.append(event_msg.event)
processed_event_ids.add(event_msg.event.id)
# 关闭所有连接
relay_manager_for_ids.close_all_relay_connections()
return referenced_events
def system_message():
system_message = f"你叫大鸟,家住鸟巢(鸟巢是你使用的Nostr社交账号名称)。你是一个AI全能助手。你要尽量全面准确的回答用户问题,并且在回答问题时保持与用户提问使用的语种一致。你在回答的问题的时候要适当参考上下文信息(其中包含你之前的回复和用户之前的提问或交流内容),并关注你之前的回答内容避免重复回答(如有用户的问题有重复你可以指出之前已经回答过),如果你不清楚的问题请直接告知用户而不是胡编乱造。"
return(system_message)
def respond(message, ref_contents=None):
messages = [{"role": "user", "content": message}]
sys_message = {"role": "system", "content": system_message()}
if ref_contents and len(ref_contents) > 0:
context_content = "\n".join(ref_contents)
sys_message["content"] += (f"上下文信息:\n{context_content}")
messages.append(sys_message)
print(messages)
completion = openai_client.chat.completions.create(
model="deepseek-r1", # 此处以 deepseek-r1 为例,可按需更换模型名称。
messages=messages
)
thinking = completion.choices[0].message.reasoning_content
answer = completion.choices[0].message.content
response = "### 思考过程:\n\n" + thinking + "\n\n----------\n\n### 正式回答:\n\n" + answer
print(response)
return response
def get_keypair():
nsec_key = "nsec124d4tdzgr7pe0cljzu6wk9xknz0s3mtht8p8cd52kywxj4rh42rs7txxns"
private_key = PrivateKey.from_nsec(nsec_key)
pubkey_hex = private_key.public_key.hex()
return private_key, pubkey_hex
```
quoting### aibot源代码第二部分
nevent1q…q6wv
```python
def run():
messages_done = []
private_key, pubkey_hex = get_keypair()
print("Pubkey: " + private_key.public_key.bech32())
print("Pubkey (hex): " + pubkey_hex)
start_timestamp = get_timestamp()
while(True):
filters = FiltersList([
Filters(pubkey_refs=[pubkey_hex],
kinds=[EventKind.ENCRYPTED_DIRECT_MESSAGE, EventKind.TEXT_NOTE],
since=start_timestamp)
])
subscription_id = uuid.uuid1().hex
relay_manager_for_pubkey.add_subscription_on_all_relays(subscription_id, filters)
relay_manager_for_pubkey.run_sync()
while relay_manager_for_pubkey.message_pool.has_notices():
notice_msg = relay_manager_for_pubkey.message_pool.get_notice()
print("Notice: " + notice_msg.content)
while relay_manager_for_pubkey.message_pool.has_events():
event_msg = relay_manager_for_pubkey.message_pool.get_event()
# is message too old?
# we don't need this anymore, we filter events and then remember processed events
#if(time.time() - 60 > event_msg.event.created_at):
# continue
# has it already been processed?
if(event_msg.event.id in messages_done):
continue
messages_done.append(event_msg.event.id)
recipient_pubkey = event_msg.event.pubkey
if event_msg.event.kind == EventKind.ENCRYPTED_DIRECT_MESSAGE:
msg_decrypted = EncryptedDirectMessage()
msg_decrypted.decrypt(private_key_hex=private_key.hex(), encrypted_message=event_msg.event.content, public_key_hex=event_msg.event.pubkey)
if (len(msg_decrypted.cleartext_content) < 4):
continue
print ("Private message '" +msg_decrypted.cleartext_content + "' from " + event_msg.event.pubkey)
response = respond(msg_decrypted.cleartext_content)
# print("--> " + response)
print("Sending response to " + recipient_pubkey)
dm = EncryptedDirectMessage()
dm.encrypt(private_key.hex(),
recipient_pubkey=recipient_pubkey,
cleartext_content=response,
)
dm_event = dm.to_event()
dm_event.sign(private_key.hex())
relay_manager_for_pubkey.publish_event(dm_event)
print("Response sent to " + recipient_pubkey)
elif event_msg.event.kind == EventKind.TEXT_NOTE:
print(f"Received public note: {event_msg.event.content}")
content = re.sub(r'\b(nostr:)?(nprofile|npub)[0-9a-z]+[\s]*', '', event_msg.event.content)
if (len(content) < 4):
continue
# 获取引用的上下文
ref_contents = process_event(event_msg.event)
print(f"Received public note: {content}")
if recipient_pubkey != pubkey_hex:
print("Responding...")
reply = Event(
content=respond(content, ref_contents),
)
reply.add_event_ref(event_msg.event.id)
reply.add_pubkey_ref(event_msg.event.pubkey)
reply.sign(private_key.hex())
relay_manager_for_pubkey.publish_event(reply)
print("Public response sent.")
gc.collect()
time.sleep(10)
relay_manager_for_pubkey.close_all_relay_connections()
try:
run()
except KeyboardInterrupt:
print("KeyboardInterrupt")
relay_manager_for_pubkey.close_all_relay_connections()
relay_manager_for_ids.close_all_relay_connections()
exit(1)
except:
print("Exception")
relay_manager_for_pubkey.close_all_relay_connections()
relay_manager_for_ids.close_all_relay_connections()
run()
```