4. Queuing subsystem
The default behavior of an MTA is to spool messages, which are being received, to disk, then pick them up and deliver them by using the queuing subsystem. In contrast to “in-line” send() to the next hop during reception, this allows the MTA to accept messages even if the next hop is unavailable, or if a message has multiple recipients on different next hop destinations.
4.1. Activity diagram
This figure illustrates the typical states and movements inside the queue. It will be described in the following sections.
4.2. Queue states
The queuing system can be seen as having three overall message states:
- Hold
Messages that are on permanent hold. This is the most simple, and typically least used, state. It is not a queue; rather a collection of messages. It can be used to, for example, implement a spam/virus quarantine. Messages can be put in hold list directly during the reception of a message, using the end-of-DATA script queue() function’s “hold” option.
- Defer
Messages that are not on hold, and are scheduled for delivery in the future. The default behavior, for every message that fails delivery (with a non-permanent error), is to be put in the defer queue; scheduled to be re-tried sometime in the future. It is a queue in the sense that messages are ordered by their scheduled delivery time.
- Active
Messages that are not on hold, and are scheduled for delivery now (or in the past). Messages that are put in queue() for delivery end up directly in the active queue (unless the “delay” option is used). Messages are also moved automatically from the defer queue into the active queue on their scheduled time of delivery. The active queue is the by far most advanced part of the queuing subsystem, and contains many sub-states. This stems from the fact that concurrency, rate and connectinterval can be limited based on many dynamic parameters, essentially creating a virtually unlimited number of sub-queues. This will be described later in detail.
The Protocol Buffer API contains queue size information:
$ halonctl process-stats | grep queue.queue
queue.queue.defer.size: 67
queue.queue.active.size: 6
4.2.1. Changing message states
As described in the previous section, messages are automatically moved between the
active and defer queue in accordance with the normal behavior of an MTA (try-defer-retry-defer, etc).
Messages can also be forcibly moved between queues.
Messages can be updated using
the Protocol Buffer API’s QueueUpdate
function, command line interface,
web administration,
as well as from the pre- and post-delivery script.
This changes both the information on disk and in memory (the queue process’s data structures).
When a message is moved, it is re-injected via the “update” actor described
in the Activity diagram.
To put queued messages for example.com on hold:
$ halonctl queue update --recipientdomain example.com --state DEFER --state ACTIVE --hold
10 messages affected
Messages that are in a “working” (for example, being delivered) will be unaffected. To immediately retry a certain deferred message by ID:
$ halonctl queue update --id 44815e44-05d3-11ea-b388-06f091ceb4dc:1 --state DEFER --active
1 messages affected
4.2.2. Messages on hold
Messages, or copies of messages, can be put in a frozen HOLD state. This usually done from the end-of-data script to implement for example short-term quarantines. In order to enforce a retention policy for those messages, you can run halonctl from cron:
$ crontab -l
*/5 * * * * halonctl queue update --delete --state HOLD --age '>86400' --metadata qid=1day
*/5 * * * * halonctl queue update --delete --state HOLD --age '>604800' --metadata qid=7days
4.3. Active queue
When a message enters the active queue, it goes through five stages:
- Pre-delivery script
The pre-delivery script is executed (if it exists). The script allows the administrator to implement per-attempt logic, such as dynamic routing.
- DNS
All necessary domain name information is resolved. If that succeeds, the message is put in the active queue. If not, it skips directly to the post-delivery script.
- Active queue
The message remains in the active queue until allowed by the active queue’s pickup policies (that control concurrency, rate, connectinterval, etc). The pickup policies define the virtually unlimited number of sub-queues, and will be described later in detail.
- Delivery
A delivery attempt (SMTP or LMTP) is made.
- Post-delivery script
Regardless of the outcome of the delivery attempt, the post-delivery script is executed (if it exists). It can be used by the administrator to override the default logic or to implement logging.
4.3.1. Active queue pickup
The active queue pickup subsystem determines when a message should be picked up, usually based on concurrency, rate or connectinterval limitations on properties such as local IP or destination, or queue pickup suspensions.
By splitting the system’s available concurrency, virtual sub-queues can be created. This is useful for separating email of different classes, so that one class of email that is stuck or moving slowly does not block others. Those sub-queues can be created based on any properties; pre-defined ones like recipient domain, or custom fields that can be populated from script. For example, consider the case where the system’s total concurrency is set to 20 000, and the system has two local IP addresses that can be used as source IPs when sending email. With a pickup policy limiting the concurrency to 10 000 connections per local IP, we can be certain that even if traffic from one of the IPs jams up, traffic from the other IP will not be disturbed. This concept can be extended to, for example, customers in a multi-tenant system (making sure that abuse from one customer does not jam the queue for others) or recipient domains/MXs (so that one slow destination does not jam the queue for other destinations). Finally, combinations of those can be created.
As explained above the queue is not a FIFO (first-in first-out) type of queue, as messages are divided into virtual queue and delivered in the fastest possible way without exceeding the queue policies (of concurrency and rates). However there is a concept of priority messages (0 - normal, 1 - high), these will be considered candidates for pickup/delivery before any other messages. They also have precedence when handled by the DNS resolver and pre- and post-delivery threads. However given the explained concept there are no guarantees that they will be delivered before a low priority message in case of suspending rules (exceeding concurrency or rates). Also within each queue it employs FIFO. That guarantees that no message should be stuck in the queue because of unfair treatment.
Below is an illustration of how the queue looks like.
The Protocol Buffer API contains pickup statistics:
$ halonctl process-stats | grep queue.pickup
queue.pickup.skips: 86826
queue.pickup.misses: 9034
4.3.2. Queue pickup policies
Concurrency, rate and connectinterval limits are counted against what we call counters, which can have one or multiple fields that define a unique entry. In order to offer a very high degree of flexibility, the counters are defined by the administrator.
The available fields are shown below.
4.3.2.1. Available fields
tenantid
A single value free-text field that can be used for any purpose. Email service providers (ESPs) typically have many customers (tenants) using the same system. This field enables you to create policy on a per-tenant basis when a message is queued, to handle responses relating to the tenant’s reputation. One way is to set the
tenantid
to the header From domain when queuing the message in HSL.jobid
A single value free-text field that can be used for any purpose, for example to identify the current campaign for bulk mailing, or message stream for transactional mailing. One way is to use the value of an incoming email header such as X-Job. This enables you to create policy to manage traffic flow on a per-job basis.
transportid
A single value referring to a
transportgroups[].transports[]
, similar to “binding” in other systems. Atransportid
can map to one or morelocalip
(s), or to other settings, such as specific custom destinations.localip
This is a list, so that an email can be queued with multiple alternatives for source IP. The same source IP may be specified multiple times to distribute the traffic according to its ratio.
grouping
A single value used for “rolling up” queued emails. It’s set after DNS resolving, it may either default to empty, remotemx or recipientdomain. Any grouping configured using the
queues.grouping
setting will apply and fill out this field (the grouping id will be prefixed with&
throughout the queue).remoteip
This single value is known after DNS resolving. In some cases it’s preferable to build policies on the
grouping
field instead together with thequeues.grouping.groupings[].remoteip[]
setting.remotemx
This single value is known after DNS resolving. In some cases it’s preferable to build policies on the
grouping
field instead together with thequeues.grouping.groupings[].remotemx[]
setting.recipientdomain
This single value is a basic property of the message
RCPT TO
address. In some cases it’s preferable to build policies on thegrouping
field instead together with thequeues.grouping.groupings[].recipientdomain[]
setting.
4.3.2.2. Loading
The default
start configuration’s
environment.policyconf
directive
loads it from /etc/halon/smtpd-policy.yaml.
It is described by, and can be validated with, the
smtpd-policy.schema.json
JSON schema (included in our Visual Studio Code integration).
If reloading the environment.policyconf
configuration during runtime, active suspensions based on (now) removed or previous exceeded
rate, concurrency or connectinterval limits are not automatically removed (as a side effect of the reload)
instead the suspension is removed when the next message for that rate, concurrency or connectinterval is allow to be sent.
If adding a new counter to the configuration during a reload, the counter will only count new messages (and not those being sent).
In addition to the configuration file on disk, policy conditions can be added on the fly over the
Protocol Buffer API’s PolicyConditionAddRequest
function, command line interface,
web administration,
as well as from the pre- and post-delivery script.
The Protocol Buffer API contains queue policy statistics:
$ halonctl process-stats | grep queue.policy
queue.policy.concurrency.counters: 0
queue.policy.concurrency.suspends: 0
queue.policy.rate.buckets: 15
queue.policy.rate.suspends: 5
queue.policy.dynamic.suspends: 0
queue.policy.dynamic.conditions: 0
queue.policy.connectinterval.buckets: 0
queue.policy.connectinterval.suspends: 0
4.3.2.3. Policy counter thresholds
Thresholds for concurrency limit the number of emails in the delivery state. Thresholds for rate limit the number of emails, X, passing through the delivery state over a given time interval, Y, specified in seconds, as X/Y. Thresholds for connectinterval limit the number of connections to be open for a specific destination.
The very simplistic example from above (with two local IPs) can be described using the following YAML pickup policy configuration:
policies:
- fields:
- localip
default:
concurrency: 10000
Each time an email is picked up from the active queue, the “localip” concurrency counter entry with that email’s source IP is incremented. When the delivery attempt is done, the same counter entry is decremented. If 10 000 email for the same source IP is being delivered at the same time, the default threshold will be exceeded, and the suspension list will be populated with an entry saying that any email with that source IP should not be picked up.
Note that all configured queue pickup policies (counters) are taken into consideration when an email is picked up from the active queue, and the lowest allowed rate, concurrency and connectinterval applies. And that it’s possible to leave out the default value for a counter (so that only a specific condition is applied).
4.3.2.4. Policy conditions
Different thresholds can be set by using conditions with the desired field values. Conditions are evaluated first-to-last, with the first matching threshold winning. Consequently, if a threshold in a more general conditions is placed above a more specific one, the latter might never match (because the former always wins).
The example below limits the concurrency based on a combination of source IP and destination domain, with an override for the domain “halon.io”:
Per every field in a if
condition (eg. recipientdomain
) multiple values (eg. domains) may be given as an array,
and matched as or-conditions.
Lists may be used to reference and match multiple values at the same time.
policies:
- fields:
- localip
- recipientdomain
conditions:
- if:
recipientdomain:
- halon.io
- halon.se
then:
concurrency: 2
default:
concurrency: 5
The above policy will be exceeded if two emails are being delivered to the recipient domain “halon.io” from the same source IP.
4.3.2.5. Policy properties
When a policy condition (if
) is matched, the policies given in the then
are applied.
There are three counter thresholds that can be applied; the concurrency
, the rate
and the connectinterval
policies, that will affect the message delivery.
If you don’t want apply specify specific concurrency
, rate
or connectinterval
, nor the default
matching, you can configure them as null.
There is also a tag
property which can be used to identity which condition were matched.
In addition custom properties may by specified in a properties
object as key and value, these custom properties
are available eg. in the post-delivery scripting hook.
policies:
- fields:
- localip
- recipientdomain
conditions:
- if:
recipientdomain: halon.io
then:
concurrency: 2
rate: 1/60
tag: this_rule
properties:
foo: bar
baz: 5
boz: true
default:
concurrency: 5
4.3.2.6. Policy counter groups
Note
Please note that it’s recommended to use the grouping
field and the queues.grouping
setting if you don’t need per policy groupings.
Counters can be aggregated based on wild-card, subnets or regular expression matching, so that different field values count against the same entry. Groups are given IDs, and conditions are matched against the grouped entry by prefixing with “#”. The example below has two counters, with multiple fields per counter. One limits both rate and concurrency based on destination MX (with rollup for Google G-suite) in combination with source IP. The other also limits the concurrency per source IP, but destination IP instead of MX, and only enforces a threshold for emails to recipient domains with a Microsoft Outlook MX.
policies:
- fields:
- localip
- remotemx:
gsuite:
- '*.google.com'
- '*.googlemail.com'
- '*.smtp.goog'
conditions:
- if:
remotemx: '#gsuite'
then:
concurrency: 10
rate: 50
default:
concurrency: 5
rate: 10
- fields:
- localip
- remotemx:
o365:
- '*.protection.outlook.com'
- remoteip
conditions:
- if:
remotemx: '#o365'
then:
concurrency: 10
rate: 30
4.4. Cluster-aware policy
4.4.1. Example architecture
Generated messages are network load-balanced across all Halon instances, which are configured identically.
The delivery (localip) addresses are located on the proxies; any Halon instance can choose any localip.
The HAProxy layer can use keepalived
or similar, to provide resilience in the event of proxy host outage.
A basic approach is to statically divide the overall concurrency and rates across the Halon instances. This is acceptable for mailbox providers that allow concurrency greater than your number of instances, but some mailbox providers expect lower concurrency. A different approach to traffic shaping is needed for equitable sharing of connections.
4.4.2. Delivery Orchestrator: clusterd
clusterd
is a service that allocates concurrency, connectinterval and rates to instances. It support TLS authentication/verification of both the server and client, over TCP.
clusterd
registers connected Halon instances, counts them and announces the total number back to each instance.
It allocates outbound connections to instances in the cluster, ensuring fair rotation of concurrent connections for each (localip + grouping) combination, even for low concurrency destinations.
It ensures each instance with messages to deliver will receive a turn.
It works in dynamic scaling situations with Kubernetes, as well as static scaling with virtual machines.
If a Halon instance loses connection to clusterd
, it will revert to division of slots (min 1 connection per node) based on the last number of known hosts.
It will automatically try to reconnect and re-establish dynamic sharing.
Halon smtpd uses the settings in Clustering directives to communicate with clusterd
.
4.4.3. Writing policy for clusters
A system with clusterd
active has policies clustered by default.
Concurrent connections are granted to instances in the cluster.
The overall cluster follows the concurrency
properties for (localip
, grouping
) combinations in the smtpd-policy.yaml policies.fields
.
Because connections are opened, closed and pooled between specific localip
and grouping
, these fields must be used when writing cluster-aware concurrency policy.
smtpd-app.yaml
groupings:
- id: orange
remotemx:
- "smtp-in.orange.fr"
smtpd-policy.yaml
rate:
algorithm: tokenbucket
policies:
- fields:
- jobid
- fields:
- localip
- fields:
- tenantid
- fields:
- tenantid
- grouping
- fields: # cluster-wide policies here
- localip
- grouping
default:
concurrency: 20
conditions:
if:
grouping: &orange
then:
concurrency: 2 # Cluster-wide rate for all localip, see https://postmaster.orange.fr/
Rates for the cluster can be set on any field condition, because rates are always divisible.
How they are divided depends on your chosen rate.algorithm
setting.
Algorithm
tokenbucket
(default) gives smooth traffic flow over the interval by scaling the denominator, i.e.tokens / (interval * number_of_hosts)
Algorithm
fixedwindow
gives traffic bursts at the start of each interval, by allocating the tokens between hosts, with each getting at least 1, i.e.max(1, tokens / number_of_hosts) / interval
For fair rotation of connections between your instances, set delivery.pooling.timeout
to small values such as 5 to 10 seconds, and delivery.pooling.transactions
to 100 or less.
If you wish to disable rotation of connections on an entire smtpd instance, set cluster.policy.sharedconcurrency
to false
.
This will give basic “divide by n” concurrency, with at least 1 connection per instance.
4.4.3.1. Advanced usage: selective non-clustered properties
Rules can selectively disengage clustering, for example to give more throughput to time-critical message streams.
smtpd-policy.yaml
conditions:
if:
localip: 1.2.3.4
grouping: &yahoo
then:
concurrency: 5
cluster: false # each instance will get 5 connections
default:
rate: 5/1 # default rate applies cluster-wide
The cluster: false
property applies to all properties in then
scope.
To create policy that has some properties clustered, and others non-clustered, create two if
clauses:
conditions:
if:
localip: 1.2.3.4
grouping: &yahoo
then:
concurrency: 5
cluster: false # affects properties in “then” scope
if:
localip: 1.2.3.4
grouping: &yahoo
then:
rate: 10/1 # put (clustered) rate property in a separate if clause
default:
rate: 5/1 # default rate applies cluster-wide
4.4.3.2. Observing clustered policy in operation
The web user interface Delivery Insights view shows cluster behavior. For each policy setting, it shows:
the cluster’s concurrency and rate
the effective concurrency and rate in use on this instance.
The halontop command has a panel showing the cluster status and number of hosts found.
4.5. Queue pickup suspension
As described in the previous section, the pickup policy subsystem implements a suspension list to enforce concurrency, rate and connectinterval limits. This suspension list can be used directly to temporarily pause and resume traffic in the active queue. For a more permanent suspension, like a quarantine for spam, put messages on hold instead.
The default
start configuration’s
environment.suspendconf
directive
loads it from /etc/halon/smtpd-suspend.yaml.
It is described by, and can be validated with, the
smtpd-suspend.schema.json
JSON schema (included in our Visual Studio Code integration).
In addition to the configuration file on disk, policy conditions can be added on the fly over the
Protocol Buffer API’s PolicyConditionAddRequest
function, command line interface,
web administration,
as well as from the pre- and post-delivery script.
Lists may be used to reference and suspend multiple field values at the same time.
The example below suspends all emails on the “customer1” transport to the destination “gmail.com”:
suspends:
- transportid: customer1
recipientdomain: gmail.com
4.5.1. Excluding IPs from pool
A queued email can have multiple designated source IPs, which can be used to load balance between a pool
of source IPs in a round-robin fashion. The list of source IPs (and matching HELO hostnames) can be configured per transport,
or overridden by the “sourceip” argument to the pre-delivery script’s Try()
function.
Queuing an email with multiple source IPs has the benefit of being able to dynamically suspend specific source IPs,
while still allowing a queued email to be sent using the other IPs. Consider the following example:
suspends:
- localip: 192.0.2.1
recipientdomain: gmail.com
Since localip
is a list, emails queued with both “192.0.2.1” and another IP will still be sent to “gmail.com”
from that other source IP.
4.5.2. Delivery settings
The active queue delivery configuration can be used to set parameters such as timeouts and connection pooling based on fields that are only available in the active queue state, such as remotemx and remoteip. Because they might be generated by an external system, or managed separately from the rest of the configuration, they are loaded from a separate file.
The default
start configuration’s
environment.deliveryconf
directive
loads it from /etc/halon/smtpd-delivery.yaml.
It is described by, and can be validated with, the
smtpd-delivery.schema.json
JSON schema (included in our Visual Studio Code integration).
Lists may be used to reference and match multiple values at the same time.
The following properties can be changed in the delivery settings file.
TLS mode
Pooling options
Timeouts
Retry (intervals and count, but not during)
The example below enable connection pooling, with priority for MXs on “example.com”:
delivery:
conditions:
- if:
remotemx: "*.example.com"
then:
pooling:
enabled: true
timeout: 300
evictable: false
transactions: 1000
tls:
mode: "optional"
- if:
remoteip: 192.0.2.1
then:
timeout:
connect: 120
smtp:
MAIL: 900
EOD: 900
default:
pooling:
enabled: true
The Protocol Buffer API contains pooling statistics:
$ halonctl process-stats | grep queue.connections.pooling
queue.connections.pooling.size: 2
queue.connections.pooling.hits: 3853
queue.connections.pooling.misses: 5900
queue.connections.pooling.expires: 5694
queue.connections.pooling.evicts: 0
queue.connections.pooling.skips: 0
4.6. Spool startup
Upon startup the spool directory (spool.path
directive) is scanned for queued messages (.hqf files). The queue files are check for read and write permission by the running process (so that they may be updated and unlinked).
The files are then opened and parsed by multiple concurrent threads (spool.threads.loader
directive) before either adding them to the defer queue, hold queue or active queue.
If there is an error during the load the file is skipped and an error is logged.
If the file is empty or contains no queue information (hence from an incomplete transaction), you may choose to unlink, rename, ignore the file (
spool.corrupt
directive)
4.7. On-disk format
Queued messages and their metadata is written to disk, so that they can be re-loaded into the MTA in case the smtpd program is restarted. The on-disk metadata can be inspected with the hqfcat Python script:
$ hqfcat /var/spool/halon/queue/66/662ffb4a-02d1-11ea-bcf7-06f091ceb4dc.hqf
[
{
"status": "QUEUE",
"retry": 23,
"retryts": 1573508496,
"transportid": "mx",
"ts": 1573290908,
...
which produces an output similar to the regular command line interface:
$ halonctl queue list --id b186d406-0c36-11ea-8dda-005056914940:1 --json
[
{
"localips": [],
"tags": [],
"metadata": {},
"id": {
"transaction": "b186d406-0c36-11ea-8dda-005056914940",
"queue": 1
},
...
You can view the message including modifications made by the end-of-DATA script using the hqfeml script.
The -R
flag is the recipient queue id. Without the -R
flag the original message is displayed.
By comparing the hqfeml output with the original message, you can see the modifications:
$ hqfeml -R 1 /var/spool/halon/queue/b1/b186d406-0c36-11ea-8dda-005056914940.hqf
4.8. Moving messages between instances
Messages can be moved from the current MTA to another, using the hqfmove
command-line utility.
Before moving messages, either:
shut down the
smtpd
process, orunload specific messages using
halonctl queue unload
and move messages from thespool.path
folder to a separate folder, so new messages arriving are not mixed with the messages to transfer.
Run the command on the source MTA, specifying the folder of messages you wish to move with the --directory
argument. This will move all the messages in the specified folder and its sub-folders.
One or more destination MTAs can be specified with --server
arguments.
Each specifies the hostname/IP address and the destination’s environment.controlsocket.port
, or else the environment.controlsocket.path
.
If many
--server
destination hosts are given, messages will be pushed to destinations in round-robin fashion.If a hostname A record resolves to multiple hosts, messages will also be pushed to destinations in round-robin fashion.
All command-line arguments:
--server address:port/unix-socket [--server address:port/unix-socket]
--directory /path [--non-interactive] [--verbose] [--progress] [--rate x/y]