4. Queuing subsystem

The default behavior of an MTA is to spool messages, which are being received, to disk, then pick them up and deliver them by using the queuing subsystem. In contrast to “in-line” send() to the next hop during reception, this allows the MTA to accept messages even if the next hop is unavailable, or if a message has multiple recipients on different next hop destinations.

4.1. Activity diagram

This figure illustrates the typical states and movements inside the queue. It will be described in the following sections.

flowchart LR smtp{{SMTP}} --> spool[(Spool)] http{{HTTP}} --> spool api{{API}} --> spool start((Startup)) --> spool spool --> spoolin(Spool-in) spoolin --> pre(Pre-delivery hook) spoolin -->|if now < next retry|defer[(Defer queue)] spoolin -->|if message on hold|hold[(Hold queue)] pre <--> defer pre <--> hold pre --> dns(DNS resolving) dns -->|DNS error|post(Post-delivery hook) dns --> active[(Active queue)] active-->|SMTP delivery attempt|post post --> defer post --> hold post --> done{{Message Done}}

4.2. Queue states

The queuing system can be seen as having three overall message states:

Hold

Messages that are on permanent hold. This is the most simple, and typically least used, state. It is not a queue; rather a collection of messages. It can be used to, for example, implement a spam/virus quarantine. Messages can be put in hold list directly during the reception of a message, using the end-of-DATA script queue() function’s “hold” option.

Defer

Messages that are not on hold, and are scheduled for delivery in the future. The default behavior, for every message that fails delivery (with a non-permanent error), is to be put in the defer queue; scheduled to be re-tried sometime in the future. It is a queue in the sense that messages are ordered by their scheduled delivery time.

Active

Messages that are not on hold, and are scheduled for delivery now (or in the past). Messages that are put in queue() for delivery end up directly in the active queue (unless the “delay” option is used). Messages are also moved automatically from the defer queue into the active queue on their scheduled time of delivery. The active queue is the by far most advanced part of the queuing subsystem, and contains many sub-states. This stems from the fact that concurrency, rate and connectinterval can be limited based on many dynamic parameters, essentially creating a virtually unlimited number of sub-queues. This will be described later in detail.

The Protocol Buffer API contains queue size information:

$ halonctl process-stats | grep queue.queue
queue.queue.defer.size: 67
queue.queue.active.size: 6

4.2.1. Changing message states

As described in the previous section, messages are automatically moved between the active and defer queue in accordance with the normal behavior of an MTA (try-defer-retry-defer, etc). Messages can also be forcibly moved between queues. Messages can be updated using the Protocol Buffer API’s QueueUpdate function, command line interface, web administration, as well as from the pre- and post-delivery script. This changes both the information on disk and in memory (the queue process’s data structures). When a message is moved, it is re-injected via the “update” actor described in the Activity diagram.

To put queued messages for example.com on hold:

$ halonctl queue update --recipientdomain example.com --state DEFER --state ACTIVE --hold
10 messages affected

Messages that are in a “working” (for example, being delivered) will be unaffected. To immediately retry a certain deferred message by ID:

$ halonctl queue update --id 44815e44-05d3-11ea-b388-06f091ceb4dc:1 --state DEFER --active
1 messages affected

4.2.2. Messages on hold

Messages, or copies of messages, can be put in a frozen HOLD state. This usually done from the end-of-data script to implement for example short-term quarantines. In order to enforce a retention policy for those messages, you can run halonctl from cron:

$ crontab -l
*/5 * * * * halonctl queue update --delete --state HOLD --age '>86400' --metadata qid=1day
*/5 * * * * halonctl queue update --delete --state HOLD --age '>604800' --metadata qid=7days

4.3. Active queue

When a message enters the active queue, it goes through five stages:

Pre-delivery script

The pre-delivery script is executed (if it exists). The script allows the administrator to implement per-attempt logic, such as dynamic routing.

DNS

All necessary domain name information is resolved. If that succeeds, the message is put in the active queue. If not, it skips directly to the post-delivery script.

Active queue

The message remains in the active queue until allowed by the active queue’s pickup policies (that control concurrency, rate, connectinterval, etc). The pickup policies define the virtually unlimited number of sub-queues, and will be described later in detail.

Delivery

A delivery attempt (SMTP or LMTP) is made.

Post-delivery script

Regardless of the outcome of the delivery attempt, the post-delivery script is executed (if it exists). It can be used by the administrator to override the default logic or to implement logging.

4.3.1. Active queue pickup

The active queue pickup subsystem determines when a message should be picked up, usually based on concurrency, rate or connectinterval limitations on properties such as local IP or destination, or queue pickup suspensions.

By splitting the system’s available concurrency, virtual sub-queues can be created. This is useful for separating email of different classes, so that one class of email that is stuck or moving slowly does not block others. Those sub-queues can be created based on any properties; pre-defined ones like recipient domain, or custom fields that can be populated from script. For example, consider the case where the system’s total concurrency is set to 20 000, and the system has two local IP addresses that can be used as source IPs when sending email. With a pickup policy limiting the concurrency to 10 000 connections per local IP, we can be certain that even if traffic from one of the IPs jams up, traffic from the other IP will not be disturbed. This concept can be extended to, for example, customers in a multi-tenant system (making sure that abuse from one customer does not jam the queue for others) or recipient domains/MXs (so that one slow destination does not jam the queue for other destinations). Finally, combinations of those can be created.

As explained above the queue is not a FIFO (first-in first-out) type of queue, as messages are divided into virtual queue and delivered in the fastest possible way without exceeding the queue policies (of concurrency and rates). However there is a concept of priority messages (0 - normal, 1 - high), these will be considered candidates for pickup/delivery before any other messages. They also have precedence when handled by the DNS resolver and pre- and post-delivery threads. However given the explained concept there are no guarantees that they will be delivered before a low priority message in case of suspending rules (exceeding concurrency or rates). Also within each queue it employs FIFO. That guarantees that no message should be stuck in the queue because of unfair treatment.

Below is an illustration of how the queue looks like.

flowchart LR A[Active Queue] --> te0(tenantid#1) te0 --> j1(jobid#1) A --> te1(cust1) te1 --> j2(sendout1) j1 --> t1(transportid#1) t1 --> l1(localip#1) t1 --> l2(localip#2) l1 --> b1[*] --> g1(grouping#1) l2 --> b1 g1 --> r1(remoteip#1) r1 --> m1(remotemx#1) m1 --> d1(recipientdomain#1) d1 --> q1[Message Queue FIFO] j2 --> t2(transactional) t2 --> l3(1.2.3.4) t2 --> l4(1.2.3.5) l3 --> b2[*] --> g2(&yahoo) l4 --> b2 g2 --> r2(5.6.7.8) r2 --> m2(mta1.am0.yahoodns.net) m2 --> d2(yahoo.com) d2 --> q2[Message Queue FIFO] b2 --> g3(&google) g3 --> r4(9.8.7.6) r4 --> m3(alt2.gmail-smtp-in.l.google.com) m3 --> d4(google.com) d4 --> q3[Message Queue FIFO] m3 --> d5(gmail.com) d5 --> q4[Message Queue FIFO] A --> te3(cust2) te3 --> j3(sendout2) j3 --> t3(bulk) t3 --> l5(1.2.3.6) l5 --> b3[*] --> g4(&yahoo) g4 --> r3(5.6.7.8) r3 --> m4(mta1.am0.yahoodns.net) m4 --> d3(yahoo.com) d3 --> q5[Message Queue FIFO]

The Protocol Buffer API contains pickup statistics:

$ halonctl process-stats | grep queue.pickup
queue.pickup.skips: 86826
queue.pickup.misses: 9034

4.3.2. Queue pickup policies

Concurrency, rate and connectinterval limits are counted against what we call counters, which can have one or multiple fields that define a unique entry. In order to offer a very high degree of flexibility, the counters are defined by the administrator.

The available fields are shown below.

4.3.2.1. Available fields

tenantid

A single value free-text field that can be used for any purpose. Email service providers (ESPs) typically have many customers (tenants) using the same system. This field enables you to create policy on a per-tenant basis when a message is queued, to handle responses relating to the tenant’s reputation. One way is to set the tenantid to the header From domain when queuing the message in HSL.

jobid

A single value free-text field that can be used for any purpose, for example to identify the current campaign for bulk mailing, or message stream for transactional mailing. One way is to use the value of an incoming email header such as X-Job. This enables you to create policy to manage traffic flow on a per-job basis.

transportid

A single value referring to a transportgroups[].transports[], similar to “binding” in other systems. A transportid can map to one or more localip(s), or to other settings, such as specific custom destinations.

localip

This is a list, so that an email can be queued with multiple alternatives for source IP. The same source IP may be specified multiple times to distribute the traffic according to its ratio.

grouping

A single value used for “rolling up” queued emails. It’s set after DNS resolving, it may either default to empty, remotemx or recipientdomain. Any grouping configured using the queues.grouping setting will apply and fill out this field (the grouping id will be prefixed with & throughout the queue).

remoteip

This single value is known after DNS resolving. In some cases it’s preferable to build policies on the grouping field instead together with the queues.grouping.groupings[].remoteip[] setting.

remotemx

This single value is known after DNS resolving. In some cases it’s preferable to build policies on the grouping field instead together with the queues.grouping.groupings[].remotemx[] setting.

recipientdomain

This single value is a basic property of the message RCPT TO address. In some cases it’s preferable to build policies on the grouping field instead together with the queues.grouping.groupings[].recipientdomain[] setting.

4.3.2.2. Loading

The default start configuration’s environment.policyconf directive loads it from /etc/halon/smtpd-policy.yaml. It is described by, and can be validated with, the smtpd-policy.schema.json JSON schema (included in our Visual Studio Code integration).

If reloading the environment.policyconf configuration during runtime, active suspensions based on (now) removed or previous exceeded rate, concurrency or connectinterval limits are not automatically removed (as a side effect of the reload) instead the suspension is removed when the next message for that rate, concurrency or connectinterval is allow to be sent. If adding a new counter to the configuration during a reload, the counter will only count new messages (and not those being sent).

In addition to the configuration file on disk, policy conditions can be added on the fly over the Protocol Buffer API’s PolicyConditionAddRequest function, command line interface, web administration, as well as from the pre- and post-delivery script.

The Protocol Buffer API contains queue policy statistics:

$ halonctl process-stats | grep queue.policy
queue.policy.concurrency.counters: 0
queue.policy.concurrency.suspends: 0
queue.policy.rate.buckets: 15
queue.policy.rate.suspends: 5
queue.policy.dynamic.suspends: 0
queue.policy.dynamic.conditions: 0
queue.policy.connectinterval.buckets: 0
queue.policy.connectinterval.suspends: 0

4.3.2.3. Policy counter thresholds

Thresholds for concurrency limit the number of emails in the delivery state. Thresholds for rate limit the number of emails, X, passing through the delivery state over a given time interval, Y, specified in seconds, as X/Y. Thresholds for connectinterval limit the number of connections to be open for a specific destination.

The very simplistic example from above (with two local IPs) can be described using the following YAML pickup policy configuration:

policies:
  - fields:
      - localip
    default:
      concurrency: 10000

Each time an email is picked up from the active queue, the “localip” concurrency counter entry with that email’s source IP is incremented. When the delivery attempt is done, the same counter entry is decremented. If 10 000 email for the same source IP is being delivered at the same time, the default threshold will be exceeded, and the suspension list will be populated with an entry saying that any email with that source IP should not be picked up.

Note that all configured queue pickup policies (counters) are taken into consideration when an email is picked up from the active queue, and the lowest allowed rate, concurrency and connectinterval applies. And that it’s possible to leave out the default value for a counter (so that only a specific condition is applied).

4.3.2.4. Policy conditions

Different thresholds can be set by using conditions with the desired field values. Conditions are evaluated first-to-last, with the first matching threshold winning. Consequently, if a threshold in a more general conditions is placed above a more specific one, the latter might never match (because the former always wins).

The example below limits the concurrency based on a combination of source IP and destination domain, with an override for the domain “halon.io”:

Per every field in a if condition (eg. recipientdomain) multiple values (eg. domains) may be given as an array, and matched as or-conditions.

Lists may be used to reference and match multiple values at the same time.

policies:
  - fields:
      - localip
      - recipientdomain
    conditions:
      - if:
          recipientdomain:
            - halon.io
            - halon.se
        then:
          concurrency: 2
    default:
      concurrency: 5

The above policy will be exceeded if two emails are being delivered to the recipient domain “halon.io” from the same source IP.

4.3.2.5. Policy properties

When a policy condition (if) is matched, the policies given in the then are applied. There are three counter thresholds that can be applied; the concurrency, the rate and the connectinterval policies, that will affect the message delivery. If you don’t want apply specify specific concurrency, rate or connectinterval, nor the default matching, you can configure them as null. There is also a tag property which can be used to identity which condition were matched. In addition custom properties may by specified in a properties object as key and value, these custom properties are available eg. in the post-delivery scripting hook.

policies:
  - fields:
      - localip
      - recipientdomain
    conditions:
      - if:
          recipientdomain: halon.io
        then:
          concurrency: 2
          rate: 1/60
          tag: this_rule
          properties:
            foo: bar
            baz: 5
            boz: true
    default:
      concurrency: 5

4.3.2.6. Policy counter groups

Note

Please note that it’s recommended to use the grouping field and the queues.grouping setting if you don’t need per policy groupings.

Counters can be aggregated based on wild-card, subnets or regular expression matching, so that different field values count against the same entry. Groups are given IDs, and conditions are matched against the grouped entry by prefixing with “#”. The example below has two counters, with multiple fields per counter. One limits both rate and concurrency based on destination MX (with rollup for Google G-suite) in combination with source IP. The other also limits the concurrency per source IP, but destination IP instead of MX, and only enforces a threshold for emails to recipient domains with a Microsoft Outlook MX.

policies:
  - fields:
      - localip
      - remotemx:
          gsuite:
            - '*.google.com'
            - '*.googlemail.com'
            - '*.smtp.goog'
    conditions:
      - if:
          remotemx: '#gsuite'
        then:
          concurrency: 10
          rate: 50
    default:
      concurrency: 5
      rate: 10
  - fields:
      - localip
      - remotemx:
          o365:
            - '*.protection.outlook.com'
      - remoteip
    conditions:
      - if:
          remotemx: '#o365'
        then:
          concurrency: 10
          rate: 30

4.4. Cluster-aware policy

4.4.1. Example architecture

flowchart LR GEN(["Message Generation"]) --> LB LB["Load balancer"] --> H1 LB--> H2 LB --> H3 H1[Halon smtpd 1] --> C H2[Halon smtpd 2] --> C H3[Halon smtpd n] --> C HAP1[HAProxy 1] --> |SMTP| MBP HAP2[HAProxy 2] --> |SMTP| MBP C(( )) --> HAP1 C --> HAP2 MBP(["Network egress"])

Generated messages are network load-balanced across all Halon instances, which are configured identically. The delivery (localip) addresses are located on the proxies; any Halon instance can choose any localip. The HAProxy layer can use keepalived or similar, to provide resilience in the event of proxy host outage.

A basic approach is to statically divide the overall concurrency and rates across the Halon instances. This is acceptable for mailbox providers that allow concurrency greater than your number of instances, but some mailbox providers expect lower concurrency. A different approach to traffic shaping is needed for equitable sharing of connections.

4.4.2. Delivery Orchestrator: clusterd

clusterd is a service that allocates concurrency, connectinterval and rates to instances. It support TLS authentication/verification of both the server and client, over TCP.

clusterd registers connected Halon instances, counts them and announces the total number back to each instance. It allocates outbound connections to instances in the cluster, ensuring fair rotation of concurrent connections for each (localip + grouping) combination, even for low concurrency destinations. It ensures each instance with messages to deliver will receive a turn. It works in dynamic scaling situations with Kubernetes, as well as static scaling with virtual machines.

If a Halon instance loses connection to clusterd, it will revert to division of slots (min 1 connection per node) based on the last number of known hosts. It will automatically try to reconnect and re-establish dynamic sharing.

flowchart LR H1[Halon smtpd 1] H2[Halon smtpd 2] H3[Halon smtpd n] CLUSTERD[clusterd] <--> H1 CLUSTERD <--> H2 CLUSTERD <--> H3

Halon smtpd uses the settings in Clustering directives to communicate with clusterd.

4.4.3. Writing policy for clusters

A system with clusterd active has policies clustered by default.

Concurrent connections are granted to instances in the cluster. The overall cluster follows the concurrency properties for (localip, grouping) combinations in the smtpd-policy.yaml policies.fields. Because connections are opened, closed and pooled between specific localip and grouping, these fields must be used when writing cluster-aware concurrency policy.

smtpd-app.yaml

groupings:
- id: orange
  remotemx:
    - "smtp-in.orange.fr"

smtpd-policy.yaml

rate:
  algorithm: tokenbucket
policies:
  - fields:
    - jobid
  - fields:
    - localip
  - fields:
    - tenantid
  - fields:
    - tenantid
    - grouping
  - fields: # cluster-wide policies here
    - localip
    - grouping
  default:
    concurrency: 20
  conditions:
    if:
      grouping: &orange
    then:
      concurrency: 2 # Cluster-wide rate for all localip, see https://postmaster.orange.fr/

Rates for the cluster can be set on any field condition, because rates are always divisible. How they are divided depends on your chosen rate.algorithm setting.

Algorithm tokenbucket (default) gives smooth traffic flow over the interval by scaling the denominator, i.e. tokens / (interval * number_of_hosts)

Algorithm fixedwindow gives traffic bursts at the start of each interval, by allocating the tokens between hosts, with each getting at least 1, i.e. max(1, tokens / number_of_hosts) / interval

For fair rotation of connections between your instances, set delivery.pooling.timeout to small values such as 5 to 10 seconds, and delivery.pooling.transactions to 100 or less.

If you wish to disable rotation of connections on an entire smtpd instance, set cluster.policy.sharedconcurrency to false. This will give basic “divide by n” concurrency, with at least 1 connection per instance.

4.4.3.1. Advanced usage: selective non-clustered properties

Rules can selectively disengage clustering, for example to give more throughput to time-critical message streams.

smtpd-policy.yaml

conditions:
  if:
    localip: 1.2.3.4
    grouping: &yahoo
  then:
    concurrency: 5
    cluster: false # each instance will get 5 connections
  default:
    rate: 5/1 # default rate applies cluster-wide

The cluster: false property applies to all properties in then scope. To create policy that has some properties clustered, and others non-clustered, create two if clauses:

conditions:
if:
  localip: 1.2.3.4
  grouping: &yahoo
then:
  concurrency: 5
  cluster: false # affects properties in “then” scope
if:
  localip: 1.2.3.4
  grouping: &yahoo
then:
  rate: 10/1 # put (clustered) rate property in a separate if clause
default:
  rate: 5/1 # default rate applies cluster-wide

4.4.3.2. Observing clustered policy in operation

The web user interface Delivery Insights view shows cluster behavior. For each policy setting, it shows:

  • the cluster’s concurrency and rate

  • the effective concurrency and rate in use on this instance.

The halontop command has a panel showing the cluster status and number of hosts found.

4.5. Queue pickup suspension

As described in the previous section, the pickup policy subsystem implements a suspension list to enforce concurrency, rate and connectinterval limits. This suspension list can be used directly to temporarily pause and resume traffic in the active queue. For a more permanent suspension, like a quarantine for spam, put messages on hold instead.

The default start configuration’s environment.suspendconf directive loads it from /etc/halon/smtpd-suspend.yaml. It is described by, and can be validated with, the smtpd-suspend.schema.json JSON schema (included in our Visual Studio Code integration).

In addition to the configuration file on disk, policy conditions can be added on the fly over the Protocol Buffer API’s PolicyConditionAddRequest function, command line interface, web administration, as well as from the pre- and post-delivery script.

Lists may be used to reference and suspend multiple field values at the same time.

The example below suspends all emails on the “customer1” transport to the destination “gmail.com”:

suspends:
  - transportid: customer1
    recipientdomain: gmail.com

4.5.1. Excluding IPs from pool

A queued email can have multiple designated source IPs, which can be used to load balance between a pool of source IPs in a round-robin fashion. The list of source IPs (and matching HELO hostnames) can be configured per transport, or overridden by the “sourceip” argument to the pre-delivery script’s Try() function. Queuing an email with multiple source IPs has the benefit of being able to dynamically suspend specific source IPs, while still allowing a queued email to be sent using the other IPs. Consider the following example:

suspends:
  - localip: 192.0.2.1
    recipientdomain: gmail.com

Since localip is a list, emails queued with both “192.0.2.1” and another IP will still be sent to “gmail.com” from that other source IP.

4.5.2. Delivery settings

The active queue delivery configuration can be used to set parameters such as timeouts and connection pooling based on fields that are only available in the active queue state, such as remotemx and remoteip. Because they might be generated by an external system, or managed separately from the rest of the configuration, they are loaded from a separate file.

The default start configuration’s environment.deliveryconf directive loads it from /etc/halon/smtpd-delivery.yaml. It is described by, and can be validated with, the smtpd-delivery.schema.json JSON schema (included in our Visual Studio Code integration).

Lists may be used to reference and match multiple values at the same time.

The following properties can be changed in the delivery settings file.

  • TLS mode

  • Pooling options

  • Timeouts

  • Retry (intervals and count, but not during)

The example below enable connection pooling, with priority for MXs on “example.com”:

delivery:
  conditions:
    - if:
        remotemx: "*.example.com"
      then:
        pooling:
          enabled: true
          timeout: 300
          evictable: false
          transactions: 1000
        tls:
          mode: "optional"
    - if:
        remoteip: 192.0.2.1
      then:
        timeout:
          connect: 120
          smtp:
            MAIL: 900
            EOD: 900
  default:
    pooling:
      enabled: true

The Protocol Buffer API contains pooling statistics:

$ halonctl process-stats | grep queue.connections.pooling
queue.connections.pooling.size: 2
queue.connections.pooling.hits: 3853
queue.connections.pooling.misses: 5900
queue.connections.pooling.expires: 5694
queue.connections.pooling.evicts: 0
queue.connections.pooling.skips: 0

4.6. Spool startup

Upon startup the spool directory (spool.path directive) is scanned for queued messages (.hqf files). The queue files are check for read and write permission by the running process (so that they may be updated and unlinked). The files are then opened and parsed by multiple concurrent threads (spool.threads.loader directive) before either adding them to the defer queue, hold queue or active queue.

  • If there is an error during the load the file is skipped and an error is logged.

  • If the file is empty or contains no queue information (hence from an incomplete transaction), you may choose to unlink, rename, ignore the file (spool.corrupt directive)

4.7. On-disk format

Queued messages and their metadata is written to disk, so that they can be re-loaded into the MTA in case the smtpd program is restarted. The on-disk metadata can be inspected with the hqfcat Python script:

$ hqfcat /var/spool/halon/queue/66/662ffb4a-02d1-11ea-bcf7-06f091ceb4dc.hqf
[
  {
    "status": "QUEUE",
    "retry": 23,
    "retryts": 1573508496,
    "transportid": "mx",
    "ts": 1573290908,
    ...

which produces an output similar to the regular command line interface:

$ halonctl queue list --id b186d406-0c36-11ea-8dda-005056914940:1 --json
[
  {
    "localips": [],
    "tags": [],
    "metadata": {},
    "id": {
      "transaction": "b186d406-0c36-11ea-8dda-005056914940",
      "queue": 1
    },
    ...

You can view the message including modifications made by the end-of-DATA script using the hqfeml script. The -R flag is the recipient queue id. Without the -R flag the original message is displayed. By comparing the hqfeml output with the original message, you can see the modifications:

$ hqfeml -R 1 /var/spool/halon/queue/b1/b186d406-0c36-11ea-8dda-005056914940.hqf

4.8. Moving messages between instances

Messages can be moved from the current MTA to another, using the hqfmove command-line utility. Before moving messages, either:

  • shut down the smtpd process, or

  • unload specific messages using halonctl queue unload and move messages from the spool.path folder to a separate folder, so new messages arriving are not mixed with the messages to transfer.

Run the command on the source MTA, specifying the folder of messages you wish to move with the --directory argument. This will move all the messages in the specified folder and its sub-folders.

One or more destination MTAs can be specified with --server arguments. Each specifies the hostname/IP address and the destination’s environment.controlsocket.port, or else the environment.controlsocket.path.

  • If many --server destination hosts are given, messages will be pushed to destinations in round-robin fashion.

  • If a hostname A record resolves to multiple hosts, messages will also be pushed to destinations in round-robin fashion.

All command-line arguments:

--server address:port/unix-socket [--server address:port/unix-socket]
  --directory /path [--non-interactive] [--verbose] [--progress] [--rate x/y]