bullish/backend/app/utils.py

184 lines
6.2 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Utils for bullish."""
# standard imports
import csv
import json
import datetime
import logging
from typing import Tuple
from io import BytesIO, TextIOWrapper
from zipfile import ZipFile
# third-party imports
import django_rq
import requests
from django_redis import get_redis_connection
from django.db import transaction
# app imports
from app.models import BhavCopyEquity
cache = get_redis_connection("default")
logger = logging.getLogger(__name__)
def fetch_bhav_copy_equity_data(date=None):
"""Fetch data from BSE India website."""
# hack: since bseindia website doesn't respond well to python requests
user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0'
headers = {
'User-Agent': user_agent
}
if date is None:
date = datetime.datetime.now()
datestr = date.strftime("%d%m%y")
zipurl = f'https://www.bseindia.com/download/BhavCopy/Equity/EQ{datestr}_CSV.ZIP'
logger.info('Fetching data from %s', zipurl)
resp = requests.get(zipurl, headers=headers)
if resp.ok:
with ZipFile(BytesIO(resp.content)) as bhavcopy_zf:
csv_file = bhavcopy_zf.namelist()[0]
with bhavcopy_zf.open(csv_file, 'r') as infile:
data = list(csv.reader(TextIOWrapper(infile, 'utf-8')))
return data
raise ValueError('Fetching data from BSE unsuccessful')
def populate_bhav_copy_data(date=None):
"""Populate DB with Bhav Copy data."""
if date is None:
date = datetime.datetime.now().date()
try:
data = fetch_bhav_copy_equity_data(date=date)
except Exception as err:
# Potentially add code for alerting if needed
# Repeat job after 10 mins if job fails
scheduler = django_rq.get_scheduler('default')
scheduler.schedule(
scheduled_time=datetime.datetime.now()+datetime.timedelta(minutes=10),
func=populate_bhav_copy_data,
)
raise ValueError(f"Error fetching data from BSE for {date}\nDetails: {err}")
else:
del data[0] # delete title row
populate_bhav_copy_data_into_postgres(data, date=date)
populate_bhav_copy_data_into_redis_v1(data, date=date)
populate_bhav_copy_data_into_redis_v2(data, date=date)
def populate_bhav_copy_data_into_redis_v1(data=None, date=None):
if date is None:
raise ValueError("Date is required")
if data is None:
data = fetch_bhav_copy_equity_data(date=date)
del data[0]
logger.info('Populating data into redis')
datestr = date.strftime("%d%m%y")
pipe = cache.pipeline()
stocks_key = f"stocks:{datestr}"
cache.delete(stocks_key)
data = stocks_csv_to_json(data)
for stock in data:
# prevent creation of duplicate entries
stock_code = stock.get('sc_code')
pipe.rpush(stocks_key, stock_code)
pipe.hset(
f"stock:{datestr}:{stock_code}",
mapping=stock
)
pipe.execute()
def populate_bhav_copy_data_into_redis_v2(data=None, date=None):
if date is None:
raise ValueError("Date required.")
if data is None:
data = fetch_bhav_copy_equity_data(date=date)
del data[0]
logger.info('Populating data into redis v2')
datestr = date.strftime("%d%m%y")
data = stocks_csv_to_json(data)
stocks_key = f"stocks:v2:{datestr}"
cache.delete(stocks_key)
cache.set(stocks_key, json.dumps(data))
@transaction.atomic
def populate_bhav_copy_data_into_postgres(data=None, date=None):
logger.info('Populating data into postgres for %s', date)
if date is None:
raise ValueError("Date required")
if data is None:
data = fetch_bhav_copy_equity_data(date=date)
del data[0]
data = stocks_csv_to_json(data)
for stock in data:
stock['date'] = date
BhavCopyEquity.objects.get_or_create(**stock)
def stocks_csv_to_json(data):
stocks = []
for stock in data:
stocks.append({
"sc_code": stock[0],
"sc_name": stock[1],
"sc_group": stock[2],
"sc_type": stock[3],
"open_price": float(stock[4]),
"high_price": float(stock[5]),
"low_price": float(stock[6]),
"close_price": float(stock[7]),
"last_price": float(stock[8]),
"prevclose_price": float(stock[9]),
"no_trades": int(stock[10]),
"no_of_shrs": int(stock[11]),
"net_turnov": float(stock[12]),
"tdcloindi": stock[13],
})
return stocks
def verify_date(date: str, ret_message: str) -> Tuple[datetime.datetime, str]:
"""Verify current date.
Check current date and time and return the appropriate date. Also take care
of the following usecases
- if today before publish time return yesterdays data
- if today after publish time return todays data
- if a future date, return current day's data with time travel message
- if a weekend return Friday's data with custom message
"""
# Verify Date
logger.info('Verifying date %s', date)
# IST
curr_datetime = datetime.datetime.now() - datetime.timedelta(hours=5, minutes=30)
req_datetime = datetime.datetime.strptime(date, '%Y-%m-%d')
next_publish_time = curr_datetime.replace(hour=6, minute=0, second=0,
microsecond=0)
logger.info('Current IST Datetime: %s; Request Datetime: %s; Next Publish: %s',
curr_datetime, req_datetime, next_publish_time)
# Check for future date
if req_datetime.date() > curr_datetime.date():
ret_message += "Time travel not yet invented! Returning latest available data. "
req_datetime = curr_datetime
# Check for weekend
day_num = req_datetime.weekday()
if day_num in [5, 6]:
ret_message += "Markets are closed on weekends. Returning data for Friday. "
req_datetime -= datetime.timedelta(days=day_num-4) # change req to Friday
# Check for day
if (req_datetime.date() == curr_datetime.date()
and curr_datetime < next_publish_time):
ret_message += "Today's data not yet published, returning yesterday's data. "
req_datetime -= datetime.timedelta(days=1)
return req_datetime, ret_message