1. ホーム
  2. スクリプト・コラム
  3. パイソン

pythonサイクルタスクスケジューリングツール スケジュール詳解

2022-01-02 19:02:08

Pythonスクリプトを定期的に実行したい場合、最もよく知られているのはCrontabスクリプトでしょうが、Crontabには次のような欠点があります。

1. 第2レベルのタスクの実行には不都合である。

2. Crontabは、実行すべきスケジュールタスクが何百個もある場合、特に管理が不便です。

もう一つの選択肢としてCeleryがありますが、Celeryは設定がやや面倒なので、軽量なスケジュール管理ツールが必要なだけなら、Celeryは良い選択とは言えません。

軽量なタスクスケジューラが欲しい、できるだけシンプルで使いやすく、外部依存がなく、理想的にはCrontabの基本的な機能をすべて収容できるものがいい、という場合には、Scheduleモジュールが適しています。 その

タスクのスケジューリングに使用すると、わずか数行のコードで、その違いを感じることができます。

import schedule
import time
def job():
    print("I'm working... ")
schedule.every(10).minutes.do(job)
while True:
    schedule.run_pending()
    time.sleep(1)


上記のコードは、10分ごとにJob関数が実行されることを意味しており、非常にシンプルで便利です。スケジュールモジュールを導入し、schedule.every(time number)で定期的なタスクを発行するだけです。 time type.do(job) で定期的なタスクを発行します。

サイクルタスクの公開後は run_pending 関数を使用して実行を確認するため、そのためには While ループでこの関数をポーリングし続けます。

以下、Scheduleモジュールのインストール方法と、初級・上級者向けの使用方法について説明します。

1. 準備

始める前に、以下のいずれかの方法で依存関係をインストールするコマンドを入力して、Pythonとpipがコンピュータに正常にインストールされていることを確認する必要があります。

Windows環境 Cmdを開く(start-run-CMD)。

MacOS環境 端末を開く(command+spaceでTerminalと入力)。

VSCodeエディタやPycharmを使用している場合は、インターフェースの下部にあるTerminalを直接使用することができます。

pip install schedule


2. 基本的な使い方

最も基本的な使い方は冒頭で述べたとおりですが、ここではさらにタスクをスケジューリングする例を紹介します。

import schedule
import time
def job():
    print("I'm working... ")
# Execute the job every 10 minutes
schedule.every(10).minutes.do(job)
# Execute the job every hour
schedule.every().hour.do(job)
# Execute the job at 10:30 every day
schedule.every().day.at("10:30").do(job)
# Execute the job every month
schedule.every().monday.do(job)
# Execute the job every Wednesday at 13:15
schedule.every().Wednesday.at("13:15").do(job)
# Execute the job at the 17th second of every minute
schedule.hour().minute.at("17").do(job)
while True:
    schedule.run_pending()
    time.sleep(1)


ご覧のように、月単位から秒単位までの設定は、上記の例でカバーされています。しかし、一度だけタスクを実行したい場合は、以下のように合わせます。

import schedule
import time
def job_that_executes_once():
    # The job written here will only be executed once...
    return schedule.CancelJob
schedule.every().day.at('22:30').do(job_that_executes_once)
while True:
    schedule.run_pending()
    time.sleep(1)


パラメータ受け渡し

実行するジョブに渡すパラメータがある場合、これを実行するだけです。

import schedule
def greet(name):
    print('Hello', name)
# do() Pass additional arguments to the job function
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')


現在の求人情報をすべて取得する

現在の割り当てをすべて取得したい場合。

import schedule
def hello():
    print('Hello world')
schedule.every().second.do(hello)
all_jobs = schedule.get_jobs()


すべてのジョブをキャンセルする

何らかのメカニズムが作動した場合、現在のプログラムからすべてのジョブを直ちに消去する必要があります。

import schedule
def greet(name):
    print('Hello {}'.format(name))
schedule.every().second.do(greet)
schedule.clear()


タグ機能

ジョブを設定する際、その後の管理上の利便性を考慮して、ジョブにタグを付けることができ、タグでフィルタリングしてジョブを取得したり、ジョブをキャンセルしたりすることができます。

import schedule
def greet(name):
    print('Hello {}'.format(name))
# .tag tagging
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
# get_jobs(tag): you can get all the tasks for this tag
friends = schedule.get_jobs('friends')
# Cancel all tasks with the daily-tasks tag
schedule.clear('daily-tasks')


仕事の納期を設定する

ある仕事の期限を設定する必要がある場合、この方法を使用することで設定することができます。

import schedule
from datetime import datetime, timedelta, time
def job():
    print('Boo')
# Run job every hour, stop after 18:30
schedule.every(1).hours.until("18:30").do(job)
# Run job every hour, 2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)
# Run the job every hour and stop after 8 hours
schedule.every(1).hours.until(timedelta(hours=8)).do(job)
# Run jobs every hour, stop after 11:32:42
schedule.every(1).hours.until(time(11, 33, 42)).do(job)
# Run jobs every hour, stop after 2020-5-17 11:36:20
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)


締切日を過ぎるとジョブは実行されません。

スケジューリングに関係なく、すべてのジョブをすぐに実行する

メカニズムが起動し、すべてのジョブをすぐに実行する必要がある場合は、以下のように schedule.run_all() :

import schedule
def job_1():
    print('Foo')
def job_2():
    print('Bar')
schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)
schedule.run_all()
# Run all jobs immediately, 10 seconds between each job
schedule.run_all(delay_seconds=10)


3. 高度な使用方法

ジョブをスケジュールするためのデコレーター

set job フォームが冗長すぎると感じる場合は、decorator パターンを使用することもできます。

from schedule import every, repeat, run_pending
import time
# This decorator has the same effect as schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
def job():
    print("I am a scheduled job")
while True:
    run_pending()
    time.sleep(1)


並列実行

デフォルトでは、Scheduleはすべてのジョブを逐次実行します。この背景には、誰もが満足するような並列実行モデルを見つけることが難しいという理由があります。

しかし、各ジョブをマルチスレッドで実行することで、この制限を回避することができます。

import threading
import time
import schedule
def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())
def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()
schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)
while True:
    schedule.run_pending()
    time.sleep(1)


ロギング

Scheduleモジュールは、ログの記録にも対応しており、以下のように使用します。

import schedule
import logging
logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# Logging level is DEBUG
schedule_logger.setLevel(level=logging.DEBUG)
def job():
    print("Hello, Logs")
schedule.every().second.do(job)
schedule.run_all()
schedule.clear()


その効果は次の通りです。

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs


例外処理

スケジュールは例外を自動的にキャッチせず、例外に遭遇したときに投げるだけです。これは重大な問題を引き起こします:後続のすべてのジョブが中断されるため、例外をキャッチする必要があるのです。

手動でキャッチすることもできますが、想定していない状況ではプログラムによる自動キャッチが必要な場合もあり、デコレーターがそれを行います。

import functools
def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator
@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0
schedule.every(5).minutes.do(bad_task)


このように bad_task を実行する際に発生したエラーは catch_exceptions これは、タスクを適切に実行するようスケジューリングする際に重要です。

これで今回の記事は終わりです。今日のPythonチュートリアルを楽しんでいただけたなら、ぜひご期待ください。

pythonの定期タスクスケジューリングツール「Schedule」の使い方を詳しく説明します。