This is a fork of the official kopano-spamd. I have added spam unlearning and some more config options.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

97 lines
3.6 KiB

#!/usr/bin/env python
# coding=utf-8
ICS driven spam learning daemon for Kopano / SpamAssasin
See included for more information.
import shlex
import subprocess
import time
import kopano
from MAPI.Tags import *
from kopano import Config, log_exc
'run_as_user': Config.string(default="kopano"),
'run_as_group': Config.string(default="kopano"),
'spam_header': Config.string(default="x-spam-status"),
'learncmd': Config.string(default="/usr/bin/sudo -u amavis /usr/bin/sa-learn --spam"),
'unlearncmd': Config.string(default="/usr/bin/sudo -u amavis /usr/bin/sa-learn --ham")
class Service(kopano.Service):
def main(self):
server = self.server
state = server.state
catcher = Checker(self)
with log_exc(self.log):
while True:
state = server.sync(catcher, state)
except Exception as e:
if == MAPI_E_NETWORK_ERROR:'Trying to reconnect to Server in %s seconds' % 5)
else:'Error: [%s]' % e)
class Checker(object):
def __init__(self, service):
self.log = service.log
self.learncmd = service.config['learncmd']
self.unlearncmd = service.config['unlearncmd']
self.spamheader = service.config['spam_header']
def update(self, item, flags):
if item.message_class == 'IPM.Note':
spamstatus = item.header(self.spamheader)
if spamstatus is not None:
if # skip public stores
if item.folder == and not spamstatus.lower().startswith('yes'):
if item.folder == and spamstatus.lower().startswith('yes'):
def learn(self, item):
with log_exc(self.log):
spameml = item.eml()
havespam = True
except Exception as e:'Failed to extract eml of email: [%s] [%s]' % (e, item.entryid))
if havespam:
p = subprocess.Popen(shlex.split(self.learncmd), stdin=subprocess.PIPE, stdout=subprocess.PIPE)
learning, output_err = p.communicate(spameml)'[%s] sa-learn: %s' % (, learning.strip('\n')))
except Exception as e:'sa-learn failed: [%s] [%s]' % (e, item.entryid))
def unlearn(self, item):
with log_exc(self.log):
hameml = item.eml()
haveham = True
except Exception as e:'Failed to extract eml of email: [%s] [%s]' % (e, item.entryid))
if haveham:
p = subprocess.Popen(shlex.split(self.unlearncmd), stdin=subprocess.PIPE, stdout=subprocess.PIPE)
unlearning, output_err = p.communicate(hameml)'[%s] sa-unlearn: %s' % (, unlearning.strip('\n')))
except Exception as e:'sa-unlearn failed: [%s] [%s]' % (e, item.entryid))
def main():
parser = kopano.parser('ckpsF') # select common cmd-line options
options, args = parser.parse_args()
service = Service('spamd', config=CONFIG, options=options)
if __name__ == '__main__':