Commit efcf405f authored by zauberstuhl's avatar zauberstuhl

Merge branch 'retry_dispatch_feature' into 'master'

Implement retrying dispatcher requests

See merge request !78
parents 0c021112 4cbfe567
Pipeline #955 passed with stages
in 15 minutes and 57 seconds
......@@ -23,6 +23,7 @@ import (
"git.feneas.org/ganggo/ganggo/app/models"
federation "git.feneas.org/ganggo/federation"
"git.feneas.org/ganggo/federation/helpers"
run "github.com/revel/modules/jobs/app/jobs"
)
func (dispatcher *Dispatcher) Comment(comment models.Comment) {
......@@ -94,11 +95,12 @@ func (dispatcher *Dispatcher) Comment(comment models.Comment) {
entity = post
}
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher Comment", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(RetryOnFail{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
}
......@@ -22,6 +22,7 @@ import (
"git.feneas.org/ganggo/ganggo/app/models"
fhelpers "git.feneas.org/ganggo/federation/helpers"
federation "git.feneas.org/ganggo/federation"
run "github.com/revel/modules/jobs/app/jobs"
)
func (dispatcher *Dispatcher) Contact(contact models.AspectMembership) {
......@@ -57,9 +58,11 @@ func (dispatcher *Dispatcher) Contact(contact models.AspectMembership) {
entity.SetRecipient(person.Author)
entity.SetSharing(true)
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher Contact", err.Error(), err)
return
}
// send and retry if it fails the first time
run.Now(RetryOnFail{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
......@@ -23,6 +23,7 @@ import (
"git.feneas.org/ganggo/ganggo/app/models"
federation "git.feneas.org/ganggo/federation"
"git.feneas.org/ganggo/federation/helpers"
run "github.com/revel/modules/jobs/app/jobs"
)
func (dispatcher *Dispatcher) Like(like models.Like) {
......@@ -85,10 +86,12 @@ func (dispatcher *Dispatcher) Like(like models.Like) {
entity = post
}
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher Like", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(RetryOnFail{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
}
......@@ -23,6 +23,7 @@ import (
"git.feneas.org/ganggo/ganggo/app/models"
"git.feneas.org/ganggo/federation"
fhelpers "git.feneas.org/ganggo/federation/helpers"
run "github.com/revel/modules/jobs/app/jobs"
)
func (dispatcher *Dispatcher) StatusMessage(post models.Post) {
......@@ -108,10 +109,12 @@ func (dispatcher *Dispatcher) StatusMessage(post models.Post) {
panic("Something went wrong!")
}
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher Reshare", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(RetryOnFail{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
}
......@@ -88,6 +88,10 @@ func (dispatcher *Dispatcher) findRecipients(parentPost *models.Post, parentUser
revel.AppLog.Error("Dispatcher findRecipients", err.Error(), err)
continue
}
// skip if not reachable
if !person.Pod.Alive {
continue
}
persons = append(persons, person)
}
return persons, nil
......@@ -95,14 +99,21 @@ func (dispatcher *Dispatcher) findRecipients(parentPost *models.Post, parentUser
} else if parentPost != nil {
// it is not local just send it to
// the remote server it should handle the rest
var persons = []models.Person{parentPost.Person}
var persons = []models.Person{}
// skip if not reachable
if parentPost.Person.Pod.Alive {
persons = append(persons, parentPost.Person)
}
if !parentPost.Public {
// in case of AP we will fetch known visibilties as well
var visibilities models.Visibilities
err := visibilities.FindByPost(*parentPost)
if err == nil {
for _, visibility := range visibilities {
persons = append(persons, visibility.Person)
// skip if not reachable
if visibility.Person.Pod.Alive {
persons = append(persons, visibility.Person)
}
}
} else {
revel.AppLog.Error("Dispatcher findRecipients", err.Error(), err)
......@@ -128,7 +139,10 @@ func (dispatcher *Dispatcher) findPublicEndpoints() (persons []models.Person, er
if err != nil {
continue
}
persons = append(persons, person)
// skip if not reachable
if person.Pod.Alive {
persons = append(persons, person)
}
}
return
}
......@@ -52,6 +52,16 @@ func (receiver Receiver) Run() {
return
}
// if the pod was deactivated and we receive a
// new valid message we can set him active again
if person.Pod.ID > 0 && !person.Pod.Alive {
person.Pod.Alive = true
err = person.Pod.Save()
if err != nil {
revel.AppLog.Error("Dispatcher", "error", err, "pod", person.Pod)
}
}
switch entity := base.(type) {
case federation.MessageContact:
revel.AppLog.Debug("Starting contact receiver")
......
......@@ -18,14 +18,15 @@ package jobs
//
import (
"fmt"
"time"
"crypto/rsa"
"github.com/revel/revel"
"git.feneas.org/ganggo/ganggo/app/models"
federation "git.feneas.org/ganggo/federation"
run "github.com/revel/modules/jobs/app/jobs"
"git.feneas.org/ganggo/federation/helpers"
"fmt"
"github.com/microcosm-cc/bluemonday"
"time"
)
func (dispatcher *Dispatcher) RelayComment(entity federation.MessageComment) {
......@@ -116,20 +117,24 @@ func (dispatcher *Dispatcher) RelayComment(entity federation.MessageComment) {
revel.AppLog.Error("Dispatcher RelayComment", err.Error(), err)
continue
}
err = translate.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher RelayComment", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(RetryOnFail{
Pod: &person.Pod,
Send: func() error {
return translate.Send(endpoint, priv, pub)
},
})
} else {
// required for a valid envelope signature
entity.SetAuthor(parentUser.Person.Author)
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher RelayComment", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(RetryOnFail{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
}
}
......@@ -22,6 +22,7 @@ import (
"github.com/revel/revel"
"git.feneas.org/ganggo/ganggo/app/models"
federation "git.feneas.org/ganggo/federation"
run "github.com/revel/modules/jobs/app/jobs"
"git.feneas.org/ganggo/federation/helpers"
)
......@@ -70,10 +71,12 @@ func (dispatcher *Dispatcher) RelayLike(entity federation.MessageLike) {
// required for a valid envelope signature
entity.SetAuthor(parentUser.Person.Author)
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher RelayLike", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(RetryOnFail{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
}
......@@ -22,6 +22,7 @@ import (
"github.com/revel/revel"
"git.feneas.org/ganggo/ganggo/app/models"
federation "git.feneas.org/ganggo/federation"
run "github.com/revel/modules/jobs/app/jobs"
"git.feneas.org/ganggo/federation/helpers"
)
......@@ -130,10 +131,12 @@ func (dispatcher *Dispatcher) RelayRetraction(entity federation.MessageRetract)
endpoint = person.Pod.Inbox
}
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher RelayRetraction", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(RetryOnFail{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
}
package jobs
//
// GangGo Application Server
// Copyright (C) 2018 Lukas Matt <lukas@zauberstuhl.de>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
import (
"github.com/revel/revel"
"git.feneas.org/ganggo/ganggo/app/models"
run "github.com/revel/modules/jobs/app/jobs"
"time"
)
type RetryOnFail struct {
Pod *models.Pod
Send func() error
After []time.Duration
firstRun bool
}
func (retry RetryOnFail) Run() {
// set default values on first run
if !retry.firstRun {
retry.firstRun = true
if len(retry.After) == 0 {
retry.After = append(retry.After, []time.Duration{
time.Minute, time.Hour, 24 * time.Hour,
}...)
}
}
// execute job and check on errors
err := retry.Send()
if err != nil {
if len(retry.After) > 0 {
// repeat until timeout (empty array)
duration := retry.After[0]
retry.After = retry.After[1:]
// repeat until timeout (empty array)
revel.AppLog.Warn("Jobs Retry", "waitfor", duration, "error", err)
run.In(duration, retry)
} else {
// this server is probably down. skip it..
revel.AppLog.Error("Jobs Retry", "error", err)
if retry.Pod != nil {
retry.Pod.Alive = false
err = retry.Pod.Save()
if err != nil {
revel.AppLog.Error("Jobs Retry", "error", err)
}
}
}
}
}
......@@ -31,6 +31,7 @@ type Pod struct {
// cause asumming we use utf8mb 4*191 = 764 < 767
Host string `gorm:"size:191" json:"host"`
Inbox string `gorm:"size:191"`
Alive bool `gorm:"default:true"`
Protocol federation.Protocol
}
......
......@@ -23,6 +23,7 @@ import (
"git.feneas.org/ganggo/ganggo/app/models"
"git.feneas.org/ganggo/gorm"
"time"
"errors"
)
type JobsTest struct {
......@@ -137,3 +138,48 @@ func (t *JobsTest) TestTelegramReceiver() {
t.Assertf(len(tgReceiverTests) == 4,
"Expected four entries, got %d", len(tgReceiverTests))
}
func (t *JobsTest) TestRetryOnFail() {
tests := []struct {
After []time.Duration
Expected int
}{
{
After: []time.Duration{
time.Second, time.Second, time.Second,
},
Expected: 3,
},
{
After: []time.Duration{
time.Second, time.Second,
},
Expected: 2,
},
{
After: []time.Duration{
time.Second, time.Second, time.Second, time.Second,
},
Expected: 4,
},
}
for i, test := range tests {
var triggered int = 0
retry := jobs.RetryOnFail{
Send: func() error {
triggered += 1
return errors.New("testing RetryOnFail")
},
After: test.After,
}
retry.Run()
// wait for async task
duration := time.Duration(test.Expected) * time.Second
time.Sleep(duration)
t.Assertf(triggered == test.Expected,
"#%d: Expected %d tries, got %d", i, test.Expected, triggered)
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment