diff --git a/archive.go b/archive.go new file mode 100644 index 0000000..5f128e6 --- /dev/null +++ b/archive.go @@ -0,0 +1,152 @@ +// +// Copyright (c) 2019 Björn Kalkbrenner +// +// Permission to use, copy, modify, and distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +// +package main + +import ( + "os" + "bufio" + "path" + "time" + "strconv" + "errors" +) + +var archiveStoragePath = "" +var archiveStorageFlat = false + + +type archiveStorage struct { + dataFile *os.File + data *bufio.Writer + + metaFile *os.File + meta *bufio.Writer +} + +func careDirectory(dirName string) error { + src, err := os.Stat(dirName) + + if os.IsNotExist(err) { + errDir := os.MkdirAll(dirName, 0755) + if errDir != nil { + return errDir + } + return nil + } + + if src.Mode().IsRegular() { + return errors.New("already exist as a file!") + } + + return nil +} + +func getArchiveFolder() (string, error) { + if archiveStoragePath == "" { + return "", errors.New("archive storage path not set") + } + + if archiveStorageFlat { + return archiveStoragePath, nil + } else { + return path.Join(archiveStoragePath,time.Now().Format("2006-01"),time.Now().Format("02")), nil + } +} + +func (a *archiveStorage) Open(fname string) error { + var err error + var t = time.Now() + var fFolder string + + if fFolder, err = getArchiveFolder(); err != nil { + return err + } + fName := fname + "." + strconv.FormatInt(t.Unix(), 10) + + if err := careDirectory(fFolder); err != nil { + return err + } + fPath := path.Join(fFolder,fName) + + if a.dataFile, err = os.Create(fPath); err != nil { + return err + } + a.data = bufio.NewWriter(a.dataFile) + + if a.metaFile, err = os.Create(fPath + ".meta"); err != nil { + return err + } + a.meta = bufio.NewWriter(a.metaFile) + + a.Meta("DATAFILE=" + fName) + a.Meta("TIME=" + t.String()) + + return nil +} + +func (a *archiveStorage) Close() error { + if a.data != nil { + if err := a.data.Flush(); err != nil { + return err + } + } + + if a.dataFile != nil { + if err := a.dataFile.Close(); err != nil { + return err + } + } + + if a.meta != nil { + if err := a.meta.Flush(); err != nil { + return err + } + } + + if a.metaFile != nil { + if err := a.metaFile.Close(); err != nil { + return err + } + } + + return nil +} + +func (a *archiveStorage) Data(text string) { + if a.data != nil { + a.data.WriteString(text + "\n") + } +} + +func (a *archiveStorage) Meta(text string) { + if a.meta != nil { + a.meta.WriteString(text + "\n") + } +} + +func (a *archiveStorage) Flush() error { + if a.data != nil { + if err := a.data.Flush(); err != nil { + return err + } + } + if a.meta != nil { + if err := a.meta.Flush(); err != nil { + return err + } + } + return nil +} diff --git a/filter-archive.go b/filter-archive.go new file mode 100644 index 0000000..217df73 --- /dev/null +++ b/filter-archive.go @@ -0,0 +1,332 @@ +// +// Copyright (c) 2019 Björn Kalkbrenner +// +// +// Permission to use, copy, modify, and distribute this software for any +// purpose with or without fee is hereby granted, provided that the above +// copyright notice and this permission notice appear in all copies. +// +// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +// ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +// + +package main + +import ( + "bufio" + "fmt" + "os" + "strings" + "log" + "flag" +) + +var version string +var tmpFile *os.File +var bufWriter *bufio.Writer + +type tx struct { + rcptTo []string + action string + response string + + archive archiveStorage +} + +type session struct { + id string + + rdns string + src string + heloName string + userName string + mtaName string + + tx tx +} + +var sessions = make(map[string]*session) + +var reporters = map[string]func(*session, []string){ + "link-connect": linkConnect, + "link-disconnect": linkDisconnect, + "link-greeting": linkGreeting, + "link-identify": linkIdentify, + "link-auth": linkAuth, + "tx-reset": txReset, + "tx-begin": txBegin, + "tx-mail": txMail, + "tx-rcpt": txRcpt, + "tx-rollback": txRollback, + "tx-envelope": txEnvelope, + "timeout": sessionTimeout, +} + +var filters = map[string]func(*session, []string){ + "data": data, + "data-line": dataLine, +} + +func systemLog(text string) { + fmt.Fprintln(os.Stderr, text) +} + +func sessionTimeout(s *session, params []string) { + log.Print("Session timeout for: " + s.id) +} + +func linkConnect(s *session, params []string) { + if len(params) != 4 { + log.Fatal("invalid input, shouldn't happen") + } + + s.rdns = params[0] + s.src = params[2] +} + +func linkDisconnect(s *session, params []string) { + if len(params) != 0 { + log.Fatal("invalid input, shouldn't happen") + } + + delete(sessions, s.id) +} + +func linkGreeting(s *session, params []string) { + if len(params) != 1 { + log.Fatal("invalid input, shouldn't happen") + } + + s.mtaName = params[0] +} + +func linkIdentify(s *session, params []string) { + if len(params) != 2 { + log.Fatal("invalid input, shouldn't happen") + } + + s.heloName = params[1] +} + +func linkAuth(s *session, params []string) { + if len(params) != 2 { + log.Fatal("invalid input, shouldn't happen") + } + if params[1] != "pass" { + return + } + + s.userName = params[0] +} + +func txReset(s *session, params []string) { + if len(params) != 1 { + log.Print("message-id is missing, this may happen") + } + + if err := s.tx.archive.Close(); err != nil { + systemLog("ERROR: " + err.Error()) + } + s.tx = tx{} +} + + +func txBegin(s *session, params []string) { + if len(params) != 1 { + log.Fatal("invalid input, shouldn't happen") + } + + msgid := params[0] + + fname := s.id + "." + msgid + if err := s.tx.archive.Open(fname); err != nil { + systemLog("ERROR: can't create new file in " + archiveStoragePath + ". " + err.Error()) + } + + meta := s.tx.archive.Meta + meta("SESSIONID=" + s.id) + meta("MSGID=" + msgid) + meta("MTANAME=" + s.mtaName) + meta("HELONAME=" + s.heloName) + meta("USERNAME=" + s.userName) + meta("RDNS=" + s.rdns) + meta("SRC=" + s.src) + +} + +func txMail(s *session, params []string) { + if len(params) != 3 { + log.Fatal("invalid input, shouldn't happen") + } + + if params[2] != "ok" { + return + } + + s.tx.archive.Meta("FROM=" + params[1]) +} + +func txRcpt(s *session, params []string) { + if len(params) != 3 { + log.Fatal("invalid input, shouldn't happen") + } + + if params[2] != "ok" { + return + } + + s.tx.rcptTo = append(s.tx.rcptTo, params[1]) +} + +func txEnvelope(s *session, params []string) { + if len(params) != 2 { + log.Fatal("invalid input, shouldn't happen") + } + + s.tx.archive.Meta("ENVELOPEID=" + params[1]) +} + +func txRollback(s *session, params []string) { + if len(params) != 1 { + log.Fatal("invalid input, shouldn't happen") + } + + s.tx.archive.Meta("STATE=REJECTED") +} + +func data(s *session, params []string) { + if len(params) != 2 { + log.Fatal("invalid input, shouldn't happen") + } + + token := params[0] + + if len(s.tx.rcptTo) > 0 { + s.tx.archive.Meta("TO=" + strings.Join(s.tx.rcptTo[:], ",")) + } + + if version < "0.5" { + fmt.Printf("filter-result|%s|%s|proceed\n", token, s.id) + } else { + fmt.Printf("filter-result|%s|%s|proceed\n", s.id, token) + } + +} + +func dataLine(s *session, params []string) { + if len(params) < 2 { + log.Fatal("invalid input, shouldn't happen") + } + + token := params[0] + line := strings.Join(params[1:], "|") + + // Input is raw SMTP data - unescape leading dots and write to archive + s.tx.archive.Data(strings.TrimPrefix(line, ".")) + + //just relay the line as received + if version < "0.5" { + fmt.Printf("filter-dataline|%s|%s|%s\n", token, s.id, line) + } else { + fmt.Printf("filter-dataline|%s|%s|%s\n", s.id, token, line) + } + +} + +func filterInit() { + for k := range reporters { + fmt.Printf("register|report|smtp-in|%s\n", k) + } + for k := range filters { + fmt.Printf("register|filter|smtp-in|%s\n", k) + } + fmt.Println("register|ready") +} + +func writeLine(s *session, token string, line string) { + prefix := "" + // Output raw SMTP data - escape leading dots. + if strings.HasPrefix(line, ".") { + prefix = "." + } + if version < "0.5" { + fmt.Printf("filter-dataline|%s|%s|%s%s\n", token, s.id, prefix, line) + } else { + fmt.Printf("filter-dataline|%s|%s|%s%s\n", s.id, token, prefix, line) + } +} + +func trigger(actions map[string]func(*session, []string), atoms []string) { + if atoms[4] == "link-connect" { + // special case to simplify subsequent code + s := session{} + s.id = atoms[5] + sessions[s.id] = &s + } + + s := sessions[atoms[5]] + if v, ok := actions[atoms[4]]; ok { + v(s, atoms[6:]) + } else { + os.Exit(1) + } +} + +func skipConfig(scanner *bufio.Scanner) { + for { + if !scanner.Scan() { + os.Exit(0) + } + line := scanner.Text() + if line == "config|ready" { + return + } + } +} + +func main() { + + fArg := flag.Bool("f",false,"Use flat filesystem path storage instead of /YYYY-MM/DD") + flag.Parse() + + if flag.Arg(0) == "" { + log.Fatal("Archive storage path not given as last parameter (e.g. /var/db/mail-archive)") + os.Exit(0) + } + + archiveStorageFlat = *fArg; + archiveStoragePath = flag.Arg(0) + + scanner := bufio.NewScanner(os.Stdin) + + skipConfig(scanner) + + filterInit() + + for { + if !scanner.Scan() { + os.Exit(0) + } + + atoms := strings.Split(scanner.Text(), "|") + if len(atoms) < 6 { + os.Exit(1) + } + + version = atoms[1] + + switch atoms[0] { + case "report": + trigger(reporters, atoms) + case "filter": + trigger(filters, atoms) + default: + os.Exit(1) + } + } +}