Skip to main content

Python Installation

Install MatsushibaDB for Python applications using pip. Perfect for data science, web applications, and Python-based services.

Installation

Basic Installation

pip install matsushibadb
# Create virtual environment
python -m venv matsushiba-env

# Activate virtual environment
# On Windows:
matsushiba-env\Scripts\activate
# On macOS/Linux:
source matsushiba-env/bin/activate

# Install MatsushibaDB
pip install matsushibadb

Development Installation

pip install -e .

Specific Version

pip install matsushibadb==1.0.9

Quick Start

import matsushibadb

# Create a new database
db = matsushibadb.MatsushibaDB('app.db')

# Create a table
db.execute('''
    CREATE TABLE users (
        id INTEGER PRIMARY KEY,
        name TEXT NOT NULL,
        email TEXT UNIQUE NOT NULL,
        created_at DATETIME DEFAULT CURRENT_TIMESTAMP
    )
''')

# Insert data
db.execute('INSERT INTO users (name, email) VALUES (?, ?)', 
           ('John Doe', '[email protected]'))

# Query data
users = db.execute('SELECT * FROM users').fetchall()
print('Users:', users)

# Close the database
db.close()

Configuration Options

Basic Configuration

import matsushibadb

db = matsushibadb.MatsushibaDB('app.db', {
    # Enable WAL mode for better concurrency
    'enable_wal': True,
    
    # Set cache size (in pages)
    'cache_size': 2000,
    
    # Synchronous mode
    'synchronous': 'NORMAL',
    
    # Journal mode
    'journal_mode': 'WAL',
    
    # Temporary storage
    'temp_store': 'MEMORY',
    
    # Foreign key constraints
    'foreign_keys': True
})

Advanced Configuration

db = matsushibadb.MatsushibaDB('app.db', {
    # Performance settings
    'performance': {
        'enable_wal': True,
        'cache_size': 5000,
        'synchronous': 'NORMAL',
        'journal_mode': 'WAL',
        'temp_store': 'MEMORY',
        'optimize_queries': True,
        'prepared_statement_cache': True,
        'prepared_statement_cache_size': 100
    },
    
    # Memory optimization
    'memory': {
        'mmap_size': 268435456,  # 256MB
        'optimize_memory': True,
        'gc_threshold': 1000
    },
    
    # I/O optimization
    'io': {
        'direct_io': True,
        'optimize_io': True,
        'buffer_size': 65536  # 64KB
    },
    
    # Security settings
    'security': {
        'enable_encryption': True,
        'encryption_key': os.getenv('DB_ENCRYPTION_KEY'),
        'enable_audit_logging': True
    }
})

Async Support

MatsushibaDB provides full async/await support with asyncio:
import asyncio
import matsushibadb

async def async_example():
    db = matsushibadb.MatsushibaDB('async-app.db')
    
    try:
        # Async operations
        await db.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                price DECIMAL(10,2),
                stock INTEGER DEFAULT 0
            )
        ''')
        
        # Insert with async
        result = await db.execute('''
            INSERT INTO products (name, price, stock)
            VALUES (?, ?, ?)
        ''', ('Laptop', 999.99, 10))
        
        print('Product created with ID:', result.lastrowid)
        
        # Query with async
        products = await db.execute('SELECT * FROM products WHERE stock > 0').fetchall()
        print('Available products:', products)
        
        # Get single record with async
        product = await db.execute('SELECT * FROM products WHERE id = ?', (result.lastrowid,)).fetchone()
        print('Product details:', product)
        
    except Exception as error:
        print('Database error:', error)
    finally:
        await db.close()

# Run async example
asyncio.run(async_example())

Connection Pooling

For production applications, use connection pooling:
from matsushibadb import MatsushibaPool

pool = MatsushibaPool(
    database='app.db',
    min_connections=5,
    max_connections=20,
    acquire_timeout=30,
    create_timeout=30,
    destroy_timeout=5,
    idle_timeout=30,
    reap_interval=1,
    create_retry_interval=0.2,
    propagate_create_error=False
)

# Use the pool
async def get_users():
    async with pool.get_connection() as conn:
        users = await conn.execute('SELECT * FROM users WHERE active = 1').fetchall()
        return users

# Transaction with pool
async def create_user_with_profile(user_data, profile_data):
    async with pool.get_connection() as conn:
        try:
            await conn.begin_transaction()
            
            user_result = await conn.execute('''
                INSERT INTO users (name, email, password_hash)
                VALUES (?, ?, ?)
            ''', (user_data['name'], user_data['email'], user_data['password_hash']))
            
            profile_result = await conn.execute('''
                INSERT INTO profiles (user_id, bio, avatar_url)
                VALUES (?, ?, ?)
            ''', (user_result.lastrowid, profile_data['bio'], profile_data['avatar_url']))
            
            await conn.commit()
            return {'user_id': user_result.lastrowid, 'profile_id': profile_result.lastrowid}
        except Exception as e:
            await conn.rollback()
            raise e

Framework Integrations

FastAPI Integration

from fastapi import FastAPI, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import matsushibadb
import os

app = FastAPI(title="MatsushibaDB FastAPI App", version="1.0.0")

# Database connection
db = matsushibadb.MatsushibaDB('fastapi-app.db')

# Middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Pydantic models
class UserCreate(BaseModel):
    name: str
    email: str
    password: str

class UserResponse(BaseModel):
    id: int
    name: str
    email: str
    created_at: str

class PostCreate(BaseModel):
    title: str
    content: str
    published: bool = False

class PostResponse(BaseModel):
    id: int
    title: str
    content: str
    author_id: int
    published: bool
    created_at: str

# Database initialization
def init_database():
    db.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            email TEXT UNIQUE NOT NULL,
            password_hash TEXT NOT NULL,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
            updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
        )
    ''')
    
    db.execute('''
        CREATE TABLE IF NOT EXISTS posts (
            id INTEGER PRIMARY KEY,
            title TEXT NOT NULL,
            content TEXT,
            author_id INTEGER,
            published BOOLEAN DEFAULT 0,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
            updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
            FOREIGN KEY (author_id) REFERENCES users(id)
        )
    ''')
    
    # Create indexes
    db.execute('CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)')
    db.execute('CREATE INDEX IF NOT EXISTS idx_posts_author ON posts(author_id)')

# Routes
@app.get("/api/health")
async def health_check():
    return {
        "status": "healthy",
        "timestamp": "2024-01-15T10:30:00Z",
        "database": "connected"
    }

@app.post("/api/users", response_model=UserResponse, status_code=201)
async def create_user(user: UserCreate):
    # Check if user exists
    existing_user = db.execute('SELECT * FROM users WHERE email = ?', (user.email,)).fetchone()
    if existing_user:
        raise HTTPException(
            status_code=409,
            detail="User with this email already exists"
        )
    
    # Hash password
    import bcrypt
    password_hash = bcrypt.hashpw(user.password.encode('utf-8'), bcrypt.gensalt())
    
    # Create user
    result = db.execute('''
        INSERT INTO users (name, email, password_hash)
        VALUES (?, ?, ?)
    ''', (user.name, user.email, password_hash))
    
    # Get created user
    new_user = db.execute('SELECT * FROM users WHERE id = ?', (result.lastrowid,)).fetchone()
    
    return UserResponse(
        id=new_user[0],
        name=new_user[1],
        email=new_user[2],
        created_at=new_user[4]
    )

@app.get("/api/posts", response_model=list[PostResponse])
async def get_posts(published: bool = True):
    posts = db.execute('''
        SELECT p.*, u.name as author_name
        FROM posts p
        JOIN users u ON p.author_id = u.id
        WHERE p.published = ?
        ORDER BY p.created_at DESC
    ''', (published,)).fetchall()
    
    return [
        PostResponse(
            id=post[0],
            title=post[1],
            content=post[2],
            author_id=post[3],
            published=bool(post[4]),
            created_at=post[5]
        ) for post in posts
    ]

if __name__ == "__main__":
    init_database()
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Django Integration

# settings.py
import os
import matsushibadb

# Database configuration
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': 'django-app.db',
    }
}

# Custom database backend
class MatsushibaDBBackend:
    def __init__(self, database_path):
        self.db = matsushibadb.MatsushibaDB(database_path)
    
    def execute(self, query, params=None):
        return self.db.execute(query, params or [])
    
    def fetchone(self, query, params=None):
        return self.db.execute(query, params or []).fetchone()
    
    def fetchall(self, query, params=None):
        return self.db.execute(query, params or []).fetchall()

# models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
from django.utils import timezone

class User(AbstractUser):
    email = models.EmailField(unique=True)
    created_at = models.DateTimeField(default=timezone.now)
    updated_at = models.DateTimeField(auto_now=True)

class Post(models.Model):
    title = models.CharField(max_length=200)
    content = models.TextField()
    author = models.ForeignKey(User, on_delete=models.CASCADE)
    published = models.BooleanField(default=False)
    created_at = models.DateTimeField(default=timezone.now)
    updated_at = models.DateTimeField(auto_now=True)
    
    class Meta:
        ordering = ['-created_at']
    
    def __str__(self):
        return self.title

# views.py
from django.shortcuts import render, get_object_or_404
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from django.contrib.auth import authenticate, login
from django.contrib.auth.decorators import login_required
from django.utils.decorators import method_decorator
from django.views import View
import json
import matsushibadb

# Custom database connection
db = matsushibadb.MatsushibaDB('django-app.db')

class HealthCheckView(View):
    def get(self, request):
        return JsonResponse({
            'status': 'healthy',
            'timestamp': timezone.now().isoformat(),
            'database': 'connected'
        })

class UserListView(View):
    def get(self, request):
        users = db.execute('''
            SELECT id, username, email, date_joined
            FROM auth_user
            ORDER BY date_joined DESC
        ''').fetchall()
        
        return JsonResponse([
            {
                'id': user[0],
                'username': user[1],
                'email': user[2],
                'date_joined': user[3]
            } for user in users
        ], safe=False)
    
    @csrf_exempt
    def post(self, request):
        data = json.loads(request.body)
        
        # Check if user exists
        existing_user = db.execute(
            'SELECT * FROM auth_user WHERE email = ?', 
            (data['email'],)
        ).fetchone()
        
        if existing_user:
            return JsonResponse(
                {'error': 'User with this email already exists'}, 
                status=409
            )
        
        # Create user (simplified - in production, use Django's user creation)
        result = db.execute('''
            INSERT INTO auth_user (username, email, password, is_active, date_joined)
            VALUES (?, ?, ?, ?, ?)
        ''', (
            data['username'],
            data['email'],
            'hashed_password',  # In production, properly hash password
            1,
            timezone.now().isoformat()
        ))
        
        return JsonResponse({
            'id': result.lastrowid,
            'username': data['username'],
            'email': data['email']
        }, status=201)

# urls.py
from django.urls import path
from . import views

urlpatterns = [
    path('api/health/', views.HealthCheckView.as_view(), name='health'),
    path('api/users/', views.UserListView.as_view(), name='user-list'),
]

Flask Integration

from flask import Flask, request, jsonify, g
from flask_cors import CORS
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
import matsushibadb
import jwt
import bcrypt
from datetime import datetime, timedelta
import os

app = Flask(__name__)
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key')

# Extensions
CORS(app)
limiter = Limiter(
    app,
    key_func=get_remote_address,
    default_limits=["200 per day", "50 per hour"]
)

# Database connection
def get_db():
    if 'db' not in g:
        g.db = matsushibadb.MatsushibaDB('flask-app.db')
    return g.db

@app.teardown_appcontext
def close_db(error):
    db = g.pop('db', None)
    if db is not None:
        db.close()

# Authentication
def create_token(user_id):
    payload = {
        'sub': user_id,
        'exp': datetime.utcnow() + timedelta(hours=24)
    }
    return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')

def verify_token(token):
    try:
        payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256'])
        return payload['sub']
    except jwt.PyJWTError:
        return None

def require_auth(f):
    from functools import wraps
    @wraps(f)
    def decorated(*args, **kwargs):
        token = request.headers.get('Authorization')
        if not token:
            return jsonify({'error': 'Token required'}), 401
        
        try:
            token = token.split(' ')[1]  # Remove 'Bearer ' prefix
            user_id = verify_token(token)
            if not user_id:
                return jsonify({'error': 'Invalid token'}), 401
        except:
            return jsonify({'error': 'Invalid token'}), 401
        
        return f(user_id, *args, **kwargs)
    return decorated

# Routes
@app.route('/api/health')
def health_check():
    return jsonify({
        'status': 'healthy',
        'timestamp': datetime.utcnow().isoformat(),
        'database': 'connected'
    })

@app.route('/api/users', methods=['POST'])
@limiter.limit("10 per minute")
def create_user():
    data = request.get_json()
    
    if not all(k in data for k in ('name', 'email', 'password')):
        return jsonify({'error': 'Name, email, and password required'}), 400
    
    db = get_db()
    
    # Check if user exists
    existing_user = db.execute('SELECT * FROM users WHERE email = ?', (data['email'],)).fetchone()
    if existing_user:
        return jsonify({'error': 'User with this email already exists'}), 409
    
    # Hash password
    password_hash = bcrypt.hashpw(data['password'].encode('utf-8'), bcrypt.gensalt())
    
    # Create user
    result = db.execute('''
        INSERT INTO users (name, email, password_hash)
        VALUES (?, ?, ?)
    ''', (data['name'], data['email'], password_hash))
    
    # Generate token
    token = create_token(result.lastrowid)
    
    return jsonify({
        'id': result.lastrowid,
        'name': data['name'],
        'email': data['email'],
        'token': token
    }), 201

@app.route('/api/posts', methods=['GET'])
def get_posts():
    db = get_db()
    published = request.args.get('published', 'true').lower() == 'true'
    
    posts = db.execute('''
        SELECT p.*, u.name as author_name
        FROM posts p
        JOIN users u ON p.author_id = u.id
        WHERE p.published = ?
        ORDER BY p.created_at DESC
    ''', (1 if published else 0,)).fetchall()
    
    return jsonify([
        {
            'id': post[0],
            'title': post[1],
            'content': post[2],
            'author_id': post[3],
            'published': bool(post[4]),
            'created_at': post[5],
            'author_name': post[7]
        } for post in posts
    ])

@app.route('/api/posts', methods=['POST'])
@require_auth
def create_post(user_id):
    data = request.get_json()
    
    if not all(k in data for k in ('title', 'content')):
        return jsonify({'error': 'Title and content required'}), 400
    
    db = get_db()
    
    result = db.execute('''
        INSERT INTO posts (title, content, author_id, published)
        VALUES (?, ?, ?, ?)
    ''', (data['title'], data['content'], user_id, data.get('published', False)))
    
    return jsonify({
        'id': result.lastrowid,
        'title': data['title'],
        'content': data['content'],
        'published': data.get('published', False)
    }), 201

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

Testing

Unit Testing with pytest

import pytest
import matsushibadb
import tempfile
import os

@pytest.fixture
def db():
    # Create temporary database for testing
    db_path = tempfile.mktemp(suffix='.db')
    db = matsushibadb.MatsushibaDB(db_path)
    
    # Create test table
    db.execute('''
        CREATE TABLE users (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            email TEXT UNIQUE NOT NULL
        )
    ''')
    
    yield db
    
    # Cleanup
    db.close()
    os.unlink(db_path)

def test_create_user(db):
    result = db.execute('INSERT INTO users (name, email) VALUES (?, ?)', 
                       ('John Doe', '[email protected]'))
    
    assert result.lastrowid == 1
    assert result.rowcount == 1

def test_retrieve_users(db):
    db.execute('INSERT INTO users (name, email) VALUES (?, ?)', 
               ('John Doe', '[email protected]'))
    
    users = db.execute('SELECT * FROM users').fetchall()
    assert len(users) == 1
    assert users[0][1] == 'John Doe'
    assert users[0][2] == '[email protected]'

def test_unique_constraint(db):
    db.execute('INSERT INTO users (name, email) VALUES (?, ?)', 
               ('John Doe', '[email protected]'))
    
    with pytest.raises(Exception):
        db.execute('INSERT INTO users (name, email) VALUES (?, ?)', 
                   ('Jane Doe', '[email protected]'))

def test_transaction(db):
    try:
        db.execute('BEGIN TRANSACTION')
        
        db.execute('INSERT INTO users (name, email) VALUES (?, ?)', 
                   ('Alice', '[email protected]'))
        db.execute('INSERT INTO users (name, email) VALUES (?, ?)', 
                   ('Bob', '[email protected]'))
        
        db.execute('COMMIT')
        
        users = db.execute('SELECT * FROM users').fetchall()
        assert len(users) == 2
        
    except Exception as e:
        db.execute('ROLLBACK')
        raise e

Integration Testing

import pytest
from fastapi.testclient import TestClient
from app import app, db

@pytest.fixture
def client():
    return TestClient(app)

@pytest.fixture
def test_db():
    # Setup test database
    db.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL,
            email TEXT UNIQUE NOT NULL
        )
    ''')
    yield db
    # Cleanup
    db.execute('DELETE FROM users')

def test_create_user_api(client, test_db):
    response = client.post('/api/users', json={
        'name': 'John Doe',
        'email': '[email protected]',
        'password': 'password123'
    })
    
    assert response.status_code == 201
    data = response.json()
    assert data['name'] == 'John Doe'
    assert data['email'] == '[email protected]'

def test_get_users_api(client, test_db):
    # Create test user
    test_db.execute('INSERT INTO users (name, email) VALUES (?, ?)', 
                    ('John Doe', '[email protected]'))
    
    response = client.get('/api/users')
    assert response.status_code == 200
    users = response.json()
    assert len(users) == 1
    assert users[0]['name'] == 'John Doe'

Performance Optimization

Query Optimization

# Use prepared statements for repeated queries
get_user_stmt = db.prepare('SELECT * FROM users WHERE id = ?')
user = get_user_stmt.fetchone(1)

# Use indexes for better performance
db.execute('CREATE INDEX IF NOT EXISTS idx_users_email ON users(email)')
db.execute('CREATE INDEX IF NOT EXISTS idx_users_name ON users(name)')

# Batch operations
insert_user_stmt = db.prepare('INSERT INTO users (name, email) VALUES (?, ?)')
users = [
    ('John Doe', '[email protected]'),
    ('Jane Smith', '[email protected]'),
    ('Bob Johnson', '[email protected]')
]

def insert_many(users):
    for user in users:
        insert_user_stmt.execute(user)

insert_many(users)

Memory Management

# Use context managers for automatic cleanup
with db.get_connection() as conn:
    users = conn.execute('SELECT * FROM users').fetchall()
    # Connection automatically closed

# Use generators for large datasets
def get_users_generator():
    stmt = db.prepare('SELECT * FROM users')
    for row in stmt:
        yield row
    stmt.close()

# Process users one at a time
for user in get_users_generator():
    process_user(user)

Production Deployment

Environment Configuration

# config/database.py
import os
import matsushibadb

class DatabaseConfig:
    def __init__(self):
        self.environment = os.getenv('ENVIRONMENT', 'development')
        self.config = self._get_config()
    
    def _get_config(self):
        configs = {
            'development': {
                'database': 'dev.db',
                'enable_wal': True,
                'cache_size': 1000
            },
            'production': {
                'database': os.getenv('DATABASE_PATH', '/data/production.db'),
                'enable_wal': True,
                'cache_size': 5000,
                'synchronous': 'NORMAL',
                'journal_mode': 'WAL',
                'temp_store': 'MEMORY',
                'security': {
                    'enable_encryption': True,
                    'encryption_key': os.getenv('DB_ENCRYPTION_KEY')
                }
            }
        }
        return configs.get(self.environment, configs['development'])
    
    def create_database(self):
        return matsushibadb.MatsushibaDB(self.config['database'], self.config)

# Usage
db_config = DatabaseConfig()
db = db_config.create_database()

Process Management with Gunicorn

# gunicorn.conf.py
bind = "0.0.0.0:8000"
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
max_requests = 1000
max_requests_jitter = 100
preload_app = True
timeout = 30
keepalive = 2

# Environment variables
raw_env = [
    "ENVIRONMENT=production",
    "DATABASE_PATH=/data/production.db",
    "DB_ENCRYPTION_KEY=your-encryption-key"
]

CLI Tools

MatsushibaDB includes powerful CLI tools:
# Initialize a new database
matsushiba init my-database.db

# Create a table
matsushiba create-table users "id INTEGER PRIMARY KEY, name TEXT, email TEXT"

# Insert data
matsushiba insert users "name='John Doe', email='[email protected]'"

# Query data
matsushiba query "SELECT * FROM users"

# Export data
matsushiba export users --format json --output users.json

# Import data
matsushiba import users --format json --input users.json

# Backup database
matsushiba backup my-database.db --output backup.db

# Restore database
matsushiba restore backup.db --output restored.db

# Performance analysis
matsushiba analyze my-database.db

# Database statistics
matsushiba stats my-database.db

Troubleshooting

Common Issues

Database Locked Error:
import time

def safe_execute(db, query, params=None, max_retries=3):
    for attempt in range(max_retries):
        try:
            return db.execute(query, params)
        except Exception as e:
            if 'database is locked' in str(e) and attempt < max_retries - 1:
                time.sleep(0.1 * (2 ** attempt))  # Exponential backoff
                continue
            raise e
Memory Issues:
import psutil
import gc

def monitor_memory():
    process = psutil.Process()
    memory_info = process.memory_info()
    print(f'Memory usage: {memory_info.rss / 1024 / 1024:.2f} MB')
    
    # Force garbage collection
    gc.collect()

Best Practices

1

Use Virtual Environments

Always use virtual environments to isolate dependencies and avoid conflicts.
2

Implement Error Handling

Wrap all database operations in try-catch blocks and handle specific error types.
3

Use Context Managers

Use context managers for automatic resource cleanup and connection management.
4

Optimize Queries

Create appropriate indexes and optimize queries for better performance.
5

Monitor Performance

Track query performance and database health metrics in production.
6

Use Type Hints

Use Python type hints for better code documentation and IDE support.
For production applications, always use virtual environments, implement proper error handling, and monitor database performance. Consider using environment-specific configurations and proper logging.