#!/usr/bin/env python # -*- coding: utf-8 -*- """Utils for bullish.""" # standard imports import csv import datetime import logging from io import BytesIO, TextIOWrapper from zipfile import ZipFile # third-party imports import django_rq import requests from django_redis import get_redis_connection from django.db import transaction # app imports from app.models import BhavCopyEquity cache = get_redis_connection("default") logger = logging.getLogger(__name__) def fetch_bhav_copy_equity_data(date=None): """Fetch data from BSE India website.""" # hack: since bseindia website doesn't respond well to python requests user_agent = 'Mozilla/5.0 (Windows NT 10.0; rv:78.0) Gecko/20100101 Firefox/78.0' headers = { 'User-Agent': user_agent } if date is None: date = datetime.datetime.now() zipurl = f'https://www.bseindia.com/download/BhavCopy/Equity/EQ{date.strftime("%d%m%y")}_CSV.ZIP' logger.info(f'Fetching data from {zipurl}') resp = requests.get(zipurl, headers=headers) if resp.ok: with ZipFile(BytesIO(resp.content)) as bhavcopy_zf: csv_file = bhavcopy_zf.namelist()[0] with bhavcopy_zf.open(csv_file, 'r') as infile: data = list(csv.reader(TextIOWrapper(infile, 'utf-8'))) return data raise ValueError('Fetching data from BSE unsuccessful') def populate_bhav_copy_data(date=None): """Populate DB with Bhav Copy data.""" try: data = fetch_bhav_copy_equity_data(date=date) except: # Potentially add code for alerting if needed # Repeat job after 10 mins if job fails scheduler = django_rq.get_scheduler('default') scheduler.schedule( scheduled_time=datetime.datetime.now()+datetime.timedelta(minutes=10), func=populate_bhav_copy_data, ) raise ValueError(f"Error fetching data from BSE for {date}") else: del data[0] # delete title row populate_bhav_copy_data_into_redis(data) populate_bhav_copy_data_into_postgres(data, date=date) def populate_bhav_copy_data_into_redis(data): logger.info(f'Populating data into redis') pipe = cache.pipeline() cache.delete("stocks") for stock in data: # prevent creation of duplicate entries pipe.rpush("stocks", stock[0]) pipe.hset( f"stock:{stock[0]}", mapping={ "sc_code": stock[0], "sc_name": stock[1], "sc_group": stock[2], "sc_type": stock[3], "open_price": float(stock[4]), "high_price": float(stock[5]), "low_price": float(stock[6]), "close_price": float(stock[7]), "last_price": float(stock[8]), "prevclose_price": float(stock[9]), "no_trades": int(stock[10]), "no_of_shrs": int(stock[11]), "net_turnov": float(stock[12]), "tdcloindi": stock[13], } ) pipe.execute() @transaction.atomic def populate_bhav_copy_data_into_postgres(data, date=None): logger.info(f'Populating data into postgres for {date}') if date is None: date = datetime.datetime.now().date() for stock in data: BhavCopyEquity.objects.get_or_create( date=date, sc_code=int(stock[0]), sc_name=stock[1], sc_group=stock[2], sc_type=stock[3], open_price=float(stock[4]), high_price=float(stock[5]), low_price=float(stock[6]), close_price=float(stock[7]), last_price=float(stock[8]), prevclose_price=float(stock[9]), no_trades=int(stock[10]), no_of_shrs=int(stock[11]), net_turnov=float(stock[12]), tdcloindi=stock[13], )