diff options
author | Runxi Yu <me@runxiyu.org> | 2025-01-13 12:02:49 +0800 |
---|---|---|
committer | Runxi Yu <me@runxiyu.org> | 2025-01-13 12:02:49 +0800 |
commit | 42c5f39700c6ba95a6b924be807e8cddd69c3bdd (patch) | |
tree | a61dc937793b00684a03a14b2ec4adfca5f632da | |
parent | Add addresses (diff) | |
download | maild-42c5f39700c6ba95a6b924be807e8cddd69c3bdd.tar.gz maild-42c5f39700c6ba95a6b924be807e8cddd69c3bdd.tar.zst maild-42c5f39700c6ba95a6b924be807e8cddd69c3bdd.zip |
Add PostgreSQL mail store support
-rw-r--r-- | config.go | 4 | ||||
-rw-r--r-- | deliver_dir.go | 33 | ||||
-rw-r--r-- | deliver_local.go | 27 | ||||
-rw-r--r-- | errors.go | 11 | ||||
-rw-r--r-- | maild.scfg | 5 | ||||
-rw-r--r-- | main.go | 7 | ||||
-rw-r--r-- | mx_recv.go | 49 |
7 files changed, 74 insertions, 62 deletions
@@ -21,6 +21,10 @@ var config struct { Type string `scfg:"type"` Conn string `scfg:"conn"` } `scfg:"db"` + MX struct { + Net string `scfg:"net"` + Addr string `scfg:"addr"` + } `scfg:"mx"` _tls_config *tls.Config } var ( diff --git a/deliver_dir.go b/deliver_dir.go deleted file mode 100644 index bba9eb4..0000000 --- a/deliver_dir.go +++ /dev/null @@ -1,33 +0,0 @@ -package main - -import ( - "os" - "time" - - "go.lindenii.runxiyu.org/lindenii-common/clog" - "go.lindenii.runxiyu.org/lindenii-common/misc" -) - -func deliver_to_local_directory(envelope_from string, envelope_recipients []string, data []byte, dir_path string) error { - clog.Debug( - "incoming_mail", - "envelope_from", envelope_from, - "envelope_recipients", envelope_recipients, - "data", string(data), - ) - t := time.Now() - dir, err := misc.Open_directory_readonly(dir_path) - if err != nil { - return misc.Wrap_one_error(err_deliver_write, err) - } - fd, err := misc.Open_file_at(dir, envelope_from+" "+t.Format(time.RFC3339Nano)+".eml", os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600) - if err != nil { - // TODO: handle fs.ErrExist - return misc.Wrap_one_error(err_deliver_write, err) - } - _, err = fd.Write(data) - if err != nil { - return misc.Wrap_one_error(err_deliver_write, err) - } - return nil -} diff --git a/deliver_local.go b/deliver_local.go new file mode 100644 index 0000000..9a29459 --- /dev/null +++ b/deliver_local.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + "context" + "github.com/jackc/pgx/v5/pgxpool" +) + +func deliver_local(ctx context.Context, db *pgxpool.Pool, envelope_from string, envelope_recipients []string, data []byte, addresses []string) error { + tx, err := db.Begin(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) // BUG: Potential missed error here, but this will always fail if commit succeeded + for _, address := range addresses { + _, err = tx.Exec(ctx, "INSERT INTO mail (mailbox_id, data) SELECT a.mailbox_id, $2::bytea FROM addresses a JOIN mailboxes m ON a.mailbox_id = m.id WHERE a.address = $1;", address, data) + if err != nil { + fmt.Printf("%#v\n", err) + return err + } + } + err = tx.Commit(ctx) + if err != nil { + return err + } + return nil +} @@ -2,9 +2,16 @@ package main import ( "errors" + "fmt" + "strings" ) var ( - err_deliver_write = errors.New("unable to write to filesystem while attempting to deliver message") - err_unsupported_database_type = errors.New("unsupported database type; only \"postgres\" is currently supported") + err_unsupported_database_type = errors.New("Unsupported database type; only \"postgres\" is currently supported") ) + +type err_local_recipients_not_found_t []string + +func (e *err_local_recipients_not_found_t) Error() string { + return fmt.Sprintf("Local recipients not found: %s", strings.Join([]string(*e), ", ")) +} @@ -9,3 +9,8 @@ db { type postgres conn postgresql:///lindenii-maild?host=/var/run/postgresql } + +mx { + net tcp + addr :1025 +} @@ -1,6 +1,7 @@ package main import ( + "context" "errors" "flag" "io" @@ -20,7 +21,7 @@ func main() { panic(err) } - listener, err := net.Listen("tcp", ":25") + listener, err := net.Listen(config.MX.Net, config.MX.Addr) if err != nil { panic(err) } @@ -33,7 +34,9 @@ func main() { } go func() { - err := handle_mx_recv_conn(conn) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := handle_mx_recv_conn(ctx, conn) if err != nil && !errors.Is(err, io.EOF) { clog.Error("connection handler returned error", "err", err) } @@ -3,7 +3,9 @@ package main import ( "bufio" "bytes" + "context" "crypto/tls" + "errors" "net" "slices" "strings" @@ -32,7 +34,7 @@ type mx_recv_session struct { current_mail_from string current_rcpt_to []string server_state server_state_t - // ctx context.Context + ctx context.Context } func (session *mx_recv_session) handle() error { @@ -128,14 +130,20 @@ func (session *mx_recv_session) handle() error { _ = session.buf_conn.Flush() break } - recipient, _, _ := mailkit.Strip_angle_brackets(param[len("TO:"):]) - ok := true // XXX: Check routing table - if !ok { - _, _ = session.buf_conn.WriteString("550 5.1.1 <" + recipient + ">: Recipient address rejected: User unknown in local recipient table\r\n") + recipient_address, _, _ := mailkit.Strip_angle_brackets(param[len("TO:"):]) + var count int + err := session.db.QueryRow(session.ctx, "SELECT COUNT (*) FROM addresses WHERE address = $1", recipient_address).Scan(&count) + if err != nil { + _, _ = session.buf_conn.WriteString("451 Internal error: " + err.Error() + "\r\n") _ = session.buf_conn.Flush() - break switch_cmd + break } - session.current_rcpt_to = append(session.current_rcpt_to, recipient) + if count == 0 { + _, _ = session.buf_conn.WriteString("550 5.1.1 Recipient address rejected: Local recipients not found: " + recipient_address + "\r\n") + _ = session.buf_conn.Flush() + break + } + session.current_rcpt_to = append(session.current_rcpt_to, recipient_address) session.server_state = server_state_rcpt _, _ = session.buf_conn.WriteString("250 2.1.5 Ok\r\n") _ = session.buf_conn.Flush() @@ -176,24 +184,14 @@ func (session *mx_recv_session) handle() error { if err != nil { return err } - { - inboxes_to_deliver_to := make(map[string]struct{}) - for _, recipient := range session.current_rcpt_to { - inbox, ok := "/tmp", true // XXX: Check routing table - if !ok { - _, _ = session.buf_conn.WriteString("550 5.1.1 <" + recipient + ">: Recipient address rejected: User unknown in local recipient table\r\n") - break switch_cmd - } - inboxes_to_deliver_to[inbox] = struct{}{} - } - for inbox := range inboxes_to_deliver_to { - err = deliver_to_local_directory(session.current_mail_from, session.current_rcpt_to, current_data, inbox) - } - } - if err == nil { - _, _ = session.buf_conn.WriteString("250 2.0.0 Ok: Accepted\r\n") - } else { + err = deliver_local(session.ctx, session.db, session.current_mail_from, session.current_rcpt_to, current_data, session.current_rcpt_to) + var err_local_recipients_not_found *err_local_recipients_not_found_t + if errors.As(err, &err_local_recipients_not_found) { + _, _ = session.buf_conn.WriteString("550 5.1.1 Recipient address rejected: " + err_local_recipients_not_found.Error() + "\r\n") + } else if err != nil { _, _ = session.buf_conn.WriteString("500 2.0.0 Error: " + err.Error() + "\r\n") + } else { + _, _ = session.buf_conn.WriteString("250 2.0.0 Ok: Accepted\r\n") } _ = session.buf_conn.Flush() session.server_state = server_state_helo @@ -217,9 +215,10 @@ func (session *mx_recv_session) handle() error { } } -func handle_mx_recv_conn(net_conn net.Conn) error { +func handle_mx_recv_conn(ctx context.Context, net_conn net.Conn) error { session := mx_recv_session{ net_conn: net_conn, + ctx: ctx, } return session.handle() } |