Comme je ne l'ai pas trouvé quelque chose commode en python 3 pour mes besoins (partie du courrier de tordu n'est pas en cours d'exécution en python 3), j'ai fait une petite maquette avec asyncio que vous pouvez améliorer si vous le souhaitez:
J'ai défini un ImapProtocol qui étend asyncio.Protocol. Ensuite, lancez un serveur comme ceci:
factory = loop.create_server(lambda: ImapProtocol(mailbox_map), 'localhost', 1143)
server = loop.run_until_complete(factory)
Le mailbox_map est une carte de carte: email -> carte des boîtes aux lettres -> ensemble de messages. Donc, tous les messages/boîtes aux lettres sont en mémoire.
Chaque fois qu'un client se connecte, une nouvelle instance de ImapProtocol est créée. Ensuite, les exécute de ImapProtocol et des réponses pour chaque client, la capacité de mise en œuvre/connexion/fetch/select/Recherche/magasin:
class ImapHandler(object):
def __init__(self, mailbox_map):
self.mailbox_map = mailbox_map
self.user_login = None
# ...
def connection_made(self, transport):
self.transport = transport
transport.write('* OK IMAP4rev1 MockIMAP Server ready\r\n'.encode())
def data_received(self, data):
command_array = data.decode().rstrip().split()
tag = command_array[0]
self.by_uid = False
self.exec_command(tag, command_array[1:])
def connection_lost(self, error):
if error:
log.error(error)
else:
log.debug('closing')
self.transport.close()
super().connection_lost(error)
def exec_command(self, tag, command_array):
command = command_array[0].lower()
if not hasattr(self, command):
return self.error(tag, 'Command "%s" not implemented' % command)
getattr(self, command)(tag, *command_array[1:])
def capability(self, tag, *args):
# code for it...
def login(self, tag, *args):
# code for it...
Puis dans mes tests, je commence le serveur lors de l'installation avec:
self.loop = asyncio.get_event_loop()
self.server = self.loop.run_until_complete(self.loop.create_server(create_imap_protocol, 'localhost', 12345))
Quand je veux simuler un nouveau message:
imap_receive(Mail(to='[email protected]', mail_from='[email protected]', subject='hello'))
Et l'arrêter à teardown:
self.server.close()
asyncio.wait_for(self.server.wait_closed(), 1)
cf https://github.com/bamthomas/aioimaplib/blob/master/aioimaplib/tests/imapserver.py
EDIT: J'ai eu un arrêt buggy du serveur, je réécris avec asyncio.Protocole et modifier la réponse pour refléter les changements
Lien ci-dessus est mort, un nouveau est susceptible https://docs.python.org/2/library/smtpd.html – sdaau