Python Installation
Install MatsushibaDB for Python applications using pip. Perfect for data science, web applications, and Python-based services.Installation
Basic Installation
Copy
pip install matsushibadb
With Virtual Environment (Recommended)
Copy
# Create virtual environment
python -m venv matsushiba-env
# Activate virtual environment
# On Windows:
matsushiba-env\Scripts\activate
# On macOS/Linux:
source matsushiba-env/bin/activate
# Install MatsushibaDB
pip install matsushibadb
Development Installation
Copy
pip install -e .
Specific Version
Copy
pip install matsushibadb==1.0.9
Quick Start
Copy
import matsushibadb
# Create a new database
db = matsushibadb.MatsushibaDB('app.db')
# Create a table
db.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# Insert data
db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('John Doe', '[email protected]'))
# Query data
users = db.execute('SELECT * FROM users').fetchall()
print('Users:', users)
# Close the database
db.close()
Configuration Options
Basic Configuration
Copy
import matsushibadb
db = matsushibadb.MatsushibaDB('app.db', {
# Enable WAL mode for better concurrency
'enable_wal': True,
# Set cache size (in pages)
'cache_size': 2000,
# Synchronous mode
'synchronous': 'NORMAL',
# Journal mode
'journal_mode': 'WAL',
# Temporary storage
'temp_store': 'MEMORY',
# Foreign key constraints
'foreign_keys': True
})
Advanced Configuration
Copy
db = matsushibadb.MatsushibaDB('app.db', {
# Performance settings
'performance': {
'enable_wal': True,
'cache_size': 5000,
'synchronous': 'NORMAL',
'journal_mode': 'WAL',
'temp_store': 'MEMORY',
'optimize_queries': True,
'prepared_statement_cache': True,
'prepared_statement_cache_size': 100
},
# Memory optimization
'memory': {
'mmap_size': 268435456, # 256MB
'optimize_memory': True,
'gc_threshold': 1000
},
# I/O optimization
'io': {
'direct_io': True,
'optimize_io': True,
'buffer_size': 65536 # 64KB
},
# Security settings
'security': {
'enable_encryption': True,
'encryption_key': os.getenv('DB_ENCRYPTION_KEY'),
'enable_audit_logging': True
}
})
Async Support
MatsushibaDB provides full async/await support with asyncio:Copy
import asyncio
import matsushibadb
async def async_example():
db = matsushibadb.MatsushibaDB('async-app.db')
try:
# Async operations
await db.execute('''
CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
price DECIMAL(10,2),
stock INTEGER DEFAULT 0
)
''')
# Insert with async
result = await db.execute('''
INSERT INTO products (name, price, stock)
VALUES (?, ?, ?)
''', ('Laptop', 999.99, 10))
print('Product created with ID:', result.lastrowid)
# Query with async
products = await db.execute('SELECT * FROM products WHERE stock > 0').fetchall()
print('Available products:', products)
# Get single record with async
product = await db.execute('SELECT * FROM products WHERE id = ?', (result.lastrowid,)).fetchone()
print('Product details:', product)
except Exception as error:
print('Database error:', error)
finally:
await db.close()
# Run async example
asyncio.run(async_example())
Connection Pooling
For production applications, use connection pooling:Copy
from matsushibadb import MatsushibaPool
pool = MatsushibaPool(
database='app.db',
min_connections=5,
max_connections=20,
acquire_timeout=30,
create_timeout=30,
destroy_timeout=5,
idle_timeout=30,
reap_interval=1,
create_retry_interval=0.2,
propagate_create_error=False
)
# Use the pool
async def get_users():
async with pool.get_connection() as conn:
users = await conn.execute('SELECT * FROM users WHERE active = 1').fetchall()
return users
# Transaction with pool
async def create_user_with_profile(user_data, profile_data):
async with pool.get_connection() as conn:
try:
await conn.begin_transaction()
user_result = await conn.execute('''
INSERT INTO users (name, email, password_hash)
VALUES (?, ?, ?)
''', (user_data['name'], user_data['email'], user_data['password_hash']))
profile_result = await conn.execute('''
INSERT INTO profiles (user_id, bio, avatar_url)
VALUES (?, ?, ?)
''', (user_result.lastrowid, profile_data['bio'], profile_data['avatar_url']))
await conn.commit()
return {'user_id': user_result.lastrowid, 'profile_id': profile_result.lastrowid}
except Exception as e:
await conn.rollback()
raise e
Framework Integrations
FastAPI Integration
Copy
from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import matsushibadb
import os
app = FastAPI(title="MatsushibaDB FastAPI App", version="1.0.0")
# Database connection
db = matsushibadb.MatsushibaDB('fastapi-app.db')
# Middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic models
class UserCreate(BaseModel):
name: str
email: str
password: str
class UserResponse(BaseModel):
id: int
name: str
email: str
created_at: str
class PostCreate(BaseModel):
title: str
content: str
published: bool = False
class PostResponse(BaseModel):
id: int
title: str
content: str
author_id: int
published: bool
created_at: str
# Database initialization
def init_database():
db.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
db.execute('''
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
content TEXT,
author_id INTEGER,
published BOOLEAN DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (author_id) REFERENCES users(id)
)
''')
# Create indexes
db.execute('CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)')
db.execute('CREATE INDEX IF NOT EXISTS idx_posts_author ON posts(author_id)')
# Routes
@app.get("/api/health")
async def health_check():
return {
"status": "healthy",
"timestamp": "2024-01-15T10:30:00Z",
"database": "connected"
}
@app.post("/api/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
# Check if user exists
existing_user = db.execute('SELECT * FROM users WHERE email = ?', (user.email,)).fetchone()
if existing_user:
raise HTTPException(
status_code=409,
detail="User with this email already exists"
)
# Hash password
import bcrypt
password_hash = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt())
# Create user
result = db.execute('''
INSERT INTO users (name, email, password_hash)
VALUES (?, ?, ?)
''', (user.name, user.email, password_hash))
# Get created user
new_user = db.execute('SELECT * FROM users WHERE id = ?', (result.lastrowid,)).fetchone()
return UserResponse(
id=new_user[0],
name=new_user[1],
email=new_user[2],
created_at=new_user[4]
)
@app.get("/api/posts", response_model=list[PostResponse])
async def get_posts(published: bool = True):
posts = db.execute('''
SELECT p.*, u.name as author_name
FROM posts p
JOIN users u ON p.author_id = u.id
WHERE p.published = ?
ORDER BY p.created_at DESC
''', (published,)).fetchall()
return [
PostResponse(
id=post[0],
title=post[1],
content=post[2],
author_id=post[3],
published=bool(post[4]),
created_at=post[5]
) for post in posts
]
if __name__ == "__main__":
init_database()
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Django Integration
Copy
# settings.py
import os
import matsushibadb
# Database configuration
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': 'django-app.db',
}
}
# Custom database backend
class MatsushibaDBBackend:
def __init__(self, database_path):
self.db = matsushibadb.MatsushibaDB(database_path)
def execute(self, query, params=None):
return self.db.execute(query, params or [])
def fetchone(self, query, params=None):
return self.db.execute(query, params or []).fetchone()
def fetchall(self, query, params=None):
return self.db.execute(query, params or []).fetchall()
# models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
from django.utils import timezone
class User(AbstractUser):
email = models.EmailField(unique=True)
created_at = models.DateTimeField(default=timezone.now)
updated_at = models.DateTimeField(auto_now=True)
class Post(models.Model):
title = models.CharField(max_length=200)
content = models.TextField()
author = models.ForeignKey(User, on_delete=models.CASCADE)
published = models.BooleanField(default=False)
created_at = models.DateTimeField(default=timezone.now)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ['-created_at']
def __str__(self):
return self.title
# views.py
from django.shortcuts import render, get_object_or_404
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.contrib.auth import authenticate, login
from django.contrib.auth.decorators import login_required
from django.utils.decorators import method_decorator
from django.views import View
import json
import matsushibadb
# Custom database connection
db = matsushibadb.MatsushibaDB('django-app.db')
class HealthCheckView(View):
def get(self, request):
return JsonResponse({
'status': 'healthy',
'timestamp': timezone.now().isoformat(),
'database': 'connected'
})
class UserListView(View):
def get(self, request):
users = db.execute('''
SELECT id, username, email, date_joined
FROM auth_user
ORDER BY date_joined DESC
''').fetchall()
return JsonResponse([
{
'id': user[0],
'username': user[1],
'email': user[2],
'date_joined': user[3]
} for user in users
], safe=False)
@csrf_exempt
def post(self, request):
data = json.loads(request.body)
# Check if user exists
existing_user = db.execute(
'SELECT * FROM auth_user WHERE email = ?',
(data['email'],)
).fetchone()
if existing_user:
return JsonResponse(
{'error': 'User with this email already exists'},
status=409
)
# Create user (simplified - in production, use Django's user creation)
result = db.execute('''
INSERT INTO auth_user (username, email, password, is_active, date_joined)
VALUES (?, ?, ?, ?, ?)
''', (
data['username'],
data['email'],
'hashed_password', # In production, properly hash password
1,
timezone.now().isoformat()
))
return JsonResponse({
'id': result.lastrowid,
'username': data['username'],
'email': data['email']
}, status=201)
# urls.py
from django.urls import path
from . import views
urlpatterns = [
path('api/health/', views.HealthCheckView.as_view(), name='health'),
path('api/users/', views.UserListView.as_view(), name='user-list'),
]
Flask Integration
Copy
from flask import Flask, request, jsonify, g
from flask_cors import CORS
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import matsushibadb
import jwt
import bcrypt
from datetime import datetime, timedelta
import os
app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key')
# Extensions
CORS(app)
limiter = Limiter(
app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
# Database connection
def get_db():
if 'db' not in g:
g.db = matsushibadb.MatsushibaDB('flask-app.db')
return g.db
@app.teardown_appcontext
def close_db(error):
db = g.pop('db', None)
if db is not None:
db.close()
# Authentication
def create_token(user_id):
payload = {
'sub': user_id,
'exp': datetime.utcnow() + timedelta(hours=24)
}
return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
def verify_token(token):
try:
payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
return payload['sub']
except jwt.PyJWTError:
return None
def require_auth(f):
from functools import wraps
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'error': 'Token required'}), 401
try:
token = token.split(' ')[1] # Remove 'Bearer ' prefix
user_id = verify_token(token)
if not user_id:
return jsonify({'error': 'Invalid token'}), 401
except:
return jsonify({'error': 'Invalid token'}), 401
return f(user_id, *args, **kwargs)
return decorated
# Routes
@app.route('/api/health')
def health_check():
return jsonify({
'status': 'healthy',
'timestamp': datetime.utcnow().isoformat(),
'database': 'connected'
})
@app.route('/api/users', methods=['POST'])
@limiter.limit("10 per minute")
def create_user():
data = request.get_json()
if not all(k in data for k in ('name', 'email', 'password')):
return jsonify({'error': 'Name, email, and password required'}), 400
db = get_db()
# Check if user exists
existing_user = db.execute('SELECT * FROM users WHERE email = ?', (data['email'],)).fetchone()
if existing_user:
return jsonify({'error': 'User with this email already exists'}), 409
# Hash password
password_hash = bcrypt.hashpw(data['password'].encode('utf-8'), bcrypt.gensalt())
# Create user
result = db.execute('''
INSERT INTO users (name, email, password_hash)
VALUES (?, ?, ?)
''', (data['name'], data['email'], password_hash))
# Generate token
token = create_token(result.lastrowid)
return jsonify({
'id': result.lastrowid,
'name': data['name'],
'email': data['email'],
'token': token
}), 201
@app.route('/api/posts', methods=['GET'])
def get_posts():
db = get_db()
published = request.args.get('published', 'true').lower() == 'true'
posts = db.execute('''
SELECT p.*, u.name as author_name
FROM posts p
JOIN users u ON p.author_id = u.id
WHERE p.published = ?
ORDER BY p.created_at DESC
''', (1 if published else 0,)).fetchall()
return jsonify([
{
'id': post[0],
'title': post[1],
'content': post[2],
'author_id': post[3],
'published': bool(post[4]),
'created_at': post[5],
'author_name': post[7]
} for post in posts
])
@app.route('/api/posts', methods=['POST'])
@require_auth
def create_post(user_id):
data = request.get_json()
if not all(k in data for k in ('title', 'content')):
return jsonify({'error': 'Title and content required'}), 400
db = get_db()
result = db.execute('''
INSERT INTO posts (title, content, author_id, published)
VALUES (?, ?, ?, ?)
''', (data['title'], data['content'], user_id, data.get('published', False)))
return jsonify({
'id': result.lastrowid,
'title': data['title'],
'content': data['content'],
'published': data.get('published', False)
}), 201
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
Testing
Unit Testing with pytest
Copy
import pytest
import matsushibadb
import tempfile
import os
@pytest.fixture
def db():
# Create temporary database for testing
db_path = tempfile.mktemp(suffix='.db')
db = matsushibadb.MatsushibaDB(db_path)
# Create test table
db.execute('''
CREATE TABLE users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
)
''')
yield db
# Cleanup
db.close()
os.unlink(db_path)
def test_create_user(db):
result = db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('John Doe', '[email protected]'))
assert result.lastrowid == 1
assert result.rowcount == 1
def test_retrieve_users(db):
db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('John Doe', '[email protected]'))
users = db.execute('SELECT * FROM users').fetchall()
assert len(users) == 1
assert users[0][1] == 'John Doe'
assert users[0][2] == '[email protected]'
def test_unique_constraint(db):
db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('John Doe', '[email protected]'))
with pytest.raises(Exception):
db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('Jane Doe', '[email protected]'))
def test_transaction(db):
try:
db.execute('BEGIN TRANSACTION')
db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('Alice', '[email protected]'))
db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('Bob', '[email protected]'))
db.execute('COMMIT')
users = db.execute('SELECT * FROM users').fetchall()
assert len(users) == 2
except Exception as e:
db.execute('ROLLBACK')
raise e
Integration Testing
Copy
import pytest
from fastapi.testclient import TestClient
from app import app, db
@pytest.fixture
def client():
return TestClient(app)
@pytest.fixture
def test_db():
# Setup test database
db.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL
)
''')
yield db
# Cleanup
db.execute('DELETE FROM users')
def test_create_user_api(client, test_db):
response = client.post('/api/users', json={
'name': 'John Doe',
'email': '[email protected]',
'password': 'password123'
})
assert response.status_code == 201
data = response.json()
assert data['name'] == 'John Doe'
assert data['email'] == '[email protected]'
def test_get_users_api(client, test_db):
# Create test user
test_db.execute('INSERT INTO users (name, email) VALUES (?, ?)',
('John Doe', '[email protected]'))
response = client.get('/api/users')
assert response.status_code == 200
users = response.json()
assert len(users) == 1
assert users[0]['name'] == 'John Doe'
Performance Optimization
Query Optimization
Copy
# Use prepared statements for repeated queries
get_user_stmt = db.prepare('SELECT * FROM users WHERE id = ?')
user = get_user_stmt.fetchone(1)
# Use indexes for better performance
db.execute('CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)')
db.execute('CREATE INDEX IF NOT EXISTS idx_users_name ON users(name)')
# Batch operations
insert_user_stmt = db.prepare('INSERT INTO users (name, email) VALUES (?, ?)')
users = [
('John Doe', '[email protected]'),
('Jane Smith', '[email protected]'),
('Bob Johnson', '[email protected]')
]
def insert_many(users):
for user in users:
insert_user_stmt.execute(user)
insert_many(users)
Memory Management
Copy
# Use context managers for automatic cleanup
with db.get_connection() as conn:
users = conn.execute('SELECT * FROM users').fetchall()
# Connection automatically closed
# Use generators for large datasets
def get_users_generator():
stmt = db.prepare('SELECT * FROM users')
for row in stmt:
yield row
stmt.close()
# Process users one at a time
for user in get_users_generator():
process_user(user)
Production Deployment
Environment Configuration
Copy
# config/database.py
import os
import matsushibadb
class DatabaseConfig:
def __init__(self):
self.environment = os.getenv('ENVIRONMENT', 'development')
self.config = self._get_config()
def _get_config(self):
configs = {
'development': {
'database': 'dev.db',
'enable_wal': True,
'cache_size': 1000
},
'production': {
'database': os.getenv('DATABASE_PATH', '/data/production.db'),
'enable_wal': True,
'cache_size': 5000,
'synchronous': 'NORMAL',
'journal_mode': 'WAL',
'temp_store': 'MEMORY',
'security': {
'enable_encryption': True,
'encryption_key': os.getenv('DB_ENCRYPTION_KEY')
}
}
}
return configs.get(self.environment, configs['development'])
def create_database(self):
return matsushibadb.MatsushibaDB(self.config['database'], self.config)
# Usage
db_config = DatabaseConfig()
db = db_config.create_database()
Process Management with Gunicorn
Copy
# gunicorn.conf.py
bind = "0.0.0.0:8000"
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
preload_app = True
timeout = 30
keepalive = 2
# Environment variables
raw_env = [
"ENVIRONMENT=production",
"DATABASE_PATH=/data/production.db",
"DB_ENCRYPTION_KEY=your-encryption-key"
]
CLI Tools
MatsushibaDB includes powerful CLI tools:Copy
# Initialize a new database
matsushiba init my-database.db
# Create a table
matsushiba create-table users "id INTEGER PRIMARY KEY, name TEXT, email TEXT"
# Insert data
matsushiba insert users "name='John Doe', email='[email protected]'"
# Query data
matsushiba query "SELECT * FROM users"
# Export data
matsushiba export users --format json --output users.json
# Import data
matsushiba import users --format json --input users.json
# Backup database
matsushiba backup my-database.db --output backup.db
# Restore database
matsushiba restore backup.db --output restored.db
# Performance analysis
matsushiba analyze my-database.db
# Database statistics
matsushiba stats my-database.db
Troubleshooting
Common Issues
Database Locked Error:Copy
import time
def safe_execute(db, query, params=None, max_retries=3):
for attempt in range(max_retries):
try:
return db.execute(query, params)
except Exception as e:
if 'database is locked' in str(e) and attempt < max_retries - 1:
time.sleep(0.1 * (2 ** attempt)) # Exponential backoff
continue
raise e
Copy
import psutil
import gc
def monitor_memory():
process = psutil.Process()
memory_info = process.memory_info()
print(f'Memory usage: {memory_info.rss / 1024 / 1024:.2f} MB')
# Force garbage collection
gc.collect()
Best Practices
1
Use Virtual Environments
Always use virtual environments to isolate dependencies and avoid conflicts.
2
Implement Error Handling
Wrap all database operations in try-catch blocks and handle specific error types.
3
Use Context Managers
Use context managers for automatic resource cleanup and connection management.
4
Optimize Queries
Create appropriate indexes and optimize queries for better performance.
5
Monitor Performance
Track query performance and database health metrics in production.
6
Use Type Hints
Use Python type hints for better code documentation and IDE support.
For production applications, always use virtual environments, implement proper error handling, and monitor database performance. Consider using environment-specific configurations and proper logging.