#! /usr/bin/env python """ Using the IMAP protocol, reach into my "Held Mail" (spam) folder at Spamcop and move the for-sure spam into the "Spam for sure" folder. Version of 2008.12.11 Revision history: 2006.10.16 - Be more suspicious of messages coming through CRI: "blocked" disposition and spam level. 2008.11.11 - React to bare in header. Improve printing to log file. Detect absence of capital letters on subject line. 2008.11.12 - Restructure. Check for X-Spam-Level before parsing the header. """ import exceptions import imaplib import re import string import types class HeaderError( exceptions.Exception ): """ A problem was encountered while processing a header. """ def __init__( self, args=None ): self.args = args class ProtocolError( exceptions.Exception ): """ Something went wrong as we executed the mailbox protocol. """ def __init__( self, args=None ): self.args = args def header_fields( s, logfile ): """Parse the header in string s into a dictionary keyed by line beginnings. Concatenate continuation lines. A line that begins with a whitespace character is a continuation of the preceding line. The first line should not be a continuation line. Every line that is not a continuation line should begins with something like one of these: Return-Path: Delivered-To: Received: X-Spam-Checker-Version: X-Spam-Level: X-Spam-Status: Date: From: Message-Id: X-SpamCop-Checked: X-SpamCop-Disposition: The whole label, converted to lower case and including the ":", is used as the key for the list that will include this line. We return a dictionary indexed by labels like "from:". Each entry consists of a list of the (reassembled if continued) lines that followed occurrences of that label. For example, the "received:" entry will typically be a list of several lines. """ result = {} k = None for line in s.split( "\n" ): line = line.rstrip() if len( line ) < 1: break if line[0] in string.whitespace: # It's a continuation line. if k == None: raise HeaderError, "Header begins with continuation line" accumulated_line = accumulated_line + " " + line.strip() else: # It's not a continuation line. if k: # If there was a predecessor line, save it. if result.has_key( k ): result[k].append( accumulated_line ) else: result[k] = [ accumulated_line ] m = re.match( r'([^:]+:)(.*)', line ) # Better find "xxx:". if m == None: logfile.write( "This doesn't look like a header line:\n%s\n"\ % line.strip() ) raise HeaderError, "Unheaderlike line in header." else: k = m.group(1).lower() accumulated_line = m.group(2).strip() if k: # (True unless s was essentially empty.) if result.has_key( k ): result[k].append( accumulated_line ) else: result[k] = [ accumulated_line ] return result def sent_to_cryptography_dot_com( received_list ): """Return True if the message whose "received:" lines are passed in received_list was originally emailed to my cryptography.com email address from a non-cryptography.com origin.""" # It seems that the email in question always has 3 # "received:" lines that mention "by www.cryptography.com". x = filter( lambda x: x.find( "by www.cryptography.com" ) >= 0, received_list ) if len( x ) >= 3: return True return False def has_matching_regexp( subject, re_list ): """If subject matches a regexp in re_list, return the matching regexp; otherwise, return"".""" for k in re_list: if re.search( k, subject, re.IGNORECASE ) != None: return k return "" def test_subject( fields ): """Examine the header fields in the dictionary provided. Return a text description of the reason for rejecting the message, or None. This test looks for red flags in the Subject line. """ if not fields.has_key( "subject:" ): return None s = fields[ "subject:" ][0] while len(s) > 3 and s[0:4] == "Re: ": s = s[4:] # Look for killer words in the subject line: kr = has_matching_regexp( s, killer_regexps ) if kr != "": return "subj killer regexp = %s" % kr # If the subject line is all lower-case, be more critical: if s == s.lower(): kr = has_matching_regexp( s, nocaps_killer_regexps ) if kr != "": return "nocaps subj killer regexp = %s" % kr # Reject subjects that use weird alphabets: if len( s ) > 1 and s[0:2] == "=?": return "subj begins =?" return None def test_to( fields ): """Examine the header fields in the dictionary provided. Return a text description of the reason for rejecting the message, or None. This test looks for red flags in the "To:" line. """ if fields.has_key( "to:" ): for to_field in fields[ "to:" ]: # (Probably just one.) kt = has_matching_regexp( to_field, to_killer_regexps ) if kt != "": return "to killer regexp = %s" % kt return None def test_cri( fields ): """Examine the header fields in the dictionary provided. Return a text description of the reason for rejecting the message, or None. If this message came through cryptography.com (but didn't originate there), then certain additional patterns in the subject line shoud kill it. Reason: anybody writing to me at cryptography.com should be writing about cryptography, not about stocks or pharmaceuticals. (The following observation is no longer [2008] true:) Also, since nearly all my spam comes through cryptography.com, I'm a little less tolerant of x-spam-level and "blocked". """ if fields.has_key( "received:" ) \ and sent_to_cryptography_dot_com( fields[ "received:" ] ): if fields.has_key( "subject:" ): kr = has_matching_regexp( fields[ "subject:" ][0], cri_killer_regexps ) if kr != "": return "subj (cri) killer regexp = %s" % kr if fields.has_key( "x-spamcop-disposition:" ) \ and fields[ "x-spamcop-disposition:" ][0] \ .startswith( "Blocked cbl.abuseat.org" ) \ and fields.has_key( "x-spam-level:" ) \ and fields[ "x-spam-level:" ][0].startswith( "******" ): return "(cri) disposition and spam level" return None def test_spamlevel( fields ): """Examine the header fields in the dictionary provided. Return a text description of the reason for rejecting the message, or None. If there's an "x-spam-level:" line with enough stars, then it's spam: """ if fields.has_key( "x-spam-level:" ): for l in fields[ "x-spam-level:" ]: if l.startswith("**********" ): return "spam-level" return None def test_from_date( fields ): """If From: is me and there's no date, reject.""" if not fields.has_key( "date:" ) \ and fields.has_key( "from:" ): f = fields[ "from:" ] if len( f ) == 1 \ and f[0].strip() == "<" + my_name + ">": return "from me, no date" return None def extract_subject( s ): """Return the subject line from text string s, or "[Not found]". """ m = re.search( r"\n\r?subject: ([^\n\r]*)", s, re.IGNORECASE ) if m == None: return "[Not found]" else: return m.group(1) def looks_spamlike( s, logfile ): """Does the header in string s look like spam?""" # For samples of what's in the string s, see the log file. logfile.write( "Input to looks_spamlike:\n%s\n" % s.strip() ) reason = None # The low-hanging fruit: is it alredy firmly flagged as spam? # (This saves us many parsing errors in malformed headers.) if "\nx-spam-level: *********" in s.lower(): subject_line = "[shortcut] " + extract_subject( s ) reason = "X-Spam-Level shortcut" # Parse the header: if reason == None: subject_line = "[none]" lines = s.split( '\n' ) # Convert text string to list of lines. try: fields = header_fields( s, logfile ) except HeaderError: # Certain header malformations indicate spam: subject_line = "[header-parsing error]" if ">\rFrom:" in s: reason = "Bare in header." else: raise # If there's a subject line, keep it: if reason == None and fields.has_key( "subject:" ): subject_line = fields[ "subject:" ][0] for test in ( test_subject, test_to, test_cri, test_spamlevel, test_from_date ): if reason != None: break reason = test( fields ) if reason != None: print "Spam: %s" % subject_line[0:60] logfile.write( "Above was classified as spam (%s).\n\n" % reason ); else: logfile.write( "Above was classified as not-spam.\n\n" ); return reason != None # Notes on regular expressions: # \bsex\b means "sex" neither preceded nor followed by letters, thus # excluding "essex" and "sextillion". Warning: \b does not # "match whitespace"; that's the job of \s. Example of # misuse: "a\bc" does *not* match "a c", because \b doesn't # match the space: it matches the null string between the a # and the space. # \bpenny\b.*\bstocks\b matches any string in which the word "penny" # (without additional letters immediately preceding or following # it) appears before the word "stocks". # home\s+loan means "home" and "loan" with one or more whitespace # characters between. # ^get means "get" at the beginning of the string. # c+ means one or more c's: c, cc, ccc, et cetera. # c+i+a+l+i+s+ means "cialis" with any of its letters stuttered # any number of times; e.g., ciialliss. # girls? means "girl" or "girls". # [a-z] means any letter. # [a-z][0-9][a-z] means any letter followed by a digit and a letter; # e.g., cia1is. # [ -] means space or hyphen. # . matches any charcter. # .* means any number of characters, including none. # ^you'?`?re means any string beginning with youre, you're, you`re, # or you'`re. # The following regular expressions flag spam whenever they # occur in subject lines: killer_regexps = ( r"[a-z][0-9@|][a-z]", r"\baffordable\b.*\bmedications?\b", r"\bblue pill\b", r"\bcellulite\b", r"\bc+i+a+l+i+s+\b", r"\bclalls\b", r"\bcoeds?\b", r"\bcum\b", r"\bdating\b", r"\bdebt consolidation\b", r"\bdesire\b", r"\bejaculation\b", r"\berections?\b", r"\bgirlfriends?\b", r"\bgirls?\b", r"\bgreat prices?\b", r"\bguys?\b", r"\bhidden\b.*\bcameras?\b", r"\bhot\b.*\bstocks?\b", r"\bihre\b", r"\bimpotence\b", r"\bimpotency\b", r"\bimpotent\b", r"\bladies\b", r"\blevitra\b", r"\blow rates\b", r"\blow prices\b", r"\bloo?se\s+weight\b", r"\bloo?se?ing\s+weight\b", r"\bmeds\b", r"\bmicro[ -]?caps?\b", r"\bmortage\b", r"\bmortgage\b", r"\bname\b.*\bbrands?\b", r"\bonline\b.*\bdrugs?\b", r"\borgasms?\b", r"\bpenis\b", r"\bpenny\b.*\bstocks?\b", r"\bpharm\b", r"\bpharmacy\b", r"\bpills\b", r"\bporn\b", r"\bpornstars?\b", r"\bprescriptions?\b", r"\bpropecia\b", r"\bpropetia\b", r"\brolex\b", r"\brefinances?\b", r"\breplica watche?s?\b", r"\brx\b", r"\bsemen\b", r"\bsex\b", r"\bsexual\b", r"\bsexually\b", r"\bshed\b.*\binches\b", r"\bsmall[ -]?caps?\b", r"\bspermatazoa\b", r"\bsperm\b", r"\bstox\b", r"\btaladafil\b", r"\bteen\b", r"\bfor\bu\b", r"\bwhen\bu\b", r"\bwhere\bu\b", r"\bif\s+u\b", r"\bv+i+a+g+r+a+\b", r"\bwebcams?\b", r"\bwhy pay\b", r"\bwomen\b", r"\bxanax\b", r"\bvalium\b", r"\byour health\b", r"\byour partners?\b", r"^become\b", r"^best\b", r"^boost\b", r"^cheapest\b", r"^don'?`?t miss\b", r"^enhance\b", r"^find cheap\b", r"^give her\b", r"^get the\b", r"^get your\b", r"^impress your\b", r"^increase your\b", r"^last longer\b", r"^let us\b", r"\breplica watche?s?\b", r"\bsave money\b", r"^top notch\b", r"^you look really stupid\b", r"^ppearson is\b", r"^you should\b", r"^you won'?`?t believe\b", r"^you won'?`?t find\b", r"^you'?`?ll be amazed\b", r"^you'?`?re a fool\b", r"^for:? ?ppearson\b", r"\bfor ppearson$" ) # The following regular expressions flag spam whenever they # occur in subject lines that have no capital letters: nocaps_killer_regexps = ( r"^win\b", r"^get\b", r"^give\b", ) # The following regular expressions flag spam whenever they # occur in subject lines of messages sent to my # cryptography.com address: cri_killer_regexps = ( r"\bas\s+seen\s+on\b", r"\bstocks?\b", r"\bpharm[ae]ce?u?ticals?\b", r"\bdrugs?\b", r"\bequities\b", r"\bequitys?\b", r"\binvestment\b", r"\binvestors?\b", r"\binches\b", r"\bloan\s+application\b", r"\bneeds?\s+this\b", r"\bweight\s+loss\b", r"\byour\s+credit\b", r"\bnew\s+man\b", r"^eliminate\b", r"^everyone\b", r"^free\b", r"^she\b", r"^use\b" ) # The following regular expressions appear on the "To:" line # in some spam that is otherwise unflagged: to_killer_regexps = ( r"\banubis@spamcop.net\b", r"\bgdobson@spamcop.net\b", r"\bpam@spamcop.net\b", r"\bpeebles@spamcop.net\b", r"\bpmorin@spamcop.net\b", r"\bpygar@spamcop.net\b", r"\bquagga@spamcop.net\b", ) def cleanse_mailbox( logfile, imap, purgatory_name, hell_name ): """ Given the open IMAP channel, scan all messages in the mailbox named purgatory_name, and move the spam into the mailbox named hell_name. """ print "Subscribing to %s . . ." % purgatory_name r = imap.subscribe( mailbox = purgatory_name ) if r[0] != "OK": raise ProtocolError, "Non-OK response when subscribing to purgatory." print "Subscribing to %s . . ." % hell_name r = imap.subscribe( mailbox = hell_name ) if r[0] != "OK": raise ProtocolError, "Non-OK response when subscribing to hell." print "Selecting %s . . ." % purgatory_name r = imap.select( mailbox = purgatory_name ) if r[0] != "OK": raise ProtocolError, "Non-OK response when selecting purgatory." n_messages = int( r[1][0] ) if n_messages < 1: print "There are no messages in %s. I'm done." % purgatory_name return print "Requesting message headers . . ." condemned = [] message_list = "1:*" print "Fetching for message_set = '%s'." % message_list r = imap.fetch( message_set = message_list, \ message_parts = "( BODY[HEADER] )" ) # Perhaps the imaplib I'm using (2.54, November 2002) is flawed, but # the structure returned as r[1] is, for example, this: # r[1][0] is a tuple: # r[1][0][0] is a string: '1 (BODY[HEADER] {1923} # r[1][0][1] is a string: 'Return-Path:...spamhaus.org\r\n\r\n' # r[1][1] is a string: ")". # r[1][2] is a tuple: # r[1][2][0] is a string: '2 (BODY[HEADER] {1989}' # r[1][2][1] is a string: "Return-Path:...ssassin=5\r\n\r\n" # r[1][3] is a string: ")". # r[1][4] is a tuple: # r[1][4][0] is a string: '3 (BODY[HEADER] {1913}' # r[1][4][1] is a string: 'Return-Path:...amhaus.org\r\n\r\n' # r[1][5] is a string: ")". # r[1][6] is a string: "3 (FLAGS (\Seen \Recent))". assert isinstance( r[1], types.ListType ) for rr in r[1]: if isinstance( rr, types.TupleType ): # # If we get here, then rr is a tuple like this: # rr[0] is a string: '1 (BODY[HEADER] {1923} # rr[1] is a string: 'Return-Path:...spamhaus.org\r\n\r\n' # Sometimes we get this, though: # rr[0] = 34 (FLAGS (\Seen) BODY[HEADER] {1300} # try: m = re.match( "([0-9]+)\s+\(.*BODY\[HEADER\]", rr[0] ) if looks_spamlike( rr[1], logfile ): condemned.append( m.group(1) ) except: print "Something went wrong processing this rr:" print "rr = ", rr print "%d for-sure spam messages were identified." % len( condemned ) if len( condemned ) < 1: print "No for-sure spam messages were found. I'm done." else: victim_list = ",".join( condemned ) # Move the offending messages from purgatory into hell: r = imap.copy( message_set = victim_list, new_mailbox = hell_name ) if r[0] != "OK": raise ProtocolError, "Non-OK response when copying to hell." # Delete the offending messages from purgatory: r = imap.store( victim_list, "+FLAGS.SILENT", r"\DELETED" ) if r[0] != "OK": raise ProtocolError, "Non-OK response when deleting." # Clean up, close down: r = imap.unsubscribe( mailbox = purgatory_name ) if r[0] != "OK": raise ProtocolError, "Non-OK response when unsubscribing to purgatory." r = imap.unsubscribe( mailbox = hell_name ) if r[0] != "OK": raise ProtocolError, "Non-OK response when unsubscribing to hell." ############################################################ if __name__ == "__main__": mail_host = "mail.spamcop.net" my_name = "xxxxxxxx@spamcop.net" # Put your Spamcop address here. my_password = "????????" # Put your password here. purgatory_name = "INBOX.Held Mail" # We look at messages from here, hell_name = "INBOX.Spam for sure" # and move spam to here. logfile = open( "/home/peter/spamlog.txt", "a" ) print "Establishing the connection . . ." imap = imaplib.IMAP4_SSL( host=mail_host ) print "Logging in . . ." r = imap.login( user=my_name, password=my_password ) if r[0] != "OK": raise ProtocolError, "Non-OK response to login." print "Mailbox names:" #print imap.list() for x in imap.list()[1]: print x # Expecting imap.list() to look like this: # ('OK', ['(\\HasNoChildren) "." "INBOX.sent-mail"', # '(\\HasNoChildren) "." "INBOX.Spam for sure"', # '(\\HasNoChildren) "." "INBOX.Archives"', # '(\\HasNoChildren) "." "INBOX.Scams"', # '(\\HasNoChildren) "." "INBOX.Trash"', # '(\\HasNoChildren) "." "INBOX.Held Mail"', # '(\\HasChildren) "." "INBOX"']) # Open the specified mailboxes, then move spam from # purgatory into hell: cleanse_mailbox( logfile, imap, purgatory_name, hell_name ) r = imap.close() if r[0] != "OK": raise ProtocolError, "Non-OK response to close." r = imap.logout() if r[0] != "BYE": raise ProtocolError, "Non-BYE response to logout." logfile.close()