v0.0.1
This commit is contained in:
108
batcher/batcher.go
Normal file
108
batcher/batcher.go
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
package batcher
|
||||||
|
|
||||||
|
import (
|
||||||
|
"btchrr/models"
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Batcher - encapsulates the batch size for splitting a slice into batches
|
||||||
|
type Batcher struct {
|
||||||
|
batchSize int
|
||||||
|
placeholders map[string]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBatcher - Creates a new batcher instance with the specified batch size.
|
||||||
|
// Returns an error if the batch size is invalid.
|
||||||
|
func NewBatcher(batchSize int) (*Batcher, error) {
|
||||||
|
if batchSize <= 0 {
|
||||||
|
return nil, models.ErrInvalidBatchSize
|
||||||
|
}
|
||||||
|
b := &Batcher{
|
||||||
|
batchSize: batchSize,
|
||||||
|
placeholders: make(map[string]struct{}),
|
||||||
|
}
|
||||||
|
for _, ph := range []string{"?", "$", ":", ":"} {
|
||||||
|
b.placeholders[ph] = struct{}{}
|
||||||
|
}
|
||||||
|
return b, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// BuildBatch - Builds a batch query from a single query
|
||||||
|
func (b *Batcher) BuildBatches(singleQuery string, items []any) ([]models.BatchedQuery, error) {
|
||||||
|
batchedItems, err := b.batchItems(items)
|
||||||
|
if err != nil {
|
||||||
|
return []models.BatchedQuery{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
batches, err := b.BuildBatchQuery(singleQuery, b.batchSize, batchedItems)
|
||||||
|
if err != nil {
|
||||||
|
return []models.BatchedQuery{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return batches, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Batch - Splits the input slice into batches of the specified size.
|
||||||
|
// Returns an error if the input slice is empty.
|
||||||
|
func (b *Batcher) batchItems(items []any) ([][]any, error) {
|
||||||
|
if len(items) == 0 {
|
||||||
|
return nil, models.ErrNoItems
|
||||||
|
}
|
||||||
|
|
||||||
|
totalItems := len(items)
|
||||||
|
numOfBatches := (totalItems + b.batchSize - 1) / b.batchSize // round up
|
||||||
|
|
||||||
|
batches := make([][]any, 0, numOfBatches)
|
||||||
|
|
||||||
|
for i := 0; i < totalItems; i += b.batchSize {
|
||||||
|
end := i + b.batchSize
|
||||||
|
if end > totalItems {
|
||||||
|
end = totalItems
|
||||||
|
}
|
||||||
|
// Copy elements to avoid possible side effects if the original slice is modified
|
||||||
|
batch := make([]any, end-i)
|
||||||
|
copy(batch, items[i:end])
|
||||||
|
batches = append(batches, batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
return batches, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// BuildBatchQuery - builds a batch query from a single query
|
||||||
|
func (b *Batcher) BuildBatchQuery(singleQuery string, batchSize int, batchedItems [][]any) (batchedQueries []models.BatchedQuery, err error) {
|
||||||
|
placeholder, err := b.detectPlaceholders(singleQuery)
|
||||||
|
if err != nil {
|
||||||
|
return []models.BatchedQuery{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch placeholder {
|
||||||
|
case "?":
|
||||||
|
return b.buildSqliteQuery(singleQuery, batchSize, batchedItems)
|
||||||
|
case "$":
|
||||||
|
return b.buildPostgresQuery(singleQuery, batchSize, batchedItems)
|
||||||
|
default:
|
||||||
|
return b.buildQueryWithNamedPlaceholders(singleQuery, batchSize, placeholder, batchedItems)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// detectPlaceholder определяет тип плейсхолдера в SQL-запросе, ищет плейсхолдеры окружённые пробелами (" ? ", " $ ", " : ")
|
||||||
|
func (b *Batcher) detectPlaceholders(query string) (string, error) {
|
||||||
|
for _, s := range query {
|
||||||
|
if _, ok := b.placeholders[string(s)]; ok {
|
||||||
|
return string(s), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", models.ErrCannotDetectPlaceholder
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batcher) buildSqliteQuery(singleQuery string, batchSize int, batchedItems [][]any) (batchedQueries []models.BatchedQuery, err error) {
|
||||||
|
return []models.BatchedQuery{}, errors.New("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batcher) buildPostgresQuery(singleQuery string, batchSize int, batchedItems [][]any) (batchedQueries []models.BatchedQuery, err error) {
|
||||||
|
return []models.BatchedQuery{}, errors.New("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Batcher) buildQueryWithNamedPlaceholders(singleQuery string, batchSize int, placeholder string, batchedItems [][]any) (batchedQueries []models.BatchedQuery, err error) {
|
||||||
|
return []models.BatchedQuery{}, errors.New("not implemented")
|
||||||
|
}
|
||||||
91
btchrr.go
Normal file
91
btchrr.go
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
package Btchrr
|
||||||
|
|
||||||
|
import (
|
||||||
|
"btchrr/batcher"
|
||||||
|
"btchrr/dbadapter"
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"btchrr/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Btchrr - main struct for the package
|
||||||
|
type Btchrr struct {
|
||||||
|
batcher *batcher.Batcher
|
||||||
|
executor Executor
|
||||||
|
}
|
||||||
|
|
||||||
|
// Executor - interface for the sql execution
|
||||||
|
type Executor interface {
|
||||||
|
Exec(ctx context.Context, query models.BatchedQuery) (sql.Result, error)
|
||||||
|
CheckQuery(query string) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// AggregatedResult - aggregated result from all batches
|
||||||
|
type AggregatedResult struct {
|
||||||
|
rowsAffected int64
|
||||||
|
lastInsertId int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AggregatedResult) LastInsertId() (int64, error) {
|
||||||
|
return r.lastInsertId, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *AggregatedResult) RowsAffected() (int64, error) {
|
||||||
|
return r.rowsAffected, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBtchrr - creates a new Btchrr instance
|
||||||
|
func NewBtchrr(batchSize int, db *sql.DB) (*Btchrr, error) {
|
||||||
|
batcher, err := batcher.NewBatcher(batchSize)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
dbAdapter := dbadapter.NewSQLAdapter(db)
|
||||||
|
|
||||||
|
return &Btchrr{
|
||||||
|
batcher: batcher,
|
||||||
|
executor: dbAdapter,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Exec - accepts a query for single item, items and executes it in batches
|
||||||
|
func (b *Btchrr) Exec(ctx context.Context, query string, items []any) (sql.Result, error) {
|
||||||
|
err := b.executor.CheckQuery(query)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
batches, err := b.batcher.BuildBatches(query, items)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var totalRowsAffected int64
|
||||||
|
var lastInsertId int64
|
||||||
|
|
||||||
|
// Execute SQL query for each batch (transforming single-item query to batch query)
|
||||||
|
for _, batch := range batches {
|
||||||
|
|
||||||
|
result, err := b.executor.Exec(ctx, batch)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Суммируем результаты от всех батчей
|
||||||
|
rowsAffected, _ := result.RowsAffected()
|
||||||
|
totalRowsAffected += rowsAffected
|
||||||
|
|
||||||
|
// Берем последний InsertId
|
||||||
|
insertId, _ := result.LastInsertId()
|
||||||
|
if insertId > 0 {
|
||||||
|
lastInsertId = insertId
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &AggregatedResult{
|
||||||
|
rowsAffected: totalRowsAffected,
|
||||||
|
lastInsertId: lastInsertId,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
32
dbadapter/sql_adapter.go
Normal file
32
dbadapter/sql_adapter.go
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
package dbadapter
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
|
||||||
|
"btchrr/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DBAdapter struct {
|
||||||
|
db *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSQLAdapter(db *sql.DB) *DBAdapter {
|
||||||
|
return &DBAdapter{db: db}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *DBAdapter) Exec(ctx context.Context, batchedQuery models.BatchedQuery) (sql.Result, error) {
|
||||||
|
res, err := a.db.ExecContext(ctx, string(batchedQuery))
|
||||||
|
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// CheckQuery - checks if the query is valid
|
||||||
|
func (a *DBAdapter) CheckQuery(query string) error {
|
||||||
|
stmt, err := a.db.Prepare(query)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
stmt.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
12
models/models.go
Normal file
12
models/models.go
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
package models
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrNoItems = errors.New("no items recieved, batch is empty")
|
||||||
|
ErrInvalidBatchSize = errors.New("batch size must be greater than zero")
|
||||||
|
ErrCannotDetectPlaceholder = errors.New("cannot detect placeholder in query")
|
||||||
|
)
|
||||||
|
|
||||||
|
// BatchedQuery - single batch query
|
||||||
|
type BatchedQuery string
|
||||||
110
readme.md
Normal file
110
readme.md
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
# Btchrr
|
||||||
|
|
||||||
|
A Go package for automatic batching of database operations with query transformation and result aggregation.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
- ✅ Automatic splitting of items into batches of specified size
|
||||||
|
- ✅ SQL query transformation from single-item to batch queries
|
||||||
|
- ✅ Batch query execution with result aggregation
|
||||||
|
- ✅ Support for any SQL database through `Executor` interface
|
||||||
|
- ✅ Database-specific placeholder support (PostgreSQL, MySQL, SQLite, etc.)
|
||||||
|
- ✅ Error handling and input validation
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
### PostgreSQL Example
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"log"
|
||||||
|
"your-project/btchrr" // Path to your package in the project
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Database connection
|
||||||
|
db, err := sql.Open("postgres", "connection_string")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
// Create Btchrr instance with batch size 100 and PostgreSQL placeholder
|
||||||
|
btchrr, err := btchrr.NewBtchrr(100, db, "$1")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare data for insertion
|
||||||
|
items := []any{"John", "Jane", "Bob", "Alice", "Charlie"}
|
||||||
|
|
||||||
|
// Execute query with automatic batching
|
||||||
|
ctx := context.Background()
|
||||||
|
result, err := btchrr.Exec(ctx, "INSERT INTO users (name) VALUES ($1)", items)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get aggregated results
|
||||||
|
rowsAffected, _ := result.RowsAffected()
|
||||||
|
lastId, _ := result.LastInsertId()
|
||||||
|
|
||||||
|
log.Printf("Rows affected: %d, Last ID: %d", rowsAffected, lastId)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### MySQL/SQLite Example
|
||||||
|
|
||||||
|
```go
|
||||||
|
// For MySQL or SQLite, use "?" placeholder
|
||||||
|
btchrr, err := btchrr.NewBtchrr(100, db, "?")
|
||||||
|
|
||||||
|
// Query will be transformed from:
|
||||||
|
// "INSERT INTO users (name) VALUES (?)"
|
||||||
|
// To:
|
||||||
|
// "INSERT INTO users (name) VALUES (?, ?, ?)"
|
||||||
|
```
|
||||||
|
|
||||||
|
## API
|
||||||
|
|
||||||
|
### NewBtchrr(batchSize int, db *sql.DB, placeholder string) (*Btchrr, error)
|
||||||
|
|
||||||
|
Creates a new Btchrr instance with specified batch size, database connection, and placeholder format.
|
||||||
|
|
||||||
|
**Parameters:**
|
||||||
|
|
||||||
|
- `batchSize` - batch size (must be > 0)
|
||||||
|
- `db` - database connection
|
||||||
|
- `placeholder` - database-specific placeholder ("?" for MySQL/SQLite, "$1" for PostgreSQL)
|
||||||
|
|
||||||
|
**Returns:**
|
||||||
|
|
||||||
|
- `*Btchrr` - Btchrr instance
|
||||||
|
- `error` - creation error
|
||||||
|
|
||||||
|
### Exec(ctx context.Context, query string, items []any) (sql.Result, error)
|
||||||
|
|
||||||
|
Executes SQL query for each batch and returns aggregated result.
|
||||||
|
|
||||||
|
**Parameters:**
|
||||||
|
|
||||||
|
- `ctx` - execution context
|
||||||
|
- `query` - SQL query for single item
|
||||||
|
- `items` - slice of items to process
|
||||||
|
|
||||||
|
**Returns:**
|
||||||
|
|
||||||
|
- `sql.Result` - aggregated result from all batches
|
||||||
|
- `error` - execution error
|
||||||
|
|
||||||
|
## Future Plans
|
||||||
|
|
||||||
|
- [ ] Support for GORM, sqlx, ent, go-pg, pgx
|
||||||
|
- [ ] Dynamic batch size based on item count
|
||||||
|
- [ ] Transaction support
|
||||||
|
- [ ] Performance metrics
|
||||||
|
- [ ] Transaction support
|
||||||
Reference in New Issue
Block a user