mirror of
				https://github.com/go-gitea/gitea.git
				synced 2025-10-24 00:02:33 -04:00 
			
		
		
		
	enable mirror, usestdlibbars and perfsprint part of: https://github.com/go-gitea/gitea/issues/34083 --------- Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
		
			
				
	
	
		
			137 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			137 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2024 The Gitea Authors. All rights reserved.
 | |
| // SPDX-License-Identifier: MIT
 | |
| 
 | |
| package globallock
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"code.gitea.io/gitea/modules/nosql"
 | |
| 
 | |
| 	"github.com/go-redsync/redsync/v4"
 | |
| 	"github.com/go-redsync/redsync/v4/redis/goredis/v9"
 | |
| )
 | |
| 
 | |
| const redisLockKeyPrefix = "gitea:globallock:"
 | |
| 
 | |
| // redisLockExpiry is the default expiry time for a lock.
 | |
| // Define it as a variable to make it possible to change it in tests.
 | |
| var redisLockExpiry = 30 * time.Second
 | |
| 
 | |
| type redisLocker struct {
 | |
| 	rs *redsync.Redsync
 | |
| 
 | |
| 	mutexM   sync.Map
 | |
| 	closed   atomic.Bool
 | |
| 	extendWg sync.WaitGroup
 | |
| }
 | |
| 
 | |
| var _ Locker = &redisLocker{}
 | |
| 
 | |
| func NewRedisLocker(connection string) Locker {
 | |
| 	l := &redisLocker{
 | |
| 		rs: redsync.New(
 | |
| 			goredis.NewPool(
 | |
| 				nosql.GetManager().GetRedisClient(connection),
 | |
| 			),
 | |
| 		),
 | |
| 	}
 | |
| 
 | |
| 	l.extendWg.Add(1)
 | |
| 	l.startExtend()
 | |
| 
 | |
| 	return l
 | |
| }
 | |
| 
 | |
| func (l *redisLocker) Lock(ctx context.Context, key string) (ReleaseFunc, error) {
 | |
| 	return l.lock(ctx, key, 0)
 | |
| }
 | |
| 
 | |
| func (l *redisLocker) TryLock(ctx context.Context, key string) (bool, ReleaseFunc, error) {
 | |
| 	f, err := l.lock(ctx, key, 1)
 | |
| 
 | |
| 	var (
 | |
| 		errTaken     *redsync.ErrTaken
 | |
| 		errNodeTaken *redsync.ErrNodeTaken
 | |
| 	)
 | |
| 	if errors.As(err, &errTaken) || errors.As(err, &errNodeTaken) {
 | |
| 		return false, f, nil
 | |
| 	}
 | |
| 	return err == nil, f, err
 | |
| }
 | |
| 
 | |
| // Close closes the locker.
 | |
| // It will stop extending the locks and refuse to acquire new locks.
 | |
| // In actual use, it is not necessary to call this function.
 | |
| // But it's useful in tests to release resources.
 | |
| // It could take some time since it waits for the extending goroutine to finish.
 | |
| func (l *redisLocker) Close() error {
 | |
| 	l.closed.Store(true)
 | |
| 	l.extendWg.Wait()
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (l *redisLocker) lock(ctx context.Context, key string, tries int) (ReleaseFunc, error) {
 | |
| 	if l.closed.Load() {
 | |
| 		return func() {}, errors.New("locker is closed")
 | |
| 	}
 | |
| 
 | |
| 	options := []redsync.Option{
 | |
| 		redsync.WithExpiry(redisLockExpiry),
 | |
| 	}
 | |
| 	if tries > 0 {
 | |
| 		options = append(options, redsync.WithTries(tries))
 | |
| 	}
 | |
| 	mutex := l.rs.NewMutex(redisLockKeyPrefix+key, options...)
 | |
| 	if err := mutex.LockContext(ctx); err != nil {
 | |
| 		return func() {}, err
 | |
| 	}
 | |
| 
 | |
| 	l.mutexM.Store(key, mutex)
 | |
| 
 | |
| 	releaseOnce := sync.Once{}
 | |
| 	return func() {
 | |
| 		releaseOnce.Do(func() {
 | |
| 			l.mutexM.Delete(key)
 | |
| 
 | |
| 			// It's safe to ignore the error here,
 | |
| 			// if it failed to unlock, it will be released automatically after the lock expires.
 | |
| 			// Do not call mutex.UnlockContext(ctx) here, or it will fail to release when ctx has timed out.
 | |
| 			_, _ = mutex.Unlock()
 | |
| 		})
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (l *redisLocker) startExtend() {
 | |
| 	if l.closed.Load() {
 | |
| 		l.extendWg.Done()
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	toExtend := make([]*redsync.Mutex, 0)
 | |
| 	l.mutexM.Range(func(_, value any) bool {
 | |
| 		m := value.(*redsync.Mutex)
 | |
| 
 | |
| 		// Extend the lock if it is not expired.
 | |
| 		// Although the mutex will be removed from the map before it is released,
 | |
| 		// it still can be expired because of a failed extension.
 | |
| 		// If it happens, it does not need to be extended anymore.
 | |
| 		if time.Now().After(m.Until()) {
 | |
| 			return true
 | |
| 		}
 | |
| 
 | |
| 		toExtend = append(toExtend, m)
 | |
| 		return true
 | |
| 	})
 | |
| 	for _, v := range toExtend {
 | |
| 		// If it failed to extend, it will be released automatically after the lock expires.
 | |
| 		_, _ = v.Extend()
 | |
| 	}
 | |
| 
 | |
| 	time.AfterFunc(redisLockExpiry/2, l.startExtend)
 | |
| }
 |