bullish/backend/app/utils.py

150 lines
4.7 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Utils for bullish."""
# standard imports
import csv
import datetime
import logging
from io import BytesIO, TextIOWrapper
from zipfile import ZipFile
# third-party imports
import django_rq
import requests
from django_redis import get_redis_connection
from django.db import transaction
# app imports
from app.models import BhavCopyEquity
cache = get_redis_connection("default")
logger = logging.getLogger(__name__)
def fetch_bhav_copy_equity_data(date=None):
"""Fetch data from BSE India website."""
# hack: since bseindia website doesn't respond well to python requests
user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0'
headers = {
'User-Agent': user_agent
}
if date is None:
date = datetime.datetime.now()
datestr = date.strftime("%d%m%y")
zipurl = f'https://www.bseindia.com/download/BhavCopy/Equity/EQ{datestr}_CSV.ZIP'
logger.info('Fetching data from %s', zipurl)
resp = requests.get(zipurl, headers=headers)
if resp.ok:
with ZipFile(BytesIO(resp.content)) as bhavcopy_zf:
csv_file = bhavcopy_zf.namelist()[0]
with bhavcopy_zf.open(csv_file, 'r') as infile:
data = list(csv.reader(TextIOWrapper(infile, 'utf-8')))
return data
raise ValueError('Fetching data from BSE unsuccessful')
def populate_bhav_copy_data(date=None):
"""Populate DB with Bhav Copy data."""
try:
data = fetch_bhav_copy_equity_data(date=date)
except Exception as err:
# Potentially add code for alerting if needed
# Repeat job after 10 mins if job fails
scheduler = django_rq.get_scheduler('default')
scheduler.schedule(
scheduled_time=datetime.datetime.now()+datetime.timedelta(minutes=10),
func=populate_bhav_copy_data,
)
raise ValueError(f"Error fetching data from BSE for {date}\nDetails: {err}")
else:
del data[0] # delete title row
populate_bhav_copy_data_into_redis(data, date=date)
populate_bhav_copy_data_into_postgres(data, date=date)
def populate_bhav_copy_data_into_redis(data, date):
logger.info('Populating data into redis')
datestr = date.strftime("%d%m%y")
pipe = cache.pipeline()
stocks_key = f"stocks:{datestr}"
cache.delete(stocks_key)
data = stocks_csv_to_json(data)
for stock in data:
# prevent creation of duplicate entries
stock_code = stock.get('sc_code')
pipe.rpush(stocks_key, stock_code)
pipe.hset(
f"stock:{datestr}:{stock_code}",
mapping=stock
)
pipe.execute()
@transaction.atomic
def populate_bhav_copy_data_into_postgres(data, date=None):
logger.info('Populating data into postgres for %s', date)
if date is None:
date = datetime.datetime.now().date()
for stock in data:
BhavCopyEquity.objects.get_or_create(
date=date,
sc_code=int(stock[0]),
sc_name=stock[1],
sc_group=stock[2],
sc_type=stock[3],
open_price=float(stock[4]),
high_price=float(stock[5]),
low_price=float(stock[6]),
close_price=float(stock[7]),
last_price=float(stock[8]),
prevclose_price=float(stock[9]),
no_trades=int(stock[10]),
no_of_shrs=int(stock[11]),
net_turnov=float(stock[12]),
tdcloindi=stock[13],
)
def stocks_csv_to_json(data):
stocks = []
for stock in data:
stocks.append({
"sc_code": stock[0],
"sc_name": stock[1],
"sc_group": stock[2],
"sc_type": stock[3],
"open_price": float(stock[4]),
"high_price": float(stock[5]),
"low_price": float(stock[6]),
"close_price": float(stock[7]),
"last_price": float(stock[8]),
"prevclose_price": float(stock[9]),
"no_trades": int(stock[10]),
"no_of_shrs": int(stock[11]),
"net_turnov": float(stock[12]),
"tdcloindi": stock[13],
})
return stocks
def verify_date(date, ret_message):
# Verify Date
logger.info('Verifying date %s', date)
req_date = datetime.datetime.strptime(date, '%Y-%m-%d')
# 18:00 IST == 12:30 UTC
today = datetime.datetime.now().replace(hour=12, minute=30, second=0, microsecond=0)
logger.info('Req Date: %s; Today: %s', req_date, today)
if req_date > today:
ret_message = "Time travel not yet invented! Returning latest available data."
req_date = today
if req_date.date() == today.date() and req_date < today:
req_date = today - datetime.timedelta(days=1)
return req_date, ret_message