mirror of
https://github.com/go-gitea/gitea.git
synced 2025-07-17 00:01:00 -04:00
Compare commits
4 Commits
b6eea680ce
...
3dc2724d36
Author | SHA1 | Date | |
---|---|---|---|
|
3dc2724d36 | ||
|
93fe0202cb | ||
|
13f304d89e | ||
|
805c5926ff |
11
cmd/admin.go
11
cmd/admin.go
@ -161,6 +161,11 @@ var (
|
|||||||
Value: "false",
|
Value: "false",
|
||||||
Usage: "Use custom URLs for GitLab/GitHub OAuth endpoints",
|
Usage: "Use custom URLs for GitLab/GitHub OAuth endpoints",
|
||||||
},
|
},
|
||||||
|
cli.StringFlag{
|
||||||
|
Name: "custom-tenant-id",
|
||||||
|
Value: "",
|
||||||
|
Usage: "Use custom Tenant ID for OAuth endpoints",
|
||||||
|
},
|
||||||
cli.StringFlag{
|
cli.StringFlag{
|
||||||
Name: "custom-auth-url",
|
Name: "custom-auth-url",
|
||||||
Value: "",
|
Value: "",
|
||||||
@ -422,6 +427,7 @@ func parseOAuth2Config(c *cli.Context) *oauth2.Source {
|
|||||||
AuthURL: c.String("custom-auth-url"),
|
AuthURL: c.String("custom-auth-url"),
|
||||||
ProfileURL: c.String("custom-profile-url"),
|
ProfileURL: c.String("custom-profile-url"),
|
||||||
EmailURL: c.String("custom-email-url"),
|
EmailURL: c.String("custom-email-url"),
|
||||||
|
Tenant: c.String("custom-tenant-id"),
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
customURLMapping = nil
|
customURLMapping = nil
|
||||||
@ -531,6 +537,7 @@ func runUpdateOauth(c *cli.Context) error {
|
|||||||
customURLMapping.AuthURL = oAuth2Config.CustomURLMapping.AuthURL
|
customURLMapping.AuthURL = oAuth2Config.CustomURLMapping.AuthURL
|
||||||
customURLMapping.ProfileURL = oAuth2Config.CustomURLMapping.ProfileURL
|
customURLMapping.ProfileURL = oAuth2Config.CustomURLMapping.ProfileURL
|
||||||
customURLMapping.EmailURL = oAuth2Config.CustomURLMapping.EmailURL
|
customURLMapping.EmailURL = oAuth2Config.CustomURLMapping.EmailURL
|
||||||
|
customURLMapping.Tenant = oAuth2Config.CustomURLMapping.Tenant
|
||||||
}
|
}
|
||||||
if c.IsSet("use-custom-urls") && c.IsSet("custom-token-url") {
|
if c.IsSet("use-custom-urls") && c.IsSet("custom-token-url") {
|
||||||
customURLMapping.TokenURL = c.String("custom-token-url")
|
customURLMapping.TokenURL = c.String("custom-token-url")
|
||||||
@ -548,6 +555,10 @@ func runUpdateOauth(c *cli.Context) error {
|
|||||||
customURLMapping.EmailURL = c.String("custom-email-url")
|
customURLMapping.EmailURL = c.String("custom-email-url")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.IsSet("use-custom-urls") && c.IsSet("custom-tenant-id") {
|
||||||
|
customURLMapping.Tenant = c.String("custom-tenant-id")
|
||||||
|
}
|
||||||
|
|
||||||
oAuth2Config.CustomURLMapping = customURLMapping
|
oAuth2Config.CustomURLMapping = customURLMapping
|
||||||
source.Cfg = oAuth2Config
|
source.Cfg = oAuth2Config
|
||||||
|
|
||||||
|
@ -131,6 +131,7 @@ Admin operations:
|
|||||||
- `--secret`: Client Secret.
|
- `--secret`: Client Secret.
|
||||||
- `--auto-discover-url`: OpenID Connect Auto Discovery URL (only required when using OpenID Connect as provider).
|
- `--auto-discover-url`: OpenID Connect Auto Discovery URL (only required when using OpenID Connect as provider).
|
||||||
- `--use-custom-urls`: Use custom URLs for GitLab/GitHub OAuth endpoints.
|
- `--use-custom-urls`: Use custom URLs for GitLab/GitHub OAuth endpoints.
|
||||||
|
- `--custom-tenant-id`: Use custom Tenant ID for OAuth endpoints.
|
||||||
- `--custom-auth-url`: Use a custom Authorization URL (option for GitLab/GitHub).
|
- `--custom-auth-url`: Use a custom Authorization URL (option for GitLab/GitHub).
|
||||||
- `--custom-token-url`: Use a custom Token URL (option for GitLab/GitHub).
|
- `--custom-token-url`: Use a custom Token URL (option for GitLab/GitHub).
|
||||||
- `--custom-profile-url`: Use a custom Profile URL (option for GitLab/GitHub).
|
- `--custom-profile-url`: Use a custom Profile URL (option for GitLab/GitHub).
|
||||||
@ -154,6 +155,7 @@ Admin operations:
|
|||||||
- `--secret`: Client Secret.
|
- `--secret`: Client Secret.
|
||||||
- `--auto-discover-url`: OpenID Connect Auto Discovery URL (only required when using OpenID Connect as provider).
|
- `--auto-discover-url`: OpenID Connect Auto Discovery URL (only required when using OpenID Connect as provider).
|
||||||
- `--use-custom-urls`: Use custom URLs for GitLab/GitHub OAuth endpoints.
|
- `--use-custom-urls`: Use custom URLs for GitLab/GitHub OAuth endpoints.
|
||||||
|
- `--custom-tenant-id`: Use custom Tenant ID for OAuth endpoints.
|
||||||
- `--custom-auth-url`: Use a custom Authorization URL (option for GitLab/GitHub).
|
- `--custom-auth-url`: Use a custom Authorization URL (option for GitLab/GitHub).
|
||||||
- `--custom-token-url`: Use a custom Token URL (option for GitLab/GitHub).
|
- `--custom-token-url`: Use a custom Token URL (option for GitLab/GitHub).
|
||||||
- `--custom-profile-url`: Use a custom Profile URL (option for GitLab/GitHub).
|
- `--custom-profile-url`: Use a custom Profile URL (option for GitLab/GitHub).
|
||||||
|
@ -52,13 +52,16 @@ func listPullRequestStatement(baseRepoID int64, opts *PullRequestsOptions) (*xor
|
|||||||
|
|
||||||
// GetUnmergedPullRequestsByHeadInfo returns all pull requests that are open and has not been merged
|
// GetUnmergedPullRequestsByHeadInfo returns all pull requests that are open and has not been merged
|
||||||
// by given head information (repo and branch).
|
// by given head information (repo and branch).
|
||||||
func GetUnmergedPullRequestsByHeadInfo(repoID int64, branch string) ([]*PullRequest, error) {
|
// arg `includeClosed` controls whether the SQL returns closed PRs
|
||||||
|
func GetUnmergedPullRequestsByHeadInfo(repoID int64, branch string, includeClosed bool) ([]*PullRequest, error) {
|
||||||
prs := make([]*PullRequest, 0, 2)
|
prs := make([]*PullRequest, 0, 2)
|
||||||
return prs, db.GetEngine(db.DefaultContext).
|
sess := db.GetEngine(db.DefaultContext).
|
||||||
Where("head_repo_id = ? AND head_branch = ? AND has_merged = ? AND issue.is_closed = ? AND flow = ?",
|
|
||||||
repoID, branch, false, false, PullRequestFlowGithub).
|
|
||||||
Join("INNER", "issue", "issue.id = pull_request.issue_id").
|
Join("INNER", "issue", "issue.id = pull_request.issue_id").
|
||||||
Find(&prs)
|
Where("head_repo_id = ? AND head_branch = ? AND has_merged = ? AND flow = ?", repoID, branch, false, PullRequestFlowGithub)
|
||||||
|
if !includeClosed {
|
||||||
|
sess.Where("issue.is_closed = ?", false)
|
||||||
|
}
|
||||||
|
return prs, sess.Find(&prs)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CanMaintainerWriteToBranch check whether user is a maintainer and could write to the branch
|
// CanMaintainerWriteToBranch check whether user is a maintainer and could write to the branch
|
||||||
@ -71,7 +74,7 @@ func CanMaintainerWriteToBranch(p access_model.Permission, branch string, user *
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
prs, err := GetUnmergedPullRequestsByHeadInfo(p.Units[0].RepoID, branch)
|
prs, err := GetUnmergedPullRequestsByHeadInfo(p.Units[0].RepoID, branch, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -119,7 +119,7 @@ func TestHasUnmergedPullRequestsByHeadInfo(t *testing.T) {
|
|||||||
|
|
||||||
func TestGetUnmergedPullRequestsByHeadInfo(t *testing.T) {
|
func TestGetUnmergedPullRequestsByHeadInfo(t *testing.T) {
|
||||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||||
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfo(1, "branch2")
|
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfo(1, "branch2", false)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.Len(t, prs, 1)
|
assert.Len(t, prs, 1)
|
||||||
for _, pr := range prs {
|
for _, pr := range prs {
|
||||||
|
@ -125,7 +125,10 @@ func (q *ChannelQueue) Shutdown() {
|
|||||||
log.Trace("ChannelQueue: %s Flushing", q.name)
|
log.Trace("ChannelQueue: %s Flushing", q.name)
|
||||||
// We can't use Cleanup here because that will close the channel
|
// We can't use Cleanup here because that will close the channel
|
||||||
if err := q.FlushWithContext(q.terminateCtx); err != nil {
|
if err := q.FlushWithContext(q.terminateCtx); err != nil {
|
||||||
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
|
count := atomic.LoadInt64(&q.numInQueue)
|
||||||
|
if count > 0 {
|
||||||
|
log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Debug("ChannelQueue: %s Flushed", q.name)
|
log.Debug("ChannelQueue: %s Flushed", q.name)
|
||||||
|
@ -95,7 +95,8 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
|
|||||||
},
|
},
|
||||||
Workers: 0,
|
Workers: 0,
|
||||||
},
|
},
|
||||||
DataDir: config.DataDir,
|
DataDir: config.DataDir,
|
||||||
|
QueueName: config.Name + "-level",
|
||||||
}
|
}
|
||||||
|
|
||||||
levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar)
|
levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar)
|
||||||
@ -173,16 +174,18 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
|
|||||||
atShutdown(q.Shutdown)
|
atShutdown(q.Shutdown)
|
||||||
atTerminate(q.Terminate)
|
atTerminate(q.Terminate)
|
||||||
|
|
||||||
if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 {
|
if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.terminateCtx) != 0 {
|
||||||
// Just run the level queue - we shut it down once it's flushed
|
// Just run the level queue - we shut it down once it's flushed
|
||||||
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
|
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
|
||||||
go func() {
|
go func() {
|
||||||
for !q.IsEmpty() {
|
for !lq.IsEmpty() {
|
||||||
_ = q.internal.Flush(0)
|
_ = lq.Flush(0)
|
||||||
select {
|
select {
|
||||||
case <-time.After(100 * time.Millisecond):
|
case <-time.After(100 * time.Millisecond):
|
||||||
case <-q.internal.(*LevelQueue).shutdownCtx.Done():
|
case <-lq.shutdownCtx.Done():
|
||||||
log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
|
if lq.byteFIFO.Len(lq.terminateCtx) > 0 {
|
||||||
|
log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name())
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -317,10 +320,22 @@ func (q *PersistableChannelQueue) Shutdown() {
|
|||||||
// Redirect all remaining data in the chan to the internal channel
|
// Redirect all remaining data in the chan to the internal channel
|
||||||
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
|
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
|
||||||
close(q.channelQueue.dataChan)
|
close(q.channelQueue.dataChan)
|
||||||
|
countOK, countLost := 0, 0
|
||||||
for data := range q.channelQueue.dataChan {
|
for data := range q.channelQueue.dataChan {
|
||||||
_ = q.internal.Push(data)
|
err := q.internal.Push(data)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("PersistableChannelQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
|
||||||
|
countLost++
|
||||||
|
} else {
|
||||||
|
countOK++
|
||||||
|
}
|
||||||
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
|
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
|
||||||
}
|
}
|
||||||
|
if countLost > 0 {
|
||||||
|
log.Warn("PersistableChannelQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
|
||||||
|
} else if countOK > 0 {
|
||||||
|
log.Warn("PersistableChannelQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
|
||||||
|
}
|
||||||
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
|
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
|
||||||
|
|
||||||
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
|
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
|
||||||
|
@ -40,7 +40,7 @@ func TestPersistableChannelQueue(t *testing.T) {
|
|||||||
Workers: 1,
|
Workers: 1,
|
||||||
BoostWorkers: 0,
|
BoostWorkers: 0,
|
||||||
MaxWorkers: 10,
|
MaxWorkers: 10,
|
||||||
Name: "first",
|
Name: "test-queue",
|
||||||
}, &testData{})
|
}, &testData{})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
@ -136,7 +136,7 @@ func TestPersistableChannelQueue(t *testing.T) {
|
|||||||
Workers: 1,
|
Workers: 1,
|
||||||
BoostWorkers: 0,
|
BoostWorkers: 0,
|
||||||
MaxWorkers: 10,
|
MaxWorkers: 10,
|
||||||
Name: "second",
|
Name: "test-queue",
|
||||||
}, &testData{})
|
}, &testData{})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
@ -228,7 +228,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
|
|||||||
Workers: 1,
|
Workers: 1,
|
||||||
BoostWorkers: 0,
|
BoostWorkers: 0,
|
||||||
MaxWorkers: 10,
|
MaxWorkers: 10,
|
||||||
Name: "first",
|
Name: "test-queue",
|
||||||
}, &testData{})
|
}, &testData{})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
@ -434,7 +434,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
|
|||||||
Workers: 1,
|
Workers: 1,
|
||||||
BoostWorkers: 0,
|
BoostWorkers: 0,
|
||||||
MaxWorkers: 10,
|
MaxWorkers: 10,
|
||||||
Name: "second",
|
Name: "test-queue",
|
||||||
}, &testData{})
|
}, &testData{})
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
pausable, ok = queue.(Pausable)
|
pausable, ok = queue.(Pausable)
|
||||||
|
@ -178,7 +178,9 @@ func (q *ChannelUniqueQueue) Shutdown() {
|
|||||||
go func() {
|
go func() {
|
||||||
log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
|
log.Trace("ChannelUniqueQueue: %s Flushing", q.name)
|
||||||
if err := q.FlushWithContext(q.terminateCtx); err != nil {
|
if err := q.FlushWithContext(q.terminateCtx); err != nil {
|
||||||
log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
|
if !q.IsEmpty() {
|
||||||
|
log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Debug("ChannelUniqueQueue: %s Flushed", q.name)
|
log.Debug("ChannelUniqueQueue: %s Flushed", q.name)
|
||||||
|
@ -9,10 +9,13 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestChannelUniqueQueue(t *testing.T) {
|
func TestChannelUniqueQueue(t *testing.T) {
|
||||||
|
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
|
||||||
handleChan := make(chan *testData)
|
handleChan := make(chan *testData)
|
||||||
handle := func(data ...Data) []Data {
|
handle := func(data ...Data) []Data {
|
||||||
for _, datum := range data {
|
for _, datum := range data {
|
||||||
@ -53,6 +56,8 @@ func TestChannelUniqueQueue(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestChannelUniqueQueue_Batch(t *testing.T) {
|
func TestChannelUniqueQueue_Batch(t *testing.T) {
|
||||||
|
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
|
||||||
|
|
||||||
handleChan := make(chan *testData)
|
handleChan := make(chan *testData)
|
||||||
handle := func(data ...Data) []Data {
|
handle := func(data ...Data) []Data {
|
||||||
for _, datum := range data {
|
for _, datum := range data {
|
||||||
@ -99,6 +104,8 @@ func TestChannelUniqueQueue_Batch(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestChannelUniqueQueue_Pause(t *testing.T) {
|
func TestChannelUniqueQueue_Pause(t *testing.T) {
|
||||||
|
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
|
||||||
|
|
||||||
lock := sync.Mutex{}
|
lock := sync.Mutex{}
|
||||||
var queue Queue
|
var queue Queue
|
||||||
var err error
|
var err error
|
||||||
|
@ -95,7 +95,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
|
|||||||
},
|
},
|
||||||
Workers: 0,
|
Workers: 0,
|
||||||
},
|
},
|
||||||
DataDir: config.DataDir,
|
DataDir: config.DataDir,
|
||||||
|
QueueName: config.Name + "-level",
|
||||||
}
|
}
|
||||||
|
|
||||||
queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue)
|
queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue)
|
||||||
@ -210,17 +211,29 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
|
|||||||
atTerminate(q.Terminate)
|
atTerminate(q.Terminate)
|
||||||
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
|
_ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0)
|
||||||
|
|
||||||
if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 {
|
if luq, ok := q.internal.(*LevelUniqueQueue); ok && !luq.IsEmpty() {
|
||||||
// Just run the level queue - we shut it down once it's flushed
|
// Just run the level queue - we shut it down once it's flushed
|
||||||
go q.internal.Run(func(_ func()) {}, func(_ func()) {})
|
go luq.Run(func(_ func()) {}, func(_ func()) {})
|
||||||
go func() {
|
go func() {
|
||||||
_ = q.internal.Flush(0)
|
_ = luq.Flush(0)
|
||||||
log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelUniqueQueue).Name())
|
for !luq.IsEmpty() {
|
||||||
q.internal.(*LevelUniqueQueue).Shutdown()
|
_ = luq.Flush(0)
|
||||||
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
|
select {
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
case <-luq.shutdownCtx.Done():
|
||||||
|
if luq.byteFIFO.Len(luq.terminateCtx) > 0 {
|
||||||
|
log.Warn("LevelUniqueQueue: %s shut down before completely flushed", luq.Name())
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log.Debug("LevelUniqueQueue: %s flushed so shutting down", luq.Name())
|
||||||
|
luq.Shutdown()
|
||||||
|
GetManager().Remove(luq.qid)
|
||||||
}()
|
}()
|
||||||
} else {
|
} else {
|
||||||
log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
|
log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name)
|
||||||
|
_ = q.internal.Flush(0)
|
||||||
q.internal.(*LevelUniqueQueue).Shutdown()
|
q.internal.(*LevelUniqueQueue).Shutdown()
|
||||||
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
|
GetManager().Remove(q.internal.(*LevelUniqueQueue).qid)
|
||||||
}
|
}
|
||||||
@ -286,8 +299,20 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
|
|||||||
// Redirect all remaining data in the chan to the internal channel
|
// Redirect all remaining data in the chan to the internal channel
|
||||||
close(q.channelQueue.dataChan)
|
close(q.channelQueue.dataChan)
|
||||||
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
|
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
|
||||||
|
countOK, countLost := 0, 0
|
||||||
for data := range q.channelQueue.dataChan {
|
for data := range q.channelQueue.dataChan {
|
||||||
_ = q.internal.Push(data)
|
err := q.internal.(*LevelUniqueQueue).Push(data)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("PersistableChannelUniqueQueue: %s Unable redirect %v due to: %v", q.delayedStarter.name, data, err)
|
||||||
|
countLost++
|
||||||
|
} else {
|
||||||
|
countOK++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if countLost > 0 {
|
||||||
|
log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart, %d lost", q.delayedStarter.name, countOK, countLost)
|
||||||
|
} else if countOK > 0 {
|
||||||
|
log.Warn("PersistableChannelUniqueQueue: %s %d will be restored on restart", q.delayedStarter.name, countOK)
|
||||||
}
|
}
|
||||||
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
|
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
|
||||||
|
|
||||||
|
259
modules/queue/unique_queue_disk_channel_test.go
Normal file
259
modules/queue/unique_queue_disk_channel_test.go
Normal file
@ -0,0 +1,259 @@
|
|||||||
|
// Copyright 2023 The Gitea Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
package queue
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPersistableChannelUniqueQueue(t *testing.T) {
|
||||||
|
tmpDir := t.TempDir()
|
||||||
|
fmt.Printf("TempDir %s\n", tmpDir)
|
||||||
|
_ = log.NewLogger(1000, "console", "console", `{"level":"warn","stacktracelevel":"NONE","stderr":true}`)
|
||||||
|
|
||||||
|
// Common function to create the Queue
|
||||||
|
newQueue := func(name string, handle func(data ...Data) []Data) Queue {
|
||||||
|
q, err := NewPersistableChannelUniqueQueue(handle,
|
||||||
|
PersistableChannelUniqueQueueConfiguration{
|
||||||
|
Name: name,
|
||||||
|
DataDir: tmpDir,
|
||||||
|
QueueLength: 200,
|
||||||
|
MaxWorkers: 1,
|
||||||
|
BlockTimeout: 1 * time.Second,
|
||||||
|
BoostTimeout: 5 * time.Minute,
|
||||||
|
BoostWorkers: 1,
|
||||||
|
Workers: 0,
|
||||||
|
}, "task-0")
|
||||||
|
assert.NoError(t, err)
|
||||||
|
return q
|
||||||
|
}
|
||||||
|
|
||||||
|
// runs the provided queue and provides some timer function
|
||||||
|
type channels struct {
|
||||||
|
readyForShutdown chan struct{} // closed when shutdown functions have been assigned
|
||||||
|
readyForTerminate chan struct{} // closed when terminate functions have been assigned
|
||||||
|
signalShutdown chan struct{} // Should close to signal shutdown
|
||||||
|
doneShutdown chan struct{} // closed when shutdown function is done
|
||||||
|
queueTerminate []func() // list of atTerminate functions to call atTerminate - need to be accessed with lock
|
||||||
|
}
|
||||||
|
runQueue := func(q Queue, lock *sync.Mutex) *channels {
|
||||||
|
chans := &channels{
|
||||||
|
readyForShutdown: make(chan struct{}),
|
||||||
|
readyForTerminate: make(chan struct{}),
|
||||||
|
signalShutdown: make(chan struct{}),
|
||||||
|
doneShutdown: make(chan struct{}),
|
||||||
|
}
|
||||||
|
go q.Run(func(atShutdown func()) {
|
||||||
|
go func() {
|
||||||
|
lock.Lock()
|
||||||
|
select {
|
||||||
|
case <-chans.readyForShutdown:
|
||||||
|
default:
|
||||||
|
close(chans.readyForShutdown)
|
||||||
|
}
|
||||||
|
lock.Unlock()
|
||||||
|
<-chans.signalShutdown
|
||||||
|
atShutdown()
|
||||||
|
close(chans.doneShutdown)
|
||||||
|
}()
|
||||||
|
}, func(atTerminate func()) {
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
select {
|
||||||
|
case <-chans.readyForTerminate:
|
||||||
|
default:
|
||||||
|
close(chans.readyForTerminate)
|
||||||
|
}
|
||||||
|
chans.queueTerminate = append(chans.queueTerminate, atTerminate)
|
||||||
|
})
|
||||||
|
|
||||||
|
return chans
|
||||||
|
}
|
||||||
|
|
||||||
|
// call to shutdown and terminate the queue associated with the channels
|
||||||
|
doTerminate := func(chans *channels, lock *sync.Mutex) {
|
||||||
|
<-chans.readyForTerminate
|
||||||
|
|
||||||
|
lock.Lock()
|
||||||
|
callbacks := []func(){}
|
||||||
|
callbacks = append(callbacks, chans.queueTerminate...)
|
||||||
|
lock.Unlock()
|
||||||
|
|
||||||
|
for _, callback := range callbacks {
|
||||||
|
callback()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mapLock := sync.Mutex{}
|
||||||
|
executedInitial := map[string][]string{}
|
||||||
|
hasInitial := map[string][]string{}
|
||||||
|
|
||||||
|
fillQueue := func(name string, done chan struct{}) {
|
||||||
|
t.Run("Initial Filling: "+name, func(t *testing.T) {
|
||||||
|
lock := sync.Mutex{}
|
||||||
|
|
||||||
|
startAt100Queued := make(chan struct{})
|
||||||
|
stopAt20Shutdown := make(chan struct{}) // stop and shutdown at the 20th item
|
||||||
|
|
||||||
|
handle := func(data ...Data) []Data {
|
||||||
|
<-startAt100Queued
|
||||||
|
for _, datum := range data {
|
||||||
|
s := datum.(string)
|
||||||
|
mapLock.Lock()
|
||||||
|
executedInitial[name] = append(executedInitial[name], s)
|
||||||
|
mapLock.Unlock()
|
||||||
|
if s == "task-20" {
|
||||||
|
close(stopAt20Shutdown)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
q := newQueue(name, handle)
|
||||||
|
|
||||||
|
// add 100 tasks to the queue
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
_ = q.Push("task-" + strconv.Itoa(i))
|
||||||
|
}
|
||||||
|
close(startAt100Queued)
|
||||||
|
|
||||||
|
chans := runQueue(q, &lock)
|
||||||
|
|
||||||
|
<-chans.readyForShutdown
|
||||||
|
<-stopAt20Shutdown
|
||||||
|
close(chans.signalShutdown)
|
||||||
|
<-chans.doneShutdown
|
||||||
|
_ = q.Push("final")
|
||||||
|
|
||||||
|
// check which tasks are still in the queue
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
|
||||||
|
mapLock.Lock()
|
||||||
|
hasInitial[name] = append(hasInitial[name], "task-"+strconv.Itoa(i))
|
||||||
|
mapLock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if has, _ := q.(UniqueQueue).Has("final"); has {
|
||||||
|
mapLock.Lock()
|
||||||
|
hasInitial[name] = append(hasInitial[name], "final")
|
||||||
|
mapLock.Unlock()
|
||||||
|
} else {
|
||||||
|
assert.Fail(t, "UnqueQueue %s should have \"final\"", name)
|
||||||
|
}
|
||||||
|
doTerminate(chans, &lock)
|
||||||
|
mapLock.Lock()
|
||||||
|
assert.Equal(t, 101, len(executedInitial[name])+len(hasInitial[name]))
|
||||||
|
mapLock.Unlock()
|
||||||
|
})
|
||||||
|
close(done)
|
||||||
|
}
|
||||||
|
|
||||||
|
doneA := make(chan struct{})
|
||||||
|
doneB := make(chan struct{})
|
||||||
|
|
||||||
|
go fillQueue("QueueA", doneA)
|
||||||
|
go fillQueue("QueueB", doneB)
|
||||||
|
|
||||||
|
<-doneA
|
||||||
|
<-doneB
|
||||||
|
|
||||||
|
executedEmpty := map[string][]string{}
|
||||||
|
hasEmpty := map[string][]string{}
|
||||||
|
emptyQueue := func(name string, done chan struct{}) {
|
||||||
|
t.Run("Empty Queue: "+name, func(t *testing.T) {
|
||||||
|
lock := sync.Mutex{}
|
||||||
|
stop := make(chan struct{})
|
||||||
|
|
||||||
|
// collect the tasks that have been executed
|
||||||
|
handle := func(data ...Data) []Data {
|
||||||
|
lock.Lock()
|
||||||
|
for _, datum := range data {
|
||||||
|
mapLock.Lock()
|
||||||
|
executedEmpty[name] = append(executedEmpty[name], datum.(string))
|
||||||
|
mapLock.Unlock()
|
||||||
|
if datum.(string) == "final" {
|
||||||
|
close(stop)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lock.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
q := newQueue(name, handle)
|
||||||
|
chans := runQueue(q, &lock)
|
||||||
|
|
||||||
|
<-chans.readyForShutdown
|
||||||
|
<-stop
|
||||||
|
close(chans.signalShutdown)
|
||||||
|
<-chans.doneShutdown
|
||||||
|
|
||||||
|
// check which tasks are still in the queue
|
||||||
|
for i := 0; i < 100; i++ {
|
||||||
|
if has, _ := q.(UniqueQueue).Has("task-" + strconv.Itoa(i)); has {
|
||||||
|
mapLock.Lock()
|
||||||
|
hasEmpty[name] = append(hasEmpty[name], "task-"+strconv.Itoa(i))
|
||||||
|
mapLock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
doTerminate(chans, &lock)
|
||||||
|
|
||||||
|
mapLock.Lock()
|
||||||
|
assert.Equal(t, 101, len(executedInitial[name])+len(executedEmpty[name]))
|
||||||
|
assert.Equal(t, 0, len(hasEmpty[name]))
|
||||||
|
mapLock.Unlock()
|
||||||
|
})
|
||||||
|
close(done)
|
||||||
|
}
|
||||||
|
|
||||||
|
doneA = make(chan struct{})
|
||||||
|
doneB = make(chan struct{})
|
||||||
|
|
||||||
|
go emptyQueue("QueueA", doneA)
|
||||||
|
go emptyQueue("QueueB", doneB)
|
||||||
|
|
||||||
|
<-doneA
|
||||||
|
<-doneB
|
||||||
|
|
||||||
|
mapLock.Lock()
|
||||||
|
t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v",
|
||||||
|
len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"]))
|
||||||
|
|
||||||
|
// reset and rerun
|
||||||
|
executedInitial = map[string][]string{}
|
||||||
|
hasInitial = map[string][]string{}
|
||||||
|
executedEmpty = map[string][]string{}
|
||||||
|
hasEmpty = map[string][]string{}
|
||||||
|
mapLock.Unlock()
|
||||||
|
|
||||||
|
doneA = make(chan struct{})
|
||||||
|
doneB = make(chan struct{})
|
||||||
|
|
||||||
|
go fillQueue("QueueA", doneA)
|
||||||
|
go fillQueue("QueueB", doneB)
|
||||||
|
|
||||||
|
<-doneA
|
||||||
|
<-doneB
|
||||||
|
|
||||||
|
doneA = make(chan struct{})
|
||||||
|
doneB = make(chan struct{})
|
||||||
|
|
||||||
|
go emptyQueue("QueueA", doneA)
|
||||||
|
go emptyQueue("QueueB", doneB)
|
||||||
|
|
||||||
|
<-doneA
|
||||||
|
<-doneB
|
||||||
|
|
||||||
|
mapLock.Lock()
|
||||||
|
t.Logf("TestPersistableChannelUniqueQueue executedInitiallyA=%v, executedInitiallyB=%v, executedToEmptyA=%v, executedToEmptyB=%v",
|
||||||
|
len(executedInitial["QueueA"]), len(executedInitial["QueueB"]), len(executedEmpty["QueueA"]), len(executedEmpty["QueueB"]))
|
||||||
|
mapLock.Unlock()
|
||||||
|
}
|
@ -1375,11 +1375,12 @@ func ViewIssue(ctx *context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
role issues_model.RoleDescriptor
|
role issues_model.RoleDescriptor
|
||||||
ok bool
|
ok bool
|
||||||
marked = make(map[int64]issues_model.RoleDescriptor)
|
marked = make(map[int64]issues_model.RoleDescriptor)
|
||||||
comment *issues_model.Comment
|
comment *issues_model.Comment
|
||||||
participants = make([]*user_model.User, 1, 10)
|
participants = make([]*user_model.User, 1, 10)
|
||||||
|
latestCloseCommentID int64
|
||||||
)
|
)
|
||||||
if ctx.Repo.Repository.IsTimetrackerEnabled() {
|
if ctx.Repo.Repository.IsTimetrackerEnabled() {
|
||||||
if ctx.IsSigned {
|
if ctx.IsSigned {
|
||||||
@ -1586,9 +1587,15 @@ func ViewIssue(ctx *context.Context) {
|
|||||||
comment.Type == issues_model.CommentTypeStopTracking {
|
comment.Type == issues_model.CommentTypeStopTracking {
|
||||||
// drop error since times could be pruned from DB..
|
// drop error since times could be pruned from DB..
|
||||||
_ = comment.LoadTime()
|
_ = comment.LoadTime()
|
||||||
|
} else if comment.Type == issues_model.CommentTypeClose {
|
||||||
|
// record ID of latest closed comment.
|
||||||
|
// if PR is closed, the comments whose type is CommentTypePullRequestPush(29) after latestCloseCommentID won't be rendered.
|
||||||
|
latestCloseCommentID = comment.ID
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx.Data["LatestCloseCommentID"] = latestCloseCommentID
|
||||||
|
|
||||||
// Combine multiple label assignments into a single comment
|
// Combine multiple label assignments into a single comment
|
||||||
combineLabelComments(issue)
|
combineLabelComments(issue)
|
||||||
|
|
||||||
|
@ -574,7 +574,7 @@ func PrepareViewPullInfo(ctx *context.Context, issue *issues_model.Issue) *git.C
|
|||||||
ctx.Data["HeadBranchCommitID"] = headBranchSha
|
ctx.Data["HeadBranchCommitID"] = headBranchSha
|
||||||
ctx.Data["PullHeadCommitID"] = sha
|
ctx.Data["PullHeadCommitID"] = sha
|
||||||
|
|
||||||
if pull.HeadRepo == nil || !headBranchExist || headBranchSha != sha {
|
if pull.HeadRepo == nil || !headBranchExist || (!pull.Issue.IsClosed && (headBranchSha != sha)) {
|
||||||
ctx.Data["IsPullRequestBroken"] = true
|
ctx.Data["IsPullRequestBroken"] = true
|
||||||
if pull.IsSameRepo() {
|
if pull.IsSameRepo() {
|
||||||
ctx.Data["HeadTarget"] = pull.HeadBranch
|
ctx.Data["HeadTarget"] = pull.HeadBranch
|
||||||
|
@ -258,7 +258,7 @@ func AddTestPullRequestTask(doer *user_model.User, repoID int64, branch string,
|
|||||||
// If you don't let it run all the way then you will lose data
|
// If you don't let it run all the way then you will lose data
|
||||||
// TODO: graceful: AddTestPullRequestTask needs to become a queue!
|
// TODO: graceful: AddTestPullRequestTask needs to become a queue!
|
||||||
|
|
||||||
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfo(repoID, branch)
|
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfo(repoID, branch, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err)
|
log.Error("Find pull requests [head_repo_id: %d, head_branch: %s]: %v", repoID, branch, err)
|
||||||
return
|
return
|
||||||
@ -502,7 +502,7 @@ func (errs errlist) Error() string {
|
|||||||
|
|
||||||
// CloseBranchPulls close all the pull requests who's head branch is the branch
|
// CloseBranchPulls close all the pull requests who's head branch is the branch
|
||||||
func CloseBranchPulls(doer *user_model.User, repoID int64, branch string) error {
|
func CloseBranchPulls(doer *user_model.User, repoID int64, branch string) error {
|
||||||
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfo(repoID, branch)
|
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfo(repoID, branch, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -538,7 +538,7 @@ func CloseRepoBranchesPulls(ctx context.Context, doer *user_model.User, repo *re
|
|||||||
|
|
||||||
var errs errlist
|
var errs errlist
|
||||||
for _, branch := range branches {
|
for _, branch := range branches {
|
||||||
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfo(repo.ID, branch.Name)
|
prs, err := issues_model.GetUnmergedPullRequestsByHeadInfo(repo.ID, branch.Name, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -697,6 +697,10 @@
|
|||||||
</span>
|
</span>
|
||||||
</div>
|
</div>
|
||||||
{{else if and (eq .Type 29) (or (gt .CommitsNum 0) .IsForcePush)}}
|
{{else if and (eq .Type 29) (or (gt .CommitsNum 0) .IsForcePush)}}
|
||||||
|
<!-- If PR is closed, the comments whose type is CommentTypePullRequestPush(29) after latestCloseCommentID won't be rendered. //-->
|
||||||
|
{{if and .Issue.IsClosed (gt .ID $.LatestCloseCommentID)}}
|
||||||
|
{{continue}}
|
||||||
|
{{end}}
|
||||||
<div class="timeline-item event" id="{{.HashTag}}">
|
<div class="timeline-item event" id="{{.HashTag}}">
|
||||||
<span class="badge">{{svg "octicon-repo-push"}}</span>
|
<span class="badge">{{svg "octicon-repo-push"}}</span>
|
||||||
<span class="text grey muted-links">
|
<span class="text grey muted-links">
|
||||||
|
@ -31,6 +31,7 @@ export default function initContextPopups() {
|
|||||||
createTippy(this, {
|
createTippy(this, {
|
||||||
content: el,
|
content: el,
|
||||||
interactive: true,
|
interactive: true,
|
||||||
|
interactiveBorder: 5,
|
||||||
onShow: () => {
|
onShow: () => {
|
||||||
el.firstChild.dispatchEvent(new CustomEvent('us-load-context-popup', {detail: {owner, repo, index}}));
|
el.firstChild.dispatchEvent(new CustomEvent('us-load-context-popup', {detail: {owner, repo, index}}));
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user