Batch Writes and Idempotency
This guide explains how to efficiently write large volumes of data to Papr Memory and ensure operations are idempotent (can be safely retried without creating duplicates).
Batch Processing Overview
Batch operations allow you to:
- Add multiple memories in a single API call
- Reduce network overhead for large data imports
- Improve throughput when processing datasets
- Track success/failure status for individual items
Using Batch Memory Endpoints
The batch memory endpoint allows you to submit multiple memory items in a single request:
curl -X POST https://memory.papr.ai/v1/memory/batch \
-H "X-API-Key: YOUR_API_KEY" \
-H "Content-Type: application/json" \
-H "X-Client-Type: curl" \
-d '{
"memories": [
{
"content": "First memory content",
"type": "text",
"metadata": {
"topics": "notes, important",
"hierarchical_structures": "Notes/Important"
}
},
{
"content": "Second memory content",
"type": "text",
"metadata": {
"topics": "meeting, product",
"hierarchical_structures": "Meetings/Product"
}
},
{
"content": "Third memory content",
"type": "text",
"metadata": {
"topics": "research, development",
"hierarchical_structures": "Research/Development"
}
}
],
"batch_size": 10
}'
Batch Size Optimization
The batch_size
parameter controls how many items are processed in parallel. Finding the optimal batch size depends on your data:
- Small batch sizes (1-5): Good for memory items with complex relationships or large content
- Medium batch sizes (5-20): Good balance for most use cases
- Large batch sizes (20-50): Efficient for small, simple memory items
The maximum allowed batch size is 50. The default is 10 if not specified.
Partial Success Handling
Batch operations can result in partial success. The response clearly indicates which items succeeded and which failed:
{
"code": 200,
"status": "success",
"successful": [...], // Array of successfully processed items
"errors": [ // Array of failed items with details
{
"index": 2, // The index in the original request
"error": "Content size exceeds maximum limit"
}
],
"total_processed": 3,
"total_successful": 2,
"total_failed": 1,
"total_content_size": 45,
"total_storage_size": 128
}
Your application should check the errors
array and handle failed items appropriately:
- Retry with modified content (e.g., if content was too large)
- Log errors for manual review
- Skip items that cannot be processed
Idempotency
Idempotency ensures that an operation produces the same result regardless of how many times it's performed. This is critical for handling network issues or retries.
Using Idempotency Keys
To make batch operations idempotent, use consistent and unique identifiers in your metadata:
curl -X POST https://api.papr.ai/v1/memory \
-H "X-API-Key: YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"content": "Important information to remember",
"type": "text",
"metadata": {
"idempotency_key": "unique-id-12345",
"topics": "important, system",
"hierarchical_structures": "System/Important"
}
}'
When Papr Memory receives a request with an idempotency key it has seen before, it will return the same response as the original request without creating duplicate data.
Idempotency Best Practices
Use unique, deterministic IDs - UUIDs or hashes of your data make good idempotency keys.
Include timestamps in your metadata to track when the data was originally created.
Store idempotency keys in your application to enable safe retries of failed operations.
Set reasonable timeout policies for retries to avoid overwhelming the API.
Include version information if the same content might be updated over time.
Large Dataset Import
For importing very large datasets (thousands to millions of records), follow these guidelines:
Break data into manageable batches - Process 50-100 items per batch.
Implement exponential backoff for rate limit handling:
import time
import random
from papr import Papr
def import_with_backoff(client, memories, max_retries=5):
retries = 0
while retries < max_retries:
try:
response = client.memory.add_batch(memories=memories)
return response
except Exception as e:
if "rate limit exceeded" in str(e).lower():
sleep_time = (2 ** retries) + random.random()
print(f"Rate limit hit, retrying in {sleep_time:.2f} seconds")
time.sleep(sleep_time)
retries += 1
else:
raise
raise Exception("Max retries exceeded")
Track progress and resume capability - Store the last successfully processed batch ID.
Consider using async processing for large imports:
import asyncio
import aiohttp
import os
async def import_batch(session, api_key, batch):
url = "https://api.papr.ai/v1/memory/batch"
headers = {
"X-API-Key": api_key,
"Content-Type": "application/json"
}
payload = {"memories": batch, "batch_size": 10}
async with session.post(url, headers=headers, json=payload) as response:
return await response.json()
async def import_dataset(api_key, all_memories, concurrency=3):
batch_size = 50
batches = [all_memories[i:i+batch_size] for i in range(0, len(all_memories), batch_size)]
async with aiohttp.ClientSession() as session:
tasks = []
for batch in batches:
task = asyncio.ensure_future(import_batch(session, api_key, batch))
tasks.append(task)
if len(tasks) >= concurrency:
# Wait for some tasks to complete before adding more
await asyncio.gather(*tasks[:concurrency])
tasks = tasks[concurrency:]
# Wait for remaining tasks
if tasks:
await asyncio.gather(*tasks)
Next Steps
- Learn about Search Tuning
- Explore Content Ingestion
- See the API Reference for detailed parameter information