cowin-monitor/app.py
Ameya Shenoy 9afcfa664f
fix: alert on bad response
Signed-off-by: Ameya Shenoy <shenoy.ameya@gmail.com>
2021-05-04 20:33:34 +05:30

78 lines
2.9 KiB
Python

# standard imports
import os
import time
import logging
from datetime import datetime
# third-party imports
import pytz
import requests
import telegram
IST = pytz.timezone('Asia/Kolkata')
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO)
logger = logging.getLogger(__name__)
# customizable vars
AGE = os.environ.get('AGE', 18)
DIST_CODE = int(os.environ.get('DIST_CODE'))
TELEGRAM_BOT_TOKEN = os.environ.get('TELEGRAM_BOT_TOKEN')
TELEGRAM_CHAT_ID = int(os.environ.get('TELEGRAM_CHAT_ID'))
SLEEP_TIME_SEC = int(os.environ.get('SLEEP_TIME_SEC', 60))
def main():
message_header = "Atleast 1 new slot seems to have just opened up for vaccination.\n\n"
message_footer = "Visit [this link](https://selfregistration.cowin.gov.in/appointment) to book a slot for yourself."
bot = telegram.Bot(token=TELEGRAM_BOT_TOKEN)
logger.info('Initiating loop')
while True:
logger.info('Hitting CoWIN API')
date_str = datetime.now(IST).strftime("%d-%m-%Y")
url = "https://cdn-api.co-vin.in/api/v2/appointment/sessions/calendarByDistrict?district_id={}&date={}".format(DIST_CODE, date_str)
response = requests.get(url)
available_center_list = []
if response.ok:
resp_json = response.json()
for center in resp_json.get('centers'):
for session in center.get('sessions'):
if session.get('available_capacity') > 0:
curr = {
'location': center.get('name'),
'available_capacity': session.get('available_capacity'),
'available_slots': ', '.join(session.get('slots')),
'vaccine': session.get('vaccine'),
'min_age_limit': session.get('min_age_limit'),
'fee_type': center.get('fee_type'),
}
available_center_list.append(curr)
else:
logger.info(response)
msg = "Cowin API is giving a bad response"
bot.sendMessage(chat_id=TELEGRAM_CHAT_ID, text=msg, parse_mode='markdown')
message_body = ''
if available_center_list:
for center in available_center_list:
if center.get('min_age_limit') < AGE:
msg = f"""Location: {center.get('location')}
Available Capacity: {center.get('available_capacity')}
Available Slots: {center.get('available_slots')}
Vaccine: {center.get('vaccine')}
Age Limit: {center.get('min_age_limit')}
Fee: {center.get('fee_type')}\n\n"""
message_body += msg
break # to prevent too long messages
msg = message_header + message_body + message_footer
bot.sendMessage(chat_id=TELEGRAM_CHAT_ID, text=msg, parse_mode='markdown')
time.sleep(SLEEP_TIME_SEC)
if __name__ == '__main__':
main()