Python Installation
Install MatsushibaDB for Python applications using pip. Perfect for data science, web applications, and Python-based services.Installation
Basic Installation
Copy
pip install matsushibadb
With Virtual Environment (Recommended)
Copy
# Create virtual environment
python -m venv matsushiba-env
# Activate virtual environment
# On Windows:
matsushiba-env\Scripts\activate
# On macOS/Linux:
source matsushiba-env/bin/activate
# Install MatsushibaDB
pip install matsushibadb
Development Installation
Copy
pip install -e .
Specific Version
Copy
pip install matsushibadb==1.0.9
Quick Start
Copy
import matsushibadb
# Create a new database
db = matsushibadb.MatsushibaDB('app.db')
# Create a table
db.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# Insert data
db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('John Doe', '[email protected]'))
# Query data
users = db.execute('SELECT * FROM users').fetchall()
print('Users:', users)
# Close the database
db.close()
Configuration Options
Basic Configuration
Copy
import matsushibadb
db = matsushibadb.MatsushibaDB('app.db', {
# Enable WAL mode for better concurrency
'enable_wal': True,
# Set cache size (in pages)
'cache_size': 2000,
# Synchronous mode
'synchronous': 'NORMAL',
# Journal mode
'journal_mode': 'WAL',
# Temporary storage
'temp_store': 'MEMORY',
# Foreign key constraints
'foreign_keys': True
})
Advanced Configuration
Copy
db = matsushibadb.MatsushibaDB('app.db', {
# Performance settings
'performance': {
'enable_wal': True,
'cache_size': 5000,
'synchronous': 'NORMAL',
'journal_mode': 'WAL',
'temp_store': 'MEMORY',
'optimize_queries': True,
'prepared_statement_cache': True,
'prepared_statement_cache_size': 100
},
# Memory optimization
'memory': {
'mmap_size': 268435456, # 256MB
'optimize_memory': True,
'gc_threshold': 1000
},
# I/O optimization
'io': {
'direct_io': True,
'optimize_io': True,
'buffer_size': 65536 # 64KB
},
# Security settings
'security': {
'enable_encryption': True,
'encryption_key': os.getenv('DB_ENCRYPTION_KEY'),
'enable_audit_logging': True
}
})
Async Support
MatsushibaDB provides full async/await support with asyncio:Copy
import asyncio
import matsushibadb
async def async_example():
db = matsushibadb.MatsushibaDB('async-app.db')
try:
# Async operations
await db.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
price DECIMAL(10,2),
stock INTEGER DEFAULT 0
)
''')
# Insert with async
result = await db.execute('''
INSERT INTO products (name, price, stock)
VALUES (?, ?, ?)
''', ('Laptop', 999.99, 10))
print('Product created with ID:', result.lastrowid)
# Query with async
products = await db.execute('SELECT * FROM products WHERE stock > 0').fetchall()
print('Available products:', products)
# Get single record with async
product = await db.execute('SELECT * FROM products WHERE id = ?', (result.lastrowid,)).fetchone()
print('Product details:', product)
except Exception as error:
print('Database error:', error)
finally:
await db.close()
# Run async example
asyncio.run(async_example())
Connection Pooling
For production applications, use connection pooling:Copy
from matsushibadb import MatsushibaPool
pool = MatsushibaPool(
database='app.db',
min_connections=5,
max_connections=20,
acquire_timeout=30,
create_timeout=30,
destroy_timeout=5,
idle_timeout=30,
reap_interval=1,
create_retry_interval=0.2,
propagate_create_error=False
)
# Use the pool
async def get_users():
async with pool.get_connection() as conn:
users = await conn.execute('SELECT * FROM users WHERE active = 1').fetchall()
return users
# Transaction with pool
async def create_user_with_profile(user_data, profile_data):
async with pool.get_connection() as conn:
try:
await conn.begin_transaction()
user_result = await conn.execute('''
INSERT INTO users (name, email, password_hash)
VALUES (?, ?, ?)
''', (user_data['name'], user_data['email'], user_data['password_hash']))
profile_result = await conn.execute('''
INSERT INTO profiles (user_id, bio, avatar_url)
VALUES (?, ?, ?)
''', (user_result.lastrowid, profile_data['bio'], profile_data['avatar_url']))
await conn.commit()
return {'user_id': user_result.lastrowid, 'profile_id': profile_result.lastrowid}
except Exception as e:
await conn.rollback()
raise e
Framework Integrations
FastAPI Integration
Copy
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import matsushibadb
import os
app = FastAPI(title="MatsushibaDB FastAPI App", version="1.0.0")
# Database connection
db = matsushibadb.MatsushibaDB('fastapi-app.db')
# Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic models
class UserCreate(BaseModel):
name: str
email: str
password: str
class UserResponse(BaseModel):
id: int
name: str
email: str
created_at: str
class PostCreate(BaseModel):
title: str
content: str
published: bool = False
class PostResponse(BaseModel):
id: int
title: str
content: str
author_id: int
published: bool
created_at: str
# Database initialization
def init_database():
db.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
db.execute('''
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
content TEXT,
author_id INTEGER,
published BOOLEAN DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (author_id) REFERENCES users(id)
)
''')
# Create indexes
db.execute('CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)')
db.execute('CREATE INDEX IF NOT EXISTS idx_posts_author ON posts(author_id)')
# Routes
@app.get("/api/health")
async def health_check():
return {
"status": "healthy",
"timestamp": "2024-01-15T10:30:00Z",
"database": "connected"
}
@app.post("/api/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
# Check if user exists
existing_user = db.execute('SELECT * FROM users WHERE email = ?', (user.email,)).fetchone()
if existing_user:
raise HTTPException(
status_code=409,
detail="User with this email already exists"
)
# Hash password
import bcrypt
password_hash = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt())
# Create user
result = db.execute('''
INSERT INTO users (name, email, password_hash)
VALUES (?, ?, ?)
''', (user.name, user.email, password_hash))
# Get created user
new_user = db.execute('SELECT * FROM users WHERE id = ?', (result.lastrowid,)).fetchone()
return UserResponse(
id=new_user[0],
name=new_user[1],
email=new_user[2],
created_at=new_user[4]
)
@app.get("/api/posts", response_model=list[PostResponse])
async def get_posts(published: bool = True):
posts = db.execute('''
SELECT p.*, u.name as author_name
FROM posts p
JOIN users u ON p.author_id = u.id
WHERE p.published = ?
ORDER BY p.created_at DESC
''', (published,)).fetchall()
return [
PostResponse(
id=post[0],
title=post[1],
content=post[2],
author_id=post[3],
published=bool(post[4]),
created_at=post[5]
) for post in posts
]
if __name__ == "__main__":
init_database()
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Django Integration
Copy
# settings.py
import os
import matsushibadb
# Database configuration
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': 'django-app.db',
}
}
# Custom database backend
class MatsushibaDBBackend:
def __init__(self, database_path):
self.db = matsushibadb.MatsushibaDB(database_path)
def execute(self, query, params=None):
return self.db.execute(query, params or [])
def fetchone(self, query, params=None):
return self.db.execute(query, params or []).fetchone()
def fetchall(self, query, params=None):
return self.db.execute(query, params or []).fetchall()
# models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
from django.utils import timezone
class User(AbstractUser):
email = models.EmailField(unique=True)
created_at = models.DateTimeField(default=timezone.now)
updated_at = models.DateTimeField(auto_now=True)
class Post(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE)
published = models.BooleanField(default=False)
created_at = models.DateTimeField(default=timezone.now)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-created_at']
def __str__(self):
return self.title
# views.py
from django.shortcuts import render, get_object_or_404
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.contrib.auth import authenticate, login
from django.contrib.auth.decorators import login_required
from django.utils.decorators import method_decorator
from django.views import View
import json
import matsushibadb
# Custom database connection
db = matsushibadb.MatsushibaDB('django-app.db')
class HealthCheckView(View):
def get(self, request):
return JsonResponse({
'status': 'healthy',
'timestamp': timezone.now().isoformat(),
'database': 'connected'
})
class UserListView(View):
def get(self, request):
users = db.execute('''
SELECT id, username, email, date_joined
FROM auth_user
ORDER BY date_joined DESC
''').fetchall()
return JsonResponse([
{
'id': user[0],
'username': user[1],
'email': user[2],
'date_joined': user[3]
} for user in users
], safe=False)
@csrf_exempt
def post(self, request):
data = json.loads(request.body)
# Check if user exists
existing_user = db.execute(
'SELECT * FROM auth_user WHERE email = ?',
(data['email'],)
).fetchone()
if existing_user:
return JsonResponse(
{'error': 'User with this email already exists'},
status=409
)
# Create user (simplified - in production, use Django's user creation)
result = db.execute('''
INSERT INTO auth_user (username, email, password, is_active, date_joined)
VALUES (?, ?, ?, ?, ?)
''', (
data['username'],
data['email'],
'hashed_password', # In production, properly hash password
1,
timezone.now().isoformat()
))
return JsonResponse({
'id': result.lastrowid,
'username': data['username'],
'email': data['email']
}, status=201)
# urls.py
from django.urls import path
from . import views
urlpatterns = [
path('api/health/', views.HealthCheckView.as_view(), name='health'),
path('api/users/', views.UserListView.as_view(), name='user-list'),
]
Flask Integration
Copy
from flask import Flask, request, jsonify, g
from flask_cors import CORS
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import matsushibadb
import jwt
import bcrypt
from datetime import datetime, timedelta
import os
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key')
# Extensions
CORS(app)
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
# Database connection
def get_db():
if 'db' not in g:
g.db = matsushibadb.MatsushibaDB('flask-app.db')
return g.db
@app.teardown_appcontext
def close_db(error):
db = g.pop('db', None)
if db is not None:
db.close()
# Authentication
def create_token(user_id):
payload = {
'sub': user_id,
'exp': datetime.utcnow() + timedelta(hours=24)
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
def verify_token(token):
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
return payload['sub']
except jwt.PyJWTError:
return None
def require_auth(f):
from functools import wraps
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Token required'}), 401
try:
token = token.split(' ')[1] # Remove 'Bearer ' prefix
user_id = verify_token(token)
if not user_id:
return jsonify({'error': 'Invalid token'}), 401
except:
return jsonify({'error': 'Invalid token'}), 401
return f(user_id, *args, **kwargs)
return decorated
# Routes
@app.route('/api/health')
def health_check():
return jsonify({
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'database': 'connected'
})
@app.route('/api/users', methods=['POST'])
@limiter.limit("10 per minute")
def create_user():
data = request.get_json()
if not all(k in data for k in ('name', 'email', 'password')):
return jsonify({'error': 'Name, email, and password required'}), 400
db = get_db()
# Check if user exists
existing_user = db.execute('SELECT * FROM users WHERE email = ?', (data['email'],)).fetchone()
if existing_user:
return jsonify({'error': 'User with this email already exists'}), 409
# Hash password
password_hash = bcrypt.hashpw(data['password'].encode('utf-8'), bcrypt.gensalt())
# Create user
result = db.execute('''
INSERT INTO users (name, email, password_hash)
VALUES (?, ?, ?)
''', (data['name'], data['email'], password_hash))
# Generate token
token = create_token(result.lastrowid)
return jsonify({
'id': result.lastrowid,
'name': data['name'],
'email': data['email'],
'token': token
}), 201
@app.route('/api/posts', methods=['GET'])
def get_posts():
db = get_db()
published = request.args.get('published', 'true').lower() == 'true'
posts = db.execute('''
SELECT p.*, u.name as author_name
FROM posts p
JOIN users u ON p.author_id = u.id
WHERE p.published = ?
ORDER BY p.created_at DESC
''', (1 if published else 0,)).fetchall()
return jsonify([
{
'id': post[0],
'title': post[1],
'content': post[2],
'author_id': post[3],
'published': bool(post[4]),
'created_at': post[5],
'author_name': post[7]
} for post in posts
])
@app.route('/api/posts', methods=['POST'])
@require_auth
def create_post(user_id):
data = request.get_json()
if not all(k in data for k in ('title', 'content')):
return jsonify({'error': 'Title and content required'}), 400
db = get_db()
result = db.execute('''
INSERT INTO posts (title, content, author_id, published)
VALUES (?, ?, ?, ?)
''', (data['title'], data['content'], user_id, data.get('published', False)))
return jsonify({
'id': result.lastrowid,
'title': data['title'],
'content': data['content'],
'published': data.get('published', False)
}), 201
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
Testing
Unit Testing with pytest
Copy
import pytest
import matsushibadb
import tempfile
import os
@pytest.fixture
def db():
# Create temporary database for testing
db_path = tempfile.mktemp(suffix='.db')
db = matsushibadb.MatsushibaDB(db_path)
# Create test table
db.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
)
''')
yield db
# Cleanup
db.close()
os.unlink(db_path)
def test_create_user(db):
result = db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('John Doe', '[email protected]'))
assert result.lastrowid == 1
assert result.rowcount == 1
def test_retrieve_users(db):
db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('John Doe', '[email protected]'))
users = db.execute('SELECT * FROM users').fetchall()
assert len(users) == 1
assert users[0][1] == 'John Doe'
assert users[0][2] == '[email protected]'
def test_unique_constraint(db):
db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('John Doe', '[email protected]'))
with pytest.raises(Exception):
db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('Jane Doe', '[email protected]'))
def test_transaction(db):
try:
db.execute('BEGIN TRANSACTION')
db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('Alice', '[email protected]'))
db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('Bob', '[email protected]'))
db.execute('COMMIT')
users = db.execute('SELECT * FROM users').fetchall()
assert len(users) == 2
except Exception as e:
db.execute('ROLLBACK')
raise e
Integration Testing
Copy
import pytest
from fastapi.testclient import TestClient
from app import app, db
@pytest.fixture
def client():
return TestClient(app)
@pytest.fixture
def test_db():
# Setup test database
db.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
)
''')
yield db
# Cleanup
db.execute('DELETE FROM users')
def test_create_user_api(client, test_db):
response = client.post('/api/users', json={
'name': 'John Doe',
'email': '[email protected]',
'password': 'password123'
})
assert response.status_code == 201
data = response.json()
assert data['name'] == 'John Doe'
assert data['email'] == '[email protected]'
def test_get_users_api(client, test_db):
# Create test user
test_db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('John Doe', '[email protected]'))
response = client.get('/api/users')
assert response.status_code == 200
users = response.json()
assert len(users) == 1
assert users[0]['name'] == 'John Doe'
Performance Optimization
Query Optimization
Copy
# Use prepared statements for repeated queries
get_user_stmt = db.prepare('SELECT * FROM users WHERE id = ?')
user = get_user_stmt.fetchone(1)
# Use indexes for better performance
db.execute('CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)')
db.execute('CREATE INDEX IF NOT EXISTS idx_users_name ON users(name)')
# Batch operations
insert_user_stmt = db.prepare('INSERT INTO users (name, email) VALUES (?, ?)')
users = [
('John Doe', '[email protected]'),
('Jane Smith', '[email protected]'),
('Bob Johnson', '[email protected]')
]
def insert_many(users):
for user in users:
insert_user_stmt.execute(user)
insert_many(users)
Memory Management
Copy
# Use context managers for automatic cleanup
with db.get_connection() as conn:
users = conn.execute('SELECT * FROM users').fetchall()
# Connection automatically closed
# Use generators for large datasets
def get_users_generator():
stmt = db.prepare('SELECT * FROM users')
for row in stmt:
yield row
stmt.close()
# Process users one at a time
for user in get_users_generator():
process_user(user)
Production Deployment
Environment Configuration
Copy
# config/database.py
import os
import matsushibadb
class DatabaseConfig:
def __init__(self):
self.environment = os.getenv('ENVIRONMENT', 'development')
self.config = self._get_config()
def _get_config(self):
configs = {
'development': {
'database': 'dev.db',
'enable_wal': True,
'cache_size': 1000
},
'production': {
'database': os.getenv('DATABASE_PATH', '/data/production.db'),
'enable_wal': True,
'cache_size': 5000,
'synchronous': 'NORMAL',
'journal_mode': 'WAL',
'temp_store': 'MEMORY',
'security': {
'enable_encryption': True,
'encryption_key': os.getenv('DB_ENCRYPTION_KEY')
}
}
}
return configs.get(self.environment, configs['development'])
def create_database(self):
return matsushibadb.MatsushibaDB(self.config['database'], self.config)
# Usage
db_config = DatabaseConfig()
db = db_config.create_database()
Process Management with Gunicorn
Copy
# gunicorn.conf.py
bind = "0.0.0.0:8000"
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
preload_app = True
timeout = 30
keepalive = 2
# Environment variables
raw_env = [
"ENVIRONMENT=production",
"DATABASE_PATH=/data/production.db",
"DB_ENCRYPTION_KEY=your-encryption-key"
]
CLI Tools
MatsushibaDB includes powerful CLI tools:Copy
# Initialize a new database
matsushiba init my-database.db
# Create a table
matsushiba create-table users "id INTEGER PRIMARY KEY, name TEXT, email TEXT"
# Insert data
matsushiba insert users "name='John Doe', email='[email protected]'"
# Query data
matsushiba query "SELECT * FROM users"
# Export data
matsushiba export users --format json --output users.json
# Import data
matsushiba import users --format json --input users.json
# Backup database
matsushiba backup my-database.db --output backup.db
# Restore database
matsushiba restore backup.db --output restored.db
# Performance analysis
matsushiba analyze my-database.db
# Database statistics
matsushiba stats my-database.db
Troubleshooting
Common Issues
Database Locked Error:Copy
import time
def safe_execute(db, query, params=None, max_retries=3):
for attempt in range(max_retries):
try:
return db.execute(query, params)
except Exception as e:
if 'database is locked' in str(e) and attempt < max_retries - 1:
time.sleep(0.1 * (2 ** attempt)) # Exponential backoff
continue
raise e
Copy
import psutil
import gc
def monitor_memory():
process = psutil.Process()
memory_info = process.memory_info()
print(f'Memory usage: {memory_info.rss / 1024 / 1024:.2f} MB')
# Force garbage collection
gc.collect()
Best Practices
Use Virtual Environments
Always use virtual environments to isolate dependencies and avoid conflicts.
Implement Error Handling
Wrap all database operations in try-catch blocks and handle specific error types.
For production applications, always use virtual environments, implement proper error handling, and monitor database performance. Consider using environment-specific configurations and proper logging.