- add relevant test to ensure doesn't happen again - fix test_today_before_publish_time Signed-off-by: Ameya Shenoy <shenoy.ameya@gmail.com>
186 lines
6.4 KiB
Python
186 lines
6.4 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""Utils for bullish."""
|
|
|
|
# standard imports
|
|
import csv
|
|
import json
|
|
import datetime
|
|
import logging
|
|
from typing import Tuple
|
|
|
|
from io import BytesIO, TextIOWrapper
|
|
from zipfile import ZipFile
|
|
|
|
# third-party imports
|
|
import django_rq
|
|
import requests
|
|
|
|
from django_redis import get_redis_connection
|
|
from django.db import transaction
|
|
|
|
# app imports
|
|
from app.models import BhavCopyEquity
|
|
|
|
|
|
cache = get_redis_connection("default")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def fetch_bhav_copy_equity_data(date=None):
|
|
"""Fetch data from BSE India website."""
|
|
# hack: since bseindia website doesn't respond well to python requests
|
|
user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0'
|
|
headers = {
|
|
'User-Agent': user_agent
|
|
}
|
|
if date is None:
|
|
date = datetime.datetime.now()
|
|
datestr = date.strftime("%d%m%y")
|
|
zipurl = f'https://www.bseindia.com/download/BhavCopy/Equity/EQ{datestr}_CSV.ZIP'
|
|
logger.info('Fetching data from %s', zipurl)
|
|
resp = requests.get(zipurl, headers=headers)
|
|
|
|
if resp.ok:
|
|
with ZipFile(BytesIO(resp.content)) as bhavcopy_zf:
|
|
csv_file = bhavcopy_zf.namelist()[0]
|
|
with bhavcopy_zf.open(csv_file, 'r') as infile:
|
|
data = list(csv.reader(TextIOWrapper(infile, 'utf-8')))
|
|
return data
|
|
raise ValueError('Fetching data from BSE unsuccessful')
|
|
|
|
|
|
def populate_bhav_copy_data(date=None):
|
|
"""Populate DB with Bhav Copy data."""
|
|
if date is None:
|
|
date = datetime.datetime.now().date()
|
|
try:
|
|
data = fetch_bhav_copy_equity_data(date=date)
|
|
except Exception as err:
|
|
# Potentially add code for alerting if needed
|
|
# Repeat job after 10 mins if job fails
|
|
scheduler = django_rq.get_scheduler('default')
|
|
scheduler.schedule(
|
|
scheduled_time=datetime.datetime.now()+datetime.timedelta(minutes=10),
|
|
func=populate_bhav_copy_data,
|
|
)
|
|
raise ValueError(f"Error fetching data from BSE for {date}\nDetails: {err}")
|
|
else:
|
|
del data[0] # delete title row
|
|
populate_bhav_copy_data_into_postgres(data, date=date)
|
|
populate_bhav_copy_data_into_redis_v1(data, date=date)
|
|
populate_bhav_copy_data_into_redis_v2(data, date=date)
|
|
|
|
|
|
def populate_bhav_copy_data_into_redis_v1(data=None, date=None):
|
|
if date is None:
|
|
raise ValueError("Date is required")
|
|
if data is None:
|
|
data = fetch_bhav_copy_equity_data(date=date)
|
|
del data[0]
|
|
logger.info('Populating data into redis')
|
|
datestr = date.strftime("%d%m%y")
|
|
pipe = cache.pipeline()
|
|
stocks_key = f"stocks:{datestr}"
|
|
cache.delete(stocks_key)
|
|
data = stocks_csv_to_json(data)
|
|
for stock in data:
|
|
# prevent creation of duplicate entries
|
|
stock_code = stock.get('sc_code')
|
|
pipe.rpush(stocks_key, stock_code)
|
|
pipe.hset(
|
|
f"stock:{datestr}:{stock_code}",
|
|
mapping=stock
|
|
)
|
|
pipe.execute()
|
|
|
|
|
|
def populate_bhav_copy_data_into_redis_v2(data=None, date=None):
|
|
if date is None:
|
|
raise ValueError("Date required.")
|
|
if data is None:
|
|
data = fetch_bhav_copy_equity_data(date=date)
|
|
del data[0]
|
|
logger.info('Populating data into redis v2')
|
|
datestr = date.strftime("%d%m%y")
|
|
data = stocks_csv_to_json(data)
|
|
stocks_key = f"stocks:v2:{datestr}"
|
|
cache.delete(stocks_key)
|
|
cache.set(stocks_key, json.dumps(data))
|
|
|
|
|
|
@transaction.atomic
|
|
def populate_bhav_copy_data_into_postgres(data=None, date=None):
|
|
logger.info('Populating data into postgres for %s', date)
|
|
if date is None:
|
|
raise ValueError("Date required")
|
|
if data is None:
|
|
data = fetch_bhav_copy_equity_data(date=date)
|
|
del data[0]
|
|
data = stocks_csv_to_json(data)
|
|
for stock in data:
|
|
stock['date'] = date
|
|
BhavCopyEquity.objects.get_or_create(**stock)
|
|
|
|
|
|
def stocks_csv_to_json(data):
|
|
stocks = []
|
|
for stock in data:
|
|
stocks.append({
|
|
"sc_code": stock[0],
|
|
"sc_name": stock[1],
|
|
"sc_group": stock[2],
|
|
"sc_type": stock[3],
|
|
"open_price": float(stock[4]),
|
|
"high_price": float(stock[5]),
|
|
"low_price": float(stock[6]),
|
|
"close_price": float(stock[7]),
|
|
"last_price": float(stock[8]),
|
|
"prevclose_price": float(stock[9]),
|
|
"no_trades": int(stock[10]),
|
|
"no_of_shrs": int(stock[11]),
|
|
"net_turnov": float(stock[12]),
|
|
"tdcloindi": stock[13],
|
|
})
|
|
return stocks
|
|
|
|
|
|
def verify_date(date: str, ret_message: str) -> Tuple[datetime.datetime, str]:
|
|
"""Verify current date.
|
|
|
|
Check current date and time and return the appropriate date. Also take care
|
|
of the following usecases
|
|
- if today before publish time return yesterdays data
|
|
- if today after publish time return todays data
|
|
- if a future date, return current day's data with time travel message
|
|
- if a weekend return Friday's data with custom message
|
|
"""
|
|
# Verify Date
|
|
logger.info('Verifying date %s', date)
|
|
# IST
|
|
curr_datetime = datetime.datetime.now() - datetime.timedelta(hours=5, minutes=30)
|
|
req_datetime = datetime.datetime.strptime(date, '%Y-%m-%d')
|
|
next_publish_time = curr_datetime.replace(hour=6, minute=0, second=0,
|
|
microsecond=0)
|
|
logger.info('Current IST Datetime: %s; Request Datetime: %s; Next Publish: %s',
|
|
curr_datetime, req_datetime, next_publish_time)
|
|
# Check for day
|
|
if (req_datetime.date() == curr_datetime.date()
|
|
and curr_datetime < next_publish_time):
|
|
logger.info("Today's data not yet present, returning yesterday's data!")
|
|
ret_message += "Today's data not yet published, returning yesterday's data. "
|
|
req_datetime -= datetime.timedelta(days=1)
|
|
# Check for future date
|
|
if req_datetime.date() > curr_datetime.date():
|
|
logger.info("Future date, resetting it to today!")
|
|
ret_message += "Time travel not yet invented! Returning latest available data. "
|
|
req_datetime = curr_datetime
|
|
# Check for weekend
|
|
day_num = req_datetime.weekday()
|
|
if day_num in [5, 6]:
|
|
logger.info("Weekend, switching it to Friday!")
|
|
ret_message += "Markets are closed on weekends. Returning data for Friday. "
|
|
req_datetime -= datetime.timedelta(days=day_num-4) # change req to Friday
|
|
return req_datetime, ret_message
|
|
|