Batch Writes and Idempotency
This guide explains how to efficiently write large volumes of data to Papr Memory and ensure operations are idempotent (can be safely retried without creating duplicates).
Batch Processing Overview
Batch operations allow you to:
- Add multiple memories in a single API call
- Delete all memories for a user in a single operation
- Reduce network overhead for large data imports
- Improve throughput when processing datasets
- Track success/failure status for individual items
Using Batch Memory Endpoints
The batch memory endpoint allows you to submit multiple memory items in a single request:
curl -X POST https://memory.papr.ai/v1/memory/batch \
-H "X-API-Key: YOUR_API_KEY" \
-H "Content-Type: application/json" \
-H "X-Client-Type: curl" \
-d '{
"memories": [
{
"content": "First memory content",
"type": "text",
"metadata": {
"topics": ["notes", "important"],
"hierarchical_structures": "Notes/Important"
}
},
{
"content": "Second memory content",
"type": "text",
"metadata": {
"topics": ["meeting", "product"],
"hierarchical_structures": "Meetings/Product"
}
},
{
"content": "Third memory content",
"type": "text",
"metadata": {
"topics": ["research", "development"],
"hierarchical_structures": "Research/Development"
}
}
],
"batch_size": 10
}'
Using External User IDs
Papr Memory supports adding and retrieving memories using external user IDs without requiring you to create users first. The system will automatically create users in the background as needed.
Adding Memories with External User IDs
You can specify an external user ID when adding memories in batch:
curl -X POST https://memory.papr.ai/v1/memory/batch \
-H "X-API-Key: YOUR_API_KEY" \
-H "Content-Type: application/json" \
-H "X-Client-Type: curl" \
-d '{
"external_user_id": "customer_123456",
"memories": [
{
"content": "Customer meeting notes",
"type": "text",
"metadata": {
"topics": ["customer", "meeting"],
"hierarchical_structures": "Customers/Meetings"
}
}
]
}'
Error Handling
Always handle errors for individual items in batch operations:
response = client.memory.add_batch(memories=memories, batch_size=10)
if response.total_failed > 0:
# Log errors
for error in response.errors:
print(f"Error at index {error.index}: {error.error}")
# Retry only the failed memories if needed
failed_indices = [err.index for err in response.errors]
failed_memories = [memories[idx] for idx in failed_indices]
# Consider retry later
# retry_response = client.memory.add_batch(memories=failed_memories)
Avoiding Duplicates with Idempotency
To avoid creating duplicate memories when retrying operations, use metadata fields to create natural idempotency keys:
import hashlib
import time
def generate_idempotency_key(content):
"""Generate a consistent key for the same content"""
return hashlib.md5(content.encode()).hexdigest()
# When creating memories, add the idempotency key to metadata
memories = []
for content in content_list:
idempotency_key = generate_idempotency_key(content)
memories.append({
"content": content,
"type": "text",
"metadata": {
"idempotency_key": idempotency_key,
"topics": ["meeting", "notes"],
"hierarchical_structures": "Meetings/Notes"
}
})
# Then if you need to retry, the server can detect duplicates
response = client.memory.add_batch(memories=memories)
Batch Processing Best Practices
Use reasonable batch sizes: Start with 10-20 items per batch and adjust based on your use case.
Group similar content together: Process similar types of memories in the same batch for better performance.
Handle partial successes: Your code should handle cases where some memories succeed and others fail.
Implement exponential backoff: For retrying failed batches, use increasing delays between attempts.
Include idempotency keys: Adding unique keys in metadata helps avoid duplicates on retry.
Monitor processing statistics: Track total_processed, total_successful, and total_failed to ensure your batch operations are working efficiently.
Consider memory size limits: Batch requests are still subject to overall size limits, so very large content may need to be processed differently.
Performance Considerations
- Use Accept-Encoding: gzip header to reduce network transfer size
- Schedule large batch operations during off-peak hours
- Process batches in parallel for higher throughput (with rate limit awareness)
- Use external_user_id for consistent user association across operations
Using Webhooks for Batch Completion Notifications
You can set up a webhook to receive notifications when batch processing is complete, which is especially useful for large batches that may take time to process.
Configuring a Webhook
When submitting a batch request, include the webhook_url
parameter to receive a notification when processing is complete:
curl -X POST https://memory.papr.ai/v1/memory/batch \
-H "X-API-Key: YOUR_API_KEY" \
-H "Content-Type: application/json" \
-H "X-Client-Type: curl" \
-d '{
"memories": [
{
"content": "First memory content",
"type": "text",
"metadata": {
"topics": ["notes", "important"]
}
},
{
"content": "Second memory content",
"type": "text",
"metadata": {
"topics": ["meeting", "product"]
}
}
],
"batch_size": 10,
"webhook_url": "https://your-server.com/webhooks/papr-batch-complete"
}'
Webhook Implementation
Your webhook endpoint should:
- Accept POST requests with JSON content
- Verify the webhook signature (if applicable)
- Process the batch completion information
- Return a 200 OK status code to acknowledge receipt
Example webhook handler:
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/webhooks/papr-batch-complete', methods=['POST'])
def handle_batch_webhook():
data = request.json
# Log the completion event
batch_id = data.get('batch_id')
status = data.get('status')
total_successful = data.get('total_successful')
total_failed = data.get('total_failed')
print(f"Batch {batch_id} {status}: {total_successful} succeeded, {total_failed} failed")
# Update your application state or notify users
# ...
return jsonify({"status": "received"}), 200
Security Considerations
- Use HTTPS for your webhook endpoint
- Consider implementing signature verification to ensure requests come from Papr
- Store webhook URLs securely and rotate them periodically
- Implement retry logic in case your endpoint is temporarily unavailable
Bulk Delete Operations
Delete All Memories for a User
⚠️ WARNING: This operation cannot be undone. All memories for the specified user will be permanently deleted.
The delete all endpoint allows you to remove all memories for a specific user or for the authenticated developer:
# Delete all memories for a specific external user
curl -X DELETE "https://memory.papr.ai/v1/memory/all?external_user_id=customer_123456" \
-H "X-API-Key: YOUR_API_KEY" \
-H "X-Client-Type: curl"
# Delete all memories for the authenticated developer (no user specified)
curl -X DELETE "https://memory.papr.ai/v1/memory/all" \
-H "X-API-Key: YOUR_API_KEY" \
-H "X-Client-Type: curl"
Bulk Delete Best Practices
Confirm Before Deletion: Always implement confirmation dialogs or processes before calling delete all operations.
Backup Critical Data: Consider creating backups of important memories before performing bulk deletions.
Use External User IDs: When working with multiple users, always specify the
external_user_id
to avoid accidentally deleting the wrong user's data.Monitor Deletion Status: Check the response for partial failures and handle them appropriately.
Implement Audit Logging: Log all bulk deletion operations for compliance and debugging purposes.
Example implementation with safety checks:
def safe_delete_all_memories(client, external_user_id=None, confirm=False):
"""Safely delete all memories with confirmation and logging"""
if not confirm:
print("⚠️ WARNING: This will permanently delete all memories!")
if external_user_id:
print(f" Target user: {external_user_id}")
else:
print(" Target: All memories for authenticated developer")
confirmation = input("Type 'DELETE ALL' to confirm: ")
if confirmation != "DELETE ALL":
print("Operation cancelled.")
return None
# Perform the deletion
try:
if external_user_id:
response = client.memory.delete_all(external_user_id=external_user_id)
else:
response = client.memory.delete_all()
# Log the operation
print(f"Deletion completed:")
print(f" - Status: {response.status}")
print(f" - Total processed: {response.total_processed}")
print(f" - Successful: {response.total_successful}")
print(f" - Failed: {response.total_failed}")
if response.total_failed > 0:
print(" - Errors:")
for error in response.errors:
print(f" * {error.error} (Memory ID: {error.memory_id})")
return response
except Exception as e:
print(f"Error during deletion: {e}")
return None
# Usage
response = safe_delete_all_memories(client, external_user_id="customer_123")
Next Steps
- Learn about Memory Management
- Explore Retrieval Strategies
- See the complete API Reference