#!/usr/bin/env python # -*- coding: utf-8 -*- """Utils for bullish.""" # standard imports import csv import json import datetime import logging from io import BytesIO, TextIOWrapper from zipfile import ZipFile # third-party imports import django_rq import requests from django_redis import get_redis_connection from django.db import transaction # app imports from app.models import BhavCopyEquity cache = get_redis_connection("default") logger = logging.getLogger(__name__) def fetch_bhav_copy_equity_data(date=None): """Fetch data from BSE India website.""" # hack: since bseindia website doesn't respond well to python requests user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0' headers = { 'User-Agent': user_agent } if date is None: date = datetime.datetime.now() datestr = date.strftime("%d%m%y") zipurl = f'https://www.bseindia.com/download/BhavCopy/Equity/EQ{datestr}_CSV.ZIP' logger.info('Fetching data from %s', zipurl) resp = requests.get(zipurl, headers=headers) if resp.ok: with ZipFile(BytesIO(resp.content)) as bhavcopy_zf: csv_file = bhavcopy_zf.namelist()[0] with bhavcopy_zf.open(csv_file, 'r') as infile: data = list(csv.reader(TextIOWrapper(infile, 'utf-8'))) return data raise ValueError('Fetching data from BSE unsuccessful') def populate_bhav_copy_data(date=None): """Populate DB with Bhav Copy data.""" try: data = fetch_bhav_copy_equity_data(date=date) except Exception as err: # Potentially add code for alerting if needed # Repeat job after 10 mins if job fails scheduler = django_rq.get_scheduler('default') scheduler.schedule( scheduled_time=datetime.datetime.now()+datetime.timedelta(minutes=10), func=populate_bhav_copy_data, ) raise ValueError(f"Error fetching data from BSE for {date}\nDetails: {err}") else: del data[0] # delete title row populate_bhav_copy_data_into_postgres(data, date=date) populate_bhav_copy_data_into_redis_v1(data, date=date) populate_bhav_copy_data_into_redis_v2(data, date=date) def populate_bhav_copy_data_into_redis_v1(data=None, date=None): if data is None: data = fetch_bhav_copy_equity_data(date=date) del data[0] if date is None: date = datetime.datetime.now().date() logger.info('Populating data into redis') datestr = date.strftime("%d%m%y") pipe = cache.pipeline() stocks_key = f"stocks:{datestr}" cache.delete(stocks_key) data = stocks_csv_to_json(data) for stock in data: # prevent creation of duplicate entries stock_code = stock.get('sc_code') pipe.rpush(stocks_key, stock_code) pipe.hset( f"stock:{datestr}:{stock_code}", mapping=stock ) pipe.execute() def populate_bhav_copy_data_into_redis_v2(data=None, date=None): if data is None: data = fetch_bhav_copy_equity_data(date=date) del data[0] if date is None: date = datetime.datetime.now().date() datestr = date.strftime("%d%m%y") data = stocks_csv_to_json(data) stocks_key = f"stocks:v2:{datestr}" cache.delete(stocks_key) cache.set(stocks_key, json.dumps(data)) @transaction.atomic def populate_bhav_copy_data_into_postgres(data=None, date=None): logger.info('Populating data into postgres for %s', date) if data is None: data = fetch_bhav_copy_equity_data(date=date) del data[0] if date is None: date = datetime.datetime.now().date() for stock in data: BhavCopyEquity.objects.get_or_create( date=date, sc_code=int(stock[0]), sc_name=stock[1], sc_group=stock[2], sc_type=stock[3], open_price=float(stock[4]), high_price=float(stock[5]), low_price=float(stock[6]), close_price=float(stock[7]), last_price=float(stock[8]), prevclose_price=float(stock[9]), no_trades=int(stock[10]), no_of_shrs=int(stock[11]), net_turnov=float(stock[12]), tdcloindi=stock[13], ) def stocks_csv_to_json(data): stocks = [] for stock in data: stocks.append({ "sc_code": stock[0], "sc_name": stock[1], "sc_group": stock[2], "sc_type": stock[3], "open_price": float(stock[4]), "high_price": float(stock[5]), "low_price": float(stock[6]), "close_price": float(stock[7]), "last_price": float(stock[8]), "prevclose_price": float(stock[9]), "no_trades": int(stock[10]), "no_of_shrs": int(stock[11]), "net_turnov": float(stock[12]), "tdcloindi": stock[13], }) return stocks def verify_date(date, ret_message): # Verify Date logger.info('Verifying date %s', date) req_date = datetime.datetime.strptime(date, '%Y-%m-%d') # 18:00 IST == 12:30 UTC today = datetime.datetime.now().replace(hour=12, minute=30, second=0, microsecond=0) logger.info('Req Date: %s; Today: %s', req_date, today) if req_date > today: ret_message = "Time travel not yet invented! Returning latest available data." req_date = today if req_date.date() == today.date() and req_date < today: req_date = today - datetime.timedelta(days=1) return req_date, ret_message