4. Queuing subsystem

The default behaviour of an MTA is to spool messages, which are being received, to disk, then pick them up and deliver them by using the queuing subsystem. In contrast to “in-line” send() to the next hop during reception, this allows the MTA to accept messages even if the next hop is unavailable, or if a message has multiple recipients on different next hop destinations.

4.1. Activity diagram

This figure illustrates the typical states and movements inside the queue. It will be described in the following sections.

_images/queue-act-diag.svg

4.2. Queue states

The queuing system can be seen as having three overall message states:

Hold

Messages that are on permanent hold. This is the most simple, and typically least used, state. It is not a queue; rather a collection of messages. It can be used to, for example, implement a spam/virus quarantine. Messages can be put in hold list directly during the reception of a message, using the end-of-DATA script queue() function’s “hold” option.

Defer

Messages that are not on hold, and are scheduled for delivery in the future. The default behaviour, for every message that fails delivery (with a non-permanent error), is to be put in the defer queue; scheduled to be re-tried sometime in the future. It is a queue in the sense that messages are ordered by their scheduled delivery time.

Active

Messages that are not on hold, and are scheduled for delivery now (or in the past). Messages that are put in queue() for delivery end up directly in the active queue (unless the “delay” option is used). Messages are also moved automatically from the defer queue into the active queue on their scheduled time of delivery. The active queue is the by far most advanced part of the queuing subsystem, and contains many sub-states. This stems from the fact that concurrency and rate can be limited based on many dynamic parameters, essentially creating a virtually unlimited number of sub-queues. This will be described later in detail.

The Protocol Buffer API contains queue size information:

$ halonctl process-stats | grep queue.queue
queue.queue.defer.size: 67
queue.queue.active.size: 6

4.2.1. Moving messages

As described in the previous section, messages are automatically moved between the active and defer queue in accordance with the normal behaviour of an MTA (try-defer-retry-defer, etc). Messages can also be forcibly moved between queues. Messages can be updated using the Protocol Buffer API’s QueueUpdate function, command line interface, web administration, as well as from the pre- and post-delivery script. This changes both the information on disk and in memory (the queue process’s data structures). When a message is moved, it is re-injected via the “update” actor described in the Activity diagram.

To put queued messages for example.com on hold:

$ halonctl queue update --recipientdomain example.com --state DEFER --state ACTIVE --hold
10 messags affected

Messages that are in a “working” (for example, being delivered) will be unaffected. To immediately retry a certain deferred message by ID:

$ halonctl queue update --id 44815e44-05d3-11ea-b388-06f091ceb4dc:1 --state DEFER --active
1 messags affected

4.2.2. Messages on hold

Messages, or copies of messages, can be put in a frozen HOLD state. This usually done from the end-of-data script to implement for example short-term quarantines. In order to enforce a retention policy for those messages, you can run halonctl from cron:

$ crontab -l
*/5 * * * * halonctl queue update --delete --state HOLD --age '>86400' --metadata qid=1day
*/5 * * * * halonctl queue update --delete --state HOLD --age '>604800' --metadata qid=7days

4.3. Active queue

When a message enters the active queue, it goes through five stages:

Pre-delivery script

The pre-delivery script is executed (if it exists). The script allows the administrator to implement per-attempt logic, such as dynamic routing.

DNS

All necessary domain name information is resolved. If that succeeds, the message is put in the active queue. If not, it skips directly to the post-delivery script.

Active queue

The message remains in the active queue until allowed by the active queue’s pickup policies (that control concurrency, rate, etc). The pickup policies define the virtually unlimited number of sub-queues, and will be described later in detail.

Delivery

A delivery attempt (SMTP or LMTP) is made.

Post-delivery script

Regardless of the outcome of the delivery attempt, the post-delivery script is executed (if it exists). It can be used by the administrator to override the default logic or to implement logging.

4.3.1. Active queue pickup

The active queue pickup subsystem determines when a message should be picked up, usually based on concurrency or rate limitations on properties such as local IP or destination, or queue pickup suspensions.

By splitting the system’s available concurrency, virtual sub-queues can be created. This is useful for separating email of different classes, so that one class of email that is stuck or moving slowly does not block others. Those sub-queues can be created based on any properties; pre-defined ones like recipient domain, or custom fields that can be populated from script. For example, consider the case where the system’s total concurrency is set to 20 000, and the system has two local IP addresses that can be used as source IPs when sending email. With a pickup policy limiting the concurrency to 10 000 connections per local IP, we can be certain that even if traffic from one of the IPs jams up, traffic from the other IP will not be disturbed. This concept can be extended to, for example, customers in a multi-tenant system (making sure that abuse from one customer does not jam the queue for others) or recipient domains/MXs (so that one slow destination does not jam the queue for other destinations). Finally, combinations of those can be created.

As explained above the queue is not a FIFO (first-in first-out) type of queue, as messages are divided into virtual queue and delivered in the fastest possible way without exceeding the queue policies (of concurrency and rates). However there is a concept of priority messages (0 - normal, 1 - high), these will be considered candidates for pickup/delivery before any other messages. They also have precedence when handled by the DNS resolver and pre- and post-delivery threads. However given the explained concept there are no guarantees that they will be delivered before a low priority message in case of suspending rules (exceeding concurrency or rates). Also within each jobid (for each recipientdomain, for each…) the queue employs FIFO. That guarantees that no message should be stuck in the queue because of unfair treatment.

Below is an illustration of how the queue looks like.

_images/queue-struct.png

The Protocol Buffer API contains pickup statistics:

$ halonctl process-stats | grep queue.pickup
queue.pickup.skips: 86826
queue.pickup.misses: 9034

4.3.2. Queue pickup policies

Concurrency and rate limits are counted against what we call counters, which can have one or multiple fields that define a unique entry. In order to offer a very high degree of flexibility, the counters are defined by the administrator. The available fields are transportid, localip, remoteip, remotemx, recipientdomain, jobid and grouping. The localip field is a list, so that an email can be queued with multiple alternatives for source IP. The same source IP may be specified multiple times to distribute the traffic according to its ratio. All other fields have exactly one value. The jobid field is a free-text field that can be used for any purpose (eg. assign to a campain or a specific customer). The grouping field is used for “rolling up” queued emails, it’s set after the DNS resolving, it may either default to empty, remotemx or recipientdomain, any grouping configured using the queues.grouping setting will apply and rename this field (the grouping id will be prefixed with & throughout the queue).

In addition to using the global grouping field, policy counter groups based on wild-card, subnets or regular expression matching is available for “rolling up” queued emails per policy with different field values into the same entry. It’s however recommended to use the grouping field if you don’t need per policy groupings. Thresholds for concurrency and rate are added based on conditions, with a fall-back called default. When the threshold is exceeded for an entry, it gets added to the active queue’s pickup suspension list. This prevents emails matching those properties from being picked up and delivered. Once the threshold is no longer exceeded, the entry is removed from the suspension list.

The default start configuration’s environment.policyconf directive loads it from /etc/halon/smtpd-policy.yaml. It is described by, and can be validated with, the smtpd-policy.schema.json JSON schema (included in our Visual Studio Code integration).

If reloading the environment.policyconf configuration during runtime, active suspensions based on (now) removed or previous exceeded rate or concurrency limits are not automatically removed (as a side effect of the reload) instead the suspension is removed when the next message for that rate or concurrency is allow to be sent. If adding a new counter to the configuration during a reload, the counter will only count new messages (and not those being sent).

In addition to the configuration file on disk, policy conditions can be added on the fly over the Protocol Buffer API’s PolicyConditionAddRequest function, command line interface, web administration, as well as from the pre- and post-delivery script.

The Protocol Buffer API contains queue policy statistics:

$ halonctl process-stats | grep queue.policy
queue.policy.concurrency.counters: 0
queue.policy.concurrency.suspends: 0
queue.policy.rate.buckets: 15
queue.policy.rate.suspends: 5
queue.policy.dynamic.suspends: 0
queue.policy.dynamic.conditions: 0

4.3.2.1. Policy counter thresholds

Thresholds for concurrency limit the number of emails in the delivery state. Thresholds for rate limit the number of emails, X, passing through the delivery state over a given time interval, Y, specified in seconds, as X/Y.

The very simplistic example from above (with two local IPs) can be described using the following YAML pickup policy configuration:

policies:
  - fields:
      - localip
    default:
      concurrency: 10000

Each time an email is picked up from the active queue, the “localip” concurrency counter entry with that email’s source IP is incremented. When the delivery attempt is done, the same counter entry is decremented. If 10 000 email for the same source IP is being delivered at the same time, the default threshold will be exceeded, and the suspension list will be populated with an entry saying that any email with that source IP should not be picked up.

Note that all configured queue pickup policies (counters) are taken into consideration when an email is picked up from the active queue, and the lowest allowed rate and concurrency applies. And that it’s possible to leave out the default value for a counter (so that only a specific condition is applied).

4.3.2.2. Policy conditions

Different thresholds can be set by using conditions with the desired field values. Conditions are evaluated first-to-last, with the first matching threshold winning. Consequently, if a threshold in a more general conditions is placed above a more specific one, the latter might never match (because the former always wins).

The example below limits the concurrency based on a combination of source IP and destination domain, with an override for the domain “halon.io”:

Per every field in a if condition (eg. recipientdomain) multiple values (eg. domains) may be given as an array, and matched as or-conditions.

Lists may be used to reference and match multiple values at the same time.

policies:
  - fields:
      - localip
      - recipientdomain
    conditions:
      - if:
          recipientdomain:
            - halon.io
            - halon.se
        then:
          concurrency: 2
    default:
      concurrency: 5

The above policy will be exceeded if two emails are being delivered to the recipient domain “halon.io” from the same source IP.

4.3.2.3. Policy properties

When a policy condition (if) is matched, the policies given in the then are applied. There are two counter thresholds that can be applied; the concurrency and the rate policies, that will affect the message delivery. If you don’t want apply specify specific concurrency or rate, nor the default matching, you can configure them as null. There is also a tag property which can be used to identity which condition were matched. In addition custom properties may by specificed in a properties object as key and value, these custom properties are available eg. in the post-delivery scripting hook.

policies:
  - fields:
      - localip
      - recipientdomain
    conditions:
      - if:
          recipientdomain: halon.io
        then:
          concurrency: 2
          rate: 1/60
          tag: this_rule
          properties:
            foo: bar
            baz: 5
            boz: true
    default:
      concurrency: 5

4.3.2.4. Policy counter groups

Note

Please note that it’s recommended to use the grouping field and the queues.grouping setting if you don’t need per policy groupings.

Counters can be aggregated based on wild-card, subnets or regular expression matching, so that different field values count against the same entry. Groups are given IDs, and conditions are matched against the grouped entry by prefixing with “#”. The example below has two counters, with multiple fields per counter. One limits both rate and concurrency based on destination MX (with rollup for Google G-suite) in combination with source IP. The other also limits the concurrency per source IP, but destination IP instead of MX, and only enforces a threshold for emails to recipient domains with a Microsoft Outlook MX.

policies:
  - fields:
      - localip
      - remotemx:
          gsuite:
            - '*.google.com'
            - '*.googlemail.com'
            - '*.smtp.goog'
    conditions:
      - if:
          remotemx: '#gsuite'
        then:
          concurrency: 10
          rate: 50
    default:
      concurrency: 5
      rate: 10
  - fields:
      - localip
      - remotemx:
          o365:
            - '*.protection.outlook.com'
      - remoteip
    conditions:
      - if:
          remotemx: '#o365'
        then:
          concurrency: 10
          rate: 30

4.3.3. Queue pickup suspension

As described in the previous section, the pickup policy subsystem implements a suspension list to enforce concurrency and rate limits. This suspension list can be used directly to temporarily pause and resume traffic in the active queue. For a more permanent suspension, like a quarantine for spam, put messages on hold instead.

The default start configuration’s environment.suspendconf directive loads it from /etc/halon/smtpd-suspend.yaml. It is described by, and can be validated with, the smtpd-suspend.schema.json JSON schema (included in our Visual Studio Code integration).

In addition to the configuration file on disk, policy conditions can be added on the fly over the Protocol Buffer API’s PolicyConditionAddRequest function, command line interface, web administration, as well as from the pre- and post-delivery script.

Lists may be used to reference and suspend multiple field values at the same time.

The example below suspends all emails on the “customer1” transport to the destination “gmail.com”:

suspends:
  - transportid: customer1
    recipientdomain: gmail.com

4.3.3.1. Excluding IPs from pool

A queued email can have multiple designated source IPs, which can be used to load balance between a pool of source IPs in a round-robin fashion. The list of source IPs (and matching HELO hostnames) can be configured per transport, or overridden by the “sourceip” argument to the pre-delivery script’s Try() function. Queuing an email with multiple source IPs has the benefit of being able to dynamically suspend specific source IPs, while still allowing a queued email to be sent using the other IPs. Consider the following example:

suspends:
  - localip: 192.0.2.1
    recipientdomain: gmail.com

Since localip is a list, emails queued with both “192.0.2.1” and another IP will still be sent to “gmail.com” from that other source IP.

4.3.4. Delivery settings

The acive queue delivery configuration can be used to set parameters such as timeouts and connection pooling based on fields that are only available in the active queue state, such as remotemx and remoteip. Because they might be generated by an external system, or managed separately from the rest of the configuration, they are loaded from a separate file.

The default start configuration’s environment.deliveryconf directive loads it from /etc/halon/smtpd-delivery.yaml. It is described by, and can be validated with, the smtpd-delivery.schema.json JSON schema (included in our Visual Studio Code integration).

Lists may be used to reference and match multiple values at the same time.

The following properties can be changed in the delivery settings file.

  • TLS mode

  • Pooling options

  • Timeouts

  • Retry (intervals and count/during)

The example below enable connection pooling, with priority for MXs on “example.com”:

delivery:
  conditions:
    - if:
        remotemx: "*.example.com"
      then:
        pooling:
          enabled: true
          timeout: 300
          evictable: false
          transactions: 1000
        tls:
          mode: "optional"
    - if:
        remoteip: 192.0.2.1
      then:
        timeout:
          connect: 120
          smtp:
            MAIL: 900
            EOD: 900
  default:
    pooling:
      enabled: true

The Protocol Buffer API contains pooling statistics:

$ halonctl process-stats | grep queue.connections.pooling
queue.connections.pooling.size: 2
queue.connections.pooling.hits: 3853
queue.connections.pooling.misses: 5900
queue.connections.pooling.expires: 5694
queue.connections.pooling.evicts: 0
queue.connections.pooling.skips: 0

4.4. Spool startup

Upon startup the spool directory (spool.path directive) is scanned for queued messages (.hqf files). The queue files are check for read and write permission by the running process (so that they may be updated and unlinked). The files are then opened and parsed by multiple concurrent threads (spool.threads.loader directive) before either adding them to the defer queue, hold queue or active queue.

  • If there is an error during the load the file is skipped and an error is logged.

  • If the file is empty or contains no queue information (hence from an incomplete transaction), you may choose to unlink, rename, ignore the file (spool.corrupt directive)

4.5. On-disk format

Queued messages and their metadata is written to disk, so that they can be re-loaded into the MTA in case the smtpd program is restarted. The on-disk metadata can be inspected with the hqfcat Python script:

$ sudo hqfcat /var/spool/halon/queue/66/662ffb4a-02d1-11ea-bcf7-06f091ceb4dc.hqf
[
  {
    "status": "QUEUE",
    "retry": 23,
    "retryts": 1573508496,
    "transportid": "mx",
    "ts": 1573290908,
    ...

which produces an output similar to the regular command line interface:

$ halonctl queue list --id b186d406-0c36-11ea-8dda-005056914940:1 --json
[
  {
    "localips": [],
    "tags": [],
    "metadata": {},
    "id": {
      "transaction": "b186d406-0c36-11ea-8dda-005056914940",
      "queue": 1
    },
    ...

You can view the message including modifications made by the end-of-DATA script using the hqfeml script. The -R flag is the recipient queue id. Without the -R flag the original message is displayed. By comparing the hqfeml output with the original message, you can see the modifications:

$ hqfeml -R 1 /var/spool/halon/queue/b1/b186d406-0c36-11ea-8dda-005056914940.hqf