This is a fork of the official kopano-spamd. I have added spam unlearning and some more config options.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

97 lines
3.6 KiB

#!/usr/bin/env python
# coding=utf-8
"""
ICS driven spam learning daemon for Kopano / SpamAssasin
See included readme.md for more information.
"""
import shlex
import subprocess
import time
import kopano
from MAPI.Tags import *
from kopano import Config, log_exc
CONFIG = {
'run_as_user': Config.string(default="kopano"),
'run_as_group': Config.string(default="kopano"),
'spam_header': Config.string(default="x-spam-status"),
'learncmd': Config.string(default="/usr/bin/sudo -u amavis /usr/bin/sa-learn --spam"),
'unlearncmd': Config.string(default="/usr/bin/sudo -u amavis /usr/bin/sa-learn --ham")
}
class Service(kopano.Service):
def main(self):
server = self.server
state = server.state
catcher = Checker(self)
with log_exc(self.log):
while True:
try:
state = server.sync(catcher, state)
except Exception as e:
if e.hr == MAPI_E_NETWORK_ERROR:
self.log.info('Trying to reconnect to Server in %s seconds' % 5)
else:
self.log.info('Error: [%s]' % e)
time.sleep(5)
time.sleep(1)
class Checker(object):
def __init__(self, service):
self.log = service.log
self.learncmd = service.config['learncmd']
self.unlearncmd = service.config['unlearncmd']
self.spamheader = service.config['spam_header']
def update(self, item, flags):
if item.message_class == 'IPM.Note':
spamstatus = item.header(self.spamheader)
if spamstatus is not None:
if item.store.user: # skip public stores
if item.folder == item.store.user.junk and not spamstatus.lower().startswith('yes'):
self.learn(item)
if item.folder == item.store.user.inbox and spamstatus.lower().startswith('yes'):
self.unlearn(item)
def learn(self, item):
with log_exc(self.log):
try:
spameml = item.eml()
havespam = True
except Exception as e:
self.log.info('Failed to extract eml of email: [%s] [%s]' % (e, item.entryid))
if havespam:
try:
p = subprocess.Popen(shlex.split(self.learncmd), stdin=subprocess.PIPE, stdout=subprocess.PIPE)
learning, output_err = p.communicate(spameml)
self.log.info('[%s] sa-learn: %s' % (item.store.user.name, learning.strip('\n')))
except Exception as e:
self.log.info('sa-learn failed: [%s] [%s]' % (e, item.entryid))
def unlearn(self, item):
with log_exc(self.log):
try:
hameml = item.eml()
haveham = True
except Exception as e:
self.log.info('Failed to extract eml of email: [%s] [%s]' % (e, item.entryid))
if haveham:
try:
p = subprocess.Popen(shlex.split(self.unlearncmd), stdin=subprocess.PIPE, stdout=subprocess.PIPE)
unlearning, output_err = p.communicate(hameml)
self.log.info('[%s] sa-unlearn: %s' % (item.store.user.name, unlearning.strip('\n')))
except Exception as e:
self.log.info('sa-unlearn failed: [%s] [%s]' % (e, item.entryid))
def main():
parser = kopano.parser('ckpsF') # select common cmd-line options
options, args = parser.parse_args()
service = Service('spamd', config=CONFIG, options=options)
service.start()
if __name__ == '__main__':
main()