A production microservice environment began experiencing database connection errors during peak hours. The errors indicated that the maximum number of connections had been reached, despite the connection pool being configured with appropriate limits.
# Database Management Scenarios
No summary provided
What Happened:
Diagnosis Steps:
Checked active database connections with
SELECT * FROM pg_stat_activity;
.Monitored connection pool metrics from the application.
Analyzed application logs for connection handling patterns.
Profiled the application to track connection acquisition and release.
Reviewed recent code changes related to database access.
Root Cause:
A recently deployed microservice was not properly closing database connections in error scenarios. Additionally, the service discovery mechanism was causing intermittent timeouts, which triggered connection retries without proper cleanup of the previous connections.
Fix/Workaround:
• Short-term: Increased the PostgreSQL max_connections
parameter and restarted the database:
ALTER SYSTEM SET max_connections = 500;
SELECT pg_reload_conf();
• Implemented a connection leak detector in the application:
// Connection leak detection in Spring Boot
@Configuration
public class DataSourceConfig {
@Bean
public DataSource dataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:postgresql://db.example.com:5432/mydb");
config.setUsername("app_user");
config.setPassword("********");
config.setMaximumPoolSize(20);
config.setMinimumIdle(5);
// Add leak detection
config.setLeakDetectionThreshold(60000); // 60 seconds
return new HikariDataSource(config);
}
}
• Long-term: Fixed the connection handling in the application code:
// Before: Potential connection leak
@Service
public class UserService {
private final JdbcTemplate jdbcTemplate;
@Autowired
public UserService(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public User findUser(Long id) {
try {
Connection conn = jdbcTemplate.getDataSource().getConnection();
PreparedStatement stmt = conn.prepareStatement("SELECT * FROM users WHERE id = ?");
stmt.setLong(1, id);
ResultSet rs = stmt.executeQuery();
// Process result
// Missing connection close!
} catch (SQLException e) {
log.error("Error finding user", e);
// Connection not closed in exception path!
}
}
}
// After: Proper connection handling
@Service
public class UserService {
private final JdbcTemplate jdbcTemplate;
@Autowired
public UserService(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public User findUser(Long id) {
// Let Spring manage the connection
return jdbcTemplate.queryForObject(
"SELECT * FROM users WHERE id = ?",
new Object[]{id},
(rs, rowNum) -> new User(rs.getLong("id"), rs.getString("name"))
);
}
}
• Added database connection monitoring:
-- Create a monitoring view for connection usage
CREATE VIEW connection_usage AS
SELECT
count(*) as total_connections,
count(*) FILTER (WHERE state = 'active') as active_connections,
count(*) FILTER (WHERE state = 'idle') as idle_connections,
count(*) FILTER (WHERE state = 'idle in transaction') as idle_in_transaction,
count(*) FILTER (WHERE state = 'idle in transaction (aborted)') as idle_in_transaction_aborted,
(SELECT setting::int FROM pg_settings WHERE name = 'max_connections') as max_connections
FROM pg_stat_activity;
Lessons Learned:
Database connection management requires careful handling, especially in distributed systems.
How to Avoid:
Use framework-provided abstractions for database access.
Implement connection leak detection in both application and database.
Set appropriate timeouts for database operations.
Monitor connection usage patterns and set alerts.
Use connection pooling with proper configuration.
No summary provided
What Happened:
Users reported seeing inconsistent data when refreshing pages or navigating between different parts of the application. Some users would see their changes immediately, while others would see outdated information for several minutes.
Diagnosis Steps:
Monitored replication lag with
SHOW SLAVE STATUS\G
on replicas.Analyzed database load patterns on primary and replicas.
Reviewed application code for read/write splitting logic.
Checked for long-running transactions or large batch operations.
Monitored network performance between database servers.
Root Cause:
A combination of factors led to excessive replication lag: 1. A nightly batch job was performing large write operations without transaction batching. 2. The application was not routing read queries based on consistency requirements. 3. One replica had insufficient resources compared to others, causing it to fall behind.
Fix/Workaround:
• Short-term: Implemented read consistency in the application:
// Java application code with read consistency
public class DatabaseService {
private final DataSource primaryDataSource;
private final DataSource replicaDataSource;
private final ThreadLocal<Boolean> forceConsistentRead = ThreadLocal.withInitial(() -> false);
// Method to force consistent reads for a specific operation
public <T> T withConsistentReads(Supplier<T> operation) {
boolean previous = forceConsistentRead.get();
try {
forceConsistentRead.set(true);
return operation.get();
} finally {
forceConsistentRead.set(previous);
}
}
// Get appropriate data source based on operation type and consistency requirements
private DataSource getDataSource(boolean isWrite) {
if (isWrite || forceConsistentRead.get()) {
return primaryDataSource;
} else {
return replicaDataSource;
}
}
// Example usage
public User getUserProfile(long userId) {
DataSource dataSource = getDataSource(false); // Read operation
// Query user data
}
public void updateUserProfile(User user) {
DataSource dataSource = getDataSource(true); // Write operation
// Update user data
}
// For operations requiring consistency
public OrderDetails getOrderWithItems(long orderId) {
return withConsistentReads(() -> {
// This will use the primary data source
// Get order and items
});
}
}
• Optimized the batch job to reduce replication impact:
-- Before: Single large transaction
START TRANSACTION;
INSERT INTO processed_events
SELECT * FROM raw_events
WHERE event_date < CURDATE() - INTERVAL 1 DAY;
DELETE FROM raw_events
WHERE event_date < CURDATE() - INTERVAL 1 DAY;
COMMIT;
-- After: Batched processing with sleep
DELIMITER //
CREATE PROCEDURE process_events_in_batches()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE batch_size INT DEFAULT 1000;
DECLARE processed INT DEFAULT 0;
WHILE NOT done DO
START TRANSACTION;
INSERT INTO processed_events
SELECT * FROM raw_events
WHERE event_date < CURDATE() - INTERVAL 1 DAY
LIMIT batch_size;
SET processed = ROW_COUNT();
IF processed > 0 THEN
DELETE FROM raw_events
WHERE id IN (
SELECT id FROM processed_events
ORDER BY id DESC LIMIT batch_size
);
COMMIT;
-- Sleep to allow replication to catch up
DO SLEEP(0.1);
ELSE
SET done = TRUE;
COMMIT;
END IF;
END WHILE;
END //
DELIMITER ;
• Added replication lag monitoring and alerting:
#!/bin/bash
# monitor_replication.sh
MAX_LAG_SECONDS=30
REPLICAS=("replica1" "replica2" "replica3")
for replica in "${REPLICAS[@]}"; do
lag=$(mysql -h "$replica" -u monitor -p"$MYSQL_PASSWORD" -e "SHOW SLAVE STATUS\G" | grep "Seconds_Behind_Master" | awk '{print $2}')
if [[ $lag -gt $MAX_LAG_SECONDS ]]; then
echo "ALERT: Replica $replica has replication lag of $lag seconds"
# Send alert via preferred method
curl -X POST "https://alerts.example.com/api/alert" \
-H "Content-Type: application/json" \
-d "{\"service\": \"mysql\", \"host\": \"$replica\", \"metric\": \"replication_lag\", \"value\": $lag, \"threshold\": $MAX_LAG_SECONDS}"
fi
done
Lessons Learned:
Database replication lag requires careful application design and monitoring.
How to Avoid:
Design applications with read consistency requirements in mind.
Implement write batching for large operations.
Monitor replication lag and set appropriate alerts.
Ensure replicas have sufficient resources.
Consider semi-synchronous replication for critical data.
No summary provided
What Happened:
During a scheduled maintenance window, a team attempted to apply a complex database schema migration to the production database. The migration included adding new tables, modifying existing columns, and creating new indexes. The migration script ran successfully in development and staging environments but failed halfway through execution in production. This left the database in an inconsistent state, with some changes applied and others not. The application became unusable, affecting thousands of users.
Diagnosis Steps:
Examined database migration logs to identify the failure point.
Analyzed database locks and running queries at the time of failure.
Reviewed differences between production and staging environments.
Checked database resource metrics during the migration.
Compared the data volume and distribution between environments.
Root Cause:
The investigation revealed multiple issues: 1. The production database had significantly more data than staging, causing some operations to time out 2. A long-running analytical query was holding locks on tables being modified 3. The migration script didn't include proper transaction handling for atomic operations 4. Database connection limits were reached during the migration 5. Index creation on large tables caused excessive disk I/O, triggering AWS RDS performance throttling
Fix/Workaround:
• Short-term: Implemented an emergency rollback procedure:
-- Rollback script for failed migration
BEGIN;
-- Drop newly created tables
DROP TABLE IF EXISTS customer_segments;
DROP TABLE IF EXISTS marketing_campaigns;
DROP TABLE IF EXISTS user_preferences;
-- Revert column modifications
ALTER TABLE users DROP COLUMN IF EXISTS last_login_ip;
ALTER TABLE orders ALTER COLUMN status TYPE varchar(20);
ALTER TABLE products DROP CONSTRAINT IF EXISTS products_sku_unique;
-- Remove new indexes
DROP INDEX IF EXISTS idx_orders_created_at;
DROP INDEX IF EXISTS idx_products_category_price;
-- Restore original functions
CREATE OR REPLACE FUNCTION calculate_order_total(order_id integer) RETURNS numeric AS $$
BEGIN
RETURN (SELECT SUM(price * quantity) FROM order_items WHERE order_id = $1);
END;
$$ LANGUAGE plpgsql;
COMMIT;
• Improved the migration script with proper transaction handling and error recovery:
-- Improved migration script with transaction handling and error recovery
DO $$
DECLARE
migration_step integer := 0;
max_retries integer := 3;
current_retry integer := 0;
start_time timestamp;
execution_time interval;
lock_timeout text := '10s';
statement_timeout text := '30s';
original_lock_timeout text;
original_statement_timeout text;
BEGIN
-- Save original timeout settings
SELECT current_setting('lock_timeout') INTO original_lock_timeout;
SELECT current_setting('statement_timeout') INTO original_statement_timeout;
-- Set timeouts for this migration
EXECUTE 'SET lock_timeout TO ' || lock_timeout;
EXECUTE 'SET statement_timeout TO ' || statement_timeout;
-- Log migration start
RAISE NOTICE 'Starting database migration at %', clock_timestamp();
-- Outer transaction
BEGIN
-- Step 1: Create new tables
migration_step := 1;
RAISE NOTICE 'Step %: Creating new tables', migration_step;
start_time := clock_timestamp();
CREATE TABLE IF NOT EXISTS customer_segments (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
description TEXT,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS marketing_campaigns (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
segment_id INTEGER REFERENCES customer_segments(id),
start_date TIMESTAMP NOT NULL,
end_date TIMESTAMP,
status VARCHAR(20) NOT NULL DEFAULT 'DRAFT',
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS user_preferences (
user_id INTEGER PRIMARY KEY REFERENCES users(id),
email_notifications BOOLEAN NOT NULL DEFAULT TRUE,
sms_notifications BOOLEAN NOT NULL DEFAULT FALSE,
theme VARCHAR(20) NOT NULL DEFAULT 'light',
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW()
);
execution_time := clock_timestamp() - start_time;
RAISE NOTICE 'Step % completed in %', migration_step, execution_time;
-- Step 2: Modify existing columns
migration_step := 2;
RAISE NOTICE 'Step %: Modifying existing columns', migration_step;
start_time := clock_timestamp();
-- Add new column with retry logic
current_retry := 0;
WHILE current_retry < max_retries LOOP
BEGIN
ALTER TABLE users ADD COLUMN IF NOT EXISTS last_login_ip VARCHAR(45);
BREAK;
EXCEPTION WHEN OTHERS THEN
current_retry := current_retry + 1;
IF current_retry >= max_retries THEN
RAISE EXCEPTION 'Failed to add column last_login_ip after % retries', max_retries;
END IF;
RAISE NOTICE 'Retrying add column operation (attempt %/%)', current_retry, max_retries;
PERFORM pg_sleep(power(2, current_retry)); -- Exponential backoff
END;
END LOOP;
-- Modify column type with retry logic
current_retry := 0;
WHILE current_retry < max_retries LOOP
BEGIN
ALTER TABLE orders ALTER COLUMN status TYPE VARCHAR(30);
BREAK;
EXCEPTION WHEN OTHERS THEN
current_retry := current_retry + 1;
IF current_retry >= max_retries THEN
RAISE EXCEPTION 'Failed to modify column status after % retries', max_retries;
END IF;
RAISE NOTICE 'Retrying modify column operation (attempt %/%)', current_retry, max_retries;
PERFORM pg_sleep(power(2, current_retry)); -- Exponential backoff
END;
END LOOP;
-- Add constraint with retry logic
current_retry := 0;
WHILE current_retry < max_retries LOOP
BEGIN
ALTER TABLE products ADD CONSTRAINT products_sku_unique UNIQUE (sku);
BREAK;
EXCEPTION WHEN OTHERS THEN
current_retry := current_retry + 1;
IF current_retry >= max_retries THEN
RAISE EXCEPTION 'Failed to add constraint after % retries', max_retries;
END IF;
RAISE NOTICE 'Retrying add constraint operation (attempt %/%)', current_retry, max_retries;
PERFORM pg_sleep(power(2, current_retry)); -- Exponential backoff
END;
END LOOP;
execution_time := clock_timestamp() - start_time;
RAISE NOTICE 'Step % completed in %', migration_step, execution_time;
-- Step 3: Create indexes (potentially long-running operations)
migration_step := 3;
RAISE NOTICE 'Step %: Creating indexes', migration_step;
start_time := clock_timestamp();
-- Create indexes with progress reporting
RAISE NOTICE 'Creating index on orders.created_at';
EXECUTE 'SET statement_timeout TO ''5min'''; -- Longer timeout for index creation
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_orders_created_at ON orders (created_at);
RAISE NOTICE 'Creating index on products.category and products.price';
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_products_category_price ON products (category, price);
execution_time := clock_timestamp() - start_time;
RAISE NOTICE 'Step % completed in %', migration_step, execution_time;
-- Step 4: Update functions
migration_step := 4;
RAISE NOTICE 'Step %: Updating functions', migration_step;
start_time := clock_timestamp();
CREATE OR REPLACE FUNCTION calculate_order_total(order_id integer) RETURNS numeric AS $$
BEGIN
RETURN (
SELECT SUM(price * quantity) - COALESCE(
(SELECT discount FROM orders WHERE id = $1),
0
)
FROM order_items
WHERE order_id = $1
);
END;
$$ LANGUAGE plpgsql;
execution_time := clock_timestamp() - start_time;
RAISE NOTICE 'Step % completed in %', migration_step, execution_time;
-- Log migration completion
RAISE NOTICE 'Database migration completed successfully at %', clock_timestamp();
EXCEPTION WHEN OTHERS THEN
-- Log failure and rollback
RAISE WARNING 'Migration failed at step % with error: %', migration_step, SQLERRM;
RAISE WARNING 'Rolling back changes';
RAISE;
END;
-- Restore original timeout settings
EXECUTE 'SET lock_timeout TO ' || original_lock_timeout;
EXECUTE 'SET statement_timeout TO ' || original_statement_timeout;
END $$;
• Long-term: Implemented a comprehensive database migration framework in Go:
// db_migration.go - Database migration framework with safety features
package migration
import (
"context"
"database/sql"
"fmt"
"log"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"gopkg.in/yaml.v3"
)
// MigrationConfig holds the configuration for the migration process
type MigrationConfig struct {
DatabaseURL string `yaml:"database_url"`
MigrationsDir string `yaml:"migrations_dir"`
LockTimeout time.Duration `yaml:"lock_timeout"`
StatementTimeout time.Duration `yaml:"statement_timeout"`
MaxRetries int `yaml:"max_retries"`
RetryBackoffBase time.Duration `yaml:"retry_backoff_base"`
PreMigrationChecks []string `yaml:"pre_migration_checks"`
PostMigrationChecks []string `yaml:"post_migration_checks"`
DryRun bool `yaml:"dry_run"`
AutomaticRollback bool `yaml:"automatic_rollback"`
LogLevel string `yaml:"log_level"`
NotificationChannels []string `yaml:"notification_channels"`
MaxLockWaitTime time.Duration `yaml:"max_lock_wait_time"`
ConnectionPoolSize int `yaml:"connection_pool_size"`
}
// Migration represents a single database migration
type Migration struct {
Version int `db:"version"`
Name string `db:"name"`
AppliedAt time.Time `db:"applied_at"`
Description string `db:"description"`
UpSQL string `db:"-"`
DownSQL string `db:"-"`
Checksum string `db:"checksum"`
}
// MigrationService handles database migrations
type MigrationService struct {
db *sqlx.DB
config MigrationConfig
logger *log.Logger
}
// NewMigrationService creates a new migration service
func NewMigrationService(configPath string) (*MigrationService, error) {
// Load configuration
config, err := loadConfig(configPath)
if err != nil {
return nil, fmt.Errorf("failed to load config: %w", err)
}
// Set up logger
logger := log.New(os.Stdout, "[MIGRATION] ", log.LstdFlags)
// Connect to database
db, err := sqlx.Connect("postgres", config.DatabaseURL)
if err != nil {
return nil, fmt.Errorf("failed to connect to database: %w", err)
}
// Configure connection pool
db.SetMaxOpenConns(config.ConnectionPoolSize)
db.SetMaxIdleConns(config.ConnectionPoolSize / 2)
db.SetConnMaxLifetime(time.Hour)
return &MigrationService{
db: db,
config: config,
logger: logger,
}, nil
}
// loadConfig loads the migration configuration from a YAML file
func loadConfig(configPath string) (MigrationConfig, error) {
var config MigrationConfig
// Set defaults
config.LockTimeout = 10 * time.Second
config.StatementTimeout = 30 * time.Second
config.MaxRetries = 3
config.RetryBackoffBase = 2 * time.Second
config.AutomaticRollback = true
config.LogLevel = "info"
config.MaxLockWaitTime = 5 * time.Minute
config.ConnectionPoolSize = 5
// Read config file
data, err := os.ReadFile(configPath)
if err != nil {
return config, fmt.Errorf("failed to read config file: %w", err)
}
// Parse YAML
if err := yaml.Unmarshal(data, &config); err != nil {
return config, fmt.Errorf("failed to parse config file: %w", err)
}
return config, nil
}
// Initialize sets up the migrations table if it doesn't exist
func (s *MigrationService) Initialize(ctx context.Context) error {
s.logger.Println("Initializing migration system")
// Create migrations table if it doesn't exist
_, err := s.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
description TEXT,
checksum TEXT NOT NULL
)
`)
if err != nil {
return fmt.Errorf("failed to create migrations table: %w", err)
}
// Create migration_logs table for detailed logging
_, err = s.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS migration_logs (
id SERIAL PRIMARY KEY,
version INTEGER NOT NULL,
operation TEXT NOT NULL,
status TEXT NOT NULL,
message TEXT,
execution_time INTERVAL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
`)
if err != nil {
return fmt.Errorf("failed to create migration logs table: %w", err)
}
return nil
}
// LoadMigrations loads all migration files from the migrations directory
func (s *MigrationService) LoadMigrations() ([]Migration, error) {
s.logger.Printf("Loading migrations from %s", s.config.MigrationsDir)
var migrations []Migration
// Read migration files
files, err := os.ReadDir(s.config.MigrationsDir)
if err != nil {
return nil, fmt.Errorf("failed to read migrations directory: %w", err)
}
// Process each file
for _, file := range files {
if file.IsDir() || !strings.HasSuffix(file.Name(), ".sql") {
continue
}
// Parse filename (format: V001__migration_name.sql)
parts := strings.SplitN(file.Name(), "__", 2)
if len(parts) != 2 || !strings.HasPrefix(parts[0], "V") {
s.logger.Printf("Skipping file with invalid format: %s", file.Name())
continue
}
// Extract version number
versionStr := strings.TrimPrefix(parts[0], "V")
var version int
_, err := fmt.Sscanf(versionStr, "%d", &version)
if err != nil {
s.logger.Printf("Skipping file with invalid version: %s", file.Name())
continue
}
// Extract migration name
name := strings.TrimSuffix(parts[1], ".sql")
// Read file content
content, err := os.ReadFile(filepath.Join(s.config.MigrationsDir, file.Name()))
if err != nil {
return nil, fmt.Errorf("failed to read migration file %s: %w", file.Name(), err)
}
// Split content into up and down migrations
sections := strings.Split(string(content), "-- DOWN")
upSQL := sections[0]
downSQL := ""
if len(sections) > 1 {
downSQL = sections[1]
}
// Create migration object
migration := Migration{
Version: version,
Name: name,
Description: extractDescription(upSQL),
UpSQL: strings.TrimSpace(upSQL),
DownSQL: strings.TrimSpace(downSQL),
Checksum: calculateChecksum(upSQL),
}
migrations = append(migrations, migration)
}
// Sort migrations by version
sort.Slice(migrations, func(i, j int) bool {
return migrations[i].Version < migrations[j].Version
})
s.logger.Printf("Loaded %d migrations", len(migrations))
return migrations, nil
}
// extractDescription extracts the description from the migration SQL
func extractDescription(sql string) string {
lines := strings.Split(sql, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "-- Description:") {
return strings.TrimSpace(strings.TrimPrefix(line, "-- Description:"))
}
}
return ""
}
// calculateChecksum calculates a checksum for the migration SQL
func calculateChecksum(sql string) string {
// In a real implementation, use a proper hash function
return fmt.Sprintf("%d", len(sql))
}
// GetAppliedMigrations returns all migrations that have been applied
func (s *MigrationService) GetAppliedMigrations(ctx context.Context) ([]Migration, error) {
var migrations []Migration
err := s.db.SelectContext(ctx, &migrations, `
SELECT version, name, applied_at, description, checksum
FROM schema_migrations
ORDER BY version
`)
if err != nil {
return nil, fmt.Errorf("failed to get applied migrations: %w", err)
}
return migrations, nil
}
// Migrate applies all pending migrations
func (s *MigrationService) Migrate(ctx context.Context) error {
s.logger.Println("Starting migration process")
// Initialize migration system
if err := s.Initialize(ctx); err != nil {
return err
}
// Load migrations
migrations, err := s.LoadMigrations()
if err != nil {
return err
}
// Get applied migrations
appliedMigrations, err := s.GetAppliedMigrations(ctx)
if err != nil {
return err
}
// Create map of applied migrations for quick lookup
appliedMap := make(map[int]Migration)
for _, m := range appliedMigrations {
appliedMap[m.Version] = m
}
// Run pre-migration checks
if err := s.runPreMigrationChecks(ctx); err != nil {
return fmt.Errorf("pre-migration checks failed: %w", err)
}
// Apply pending migrations
for _, migration := range migrations {
// Skip if already applied
if _, ok := appliedMap[migration.Version]; ok {
s.logger.Printf("Migration V%03d__%s already applied", migration.Version, migration.Name)
continue
}
s.logger.Printf("Applying migration V%03d__%s", migration.Version, migration.Name)
// Start transaction
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}
// Set timeouts
_, err = tx.ExecContext(ctx, fmt.Sprintf("SET LOCAL lock_timeout = '%dms'", s.config.LockTimeout.Milliseconds()))
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to set lock timeout: %w", err)
}
_, err = tx.ExecContext(ctx, fmt.Sprintf("SET LOCAL statement_timeout = '%dms'", s.config.StatementTimeout.Milliseconds()))
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to set statement timeout: %w", err)
}
// Log migration start
_, err = tx.ExecContext(ctx, `
INSERT INTO migration_logs (version, operation, status, message)
VALUES ($1, 'MIGRATE', 'STARTED', $2)
`, migration.Version, fmt.Sprintf("Starting migration V%03d__%s", migration.Version, migration.Name))
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to log migration start: %w", err)
}
// Apply migration with retry logic
var migrationErr error
startTime := time.Now()
for retry := 0; retry < s.config.MaxRetries; retry++ {
if retry > 0 {
s.logger.Printf("Retrying migration V%03d__%s (attempt %d/%d)", migration.Version, migration.Name, retry+1, s.config.MaxRetries)
time.Sleep(s.config.RetryBackoffBase * time.Duration(1<<uint(retry)))
}
// Execute migration SQL
_, err = tx.ExecContext(ctx, migration.UpSQL)
if err == nil {
migrationErr = nil
break
}
migrationErr = err
s.logger.Printf("Migration V%03d__%s failed: %v", migration.Version, migration.Name, err)
}
executionTime := time.Since(startTime)
if migrationErr != nil {
// Log migration failure
_, _ = tx.ExecContext(ctx, `
INSERT INTO migration_logs (version, operation, status, message, execution_time)
VALUES ($1, 'MIGRATE', 'FAILED', $2, $3)
`, migration.Version, migrationErr.Error(), executionTime)
tx.Rollback()
return fmt.Errorf("migration V%03d__%s failed after %d retries: %w", migration.Version, migration.Name, s.config.MaxRetries, migrationErr)
}
// Record successful migration
_, err = tx.ExecContext(ctx, `
INSERT INTO schema_migrations (version, name, description, checksum)
VALUES ($1, $2, $3, $4)
`, migration.Version, migration.Name, migration.Description, migration.Checksum)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to record migration: %w", err)
}
// Log migration success
_, err = tx.ExecContext(ctx, `
INSERT INTO migration_logs (version, operation, status, message, execution_time)
VALUES ($1, 'MIGRATE', 'COMPLETED', $2, $3)
`, migration.Version, fmt.Sprintf("Successfully applied migration V%03d__%s", migration.Version, migration.Name), executionTime)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to log migration completion: %w", err)
}
// Commit transaction
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
s.logger.Printf("Successfully applied migration V%03d__%s in %v", migration.Version, migration.Name, executionTime)
}
// Run post-migration checks
if err := s.runPostMigrationChecks(ctx); err != nil {
return fmt.Errorf("post-migration checks failed: %w", err)
}
s.logger.Println("Migration process completed successfully")
return nil
}
// Rollback reverts the last applied migration
func (s *MigrationService) Rollback(ctx context.Context) error {
s.logger.Println("Starting rollback process")
// Get the last applied migration
var lastMigration Migration
err := s.db.GetContext(ctx, &lastMigration, `
SELECT version, name, applied_at, description, checksum
FROM schema_migrations
ORDER BY version DESC
LIMIT 1
`)
if err != nil {
if err == sql.ErrNoRows {
s.logger.Println("No migrations to roll back")
return nil
}
return fmt.Errorf("failed to get last migration: %w", err)
}
// Load the migration file to get the down SQL
migrations, err := s.LoadMigrations()
if err != nil {
return err
}
var migrationToRollback *Migration
for i := range migrations {
if migrations[i].Version == lastMigration.Version {
migrationToRollback = &migrations[i]
break
}
}
if migrationToRollback == nil {
return fmt.Errorf("migration file for version %d not found", lastMigration.Version)
}
if migrationToRollback.DownSQL == "" {
return fmt.Errorf("no down migration defined for version %d", lastMigration.Version)
}
s.logger.Printf("Rolling back migration V%03d__%s", lastMigration.Version, lastMigration.Name)
// Start transaction
tx, err := s.db.BeginTxx(ctx, nil)
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}
// Set timeouts
_, err = tx.ExecContext(ctx, fmt.Sprintf("SET LOCAL lock_timeout = '%dms'", s.config.LockTimeout.Milliseconds()))
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to set lock timeout: %w", err)
}
_, err = tx.ExecContext(ctx, fmt.Sprintf("SET LOCAL statement_timeout = '%dms'", s.config.StatementTimeout.Milliseconds()))
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to set statement timeout: %w", err)
}
// Log rollback start
_, err = tx.ExecContext(ctx, `
INSERT INTO migration_logs (version, operation, status, message)
VALUES ($1, 'ROLLBACK', 'STARTED', $2)
`, lastMigration.Version, fmt.Sprintf("Starting rollback of migration V%03d__%s", lastMigration.Version, lastMigration.Name))
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to log rollback start: %w", err)
}
// Apply rollback with retry logic
var rollbackErr error
startTime := time.Now()
for retry := 0; retry < s.config.MaxRetries; retry++ {
if retry > 0 {
s.logger.Printf("Retrying rollback of V%03d__%s (attempt %d/%d)", lastMigration.Version, lastMigration.Name, retry+1, s.config.MaxRetries)
time.Sleep(s.config.RetryBackoffBase * time.Duration(1<<uint(retry)))
}
// Execute rollback SQL
_, err = tx.ExecContext(ctx, migrationToRollback.DownSQL)
if err == nil {
rollbackErr = nil
break
}
rollbackErr = err
s.logger.Printf("Rollback of V%03d__%s failed: %v", lastMigration.Version, lastMigration.Name, err)
}
executionTime := time.Since(startTime)
if rollbackErr != nil {
// Log rollback failure
_, _ = tx.ExecContext(ctx, `
INSERT INTO migration_logs (version, operation, status, message, execution_time)
VALUES ($1, 'ROLLBACK', 'FAILED', $2, $3)
`, lastMigration.Version, rollbackErr.Error(), executionTime)
tx.Rollback()
return fmt.Errorf("rollback of V%03d__%s failed after %d retries: %w", lastMigration.Version, lastMigration.Name, s.config.MaxRetries, rollbackErr)
}
// Remove migration record
_, err = tx.ExecContext(ctx, `
DELETE FROM schema_migrations
WHERE version = $1
`, lastMigration.Version)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to remove migration record: %w", err)
}
// Log rollback success
_, err = tx.ExecContext(ctx, `
INSERT INTO migration_logs (version, operation, status, message, execution_time)
VALUES ($1, 'ROLLBACK', 'COMPLETED', $2, $3)
`, lastMigration.Version, fmt.Sprintf("Successfully rolled back migration V%03d__%s", lastMigration.Version, lastMigration.Name), executionTime)
if err != nil {
tx.Rollback()
return fmt.Errorf("failed to log rollback completion: %w", err)
}
// Commit transaction
if err := tx.Commit(); err != nil {
return fmt.Errorf("failed to commit transaction: %w", err)
}
s.logger.Printf("Successfully rolled back migration V%03d__%s in %v", lastMigration.Version, lastMigration.Name, executionTime)
return nil
}
// runPreMigrationChecks runs checks before migrations are applied
func (s *MigrationService) runPreMigrationChecks(ctx context.Context) error {
s.logger.Println("Running pre-migration checks")
// Check for active connections
var activeConnections int
err := s.db.GetContext(ctx, &activeConnections, `
SELECT COUNT(*)
FROM pg_stat_activity
WHERE datname = current_database()
AND pid <> pg_backend_pid()
AND state = 'active'
`)
if err != nil {
return fmt.Errorf("failed to check active connections: %w", err)
}
s.logger.Printf("Found %d active connections", activeConnections)
// Check for long-running queries
var longRunningQueries int
err = s.db.GetContext(ctx, &longRunningQueries, `
SELECT COUNT(*)
FROM pg_stat_activity
WHERE datname = current_database()
AND state = 'active'
AND pid <> pg_backend_pid()
AND (now() - query_start) > interval '5 minutes'
`)
if err != nil {
return fmt.Errorf("failed to check long-running queries: %w", err)
}
if longRunningQueries > 0 {
s.logger.Printf("WARNING: Found %d long-running queries", longRunningQueries)
}
// Check for locks
var locksCount int
err = s.db.GetContext(ctx, &locksCount, `
SELECT COUNT(*)
FROM pg_locks l
JOIN pg_stat_activity a ON l.pid = a.pid
WHERE a.datname = current_database()
AND l.granted = true
`)
if err != nil {
return fmt.Errorf("failed to check locks: %w", err)
}
s.logger.Printf("Found %d locks", locksCount)
// Check database size
var dbSize string
err = s.db.GetContext(ctx, &dbSize, `
SELECT pg_size_pretty(pg_database_size(current_database()))
`)
if err != nil {
return fmt.Errorf("failed to check database size: %w", err)
}
s.logger.Printf("Current database size: %s", dbSize)
// Run custom pre-migration checks
for _, check := range s.config.PreMigrationChecks {
s.logger.Printf("Running custom pre-migration check: %s", check)
_, err := s.db.ExecContext(ctx, check)
if err != nil {
return fmt.Errorf("custom pre-migration check failed: %w", err)
}
}
return nil
}
// runPostMigrationChecks runs checks after migrations are applied
func (s *MigrationService) runPostMigrationChecks(ctx context.Context) error {
s.logger.Println("Running post-migration checks")
// Check for invalid indexes
var invalidIndexes int
err := s.db.GetContext(ctx, &invalidIndexes, `
SELECT COUNT(*)
FROM pg_class c
JOIN pg_index i ON i.indexrelid = c.oid
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'public'
AND i.indisvalid = false
`)
if err != nil {
return fmt.Errorf("failed to check invalid indexes: %w", err)
}
if invalidIndexes > 0 {
s.logger.Printf("WARNING: Found %d invalid indexes", invalidIndexes)
}
// Check for bloated tables
var bloatedTables int
err = s.db.GetContext(ctx, &bloatedTables, `
SELECT COUNT(*)
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname = 'public'
AND c.relkind = 'r'
AND c.relpages > 0
AND c.reltuples > 0
AND (c.relpages::float / (c.reltuples / (c.relpages::float * 8192 / (c.reltuples * 28 + 8)))) > 2
`)
if err != nil {
return fmt.Errorf("failed to check bloated tables: %w", err)
}
if bloatedTables > 0 {
s.logger.Printf("WARNING: Found %d bloated tables", bloatedTables)
}
// Run custom post-migration checks
for _, check := range s.config.PostMigrationChecks {
s.logger.Printf("Running custom post-migration check: %s", check)
_, err := s.db.ExecContext(ctx, check)
if err != nil {
return fmt.Errorf("custom post-migration check failed: %w", err)
}
}
return nil
}
// Status returns the current migration status
func (s *MigrationService) Status(ctx context.Context) error {
s.logger.Println("Checking migration status")
// Load migrations
migrations, err := s.LoadMigrations()
if err != nil {
return err
}
// Get applied migrations
appliedMigrations, err := s.GetAppliedMigrations(ctx)
if err != nil {
return err
}
// Create map of applied migrations for quick lookup
appliedMap := make(map[int]Migration)
for _, m := range appliedMigrations {
appliedMap[m.Version] = m
}
// Print status
fmt.Println("Migration Status:")
fmt.Println("=================")
fmt.Printf("Total migrations: %d\n", len(migrations))
fmt.Printf("Applied migrations: %d\n", len(appliedMigrations))
fmt.Printf("Pending migrations: %d\n", len(migrations)-len(appliedMigrations))
fmt.Println()
fmt.Println("Migration Details:")
fmt.Println("=================")
fmt.Printf("%-10s %-30s %-20s %s\n", "Version", "Name", "Status", "Applied At")
fmt.Printf("%-10s %-30s %-20s %s\n", "-------", "----", "------", "----------")
for _, m := range migrations {
applied, ok := appliedMap[m.Version]
status := "PENDING"
appliedAt := ""
if ok {
status = "APPLIED"
appliedAt = applied.AppliedAt.Format("2006-01-02 15:04:05")
}
fmt.Printf("V%03d %-30s %-20s %s\n", m.Version, m.Name, status, appliedAt)
}
return nil
}
// Main function for CLI usage
func main() {
if len(os.Args) < 3 {
fmt.Println("Usage: migration [config_file] [command]")
fmt.Println("Commands: migrate, rollback, status")
os.Exit(1)
}
configPath := os.Args[1]
command := os.Args[2]
service, err := NewMigrationService(configPath)
if err != nil {
log.Fatalf("Failed to create migration service: %v", err)
}
ctx := context.Background()
switch command {
case "migrate":
if err := service.Migrate(ctx); err != nil {
log.Fatalf("Migration failed: %v", err)
}
case "rollback":
if err := service.Rollback(ctx); err != nil {
log.Fatalf("Rollback failed: %v", err)
}
case "status":
if err := service.Status(ctx); err != nil {
log.Fatalf("Status check failed: %v", err)
}
default:
log.Fatalf("Unknown command: %s", command)
}
}
Lessons Learned:
Database migrations require careful planning, testing, and safety mechanisms.
How to Avoid:
Implement proper transaction handling in all migration scripts.
Test migrations with production-like data volumes.
Schedule migrations during low-traffic periods.
Implement automated rollback procedures.
Monitor database resources during migrations.
No summary provided
What Happened:
During peak traffic hours, users reported slow response times and occasional errors. Application logs showed database connection timeouts and "too many connections" errors. However, database server monitoring indicated CPU, memory, and I/O were all within normal ranges. The issue worsened over time as more microservices were deployed, eventually leading to a partial outage when connection failures cascaded across multiple services.
Diagnosis Steps:
Analyzed application logs to identify the specific database errors.
Monitored active database connections during peak load.
Reviewed connection pool configurations across all microservices.
Traced connection lifecycle in problematic services.
Examined database server configuration for connection limits.
Root Cause:
The investigation revealed multiple issues with connection pool management: 1. Each microservice had its own connection pool configured with excessive max connections 2. Connection leaks in several services were not properly closing connections 3. Connection pool settings didn't include proper timeout and validation parameters 4. No circuit breaker pattern was implemented to handle database unavailability 5. The total potential connections from all services exceeded the database's max_connections setting
Fix/Workaround:
• Short-term: Implemented immediate fixes to prevent connection exhaustion:
// Before: Problematic HikariCP configuration in application.properties
spring.datasource.hikari.maximum-pool-size=100
spring.datasource.hikari.minimum-idle=10
spring.datasource.hikari.idle-timeout=600000
spring.datasource.hikari.max-lifetime=1800000
// After: Optimized HikariCP configuration in application.properties
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.idle-timeout=300000
spring.datasource.hikari.max-lifetime=1200000
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.validation-timeout=5000
spring.datasource.hikari.leak-detection-threshold=60000
spring.datasource.hikari.register-mbeans=true
• Fixed connection leaks in service code:
// Before: Connection leak in service code
@Service
public class UserService {
private final JdbcTemplate jdbcTemplate;
@Autowired
public UserService(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public User findUserById(Long id) {
// Connection acquired but not properly released in some error paths
try {
return jdbcTemplate.queryForObject(
"SELECT * FROM users WHERE id = ?",
new Object[]{id},
(rs, rowNum) -> {
User user = new User();
user.setId(rs.getLong("id"));
user.setUsername(rs.getString("username"));
user.setEmail(rs.getString("email"));
return user;
}
);
} catch (Exception e) {
log.error("Error finding user", e);
// Connection might not be properly released here
throw e;
}
}
}
// After: Fixed service code with proper connection handling
@Service
public class UserService {
private final JdbcTemplate jdbcTemplate;
@Autowired
public UserService(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
public User findUserById(Long id) {
try {
return jdbcTemplate.queryForObject(
"SELECT * FROM users WHERE id = ?",
new Object[]{id},
(rs, rowNum) -> {
User user = new User();
user.setId(rs.getLong("id"));
user.setUsername(rs.getString("username"));
user.setEmail(rs.getString("email"));
return user;
}
);
} catch (EmptyResultDataAccessException e) {
log.info("User not found with id: {}", id);
return null;
} catch (Exception e) {
log.error("Error finding user with id: {}", id, e);
throw e;
}
}
}
• Implemented a connection pool metrics collector:
@Configuration
public class DatabaseMetricsConfiguration {
@Bean
public DataSourcePoolMetadataProvider dataSourcePoolMetadataProvider() {
return dataSource -> {
if (dataSource instanceof HikariDataSource) {
return new HikariDataSourcePoolMetadata((HikariDataSource) dataSource);
}
return null;
};
}
@Bean
public MeterBinder hikariMetrics(DataSource dataSource) {
if (dataSource instanceof HikariDataSource) {
return new HikariCPMetrics((HikariDataSource) dataSource);
}
return null;
}
@Bean
public ConnectionPoolHealthIndicator connectionPoolHealthIndicator(DataSource dataSource) {
return new ConnectionPoolHealthIndicator(dataSource);
}
}
@Component
public class ConnectionPoolHealthIndicator extends AbstractHealthIndicator {
private final DataSource dataSource;
public ConnectionPoolHealthIndicator(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
if (dataSource instanceof HikariDataSource) {
HikariDataSource hikariDataSource = (HikariDataSource) dataSource;
HikariPoolMXBean poolMXBean = hikariDataSource.getHikariPoolMXBean();
int active = poolMXBean.getActiveConnections();
int idle = poolMXBean.getIdleConnections();
int total = poolMXBean.getTotalConnections();
int max = hikariDataSource.getMaximumPoolSize();
builder.up()
.withDetail("active", active)
.withDetail("idle", idle)
.withDetail("total", total)
.withDetail("max", max)
.withDetail("usage", (double) active / max);
// Mark as degraded if connection pool is more than 80% utilized
if ((double) active / max > 0.8) {
builder.status(Status.DOWN)
.withDetail("reason", "Connection pool utilization high");
}
} else {
builder.unknown()
.withDetail("reason", "Not a HikariDataSource");
}
}
}
• Implemented a circuit breaker pattern for database access:
@Configuration
public class ResilienceConfiguration {
@Bean
public CircuitBreakerRegistry circuitBreakerRegistry() {
CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.slowCallRateThreshold(50)
.slowCallDurationThreshold(Duration.ofSeconds(2))
.permittedNumberOfCallsInHalfOpenState(10)
.minimumNumberOfCalls(20)
.slidingWindowType(CircuitBreakerConfig.SlidingWindowType.COUNT_BASED)
.slidingWindowSize(100)
.waitDurationInOpenState(Duration.ofSeconds(60))
.automaticTransitionFromOpenToHalfOpenEnabled(true)
.build();
return CircuitBreakerRegistry.of(circuitBreakerConfig);
}
@Bean
public CircuitBreaker databaseCircuitBreaker(CircuitBreakerRegistry circuitBreakerRegistry) {
return circuitBreakerRegistry.circuitBreaker("database");
}
@Bean
public Retry databaseRetry() {
return RetryRegistry.of(RetryConfig.custom()
.maxAttempts(3)
.waitDuration(Duration.ofMillis(100))
.retryExceptions(SQLException.class, DataAccessException.class)
.ignoreExceptions(NonTransientDataAccessException.class)
.build()).retry("database");
}
}
@Service
public class ResilientUserService {
private final JdbcTemplate jdbcTemplate;
private final CircuitBreaker circuitBreaker;
private final Retry retry;
@Autowired
public ResilientUserService(
JdbcTemplate jdbcTemplate,
@Qualifier("databaseCircuitBreaker") CircuitBreaker circuitBreaker,
@Qualifier("databaseRetry") Retry retry) {
this.jdbcTemplate = jdbcTemplate;
this.circuitBreaker = circuitBreaker;
this.retry = retry;
}
public User findUserById(Long id) {
return Retry.decorateSupplier(retry,
CircuitBreaker.decorateSupplier(circuitBreaker,
() -> {
try {
return jdbcTemplate.queryForObject(
"SELECT * FROM users WHERE id = ?",
new Object[]{id},
(rs, rowNum) -> {
User user = new User();
user.setId(rs.getLong("id"));
user.setUsername(rs.getString("username"));
user.setEmail(rs.getString("email"));
return user;
}
);
} catch (EmptyResultDataAccessException e) {
return null;
}
}
)
).get();
}
}
• Updated PostgreSQL configuration for better connection handling:
-- Before: Default PostgreSQL configuration
max_connections = 100
shared_buffers = 128MB
effective_cache_size = 4GB
work_mem = 4MB
maintenance_work_mem = 64MB
idle_in_transaction_session_timeout = 0
-- After: Optimized PostgreSQL configuration
max_connections = 500
shared_buffers = 2GB
effective_cache_size = 6GB
work_mem = 8MB
maintenance_work_mem = 128MB
idle_in_transaction_session_timeout = 60000 -- 1 minute timeout for idle transactions
statement_timeout = 120000 -- 2 minute statement timeout
tcp_keepalives_idle = 60
tcp_keepalives_interval = 10
tcp_keepalives_count = 10
• Long-term: Implemented a comprehensive database connection management strategy:
- Created a centralized connection pool service with dynamic scaling
- Implemented connection pool monitoring and alerting
- Developed automated testing for connection leaks
- Established connection pool sizing guidelines for all services
- Implemented database request queuing and prioritization
Lessons Learned:
Connection pool management is critical in microservices architectures with shared databases.
How to Avoid:
Implement proper connection pool sizing based on service needs and database limits.
Use connection leak detection and timeout settings in all services.
Monitor connection pool metrics and set up alerts for potential issues.
Implement circuit breakers to handle database unavailability gracefully.
Consider using a connection pool proxy for centralized management in microservices.