From 7bed5c757065478f215c90b28dea5af21b20928e Mon Sep 17 00:00:00 2001 From: Robert Sesek Date: Mon, 29 Dec 2025 14:58:54 -0500 Subject: [PATCH] Implement the Monitor to move messages between mailboxes --- cmd/mailbox-shuffler/config.go | 13 ++- cmd/mailbox-shuffler/mailbox-shuffler.go | 21 ++--- cmd/mailbox-shuffler/monitor.go | 112 +++++++++++++++++++++-- cmd/mailbox-shuffler/source.go | 11 ++- 4 files changed, 135 insertions(+), 22 deletions(-) diff --git a/cmd/mailbox-shuffler/config.go b/cmd/mailbox-shuffler/config.go index 3446aeb..a89b7fa 100644 --- a/cmd/mailbox-shuffler/config.go +++ b/cmd/mailbox-shuffler/config.go @@ -30,12 +30,16 @@ type ServerConfig struct { Password string } +func (c *ServerConfig) LogDescription() string { + return fmt.Sprintf("%s/%s", c.Type, c.Email) +} + // MonitorConfig controls how to move messages between a source and // destination email server. type MonitorConfig struct { - Source ServerConfig - Destination ServerConfig - PollInterval time.Duration + Source ServerConfig + Destination ServerConfig + PollIntervalSeconds time.Duration } // OAuthServerConfig stores the configuration for an OAuth 2.0 @@ -59,6 +63,9 @@ func (c *Config) Validate() error { if mon.Source.Email == "" || mon.Destination.Email == "" { return fmt.Errorf("Monitor source/destination email missing") } + if mon.PollIntervalSeconds == 0 { + return fmt.Errorf("Unset poll interval") + } if err := validateSource(mon.Source); err != nil { return fmt.Errorf("Invalid Source: %w", err) } diff --git a/cmd/mailbox-shuffler/mailbox-shuffler.go b/cmd/mailbox-shuffler/mailbox-shuffler.go index 5c8c319..649a8aa 100644 --- a/cmd/mailbox-shuffler/mailbox-shuffler.go +++ b/cmd/mailbox-shuffler/mailbox-shuffler.go @@ -55,9 +55,8 @@ func main() { log.Info("starting mailbox-shuffler") - rawMsg, err := os.ReadFile("dev/test.msg") - if err != nil { - log.Fatal("Failed to read test mesage", zap.Error(err)) + if err := config.Validate(); err != nil { + log.Fatal("Invalid config", zap.Error(err)) } clientSecret, err := os.ReadFile(config.OAuthServer.CredentialsPath) @@ -72,14 +71,14 @@ func main() { oauthServer := RunOAuthServer(ctx, config.OAuthServer, oauthConfig, log) - dest := NewDestination(config.Monitor[0].Destination, oauthServer, log) - destConn, err := dest.Connect(ctx) - if err != nil { - log.Fatal("Failed to connect to destination", zap.Error(err)) + for i, mc := range config.Monitor { + m := NewMontior(mc, oauthServer, log) + if err := m.Start(ctx); err != nil { + log.Fatal("Failed to start montior", zap.Int("index", i), zap.Error(err)) + } } - err = destConn.AddMessage(rawMsg) - if err != nil { - log.Fatal("Failed to insert message", zap.Error(err)) - } + log.Info("Succesfully started all monitors") + + <-ctx.Done() } diff --git a/cmd/mailbox-shuffler/monitor.go b/cmd/mailbox-shuffler/monitor.go index 7e18f38..2110ebe 100644 --- a/cmd/mailbox-shuffler/monitor.go +++ b/cmd/mailbox-shuffler/monitor.go @@ -8,21 +8,121 @@ package main import ( "context" + "fmt" + "io" + "time" "go.uber.org/zap" ) type Monitor struct { - c MonitorConfig - log *zap.Logger + c MonitorConfig + auth OAuthServer + log *zap.Logger } -func NewMontior(config MonitorConfig, log *zap.Logger) *Monitor { +func NewMontior(config MonitorConfig, auth OAuthServer, log *zap.Logger) *Monitor { + log = log.With(zap.String("source", config.Source.LogDescription()), + zap.String("dest", config.Destination.LogDescription())) return &Monitor{ - c: config, - log: log, + c: config, + auth: auth, + log: log, } } -func (m *Monitor) Start(ctx context.Context) { +func (m *Monitor) Start(ctx context.Context) error { + src := NewSource(m.c.Source, m.auth, m.log) + dst := NewDestination(m.c.Destination, m.auth, m.log) + + if err := m.runOnce(ctx, src, dst); err != nil { + m.log.Error("Failed to start monitor", zap.Error(err)) + return err + } + + go m.run(ctx, src, dst) + + return nil +} + +func (m *Monitor) run(ctx context.Context, src Source, dst Destination) { + for { + select { + case <-ctx.Done(): + m.log.Info("Monitor stopping") + return + case <-time.After(m.c.PollIntervalSeconds * time.Second): + m.runOnce(ctx, src, dst) + } + } +} + +func (m *Monitor) runOnce(ctx context.Context, src Source, dst Destination) error { + m.log.Info("Polling for messages") + + if err := src.Connect(); err != nil { + return fmt.Errorf("Failed to connect to source: %w", err) + } + dstConn, err := dst.Connect(ctx) + if err != nil { + return fmt.Errorf("Failed to connect to dest: %w", err) + } + + msgs, err := src.GetMessages() + if err != nil { + return fmt.Errorf("Failed to list messages: %w", err) + } + for _, msg := range msgs { + log := m.log.With(zap.String("id", msg.ID())) + log.Info("Transferring message to destination") + err := m.transferMessageTo(msg, dstConn) + if err == nil { + log.Info("Successfully transferred message") + } else { + log.Error("Failed to transfer message", zap.Error(err)) + } + } + + if err := src.Close(); err != nil { + return fmt.Errorf("Failed to close source: %w", err) + } + if err := dstConn.Close(); err != nil { + return fmt.Errorf("Failed to close dest: %w", err) + } + + return nil +} + +func (m *Monitor) transferMessageTo(msg Message, dst DestinationConnection) error { + r, err := msg.Content() + if err != nil { + return fmt.Errorf("Failed to get message content: %w", err) + } + defer r.Close() + + body, err := io.ReadAll(r) + if err != nil { + return fmt.Errorf("Failed to read message content: %w", err) + } + + content := getReceivedInfo(m.c, time.Now()) + content = append(content, body...) + + if err = dst.AddMessage(content); err == nil { + if err = msg.Delete(); err != nil { + return fmt.Errorf("Failed to mark source message as deleted: %w", err) + } + return nil + } else { + return fmt.Errorf("Failed to add message to destination: %w", err) + } +} + +func getReceivedInfo(cfg MonitorConfig, t time.Time) []byte { + line := fmt.Sprintf( + "Received: from <%s> (via %s)\r\n by mailpopbox-shuffler at %s\r\n for <%s> (via %s)\r\n", + cfg.Source.Email, cfg.Source.Type, + t.Format(time.RFC1123Z), + cfg.Destination.Email, cfg.Destination.Type) + return []byte(line) } diff --git a/cmd/mailbox-shuffler/source.go b/cmd/mailbox-shuffler/source.go index bdc8644..851bd5a 100644 --- a/cmd/mailbox-shuffler/source.go +++ b/cmd/mailbox-shuffler/source.go @@ -18,8 +18,11 @@ import ( ) type Source interface { + // Connect establishes a connection to the Source. + Connect() error // GetMessages returns the list of available messages on the server. The - // returned Message objects are only valid until `Close` is called. + // returned Message objects are only valid until `Close` is called. This + // implicitly calls `Connect`. GetMessages() ([]Message, error) // Reset attempts to rollback the transaction on the server. Reset() error @@ -35,7 +38,7 @@ type Message interface { // NewSource creates an interface for accessing a message source. The returned // object is *not* goroutine safe. -func NewSource(config ServerConfig, auth *OAuthServer, log *zap.Logger) Source { +func NewSource(config ServerConfig, auth OAuthServer, log *zap.Logger) Source { switch config.Type { case ServerTypePOP3: return &pop3Source{ @@ -66,6 +69,10 @@ func (m *pop3Message) Delete() error { return m.s.mbox.Delete( var errNotConnected = fmt.Errorf("Source is not connected") +func (s *pop3Source) Connect() error { + return s.connect() +} + func (s *pop3Source) GetMessages() ([]Message, error) { if err := s.connect(); err != nil { return nil, err -- 2.43.5