Commit fdb8eb73 authored by zauberstuhl's avatar zauberstuhl

Implement retrying dispatcher requests

If the first request fails it will retry
after a minute, hour, one day. If it continues
to fail after one day it will mark the server
as not reachable in the database.

As soon as the localhost receives a valid
message again it will mark the server as
reachable and the cycle starts anew
parent 0c021112
Pipeline #570 passed with stages
in 10 minutes and 56 seconds
......@@ -23,6 +23,7 @@ import (
"git.feneas.org/ganggo/ganggo/app/models"
federation "git.feneas.org/ganggo/federation"
"git.feneas.org/ganggo/federation/helpers"
run "github.com/revel/modules/jobs/app/jobs"
)
func (dispatcher *Dispatcher) Comment(comment models.Comment) {
......@@ -94,11 +95,12 @@ func (dispatcher *Dispatcher) Comment(comment models.Comment) {
entity = post
}
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher Comment", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(Retry{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
}
......@@ -22,6 +22,7 @@ import (
"git.feneas.org/ganggo/ganggo/app/models"
fhelpers "git.feneas.org/ganggo/federation/helpers"
federation "git.feneas.org/ganggo/federation"
run "github.com/revel/modules/jobs/app/jobs"
)
func (dispatcher *Dispatcher) Contact(contact models.AspectMembership) {
......@@ -57,9 +58,11 @@ func (dispatcher *Dispatcher) Contact(contact models.AspectMembership) {
entity.SetRecipient(person.Author)
entity.SetSharing(true)
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher Contact", err.Error(), err)
return
}
// send and retry if it fails the first time
run.Now(Retry{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
......@@ -23,6 +23,7 @@ import (
"git.feneas.org/ganggo/ganggo/app/models"
federation "git.feneas.org/ganggo/federation"
"git.feneas.org/ganggo/federation/helpers"
run "github.com/revel/modules/jobs/app/jobs"
)
func (dispatcher *Dispatcher) Like(like models.Like) {
......@@ -85,10 +86,12 @@ func (dispatcher *Dispatcher) Like(like models.Like) {
entity = post
}
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher Like", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(Retry{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
}
......@@ -23,6 +23,7 @@ import (
"git.feneas.org/ganggo/ganggo/app/models"
"git.feneas.org/ganggo/federation"
fhelpers "git.feneas.org/ganggo/federation/helpers"
run "github.com/revel/modules/jobs/app/jobs"
)
func (dispatcher *Dispatcher) StatusMessage(post models.Post) {
......@@ -108,10 +109,12 @@ func (dispatcher *Dispatcher) StatusMessage(post models.Post) {
panic("Something went wrong!")
}
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher Reshare", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(Retry{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
}
......@@ -88,6 +88,10 @@ func (dispatcher *Dispatcher) findRecipients(parentPost *models.Post, parentUser
revel.AppLog.Error("Dispatcher findRecipients", err.Error(), err)
continue
}
// skip if not reachable
if !person.Pod.Alive {
continue
}
persons = append(persons, person)
}
return persons, nil
......@@ -95,14 +99,21 @@ func (dispatcher *Dispatcher) findRecipients(parentPost *models.Post, parentUser
} else if parentPost != nil {
// it is not local just send it to
// the remote server it should handle the rest
var persons = []models.Person{parentPost.Person}
var persons = []models.Person{}
// skip if not reachable
if parentPost.Person.Pod.Alive {
persons = append(persons, parentPost.Person)
}
if !parentPost.Public {
// in case of AP we will fetch known visibilties as well
var visibilities models.Visibilities
err := visibilities.FindByPost(*parentPost)
if err == nil {
for _, visibility := range visibilities {
persons = append(persons, visibility.Person)
// skip if not reachable
if visibility.Person.Pod.Alive {
persons = append(persons, visibility.Person)
}
}
} else {
revel.AppLog.Error("Dispatcher findRecipients", err.Error(), err)
......@@ -128,7 +139,10 @@ func (dispatcher *Dispatcher) findPublicEndpoints() (persons []models.Person, er
if err != nil {
continue
}
persons = append(persons, person)
// skip if not reachable
if person.Pod.Alive {
persons = append(persons, person)
}
}
return
}
......@@ -52,6 +52,16 @@ func (receiver Receiver) Run() {
return
}
// if the pod was deactivated and we receive a
// new valid message we can set him active again
if person.Pod.ID > 0 && !person.Pod.Alive {
person.Pod.Alive = true
err = person.Pod.Save()
if err != nil {
revel.AppLog.Error("Dispatcher", "error", err, "pod", person.Pod)
Please register or sign in to reply
}
}
switch entity := base.(type) {
case federation.MessageContact:
revel.AppLog.Debug("Starting contact receiver")
......
......@@ -18,14 +18,15 @@ package jobs
//
import (
"fmt"
"time"
"crypto/rsa"
"github.com/revel/revel"
"git.feneas.org/ganggo/ganggo/app/models"
federation "git.feneas.org/ganggo/federation"
run "github.com/revel/modules/jobs/app/jobs"
"git.feneas.org/ganggo/federation/helpers"
"fmt"
"github.com/microcosm-cc/bluemonday"
"time"
)
func (dispatcher *Dispatcher) RelayComment(entity federation.MessageComment) {
......@@ -116,20 +117,24 @@ func (dispatcher *Dispatcher) RelayComment(entity federation.MessageComment) {
revel.AppLog.Error("Dispatcher RelayComment", err.Error(), err)
continue
}
err = translate.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher RelayComment", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(Retry{
Pod: &person.Pod,
Send: func() error {
return translate.Send(endpoint, priv, pub)
},
})
} else {
// required for a valid envelope signature
entity.SetAuthor(parentUser.Person.Author)
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher RelayComment", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(Retry{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
}
}
......@@ -22,6 +22,7 @@ import (
"github.com/revel/revel"
"git.feneas.org/ganggo/ganggo/app/models"
federation "git.feneas.org/ganggo/federation"
run "github.com/revel/modules/jobs/app/jobs"
"git.feneas.org/ganggo/federation/helpers"
)
......@@ -70,10 +71,12 @@ func (dispatcher *Dispatcher) RelayLike(entity federation.MessageLike) {
// required for a valid envelope signature
entity.SetAuthor(parentUser.Person.Author)
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher RelayLike", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(Retry{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
}
......@@ -22,6 +22,7 @@ import (
"github.com/revel/revel"
"git.feneas.org/ganggo/ganggo/app/models"
federation "git.feneas.org/ganggo/federation"
run "github.com/revel/modules/jobs/app/jobs"
"git.feneas.org/ganggo/federation/helpers"
)
......@@ -130,10 +131,12 @@ func (dispatcher *Dispatcher) RelayRetraction(entity federation.MessageRetract)
endpoint = person.Pod.Inbox
}
err = entity.Send(endpoint, priv, pub)
if err != nil {
revel.AppLog.Error("Dispatcher RelayRetraction", err.Error(), err)
continue
}
// send and retry if it fails the first time
run.Now(Retry{
Pod: &person.Pod,
Send: func() error {
return entity.Send(endpoint, priv, pub)
},
})
}
}
package jobs
//
// GangGo Application Server
// Copyright (C) 2018 Lukas Matt <lukas@zauberstuhl.de>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//
import (
"github.com/revel/revel"
"git.feneas.org/ganggo/ganggo/app/models"
run "github.com/revel/modules/jobs/app/jobs"
"time"
)
type Retry struct {
Pod *models.Pod
Send func() error
wait time.Duration
}
func (retry Retry) Run() {
err := retry.Send()
if err != nil {
if retry.wait == 0 {
retry.wait = time.Minute
} else if retry.wait == time.Minute {
retry.wait = time.Hour
} else if retry.wait == time.Hour {
retry.wait = 24 * time.Hour
} else {
// this server is probably down. skip it..
revel.AppLog.Error("Jobs Retry", "error", err)
if retry.Pod != nil {
retry.Pod.Alive = false
err = retry.Pod.Save()
if err != nil {
revel.AppLog.Error("Jobs Retry", "error", err)
}
}
return
}
revel.AppLog.Warn("Jobs Retry", "waitfor", retry.wait, "error", err)
// repeat until timeout
run.In(retry.wait, retry)
}
}
......@@ -31,6 +31,7 @@ type Pod struct {
// cause asumming we use utf8mb 4*191 = 764 < 767
Host string `gorm:"size:191" json:"host"`
Inbox string `gorm:"size:191"`
Alive bool `gorm:"default:true"`
Protocol federation.Protocol
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment