الإنتاج والمؤسسات
أفضل ممارسات الأمان
4 دقيقة للقراءة
الوكيل مع الوصول لأنظمتك أداة قوية—وسطح هجوم قوي. الأمان ليس اختيارياً؛ إنه وجودي.
نموذج التهديد
| التهديد | مثال | التخفيف |
|---|---|---|
| حقن البرومبت | المستخدم يخدع الوكيل لتشغيل كود ضار | تنقية المدخلات، صندوق رمل |
| تسريب البيانات | الوكيل يرسل بيانات حساسة لخدمة خارجية | سياسات الشبكة، تصفية المخرجات |
| تصعيد الامتيازات | الوكيل يكتسب وصولاً خارج نطاقه | أقل امتياز، رموز القدرة |
| إساءة استخدام الموارد | الوكيل يشغل حلقات لانهائية، يستهلك كل الرموز | حدود المعدل، ميزانيات، مهلات |
| إساءة استخدام الأداة | الوكيل يحذف ملفات خاطئة، يسقط جداول | تدفقات التأكيد، أوضاع التشغيل الجاف |
تنقية المدخلات
لا تثق أبداً بمدخلات المستخدم—أو المحتوى المولّد من الوكيل:
class InputSanitizer:
def __init__(self):
self.dangerous_patterns = [
r"ignore previous instructions",
r"system prompt",
r"<\|.*?\|>", # رموز خاصة
r"```.*?exec\(.*?```", # تنفيذ كود
]
def sanitize(self, text: str) -> tuple[str, list[str]]:
"""إزالة الأنماط الخطرة وإرجاع تحذيرات."""
warnings = []
for pattern in self.dangerous_patterns:
if re.search(pattern, text, re.IGNORECASE | re.DOTALL):
warnings.append(f"نمط محظور: {pattern}")
text = re.sub(pattern, "[محظور]", text, flags=re.IGNORECASE | re.DOTALL)
return text, warnings
def validate_tool_args(self, tool_name: str, args: dict) -> tuple[bool, str]:
"""التحقق من معاملات الأداة قبل التنفيذ."""
if tool_name == "run_command":
cmd = args.get("command", "")
# حظر الأوامر الخطرة
blocked = ["rm -rf", "sudo", "chmod 777", "curl | bash", "eval"]
for b in blocked:
if b in cmd:
return False, f"نمط أمر محظور: {b}"
if tool_name == "write_file":
path = args.get("path", "")
# منع اجتياز المسار
if ".." in path or path.startswith("/etc") or path.startswith("/root"):
return False, f"مسار محظور: {path}"
return True, ""
التنفيذ في صندوق رمل
شغّل الكود في بيئات معزولة:
import docker
class SandboxExecutor:
def __init__(self, image: str = "python:3.11-slim"):
self.client = docker.from_env()
self.image = image
self.timeout = 30
self.memory_limit = "256m"
self.network_disabled = True
async def execute(self, code: str) -> dict:
"""تشغيل الكود في حاوية معزولة."""
container = self.client.containers.run(
self.image,
command=["python", "-c", code],
detach=True,
mem_limit=self.memory_limit,
network_disabled=self.network_disabled,
read_only=True,
security_opt=["no-new-privileges:true"]
)
try:
result = container.wait(timeout=self.timeout)
logs = container.logs().decode()
return {
"success": result["StatusCode"] == 0,
"output": logs,
"exit_code": result["StatusCode"]
}
except Exception as e:
return {"success": False, "error": str(e)}
finally:
container.remove(force=True)
تصفية المخرجات
منع تسريب البيانات الحساسة:
class OutputFilter:
def __init__(self):
self.patterns = {
"api_key": r"(sk-[a-zA-Z0-9]{48}|api[_-]?key['\"]?\s*[:=]\s*['\"][^'\"]+)",
"password": r"password['\"]?\s*[:=]\s*['\"][^'\"]+",
"jwt": r"eyJ[a-zA-Z0-9_-]+\.eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+",
"private_key": r"-----BEGIN (RSA |EC )?PRIVATE KEY-----",
"aws_key": r"AKIA[0-9A-Z]{16}",
}
def filter_output(self, text: str) -> tuple[str, list[str]]:
"""تنقيح المعلومات الحساسة من مخرج الوكيل."""
redacted = []
for name, pattern in self.patterns.items():
matches = re.findall(pattern, text, re.IGNORECASE)
if matches:
redacted.append(f"تم تنقيح {len(matches)} نمط {name}")
text = re.sub(pattern, f"[منقح_{name.upper()}]", text, flags=re.IGNORECASE)
return text, redacted
def validate_urls(self, text: str, allowed_domains: list[str]) -> tuple[str, list[str]]:
"""حظر الطلبات للنطاقات غير المصرح بها."""
blocked = []
urls = re.findall(r'https?://([^/\s]+)', text)
for url in urls:
domain = url.split(':')[0] # إزالة المنفذ
if not any(domain.endswith(d) for d in allowed_domains):
blocked.append(domain)
text = text.replace(url, "[نطاق_محظور]")
return text, blocked
التأكيد للإجراءات المدمرة
طلب موافقة صريحة للعمليات الخطرة:
class ConfirmationGate:
def __init__(self, dangerous_tools: list[str]):
self.dangerous_tools = dangerous_tools
self.pending_confirmations = {}
async def check_tool_call(self, tool_name: str, args: dict, context: dict) -> dict:
"""اعتراض استدعاءات الأدوات الخطرة للتأكيد."""
if tool_name not in self.dangerous_tools:
return {"approved": True}
# توليد طلب تأكيد
confirmation_id = str(uuid.uuid4())
self.pending_confirmations[confirmation_id] = {
"tool": tool_name,
"args": args,
"context": context,
"created_at": datetime.now()
}
return {
"approved": False,
"requires_confirmation": True,
"confirmation_id": confirmation_id,
"message": f"الإجراء '{tool_name}' يتطلب موافقة. المعاملات: {args}"
}
async def confirm(self, confirmation_id: str, approved: bool) -> dict:
"""معالجة قرار التأكيد."""
if confirmation_id not in self.pending_confirmations:
return {"error": "معرف تأكيد غير معروف"}
pending = self.pending_confirmations.pop(confirmation_id)
if approved:
return {"approved": True, "tool": pending["tool"], "args": pending["args"]}
else:
return {"approved": False, "reason": "المستخدم رفض"}
تسجيل التدقيق
سجل كل شيء للطب الشرعي:
class SecurityAuditLog:
def __init__(self, log_file: str):
self.log_file = log_file
def log_event(self, event_type: str, details: dict):
"""إدخال سجل تدقيق غير قابل للتغيير."""
entry = {
"timestamp": datetime.now().isoformat(),
"event_type": event_type,
"details": details,
"checksum": None
}
# إضافة checksum للسلامة
entry["checksum"] = hashlib.sha256(
json.dumps(entry, sort_keys=True).encode()
).hexdigest()
with open(self.log_file, "a") as f:
f.write(json.dumps(entry) + "\n")
# تسجيل جميع تنفيذات الأدوات
audit.log_event("tool_execution", {
"tool": "run_command",
"args": {"command": "ls -la"},
"user_id": "user_123",
"agent_session": "session_456",
"result": "success"
})
ملاحظة نيردية: الدفاع في العمق. افترض أن كل طبقة ستفشل وأضف طبقة أخرى. التحقق من المدخلات + صندوق الرمل + تصفية المخرجات + التأكيد + تسجيل التدقيق.
التالي: إدارة من يمكنه الوصول لماذا. :::