Skip to content

chore: database transaction #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/app/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func (app *App) StartService() error {
userRepo := userRepository.NewRepository(app.DB, app.Log)

// define usecase
userUC := userUseCase.NewUseCase(userRepo, app.DB, app.Log)
userUC := userUseCase.NewUseCase(userRepo, app.Log)

// define controllers
userCTRL := userV1.NewHandlers(userUC, app.Log)
Expand Down
3 changes: 1 addition & 2 deletions internal/users/entities/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ type (
}

LockingOpt struct {
ForUpdateNoWait bool
ForUpdate bool
PessimisticLocking bool
}
)

Expand Down
16 changes: 16 additions & 0 deletions internal/users/mock/repository_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 35 additions & 14 deletions internal/users/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,55 @@ package repository

import (
"context"
"database/sql"

"github.com/DoWithLogic/golang-clean-architecture/internal/users/entities"
"github.com/DoWithLogic/golang-clean-architecture/internal/users/repository/repository_query"
"github.com/DoWithLogic/golang-clean-architecture/pkg/datasource"
"github.com/DoWithLogic/golang-clean-architecture/pkg/otel/zerolog"
"github.com/DoWithLogic/golang-clean-architecture/pkg/utils"
"github.com/jmoiron/sqlx"
)

type (
Repository interface {
Atomic(ctx context.Context, opt *sql.TxOptions, repo func(tx Repository) error) error

SaveNewUser(context.Context, entities.Users) (int64, error)
UpdateUserByID(context.Context, entities.UpdateUsers) error
GetUserByID(context.Context, int64, ...entities.LockingOpt) (entities.Users, error)
UpdateUserStatusByID(context.Context, entities.UpdateUserStatus) error
}

repository struct {
conn datasource.SQLTxConn
db *sqlx.DB
conn datasource.ConnTx
log *zerolog.Logger
}
)

func NewRepository(conn datasource.SQLTxConn, log *zerolog.Logger) Repository {
return &repository{conn, log}
func NewRepository(c *sqlx.DB, l *zerolog.Logger) Repository {
return &repository{conn: c, log: l, db: c}
}

// Atomic implements vendor.Repository for transaction query
func (r *repository) Atomic(ctx context.Context, opt *sql.TxOptions, repo func(tx Repository) error) error {
txConn, err := r.db.BeginTx(ctx, opt)
if err != nil {
r.log.Z().Err(err).Msg("[repository]Atomic.BeginTxx")

return err
}

newRepository := &repository{conn: txConn, db: r.db}

repo(newRepository)

if err := new(datasource.DataSource).EndTx(txConn, err); err != nil {
return err
}

return nil
}

func (repo *repository) SaveNewUser(ctx context.Context, user entities.Users) (int64, error) {
Expand All @@ -39,7 +64,7 @@ func (repo *repository) SaveNewUser(ctx context.Context, user entities.Users) (i
}

var userID int64
err := new(datasource.SQL).Exec(repo.conn.ExecContext(ctx, repository_query.InsertUsers, args...)).Scan(nil, &userID)
err := new(datasource.DataSource).ExecSQL(repo.conn.ExecContext(ctx, repository_query.InsertUsers, args...)).Scan(nil, &userID)
if err != nil {
repo.log.Z().Err(err).Msg("[repository]SaveNewUser.ExecContext")

Expand All @@ -59,7 +84,7 @@ func (repo *repository) UpdateUserByID(ctx context.Context, user entities.Update
user.UserID,
}

err := new(datasource.SQL).Exec(repo.conn.ExecContext(ctx, repository_query.UpdateUsers, args...)).Scan(nil, nil)
err := new(datasource.DataSource).ExecSQL(repo.conn.ExecContext(ctx, repository_query.UpdateUsers, args...)).Scan(nil, nil)
if err != nil {
repo.log.Z().Err(err).Msg("[repository]UpdateUserByID.ExecContext")

Expand All @@ -69,7 +94,7 @@ func (repo *repository) UpdateUserByID(ctx context.Context, user entities.Update
return nil
}

func (repo *repository) GetUserByID(ctx context.Context, userID int64, lockOpt ...entities.LockingOpt) (userData entities.Users, err error) {
func (repo *repository) GetUserByID(ctx context.Context, userID int64, options ...entities.LockingOpt) (userData entities.Users, err error) {
args := utils.Array{
userID,
}
Expand All @@ -87,15 +112,11 @@ func (repo *repository) GetUserByID(ctx context.Context, userID int64, lockOpt .

query := repository_query.GetUserByID

if len(lockOpt) >= 1 {
if lockOpt[0].ForUpdate {
query += " FOR UPDATE;"
} else {
query += " FOR UPDATE NO WAIT;"
}
if len(options) >= 1 && options[0].PessimisticLocking {
query += " FOR UPDATE"
}

if err = new(datasource.SQL).Query(repo.conn.QueryContext(ctx, query, args...)).Scan(row); err != nil {
if err = new(datasource.DataSource).QuerySQL(repo.conn.QueryContext(ctx, query, args...)).Scan(row); err != nil {
repo.log.Z().Err(err).Msg("[repository]GetUserByID.QueryContext")
return userData, err
}
Expand All @@ -112,7 +133,7 @@ func (repo *repository) UpdateUserStatusByID(ctx context.Context, req entities.U
}

var updatedID int64
err := new(datasource.SQL).Exec(repo.conn.ExecContext(ctx, repository_query.UpdateUserStatusByID, args...)).Scan(nil, &updatedID)
err := new(datasource.DataSource).ExecSQL(repo.conn.ExecContext(ctx, repository_query.UpdateUserStatusByID, args...)).Scan(nil, &updatedID)
if err != nil {
repo.log.Z().Err(err).Msg("[repository]UpdateUserStatusByID.ExecContext")

Expand Down
30 changes: 9 additions & 21 deletions internal/users/usecase/usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package usecase

import (
"context"
"database/sql"

"github.com/DoWithLogic/golang-clean-architecture/internal/users/entities"
"github.com/DoWithLogic/golang-clean-architecture/internal/users/repository"
"github.com/DoWithLogic/golang-clean-architecture/pkg/datasource"
"github.com/DoWithLogic/golang-clean-architecture/pkg/otel/zerolog"
"github.com/jmoiron/sqlx"
)

type (
Expand All @@ -19,13 +18,12 @@ type (

usecase struct {
repo repository.Repository
dbTx *sqlx.DB
log *zerolog.Logger
}
)

func NewUseCase(repo repository.Repository, txConn *sqlx.DB, log *zerolog.Logger) Usecase {
return &usecase{repo, txConn, log}
func NewUseCase(repo repository.Repository, log *zerolog.Logger) Usecase {
return &usecase{repo, log}
}

func (uc *usecase) CreateUser(ctx context.Context, payload entities.CreateUser) (int64, error) {
Expand All @@ -40,32 +38,22 @@ func (uc *usecase) CreateUser(ctx context.Context, payload entities.CreateUser)
}

func (uc *usecase) UpdateUser(ctx context.Context, updateData entities.UpdateUsers) error {
return func(dbTx *sqlx.DB) error {
txConn, err := uc.dbTx.BeginTx(ctx, nil)
if err != nil {
return err
}

defer func() {
if err := new(datasource.SQL).EndTx(txConn, err); err != nil {
return
}
}()
return uc.repo.Atomic(ctx, &sql.TxOptions{}, func(tx repository.Repository) error {

repoTx := repository.NewRepository(txConn, uc.log)

if _, err := repoTx.GetUserByID(ctx, updateData.UserID, entities.LockingOpt{ForUpdate: true}); err != nil {
if _, err := tx.GetUserByID(ctx, updateData.UserID, entities.LockingOpt{PessimisticLocking: true}); err != nil {
uc.log.Z().Err(err).Msg("[usecase]UpdateUser.GetUserByID")

return err
}

if err = repoTx.UpdateUserByID(ctx, entities.NewUpdateUsers(updateData)); err != nil {
if err := tx.UpdateUserByID(ctx, entities.NewUpdateUsers(updateData)); err != nil {
uc.log.Z().Err(err).Msg("[usecase]UpdateUser.UpdateUserByID")

return err
}

return nil
}(uc.dbTx)
})
}

func (uc *usecase) UpdateUserStatus(ctx context.Context, req entities.UpdateUserStatus) error {
Expand Down
7 changes: 1 addition & 6 deletions internal/users/usecase/usecase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func Test_usecase_CreateUser(t *testing.T) {
repo := mocks.NewMockRepository(ctrl)
uc := usecase.NewUseCase(
repo,
nil,
zerolog.NewZeroLog(ctx, os.Stdout),
)

Expand Down Expand Up @@ -104,11 +103,7 @@ func Test_usecase_UpdateUserStatus(t *testing.T) {

ctx := context.Background()
repo := mocks.NewMockRepository(ctrl)
uc := usecase.NewUseCase(
repo,
nil,
zerolog.NewZeroLog(ctx, os.Stdout),
)
uc := usecase.NewUseCase(repo, zerolog.NewZeroLog(ctx, os.Stdout))

args := entities.UpdateUserStatus{
UserID: 1,
Expand Down
66 changes: 19 additions & 47 deletions pkg/datasource/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,17 @@ import (
)

type (
BeginTx interface {
Conn interface {
BeginTx(ctx context.Context, opts *sql.TxOptions) (tx *sql.Tx, err error)
}
ExecContext interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (res sql.Result, err error)
}
PingContext interface {
PingContext(ctx context.Context) (err error)
io.Closer
ConnTx
}
PrepareContext interface {

ConnTx interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (res sql.Result, err error)
PrepareContext(ctx context.Context, query string) (stmt *sql.Stmt, err error)
}
QueryContext interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (rows *sql.Rows, err error)
}
QueryRowContext interface {
QueryRowContext(ctx context.Context, query string, args ...interface{}) (row *sql.Row)
}

Expand All @@ -37,11 +32,6 @@ type (
}

Query interface {
// Scan accept do, a func that accept `i int` as index and returns a List
// of pointer.
// List == nil // break the loop
// len(List) < 1 // skip the current loop
// len(List) > 0 // assign the pointer, must be same as the length of columns
Scan(row func(i int) utils.Array) (err error)
}

Expand All @@ -55,30 +45,17 @@ type (
err error
}

SQLConn interface {
BeginTx
io.Closer
PingContext
SQLTxConn
}

SQLTxConn interface {
ExecContext
PrepareContext
QueryContext
QueryRowContext
}

SQL struct{}
DataSource struct{}
)

var (
_ SQLConn = (*sql.Conn)(nil)
_ SQLConn = (*sql.DB)(nil)
_ SQLTxConn = (*sql.Tx)(nil)
log = zerolog.NewZeroLog(context.Background(), os.Stdout)
_ Conn = (*sql.Conn)(nil)
_ Conn = (*sql.DB)(nil)
_ ConnTx = (*sql.Tx)(nil)
log = zerolog.NewZeroLog(context.Background(), os.Stdout)
)

// datasource errors
var (
ErrNoColumnReturned = errors.New("no columns returned")
ErrDataNotFound = errors.New("data not found")
Expand Down Expand Up @@ -193,20 +170,15 @@ func (x query) Scan(row func(i int) utils.Array) error {
return err
}

func (SQL) Exec(sqlResult sql.Result, err error) Exec { return exec{sqlResult, err} }
func (DataSource) ExecSQL(sqlResult sql.Result, err error) exec {
return exec{sqlResult, err}
}

func (SQL) Query(sqlRows *sql.Rows, err error) Query { return query{sqlRows, err} }
func (DataSource) QuerySQL(sqlRows *sql.Rows, err error) Query {
return query{sqlRows, err}
}

// EndTx will end transaction with provided *sql.Tx and error. The tx argument
// should be valid, and then will check the err, if any error occurred, will
// commencing the ROLLBACK else will COMMIT the transaction.
//
// txc := XSQLTxConn(db) // shared between *sql.Tx, *sql.DB and *sql.Conn
// if tx, err := db.BeginTx(ctx, nil); err == nil && tx != nil {
// defer func() { err = xsql.EndTx(tx, err) }()
// txc = tx
// }
func (SQL) EndTx(tx *sql.Tx, err error) error {
func (DataSource) EndTx(tx *sql.Tx, err error) error {
if tx == nil {
log.Z().Err(ErrInvalidTransaction).Msg("[database:EndTx]")

Expand Down