Reconnect to XMPP automatically with backoff.

change-name
Robert Jacob 5 years ago
parent 8f4ce84bb0
commit 175bc9663c
  1. 5
      cmd/spaceapi-server/main.go
  2. 212
      xmpp/xmpp.go

@ -75,9 +75,12 @@ func main() {
}
if c.XMPPJid != "" {
if err := xmpp.AddXMPPListener(storage, c.XMPPJid, c.XMPPPassword, c.XMPPTarget, c.XMPPHandle); err != nil {
xmppListener, err := xmpp.Listener(storage, c.XMPPJid, c.XMPPPassword, c.XMPPTarget, c.XMPPHandle)
if err != nil {
log.Fatalf("Error creating XMPP listener: %s", err)
}
storage.AddListener(xmppListener)
}
mux := web.CreateMux(storage, c.WebPrefix)

@ -3,10 +3,10 @@ package xmpp
import (
"crypto/tls"
"fmt"
"io"
"log"
"net"
"strings"
"time"
"git.hacknology.de/projekte/spaceapi"
"github.com/mattn/go-xmpp"
@ -32,10 +32,11 @@ Gern geschehen.`
stateClosed = "Space ist zu."
)
func AddXMPPListener(storage *spaceapi.Storage, jid, password, target, handle string) error {
// Listener returns a storage listener that broadcasts changes in the state to an XMPP MUC.
func Listener(storage *spaceapi.Storage, jid, password, target, handle string) (spaceapi.Listener, error) {
xmppHost, err := lookupHost(jid)
if err != nil {
return fmt.Errorf("can not find SRV record: %s", err)
return nil, fmt.Errorf("can not find SRV record: %s", err)
}
clientOpts := xmpp.Options{
@ -50,99 +51,160 @@ func AddXMPPListener(storage *spaceapi.Storage, jid, password, target, handle st
},
}
log.Printf("Connecting to %s as %s", xmppHost, jid)
sendChan := make(chan string, 1)
go clientLoop(storage, sendChan, clientOpts, target, handle)
return func(old, new spaceapi.SpaceStatus) {
if new.State.Open == nil {
return
}
if old.State.Open == new.State.Open {
return
}
msg := "Space ist jetzt OFFEN!"
if !*new.State.Open {
msg = "Space ist jetzt ZU!"
}
sendChan <- msg
}, nil
}
const (
defaultClientPort = 5222
backOffFactor = 1.2
maxReconnectWait = 5 * time.Minute
)
func clientLoop(storage *spaceapi.Storage, sendChan <-chan string, clientOpts xmpp.Options, target, handle string) {
wait := 1 * time.Second
for {
client, err := clientConnect(clientOpts, target, handle)
if err != nil {
log.Printf("Error connecting to XMPP server: %s", err)
wait = time.Duration(float64(wait) * backOffFactor)
if wait > maxReconnectWait {
wait = maxReconnectWait
}
log.Printf("Retrying in %s...", wait)
time.Sleep(wait)
continue
}
receiveChan, errorChan := receiveLoop(storage, client, target)
receiveLoop:
for {
select {
case err := <-errorChan:
log.Printf("Error receiving messages: %s", err)
break receiveLoop
case msg := <-sendChan:
if err := sendGroupMessage(client, target, msg); err != nil {
log.Printf("Error sending message: %s", err)
}
case chat := <-receiveChan:
if err := handleMessage(storage, client, target, chat); err != nil {
log.Printf("Error handling message: %s", err)
}
}
}
log.Printf("Closing XMPP connection.")
client.Close()
}
}
func handleMessage(storage *spaceapi.Storage, client *xmpp.Client, target string, chat xmpp.Chat) error {
switch chat.Text {
case commandHelp:
if err := sendMessage(client, chat.Remote, helpMessage); err != nil {
return fmt.Errorf("error sending reply: %s", err)
}
case commandState:
state := storage.Status().State
msg := stateUnknown
if state.Open != nil {
switch *state.Open {
case true:
msg = stateOpen
case false:
msg = stateClosed
}
}
sendFunc := sendGroupMessage
replyTarget := target
if chat.Type != "groupchat" {
sendFunc = sendMessage
replyTarget = chat.Remote
}
if err := sendFunc(client, replyTarget, msg); err != nil {
return fmt.Errorf("error sending reply: %s", err)
}
case commandClose:
if chat.Type != "groupchat" {
if err := sendMessage(client, chat.Remote, errorNonPublic); err != nil {
return fmt.Errorf("error sending reply: %s", err)
}
return nil
}
storage.Modify(func(status *spaceapi.SpaceStatus) {
open := false
status.State.Open = &open
})
}
return nil
}
func clientConnect(clientOpts xmpp.Options, target, handle string) (*xmpp.Client, error) {
log.Printf("Connecting to %s as %s", clientOpts.Host, clientOpts.User)
client, err := clientOpts.NewClient()
if err != nil {
return fmt.Errorf("can not connect to server: %s", err)
return nil, fmt.Errorf("can not connect to server: %s", err)
}
if _, err := client.JoinMUCNoHistory(target, handle); err != nil {
return fmt.Errorf("can not join room: %s", err)
return nil, fmt.Errorf("can not join room: %s", err)
}
return client, nil
}
func receiveLoop(storage *spaceapi.Storage, client *xmpp.Client, target string) (<-chan xmpp.Chat, <-chan error) {
receiveChan := make(chan xmpp.Chat)
errorChan := make(chan error)
go func() {
defer close(errorChan)
defer close(receiveChan)
for {
chat, err := client.Recv()
switch {
case err == io.EOF:
log.Println("Lost XMPP connection.")
if err != nil {
errorChan <- err
return
case err != nil:
log.Printf("Error receiving XMPP message: %s", err)
continue
}
switch v := chat.(type) {
case xmpp.Chat:
switch v.Text {
case commandHelp:
if err := sendMessage(client, v.Remote, helpMessage); err != nil {
log.Printf("Error sending message: %s", err)
}
case commandState:
state := storage.Status().State
msg := stateUnknown
if state.Open != nil {
switch *state.Open {
case true:
msg = stateOpen
case false:
msg = stateClosed
}
}
sendFunc := sendGroupMessage
replyTarget := target
if v.Type != "groupchat" {
sendFunc = sendMessage
replyTarget = v.Remote
}
if err := sendFunc(client, replyTarget, msg); err != nil {
log.Printf("Error sending message: %s", err)
}
case commandClose:
if v.Type != "groupchat" {
if err := sendMessage(client, v.Remote, errorNonPublic); err != nil {
log.Printf("Error sending message: %s", err)
}
continue
}
open := false
storage.Modify(func(status *spaceapi.SpaceStatus) {
status.State.Open = &open
})
}
receiveChan <- v
}
}
}()
storage.AddListener(func(old, new spaceapi.SpaceStatus) {
if new.State.Open == nil {
return
}
if old.State.Open == new.State.Open {
return
}
msg := "Space ist jetzt OFFEN!"
if !*new.State.Open {
msg = "Space ist jetzt ZU!"
}
if err := sendGroupMessage(client, target, msg); err != nil {
log.Printf("Error sending status update: %s", err)
}
})
return nil
return receiveChan, errorChan
}
const defaultClientPort = 5222
func lookupHost(jid string) (string, error) {
if !strings.Contains(jid, "@") {
return "", fmt.Errorf("not a valid JID: %s", jid)

Loading…
Cancel
Save