PK BXžFm$í!¦ ¦ mtq-latest/.buildinfo# Sphinx build info version 1
# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done.
config:
tags:
PK BXžFÈ™€(å å mtq-latest/objects.inv# Sphinx inventory version 2
# Project: Mongo Task Queue
# Version: 0.0.0
# The remainder of this file is compressed using zlib.
xÚµVÉnÛ0½ç+¤WÍ5מZ @‚è‘ ÉÅš›¹Àñß—%ÇF£¦¤ì‹
Iœá›7oe˜ÐÞ:Ø£$[Ý桳ǚ> !(yß=%tßó«;ú£ùídQŽz Ž—Ë»éØk9ðL8Ü©°G¿Í6]ÿ¨Ê}_»3—ÌШ@‡
±â¾Ø|é6£YBúCj´„Ñ-ÖãvàZ,÷"¬Œž~¾|{*X°ˆ=Cë=æ2;¡’x¿ÂRd8EëŽU˜Æ„¢h&{Á5)ùí£.Ø>wQA¿Æ¿ggÞŽ5%!¢EŒ«9àð™Tþ—ƒQ#èeü%Râ@øÈ Á‰mPKBCü¨ZøXòÙX°Ux‘ĬˆÐXc›Ù=>ašhl¡PÖ$©´„æ•*œ©Øˆ«OßlÝbû·þ‘Ž
o ÝIÃ[‘äÐkm#9½fÚ{ 5R¾·°vù ë\}¶R¹— òS»yÉ{ªù€é t‡…^JM9g@xʆãÚ8rB¬3¼v52èI”á\u›Ûk¢àjpò´˜× Z$¹:qšKÍMWÃXŒiž"Ù£_QŸ‹¤Ã"€jbÍ'Æ›æÜ"œ÷õ®KË_„áSÍçíñŒÅø5qË
9e,Ú´p^?‘ÏPu¸%㮛ذï.úÔphÙOÒ®Ÿ®§ÃZyl^¹r«î‰°rtÀ…•Ñ0:nPß׫ÓIÈ·(ÓÞ•üê^ðúWŠ
´./ÏÖ¦~‰±¢îþMfcì Ÿˆ@PK BXžFH
F#™ #™ mtq-latest/index.html
Mongo Task Queue 0.0.0 documentation
Mongo Task Queue
Welcome to Mongo Task Queue’s documentation!
Contents:
API
Connection
Created on Aug 2, 2013
@author: sean
class mtq.connection. MTQConnection ( db , collection_base='mq' , qsize=50 , workersize=5 , logsize=100 , extra_lognames=() ) [source]
Base object that you should use to create all other TQ objects
## Init
Parameters:
db – mongo database
collection_base – base name for collection
qsize – the size of the capped collection of the queue
..sealso: MTQConnection.default, MTQConnection.from_config
classmethod default ( ) [source]
Create an MTQConnection default configuration using mongo from localhost
finished_jobs_collection [source]
The collection to push jobs to
classmethod from_config ( config=None , client=None ) [source]
Create an MTQConnection from a config dict,
Parameters:
config – configutation dict, with the parameters
* DB_HOST
* DB
* COLLECTION_BASE
* COLLECTION_SIZE
client – a pymongo.MongoClient or None
get_job ( job_id ) [source]
retrieve a job
get_worker ( worker_name=None , worker_id=None ) [source]
retrieve a worker
job_stream ( job_id ) [source]
Get a file like object for the output of a job
logging_collection [source]
The collection to push log lines to
make_query ( queues , tags , priority=0 , processed=False , failed=False , **query ) [source]
return a mongodb query dict to get the next task in the queue
make_tag_query ( tags ) [source]
Query for tags
new_worker ( queues=() , tags=() , priority=0 , silence=False , log_worker_output=False , poll_interval=3 , args=None ) [source]
Create a worker object
Parameters:
queues – names of queues to pop from (these are OR’d)
tags – jobs must have all these tags to be processed by this worker
priority – (not implemented yet)
log_worker_output – if true, log worker output to the db
pop_item ( worker_id , queues , tags , priority=0 , failed=False ) [source]
Pop an item from the queue
queue ( name='default' , tags=() , priority=0 ) [source]
Create a queue object
Parameters:
name – the name of the queue
tags – default tags to give to jobs
priority – (not implemented yet)
queue_collection [source]
The collection to push jobs to
queues [source]
List of existing queues
schedule_collection [source]
The collection to push log lines to
worker_collection [source]
Collection to register workers to
worker_stream ( worker_name=None , worker_id=None ) [source]
Get a file like object for the output of a worker
workers [source]
List of existing workers
Returns: a WorkerProxy object
Queue
class mtq.queue. Queue ( factory , name='default' , tags=() , priority=0 ) [source]
A queue to enqueue an pop tasks
Do not create directly use MTQConnection.queue
all_tags [source]
All the unique tags of jobs in this queue
count [source]
The number of jobs in this queue (filtering by tags too)
enqueue ( func_or_str , *args , **kwargs ) [source]
Creates a job to represent the delayed function call and enqueues
it.
Expects the function to call, along with the arguments and keyword
arguments.
The function argument func_or_str may be a function or a string representing the location of a function
enqueue_call ( func_or_str , args=() , kwargs=None , tags=() , priority=None , timeout=None , mutex=None ) [source]
Creates a job to represent the delayed function call and enqueues
it.
It is much like .enqueue() , except that it takes the function’s args
and kwargs as explicit arguments. Any kwargs passed to this function
contain options for MQ itself.
is_empty ( ) [source]
The number of jobs in this queue (filtering by tags too)
num_failed [source]
The number of jobs in this queue (filtering by tags too)
pop ( worker_id=None ) [source]
Pop a job off the queue
Job
Created on Aug 2, 2013
@author: sean
class mtq.job. Job ( factory , doc ) [source]
A Job is just a convenient datastructure to pass around job (meta) data.
Do not create directly, use MTQConnection.get_job
apply ( ) [source]
Execute this task syncronusly
args [source]
The arguments to call func with
finished ( ) [source]
test if this job has finished
func [source]
a callable function for workers to execute
func_name [source]
The name of the task to execute
id [source]
the identifier for this job
kwargs [source]
The keyword arguments to call func with
qname [source]
The name of the queue that this job is in
set_finished ( failed=False ) [source]
Mark this jog as finished.
Parameters: failed – if true, this was a failed job
stream ( ) [source]
Get a stream to read log lines from this job
tags [source]
List of tags for this job
Worker
class mtq.worker. Worker ( factory , queues=() , tags=() , priority=0 , poll_interval=1 , exception_handler=None , log_worker_output=False , silence=False , extra_lognames=() ) [source]
Should create a worker from MTQConnection.new_worker
num_backlog [source]
number of tasks this worker has to complete
process_job ( job ) [source]
Process a single job in a multiprocessing.Process
register ( *args , **kwds ) [source]
Internal
Contextmanager, register the birth and death of this worker
eg::
with worker.register():
# Work
start_main_loop ( one=False , batch=False , pop_failed=False , fail_fast=False ) [source]
Start the main loop and process jobs
work ( one=False , batch=False , failed=False , fail_fast=False ) [source]
Main work function
Parameters:
one – wait for the first job execute and then exit
batch – work until the queue is empty, then exit
class mtq.worker. WorkerProxy ( factory , doc ) [source]
This is a representation of an actual worker process
finished ( ) [source]
test if this worker is finished
last_check_in [source]
last check in time
num_backlog [source]
number of tasks this worker has to complete
num_processed [source]
number of tasks this worker has completed
Utils
Created on Aug 1, 2013
@author: sean
mtq.utils. ensure_capped_collection ( db , collection_name , size_mb ) [source]
mtq.utils. handle_signals ( ) [source]
Handle signals in multiprocess.Process threads
mtq.utils. import_string ( import_name , silent=False ) [source]
Imports an object based on a string. This is useful if you want to
use import paths as endpoints or something similar. An import path can
be specified either in dotted notation (xml.sax.saxutils.escape )
or with a colon as object delimiter (xml.sax.saxutils:escape ).
If silent is True the return value will be None if the import fails.
For better debugging we recommend the new import_module()
function to be used instead.
Parameters:
import_name – the dotted name for the object to import.
silent – if set to True import errors are ignored and
None is returned instead.
Returns: imported object
mtq.utils. setup_logging ( worker_id , job_id , silence=False ) [source]
set up logging for worker
PK BXžFøË™á ™á ' mtq-latest/.doctrees/environment.pickle€(csphinx.environment
BuildEnvironment
qoq}q(Udlfilesqcsphinx.util
FilenameUniqDict
q)qc__builtin__
set
q]…RqbUappq NU _warnfuncq
NUtitlesq}q(X indexq
cdocutils.nodes
title
q)q}q(U rawsourceqU U
attributesq}q(Udupnamesq]Uclassesq]Ubackrefsq]Uidsq]Unamesq]uUchildrenq]qcdocutils.nodes
Text
qX, Welcome to Mongo Task Queue's documentation!q…q}q(hX, Welcome to Mongo Task Queue's documentation!qUparentq hubaUtagnameq!Utitleq"ubX apiq#h)q$}q%(hU h}q&(h]h]h]h]h]uh]q'hX APIq(…q)}q*(hX APIq+h h$ubah!h"ubuU
domaindataq,}q-(Ustdq.}q/(Uversionq0K U
anonlabelsq1}q2(Umodindexq3Upy-modindexU †Ugenindexq4h4U †Usearchq5UsearchU †uUlabelsq6}q7(h3Upy-modindexU csphinx.locale
_TranslationProxy
q8csphinx.locale
mygettext
q9UModule Indexq:†q;h9h:…q<†b‡h4h4U h8h9UIndexq=†q>h9h=…q?†b‡h5h5U h8h9USearch Pageq@†qAh9h@…qB†b‡uUprogoptionsqC}qDUobjectsqE}qFuUc}qG(hE}qHh0K uUpyqI}qJ(hE}qK(X* mtq.connection.MTQConnection.worker_streamqLh#X methodqM†X mtq.connection.MTQConnectionqNh#X classqO†X' mtq.connection.MTQConnection.make_queryqPh#X methodqQ†X mtq.utils.handle_signalsqRh#X functionqS†X mtq.worker.WorkerProxyqTh#X classqU†X mtq.job.JobqVh#X classqW†X' mtq.connection.MTQConnection.get_workerqXh#X methodqY†X mtq.queue.Queue.all_tagsqZh#X attributeq[†X mtq.worker.Workerq\h#X classq]†X mtq.job.Job.finishedq^h#X methodq_†X mtq.job.Job.idq`h#X attributeqa†X! mtq.worker.Worker.start_main_loopqbh#X methodqc†X mtq.connectionqdh#Umoduleqe†X mtq.queue.Queue.enqueue_callqfh#X methodqg†X mtq.utils.import_stringqhh#X functionqi†X mtq.job.Job.kwargsqjh#X attributeqk†X mtq.job.Job.argsqlh#X attributeqm†X" mtq.worker.WorkerProxy.num_backlogqnh#X attributeqo†X mtq.job.Job.funcqph#X attributeqq†X mtq.utilsqrh#he†X mtq.queueqsh#he†X' mtq.connection.MTQConnection.job_streamqth#X methodqu†X. mtq.connection.MTQConnection.worker_collectionqvh#X attributeqw†X mtq.worker.Worker.num_backlogqxh#X attributeqy†X mtq.queue.Queue.popqzh#X methodq{†X mtq.worker.Worker.workq|h#X methodq}†X$ mtq.worker.WorkerProxy.last_check_inq~h#X attributeq†X mtq.queue.Queueq€h#X classq†X mtq.queue.Queue.is_emptyq‚h#X methodqƒ†X$ mtq.worker.WorkerProxy.num_processedq„h#X attributeq…†X
mtq.workerq†h#he†X$ mtq.connection.MTQConnection.defaultq‡h#X classmethodqˆ†X mtq.job.Job.qnameq‰h#X attributeqŠ†X$ mtq.connection.MTQConnection.get_jobq‹h#X methodqŒ†X mtq.job.Job.func_nameqh#X attributeqŽ†X mtq.worker.Worker.process_jobqh#X methodq†X5 mtq.connection.MTQConnection.finished_jobs_collectionq‘h#X attributeq’†X% mtq.connection.MTQConnection.pop_itemq“h#X methodq”†X mtq.job.Job.set_finishedq•h#X methodq–†X" mtq.connection.MTQConnection.queueq—h#X methodq˜†X mtq.job.Job.tagsq™h#X attributeqš†X0 mtq.connection.MTQConnection.schedule_collectionq›h#X attributeqœ†X$ mtq.connection.MTQConnection.workersqh#X attributeqž†X mtq.queue.Queue.enqueueqŸh#X methodq †X mtq.jobq¡h#he†X mtq.utils.setup_loggingq¢h#X functionq£†X mtq.queue.Queue.countq¤h#X attributeq¥†X+ mtq.connection.MTQConnection.make_tag_queryq¦h#X methodq§†X' mtq.connection.MTQConnection.new_workerq¨h#X methodq©†X mtq.job.Job.streamqªh#X methodq«†X mtq.job.Job.applyq¬h#X methodq†X mtq.queue.Queue.num_failedq®h#X attributeq¯†X mtq.worker.Worker.registerq°h#X methodq±†X mtq.worker.WorkerProxy.finishedq²h#X methodq³†X# mtq.connection.MTQConnection.queuesq´h#X attributeqµ†X/ mtq.connection.MTQConnection.logging_collectionq¶h#X attributeq·†X( mtq.connection.MTQConnection.from_configq¸h#X classmethodq¹†X" mtq.utils.ensure_capped_collectionqºh#X functionq»†X- mtq.connection.MTQConnection.queue_collectionq¼h#X attributeq½†uUmodulesq¾}q¿(h¡(h#U U ‰thr(h#U U ‰thd(h#U U ‰th†(h#U U ‰ths(h#U U ‰tuh0K uUjsqÀ}qÁ(hE}qÂh0K uUrstqÃ}qÄ(hE}qÅh0K uUcppqÆ}qÇ(hE}qÈh0K uuU
glob_toctreesqÉh]…RqÊU
reread_alwaysqËh]…RqÌU
doctreedirqÍXL /var/build/user_builds/mtq/checkouts/latest/docs/_build/localmedia/.doctreesqÎUversioning_conditionqωU citationsqÐ}h0K*UsrcdirqÑX0 /var/build/user_builds/mtq/checkouts/latest/docsqÒUconfigqÓcsphinx.config
Config
qÔ)qÕ}qÖ(Uhtml_contextq×}qØ(Ubitbucket_versionUmasterqÙUusing_theme‰Uuser_analytics_codeU U
html_themeqÚUsphinx_rtd_themeqÛUcurrent_versionUlatestqÜU
canonical_urlU Uglobal_analytics_codeU
UA-17997319-1U
source_suffixqÝU.rstqÞUPRODUCTION_DOMAINUreadthedocs.orgUgithub_userU srossrossU new_themeˆUanalytics_codeU Usingle_version‰Udisplay_githubˆU downloads]qß(UpdfU4//readthedocs.org/projects/mtq/downloads/pdf/latest/†qàUhtmlzipU8//readthedocs.org/projects/mtq/downloads/htmlzip/latest/†qáUepubU5//readthedocs.org/projects/mtq/downloads/epub/latest/†qâeUREADTHEDOCSˆUconf_py_pathU/docs/Ugithub_repoUmtqqãUrtd_languageX enUbitbucket_repoUNoneqäUslughãUapi_hostUhttps://readthedocs.orgUbitbucket_userhäUnameqåX mtqUversions]qæhÜU/en/latest/†qçaUgithub_versionhÙUdisplay_bitbucket‰UcommitU(d4a4a3b45a0ce0223c596e4fe545eb4632134e79U MEDIA_URLqèUhttps://media.readthedocs.org/uUpygments_styleqéUsphinxqêUhtmlhelp_basenameqëUMongoTaskQueuedochÚhÛU
master_docqìUindexqíhÝhÞUtexinfo_documentsqî]qï(UindexqðUMongoTaskQueueqñX Mongo Task Queue DocumentationqòX Sean Ross-RossqóhñU One line description of project.U
MiscellaneoustqôaU copyrightqõX 2013, Sean Ross-RossUexclude_patternsqö]q÷U_buildqøah0U0.0.0qùU man_pagesqú]qû(híUmongotaskqueuehò]qühóaKtqýaU
html_styleqþNUhtml_theme_optionsqÿ}Utemplates_pathr ]r (UA/home/docs/checkouts/readthedocs.org/readthedocs/templates/sphinxr U
_templatesr eUlatex_documentsr ]r (hðUMongoTaskQueue.texhòhóUmanualtr aUhtml_static_pathr ]r (U_staticr UI/home/docs/checkouts/readthedocs.org/readthedocs/templates/sphinx/_staticr
eUhtml_theme_pathr ]r (U_themesr
j eUlatex_elementsr }Ulanguager X enr U overridesr }r j j sUprojectr X Mongo Task QueueU
extensionsr ]r (Usphinx.ext.autodocr Usphinx.ext.coverager Usphinx.ext.viewcoder Ureadthedocs_ext.readthedocsr eUreleaser hùUsetupr NubUmetadatar }r (h
}h#}uUversionchangesr }U_viewcode_modulesr }r (cdocutils.nodes
reprunicode
r! X mtq.jobr" …r# }r$ bXY '''
Created on Aug 2, 2013
@author: sean
'''
from mtq.utils import import_string, now, nulltime
from mtq.log import MongoStream
from bson.objectid import ObjectId
from time import mktime
class Job(object):
'''
A Job is just a convenient datastructure to pass around job (meta) data.
Do not create directly, use MTQConnection.get_job
'''
def __init__(self, factory, doc):
self.factory = factory
self.doc = doc
def __repr__(self):
return '' % (self.qname, self.tags, self.func_name)
@property
def tags(self):
'List of tags for this job'
return self.doc['tags']
@property
def qname(self):
'The name of the queue that this job is in'
return self.doc['qname']
@property
def func_name(self):
'The name of the task to execute'
return self.doc['execute']['func_str']
@property
def id(self):
'the identifier for this job'
return self.doc['_id']
@property
def func(self):
'a callable function for workers to execute'
if self.func_name in self.factory._task_map:
return self.factory._task_map[self.func_name]
return import_string(self.func_name)
@property
def call_str(self):
args = [repr(arg) for arg in self.args]
args.extend('%s=%r' % item for item in self.kwargs.items())
args = ', '.join(args)
return '%s(%s)' % (self.func_name, args)
@property
def enqueued(self):
return self.doc['enqueued_at']
@property
def started(self):
return self.doc['started_at']
@property
def args(self):
'The arguments to call func with'
return self.doc['execute']['args']
@property
def kwargs(self):
'The keyword arguments to call func with'
return self.doc['execute']['kwargs']
def apply(self):
'Execute this task syncronusly'
return self.func(*self.args, **self.kwargs)
def set_finished(self, failed=False):
'''
Mark this jog as finished.
:param failed: if true, this was a failed job
'''
n = now()
update = {'$set':{'processed':True,
'failed':failed,
'finished':True,
'finished_at': n,
'finished_at_': mktime(n.timetuple())
}
}
self.factory.queue_collection.update({'_id':self.id}, update)
if not failed:
data = self.factory.queue_collection.find_one({'_id':self.id})
if data:
self.factory.queue_collection.remove({'_id':self.id})
self.factory.finished_jobs_collection.insert(data)
def stream(self):
'''
Get a stream to read log lines from this job
'''
return MongoStream(self.factory.logging_collection,
doc={'job_id': self.id},
finished=self.finished)
def finished(self):
'''
test if this job has finished
'''
collection = self.factory.queue_collection
cursor = collection.find({'_id':self.id, 'processed':True})
return bool(cursor.count())
def cancel(self):
self.set_finished()
@classmethod
def new(cls, name, tags, priority, execute, timeout, mutex=None):
n = now()
no = mktime(n.timetuple())
return {
'qname':name,
'tags': tags,
'process_after': n,
'priority': priority,
'execute': execute,
'enqueued_at': n,
'enqueued_at_': no,
'started_at': nulltime(),
'started_at_': 0.0,
'finished_at': nulltime(),
'finished_at_': 0.0,
'processed': False,
'failed': False,
'finished': False,
'timeout':timeout,
'worker_id': ObjectId('000000000000000000000000'),
'mutex': mutex,
}
r% }r& (X Job.call_strX defr' K6K<‡X
Job.func_nameX defr( K"K&‡X Job.tagsX defr) KK‡X Job.__init__X defr* KK‡X Job.__repr__X defr+ KK‡X Job.applyX defr, KNKR‡X
Job.streamX defr- KjKr‡X Job.finishedX defr. KrKz‡X Jobr/ X classr0 KK›‡X Job.set_finishedX defr1 KRKj‡X
Job.kwargsX defr2 KJKN‡X
Job.cancelX defr3 KzK~‡X Job.startedX defr4 KAKD‡X Job.idX defr5 K'K,‡X Job.qnameX defr6 KK!‡X Job.enqueuedX defr7 K=K@‡X Job.newX defr8 KK›‡X Job.argsX defr9 KEKI‡X Job.funcX defr: K-K5‡u}r; (X
Job.func_namer< h#X Job.tagsr= h#X
Job.streamr> h#X Job.applyr? h#X
Job.kwargsr@ h#X Job.set_finishedrA h#X JobrB h#X Job.finishedrC h#X Job.idrD h#X Job.qnamerE h#X Job.argsrF h#X Job.funcrG h#u‡j! X mtq.utilsrH …rI }rJ bX7 '''
Created on Aug 1, 2013
@author: sean
'''
import signal
import traceback
import sys
import logging
from datetime import datetime
from bson.errors import InvalidId
from bson.objectid import ObjectId
from contextlib import contextmanager
import io
import pytz
from mtq import errors
class ImportStringError(Exception):
pass
is_py3 = lambda: sys.version_info.major >= 3
def is_str(obj):
if is_py3():
return isinstance(obj, str)
else:
return isinstance(obj, basestring)
def is_unicode(obj):
if is_py3():
return isinstance(obj, str)
else:
return isinstance(obj, unicode)
def handle_signals():
'''
Handle signals in multiprocess.Process threads
'''
def handler(signum, frame):
signal.signal(signal.SIGINT, signal.default_int_handler)
def raise_timeout(signum, frame):
raise errors.Timeout()
def term_handler(signum, frame):
traceback.print_stack(frame)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
raise SystemExit(-signum)
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, term_handler)
signal.signal(signal.SIGALRM, raise_timeout)
def import_string(import_name, silent=False):
"""Imports an object based on a string. This is useful if you want to
use import paths as endpoints or something similar. An import path can
be specified either in dotted notation (``xml.sax.saxutils.escape``)
or with a colon as object delimiter (``xml.sax.saxutils:escape``).
If `silent` is True the return value will be `None` if the import fails.
For better debugging we recommend the new :func:`import_module`
function to be used instead.
:param import_name: the dotted name for the object to import.
:param silent: if set to `True` import errors are ignored and
`None` is returned instead.
:return: imported object
"""
# force the import name to automatically convert to strings
if is_unicode(import_name):
import_name = str(import_name)
try:
if ':' in import_name:
module, obj = import_name.split(':', 1)
elif '.' in import_name:
module, obj = import_name.rsplit('.', 1)
else:
return __import__(import_name)
# __import__ is not able to handle unicode strings in the fromlist
# if the module is a package
if is_unicode(obj) and not is_py3():
obj = obj.encode('utf-8')
try:
return getattr(__import__(module, None, None, [obj]), obj)
except (ImportError, AttributeError):
# support importing modules not yet set up by the parent module
# (or package for that matter)
modname = module + '.' + obj
__import__(modname)
return sys.modules[modname]
except ImportError as e:
if not silent:
raise (ImportStringError(import_name, e), None, sys.exc_info()[2])
def ensure_capped_collection(db, collection_name, size_mb):
'''
'''
if collection_name not in db.collection_names():
db.create_collection(collection_name, capped=True,
size=(1024.**2) * size_mb) # Mb
return db[collection_name]
@contextmanager
def stream_logging(silence=False):
from mtq.log import IOStreamLogger
stdout = sys.stdout
sys.stdout = IOStreamLogger(sys.stdout, silence)
stderr = sys.stderr
sys.stderr = IOStreamLogger(sys.stderr, silence)
yield sys.stdout, sys.stderr
sys.stdout = stdout
sys.stderr = stderr
class UnicodeFormatter(logging.Formatter):
def format(self, record):
msg = logging.Formatter.format(self, record)
# FIXME: should I be doing errors='replace'?
if hasattr(msg, 'decode'):
msg = msg.decode(errors='replace')
return msg
mgs_template = """Job %s exited with exception:
Job Log:
| %s
"""
@contextmanager
def setup_logging2(worker_id, job_id, lognames=()):
record = io.StringIO()
record_hndlr = logging.StreamHandler(record)
record_hndlr.setFormatter(UnicodeFormatter())
record_hndlr.setLevel(logging.INFO)
logger = logging.getLogger('job')
logger.setLevel(logging.INFO)
loggers = [logger] + [logging.getLogger(name) for name in lognames]
[l.addHandler(record_hndlr) for l in loggers]
logger.info('Starting Job %s' % job_id)
try:
yield loggers
except:
text = record.getvalue().replace('\n', '\n | ')
msg = mgs_template % (job_id, text,)
logger.exception(msg)
raise
else:
logger.info("Job %s finished successfully" % (job_id,))
finally:
pass
def setup_logging(worker_id, job_id, silence=False):
'''
set up logging for worker
'''
from mtq.log import mstream, MongoHandler
doc = {'worker_id':worker_id, 'job_id':job_id}
sys.stdout = mstream(collection, doc.copy(), sys.stdout, silence)
sys.sterr = mstream(collection, doc.copy(), sys.stderr, silence)
logger = logging.getLogger('job')
logger.setLevel(logging.INFO)
hndlr = MongoHandler(collection, doc.copy())
logger.addHandler(hndlr)
def now():
now = datetime.utcnow()
return now.replace(tzinfo=pytz.utc)
def nulltime():
dt = datetime.utcfromtimestamp(0)
return dt.replace(tzinfo=pytz.utc)
def config_dict(filename):
config = {}
if filename:
return vars(import_string(filename))
return config
def object_id(oid):
try:
return ObjectId(oid)
except InvalidId:
raise TypeError()
def wait_times(conn):
coll = conn.queue_collection
wait = { '$avg': { '$subtract':['$started_at_', '$enqueued_at_'] } }
raw = coll.aggregate([{'$match':{'processed':True}}, {'$group':{'_id':'$qname', 'wait': wait } } ])
result = raw['result']
return {item['_id']:item['wait'] for item in result}
def job_stats(conn, group_by='$execute.func_str', since=None):
coll = conn.queue_collection
duration = { '$avg': { '$subtract':['$finished_at_', '$started_at_'] } }
wait = { '$avg': { '$subtract':['$started_at_', '$enqueued_at_'] } }
count = {'$sum': 1}
failed = {'$sum': {'$cmp':['$failed', False]}}
queues = {'$addToSet': '$qname'}
tags = {'$addToSet': '$push'}
erliest = {'$min': '$finished_at'}
latest = {'$max': '$finished_at'}
match = {'$match':{'finished':True}}
if since:
match['$match']['finished_at'] = {'$gt':since}
raw = coll.aggregate([match, {'$group':{'_id':group_by,
'duration': duration,
'wait_in_queue': wait,
'count': count,
'queues': queues,
'tags': tags,
'failed':failed,
'latest':latest,
'erliest':erliest,
} }
])
result = raw['result']
return {item.pop('_id'):item for item in result}
def shutdown_worker(conn, worker_id=None):
coll = conn.worker_collection
query = {}
if worker_id:
query['_id'] = worker_id
else:
query = {'working':True}
print(coll.update(query, {'$set':{'terminate':True}}, multi=True))
def last_job(conn, worker_id):
coll = conn.queue_collection
cursor = coll.find({'worker_id': worker_id}).sort('enqueued_at', -1)
doc = next(cursor, None)
if doc:
from mtq.job import Job
return Job(conn, doc)
rK }rL (X shutdown_workerrM X defrN KóKý‡X
wait_timesrO X defrP KÉKЇX setup_logging2rQ X defrR K†K¢‡X last_jobrS X defrT KýM ‡X is_strrU X defrV KK‡X
setup_loggingrW X defrX K¢K²‡X config_dictrY X defrZ KºK‡X object_idr[ X defr\ KÂKɇX UnicodeFormatter.formatX defr] KyK€‡X job_statsr^ X defr_ KÐKó‡X handle_signals.raise_timeoutX defr` K*K-‡X handle_signals.handlerX defra K'K*‡X stream_loggingrb X defrc KkKx‡X ImportStringErrorrd X classre KK‡X UnicodeFormatterrf X classrg KxK€‡X nowrh X defri K²K¶‡X ensure_capped_collectionrj X defrk K`Kj‡X nulltimerl X defrm K¶Kº‡X
is_unicodern X defro KK#‡X
import_stringrp X defrq K6K`‡X handle_signalsrr X defrs K#K6‡X handle_signals.term_handlerX defrt K-K2‡u}ru (X
import_stringrv h#X
setup_loggingrw h#X ensure_capped_collectionrx h#X handle_signalsry h#u‡j! X mtq.connectionrz …r{ }r| bXÖ' '''
Created on Aug 2, 2013
@author: sean
'''
import mtq
from pymongo.mongo_client import MongoClient
from mtq.defaults import _collection_base, _qsize, _workersize, _logsize, \
_task_map
from mtq.utils import ensure_capped_collection, now
from time import mktime
from pymongo import ASCENDING
class MTQConnection(object):
'''
Base object that you should use to create all other TQ objects
## Init
:param db: mongo database
:param collection_base: base name for collection
:param qsize: the size of the capped collection of the queue
..sealso: MTQConnection.default, MTQConnection.from_config
'''
def __init__(self, db, collection_base=_collection_base, qsize=_qsize,
workersize=_workersize, logsize=_logsize, extra_lognames=()):
self.db = db
self.collection_base = collection_base
self.qsize = qsize
self.workersize = workersize
self.logsize = logsize
self._task_map = _task_map.copy()
self.extra_lognames = extra_lognames
def _destroy(self):
'Destroy ALL data'
self.db.connection.drop_database(self.db)
@classmethod
def default(cls):
'''
Create an MTQConnection default configuration using mongo from localhost
'''
return cls.from_config()
@classmethod
def from_config(cls, config=None, client=None):
'''
Create an MTQConnection from a config dict,
:param config: configutation dict, with the parameters
* DB_HOST
* DB
* COLLECTION_BASE
* COLLECTION_SIZE
:param client: a pymongo.MongoClient or None
'''
if config is None:
config = {}
if 'connection' in config:
return config['connection']
if client is None:
client = MongoClient(config.get('DB_HOST', 'mongodb://localhost/?journal=true'), tz_aware=True)
db = getattr(client, config.get('DB', 'mq'))
base = config.get('COLLECTION_BASE', _collection_base)
qsize = config.get('COLLECTION_SIZE', _qsize)
return cls(db, base, qsize, extra_lognames=config.get('extra_lognames', ()))
def job_stream(self, job_id):
'''
Get a file like object for the output of a job
'''
job = self.get_job(job_id)
return job.stream()
def worker_stream(self, worker_name=None,
worker_id=None):
'''
Get a file like object for the output of a worker
'''
worker = self.get_worker(worker_name, worker_id)
return worker.stream()
@property
def queue_collection(self):
'The collection to push jobs to'
collection_name = '%s.queue' % (self.collection_base)
return self.db[collection_name]
@property
def finished_jobs_collection(self):
'The collection to push jobs to'
collection_name = '%s.finished_jobs' % (self.collection_base)
return ensure_capped_collection(self.db, collection_name, self.qsize)
@property
def logging_collection(self):
'The collection to push log lines to'
db = self.db
collection_name = '%s.log' % self.collection_base
return ensure_capped_collection(db, collection_name, self.logsize)
@property
def schedule_collection(self):
'The collection to push log lines to'
db = self.db
collection_name = '%s.schedule' % self.collection_base
return db[collection_name]
def make_query(self, queues, tags, priority=0, processed=False, failed=False, **query):
'''
return a mongodb query dict to get the next task in the queue
'''
query.update({
'priority':{'$gte':priority},
'process_after': {'$lte':now()},
})
if failed:
query['failed'] = True
elif processed is not None:
query['processed'] = processed
if queues:
if len(queues) == 1:
query['qname'] = queues[0]
else:
query['qname'] = {'$in': queues}
query.update(self.make_tag_query(tags))
return query
def make_tag_query(self, tags):
'Query for tags'
if not tags:
tag_query = {}
# elif len(tags) == 1:
# return {'tags': tags[0]}
else:
# tags on job must be a subset of jobs on the worker
# i.e. assert job['tags'] in worker['tags']
tag_query = {'tags': {'$not':{ '$elemMatch' : {'$nin': tags}}}}
return tag_query
def add_mutex(self, query):
running_query = self.make_query(None, None, processed=True)
cursor = self.queue_collection.find(running_query, fields={'mutex':1, '_id':0})
if not cursor.count():
return
# Populate dictionary of mutex_key:count
mutex = {}
# Iterate over running jobs
for item in cursor:
item_mutex = item.get('mutex', {})
if not item_mutex: continue
mutext_key = item_mutex.get('key')
if not mutext_key: continue
mutex.setdefault(mutext_key, 0)
mutex[mutext_key] += 1
# Query should inclue jobs with no mutex key or where its key is not in any running jobs
_or = [{'mutex': None}, {'mutex': {'$exists': False}}, {'mutex.key': {'$nin': mutex.keys()}}]
# Query should inclue jobs where the mutex.count is > the # already running
for key, already_running in mutex.items():
_or.append({'mutex.key': key, 'mutex.count': {'$gt': already_running}})
query['$or'] = _or
def pop_item(self, worker_id, queues, tags, priority=0, failed=False):
'Pop an item from the queue'
n = now()
update = {'$set':{'processed':True,
'started_at': n,
'started_at_': mktime(n.timetuple()),
'worker_id':worker_id}
}
query = self.make_query(queues, tags, priority, failed)
self.add_mutex(query)
doc = self.queue_collection.find_and_modify(query, update, sort=[('enqueued_at', ASCENDING)])
if doc is None:
return None
else:
return mtq.Job(self, doc)
def push_item(self, job_id):
query = {'_id': job_id}
update = {'$set':{'processed':False}}
doc = self.queue_collection.find_and_modify(query, update)
def _items_cursor(self, queues, tags, priority=0, processed=False, limit=None, reverse=False):
query = self.make_query(queues, tags, priority, processed=processed)
cursor = self.queue_collection.find(query)
if reverse:
cursor = cursor.sort('enqueued_at', -1)
if limit:
cursor = cursor.limit(limit)
return cursor
def items(self, queues, tags, priority=0, processed=False, limit=None, reverse=False):
cursor = self._items_cursor(queues, tags, priority, processed, limit, reverse)
return [mtq.Job(self, doc) for doc in cursor]
def queue(self, name='default', tags=(), priority=0):
'''
Create a queue object
:param name: the name of the queue
:param tags: default tags to give to jobs
:param priority: (not implemented yet)
'''
return mtq.Queue(self, name, tags, priority)
def new_worker(self, queues=(), tags=(), priority=0, silence=False,
log_worker_output=False, poll_interval=3, args=None):
'''
Create a worker object
:param queues: names of queues to pop from (these are OR'd)
:param tags: jobs *must* have all these tags to be processed by this worker
:param priority: (not implemented yet)
:param log_worker_output: if true, log worker output to the db
'''
worker = mtq.Worker(self, queues, tags, priority,
log_worker_output=log_worker_output,
silence=silence, extra_lognames=self.extra_lognames, poll_interval=poll_interval)
self.args = args
self.worker = worker
return worker
#===========================================================================
# Workers
#===========================================================================
@property
def worker_collection(self):
'Collection to register workers to'
collection_name = '%s.workers' % self.collection_base
return self.db[collection_name]
@property
def queues(self):
'List of existing queues'
collection = self.queue_collection
qnames = collection.find().distinct('qname')
return [mtq.Queue(self, qname) for qname in qnames]
@property
def workers(self):
'''
List of existing workers
:returns: a WorkerProxy object
'''
collection = self.worker_collection
return [mtq.WorkerProxy(self, item) for item in collection.find({'working':True})]
def get_job(self, job_id):
'''
retrieve a job
'''
collection = self.queue_collection
doc = collection.find_one({'_id':job_id})
if doc is None:
return None
return mtq.Job(self, doc)
def get_worker(self, worker_name=None, worker_id=None):
'''
retrieve a worker
'''
coll = self.worker_collection
if worker_name:
query = {'name':worker_name}
elif worker_id:
query = {'_id':worker_id}
else:
raise TypeError('must give one of worker_name or worker_id')
doc = coll.find_one(query)
if doc is None:
raise TypeError('Could not find worker')
return mtq.WorkerProxy(self, doc)
#===========================================================================
# Scheduler
#===========================================================================
def scheduler(self):
return mtq.Scheduler(self)
r} }r~ (X MTQConnection._destroyX defr K%K)‡X MTQConnection.queuesX defr€ MM
‡X
MTQConnectionr X classr‚ KM:‡X MTQConnection.push_itemX defrƒ KÇK̇X MTQConnection.defaultX defr„ K*K0‡X MTQConnection.from_configX defr… K1KK‡X! MTQConnection.schedule_collectionX defr† KnKt‡X MTQConnection.worker_streamX defr‡ KRKZ‡X MTQConnection.queueX defrˆ KÛKè‡X MTQConnection.schedulerX defr‰ M6M:‡X MTQConnection.make_queryX defrŠ KtKŠ‡X MTQConnection.make_tag_queryX defr‹ KŠK˜‡X MTQConnection.job_streamX defrŒ KKKR‡X MTQConnection.workersX defr MM‡X MTQConnection.pop_itemX defrŽ K´KLJX MTQConnection.logging_collectionX defr KgKm‡X MTQConnection.new_workerX defr KèKý‡X MTQConnection._items_cursorX defr‘ KÌKׇX MTQConnection.queue_collectionX defr’ K[K`‡X MTQConnection.worker_collectionX defr“ KþM‡X MTQConnection.__init__X defr” KK%‡X MTQConnection.get_jobX defr• MM ‡X MTQConnection.itemsX defr– K×KÛ‡X&