Skip to content

Commit 9e262fd

Browse files
authored
Merge pull request #670 from digitalgreenorg/loop_aggregated_myisam
Loop aggregated myisam
2 parents 032d4f3 + e96a914 commit 9e262fd

File tree

13 files changed

+519
-563
lines changed

13 files changed

+519
-563
lines changed

dg/media/app_dashboards/js/loop_dashboard.js

Lines changed: 127 additions & 377 deletions
Large diffs are not rendered by default.

dg/templates/app_dashboards/loop_base.html

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
<!--Let browser know website is optimized for mobile-->
1919
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
20+
<meta name="keywords" content="loop, dg, dashboard, digitalgreen">
21+
<meta name="description" content="Loop Dashboard">
2022
</head>
2123

2224
<body>
@@ -42,9 +44,8 @@
4244
{% block script %} {%endblock%}
4345
<script src="{% static 'app_dashboards/js/highstocks.js' %}"></script>
4446
<script src="{% static 'app_dashboards/js/highcharts/highcharts-more.js' %}" ></script>
45-
<script src="{% static 'app_dashboards/js/highcharts/modules/data.js' %}"></script>
47+
<!-- <script src="{% static 'app_dashboards/js/highcharts/modules/data.js' %}"></script> -->
4648
<script src="{% static 'app_dashboards/js/highcharts/modules/solid-gauge.js' %}"></script>
4749
<script src="{% static 'app_dashboards/js/highcharts/modules/drilldown.js' %}"></script>
4850
<script src="{% static 'app_dashboards/js/highcharts/modules/boost.js' %}"></script>
49-
<script src="{% static 'app_dashboards/js/highcharts/modules/exporting.js' %}"></script>
5051
</html>
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import os
2+
import sys
3+
from django.core.management.base import BaseCommand, CommandError
4+
from dg.settings import DATABASES
5+
from loop.models import LoopUser, CombinedTransaction, Village, Crop, Mandi, Farmer, DayTransportation, Gaddidar, \
6+
Transporter, Language, CropLanguage, GaddidarCommission, GaddidarShareOutliers, AggregatorIncentive, \
7+
AggregatorShareOutliers, IncentiveParameter, IncentiveModel
8+
import subprocess
9+
import MySQLdb
10+
import datetime, time
11+
import pandas as pd
12+
from django.db.models import Count, Sum, Avg
13+
import inspect
14+
from loop.utils.loop_etl.get_gaddidar_share import compute_gaddidar_share
15+
from loop.utils.loop_etl.get_aggregator_share import compute_aggregator_share
16+
17+
DIR_PATH = os.path.dirname(os.path.abspath(__file__))
18+
19+
class LoopStatistics():
20+
21+
def recompute_myisam(self):
22+
database = DATABASES['default']['NAME']
23+
username = DATABASES['default']['USER']
24+
password = DATABASES['default']['PASSWORD']
25+
print 'Database : ', database
26+
print datetime.datetime.utcnow()
27+
28+
create_schema = subprocess.call("mysql -u%s -p%s %s < %s" % (username, password, database, os.path.join(DIR_PATH,'recreate_schema.sql')), shell=True)
29+
30+
if create_schema !=0:
31+
raise Exception("Could not create schema for loop etl")
32+
print "Schema created successfully"
33+
34+
try:
35+
start_time = time.time()
36+
self.mysql_cn = MySQLdb.connect(host='localhost',user=DATABASES['default']['USER'], passwd=DATABASES['default']['PASSWORD'], db=DATABASES['default']['NAME'], charset='utf8', use_unicode=True)
37+
# .cursor()
38+
39+
df_loopuser = pd.DataFrame(list(LoopUser.objects.values('id','user__id','name_en')))
40+
df_loopuser.rename(columns={"user__id":"user_created__id","name_en":"name"},inplace=True)
41+
42+
print "Loop User Shape",df_loopuser.shape
43+
44+
df_ct = pd.DataFrame(list(CombinedTransaction.objects.values('date','user_created__id','mandi__id','mandi__mandi_name_en','gaddidar__id','gaddidar__gaddidar_name_en').order_by('date').annotate(Sum('quantity'),Sum('amount'))))
45+
df_ct.rename(columns={"mandi__mandi_name_en":"mandi__mandi_name","gaddidar__gaddidar_name_en":"gaddidar__gaddidar_name"},inplace=True)
46+
47+
print "Combined Transaction Shape",df_ct.shape
48+
49+
df_ct = pd.merge(df_ct,df_loopuser,left_on='user_created__id',right_on='user_created__id',how='left')
50+
51+
df_dt = pd.DataFrame(list(DayTransportation.objects.values('date','user_created__id','mandi__id').order_by('date').annotate(Sum('transportation_cost'),Avg('farmer_share'))))
52+
53+
print "Day Transportation Shape",df_dt.shape
54+
55+
ct_merge_dt = pd.merge(df_ct,df_dt,left_on=['date','user_created__id','mandi__id'],right_on=['date','user_created__id','mandi__id'],how='left')
56+
57+
print "Combined Transaction merged with Day Transportation ",ct_merge_dt.shape
58+
59+
#CALCULATING GADDIDAR SHARE
60+
gaddidar_share_result = compute_gaddidar_share()
61+
62+
gaddidar_share = pd.DataFrame(gaddidar_share_result)
63+
64+
print "Gaddidar Share",gaddidar_share.shape
65+
66+
# CALCULATING AGGREGATOR INCENTIVE
67+
aggregator_incentive_result = compute_aggregator_share()
68+
69+
aggregator_incentive = pd.DataFrame(aggregator_incentive_result)
70+
71+
print "Aggregator Incentive",aggregator_incentive.shape
72+
73+
merged_ct_dt_gaddidar = pd.merge(ct_merge_dt,gaddidar_share,left_on=['user_created__id','mandi__id','gaddidar__id','date'],right_on=['user_created__id','mandi__id','gaddidar__id','date'],how='left')
74+
75+
print "After merging Gaddidar Share", merged_ct_dt_gaddidar.shape
76+
77+
result = pd.merge(merged_ct_dt_gaddidar,aggregator_incentive,left_on=['user_created__id','mandi__id','date'],right_on=['user_created__id','mandi__id','date'],how='left')
78+
79+
print "After adding aggregator incentive", result.shape
80+
result.fillna(value=0,axis=1,inplace=True)
81+
82+
# Getting new farmers who did any transaction on a particular date
83+
df_farmer_count = pd.read_sql("SELECT T.date, count(T.farmer_id) as distinct_farmer_count FROM ( SELECT farmer_id, min(date) as date FROM loop_combinedtransaction GROUP BY farmer_id) as T GROUP BY T.date",con=self.mysql_cn)
84+
85+
# Cummulating sum of farmers that were unique and did any transaction till a particular date
86+
df_farmer_count['cummulative_distinct_farmer'] = df_farmer_count['distinct_farmer_count'].cumsum()
87+
df_farmer_count.drop(['distinct_farmer_count'],axis=1,inplace=True)
88+
89+
result = pd.merge(result,df_farmer_count,left_on='date',right_on='date',how='left')
90+
result['cummulative_distinct_farmer'].fillna(method='ffill',inplace=True)
91+
92+
# Final result DataFrame contains same value for transportation_cost, farmer share, aggregator_incentive where date,aggregator_id,mandi are same but gaddidar_id is different.
93+
# Also cummulative_distinct_farmer is same where date is same but aggregator_id,gaddidar_id,mandi_id are different
94+
print "After adding cummulative distinct farmer ", result.shape
95+
96+
for index,row in result.iterrows():
97+
self.mysql_cn.cursor().execute("""INSERT INTO loop_aggregated_myisam (date,aggregator_id,mandi_id,gaddidar_id,quantity,amount,transportation_cost,farmer_share,gaddidar_share,aggregator_incentive,aggregator_name,mandi_name,gaddidar_name,cum_distinct_farmer) values(""" + '"'+row['date'].strftime('%Y-%m-%d %H:%M:%S')+'"' + "," + str(row['user_created__id']) + ","
98+
+ str(row['mandi__id']) + ","
99+
+ str(row['gaddidar__id']) + ","
100+
+ str(row['quantity__sum']) + ","
101+
+ str(row['amount__sum']) + ","
102+
+ str(row['transportation_cost__sum']) + ","
103+
+ str(row['farmer_share__avg']) + ","
104+
+ str(row['gaddidar_share_amount']) + ","
105+
+ str(row['aggregator_incentive']) + ","
106+
+ '"'+row['name']+'"' + ","
107+
+ '"'+row['mandi__mandi_name']+'"' + ","
108+
+ '"'+row['gaddidar__gaddidar_name']+'",'
109+
+ str(row['cummulative_distinct_farmer']) + """)""")
110+
111+
print "Myisam insertion complete"
112+
end_time = time.time()
113+
print "Total time taken (secs) : %f" % (end_time-start_time)
114+
115+
ct_outer_merge_dt = pd.merge(df_ct,df_dt,left_on=['date','user_created__id','mandi__id'],right_on=['date','user_created__id','mandi__id'],how='outer')
116+
117+
if ct_outer_merge_dt.shape == ct_merge_dt.shape:
118+
print "successfully Completed"
119+
else:
120+
print "Issue: Some aggregator has DT but no CT corresponding to date(s).", ct_outer_merge_dt.shape
121+
# print ct_outer_merge_dt[ct_outer_merge_dt.isnull().any(axis=1)]
122+
print "=================================="
123+
124+
125+
except Exception as e:
126+
print "Error : %s" % (e)
127+
sys.exit(1)
128+
129+
class Command(BaseCommand):
130+
help = '''This command updates stats displayed on Loop dashboard. '''
131+
132+
def handle(self,*args,**options):
133+
print("Log")
134+
print("LOOP ETL LOG")
135+
print(datetime.date.today())
136+
loop_statistics = LoopStatistics()
137+
loop_statistics.recompute_myisam()
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
DROP TABLE IF EXISTS `loop_aggregated_myisam`;
2+
3+
CREATE TABLE `loop_aggregated_myisam`(
4+
`id` int unsigned NOT NULL AUTO_INCREMENT,
5+
`aggregator_id` int unsigned NOT NULL,
6+
`date` date NOT NULL,
7+
`mandi_id` int unsigned NOT NULL,
8+
`gaddidar_id` int unsigned NOT NULL,
9+
`quantity` decimal(10,3),
10+
`amount` decimal(10,3),
11+
`transportation_cost` decimal(10,3),
12+
`farmer_share` decimal(10,3),
13+
`gaddidar_share` decimal(10,3),
14+
`aggregator_incentive` decimal(10,3),
15+
`aggregator_name` varchar(50) NOT NULL,
16+
`mandi_name` varchar(50) NOT NULL,
17+
`gaddidar_name` varchar(50) NOT NULL,
18+
`cum_distinct_farmer` int unsigned NOT NULL,
19+
PRIMARY KEY(`id`)
20+
)ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=latin1 ;
21+
22+
CREATE INDEX loop_aggregated_myisam_date ON loop_aggregated_myisam(date);
23+
CREATE INDEX loop_aggregated_myisam_date_aggregator_mandi ON loop_aggregated_myisam(date,aggregator_id,mandi_id);
24+
CREATE INDEX loop_aggregated_myisam_date_aggregator_mandi_gaddidar ON loop_aggregated_myisam(date,aggregator_id,mandi_id,gaddidar_id);

loop/urls.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@
3434
url(r'^get_log/', send_updated_log),
3535
url(r'^dashboard/', dashboard),
3636
url(r'^get_payment_sheet/', download_data_workbook, name="download-data-workbook"),
37-
url(r'^village_wise_data/', village_wise_data),
38-
url(r'^aggregator_wise_data/', aggregator_wise_data),
39-
url(r'^crop_wise_data/', crop_wise_data),
4037
url(r'^filter_data/', filter_data),
4138
url(r'^total_static_data/',total_static_data),
4239
url(r'^recent_graphs_data/',recent_graphs_data),

loop/utils/loop_etl/__init__.py

Whitespace-only changes.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from loop.models import AggregatorIncentive, AggregatorShareOutliers, CombinedTransaction, IncentiveParameter, LoopUser, IncentiveModel
2+
from django.db.models import Count, Sum, Avg
3+
import inspect
4+
5+
def calculate_inc_default(V):
6+
return 0.25*V
7+
8+
def compute_aggregator_share():
9+
ai_queryset = AggregatorIncentive.objects.all()
10+
aso_queryset = AggregatorShareOutliers.objects.all()
11+
combined_ct_queryset = CombinedTransaction.objects.values(
12+
'date', 'user_created_id', 'mandi').order_by('-date').annotate(Sum('quantity'), Sum('amount'),
13+
Count('farmer_id', distinct=True))
14+
aggregator_incentive_result = []
15+
16+
incentive_param_queryset = IncentiveParameter.objects.all()
17+
18+
for CT in combined_ct_queryset:
19+
amount_sum = 0.0
20+
user = LoopUser.objects.get(user_id=CT['user_created_id'])
21+
if CT['date'] not in [aso.date for aso in aso_queryset.filter(mandi=CT['mandi'], aggregator=user.id)]:
22+
try:
23+
ai_list_set = ai_queryset.filter(start_date__lte=CT['date'], aggregator=user.id).order_by('-start_date')
24+
if (ai_list_set.count() > 0):
25+
exec (ai_list_set[0].incentive_model.calculation_method)
26+
paramter_list = inspect.getargspec(calculate_inc)[0]
27+
for param in paramter_list:
28+
param_to_apply = incentive_param_queryset.get(notation=param)
29+
x = calculate_inc(CT[param_to_apply.notation_equivalent])
30+
amount_sum += x
31+
else:
32+
amount_sum += calculate_inc_default(CT['quantity__sum'])
33+
except Exception:
34+
pass
35+
else:
36+
try:
37+
aso_share_date_aggregator = aso_queryset.filter(
38+
date=CT['date'], aggregator=user.id, mandi=CT['mandi']).values('amount', 'comment')
39+
if aso_share_date_aggregator.count():
40+
amount_sum += aso_share_date_aggregator[0]['amount']
41+
except AggregatorShareOutliers.DoesNotExist:
42+
pass
43+
aggregator_incentive_result.append(
44+
{'date': CT['date'], 'user_created__id': CT['user_created_id'], 'mandi__id': CT['mandi'], 'aggregator_incentive': amount_sum})
45+
46+
return aggregator_incentive_result
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
from loop.models import GaddidarCommission, GaddidarShareOutliers, CombinedTransaction, LoopUser
2+
from django.db.models import Count, Sum, Avg
3+
4+
def compute_gaddidar_share():
5+
gc_queryset = GaddidarCommission.objects.all()
6+
gso_queryset = GaddidarShareOutliers.objects.all()
7+
combined_ct_queryset = CombinedTransaction.objects.values(
8+
'date', 'user_created_id', 'gaddidar', 'mandi', 'gaddidar__discount_criteria').order_by('-date').annotate(
9+
Sum('quantity'), Sum('amount'))
10+
gaddidar_share_result = []
11+
# gso_list = [gso.date for gso in gso_queryset.filter(gaddidar=CT['gaddidar'], aggregator=user.id)]
12+
for CT in combined_ct_queryset:
13+
amount_sum = 0
14+
user = LoopUser.objects.get(user_id=CT['user_created_id'])
15+
if CT['date'] not in [gso.date for gso in gso_queryset.filter(gaddidar=CT['gaddidar'], aggregator=user.id)]:
16+
try:
17+
gc_list_set = gc_queryset.filter(start_date__lte=CT['date'], gaddidar=CT[
18+
'gaddidar']).order_by('-start_date')
19+
if CT['gaddidar__discount_criteria'] == 0 and gc_list_set.count() > 0:
20+
amount_sum += CT['quantity__sum'] * \
21+
gc_list_set[0].discount_percent
22+
elif gc_list_set.count() > 0:
23+
amount_sum += CT['amount__sum'] * gc_list_set[0].discount_percent
24+
except GaddidarCommission.DoesNotExist:
25+
pass
26+
else:
27+
try:
28+
gso_gaddidar_date_aggregator = gso_queryset.filter(
29+
date=CT['date'], aggregator=user.id, gaddidar=CT['gaddidar']).values_list('amount', flat=True)
30+
if gso_gaddidar_date_aggregator.count():
31+
amount_sum += gso_gaddidar_date_aggregator[0]
32+
except GaddidarShareOutliers.DoesNotExist:
33+
pass
34+
gaddidar_share_result.append({'date': CT['date'], 'user_created__id': CT['user_created_id'], 'gaddidar__id': CT[
35+
'gaddidar'], 'mandi__id': CT['mandi'], 'gaddidar_share_amount': amount_sum})
36+
37+
return gaddidar_share_result
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
from dg.settings import DATABASES
2+
import MySQLdb
3+
import datetime, time
4+
import pandas as pd
5+
import numpy as np
6+
from loop.models import CombinedTransaction
7+
8+
def get_grouped_data(df_result_aggregate,day,df_farmers):
9+
start_date = df_result_aggregate['date'].min()
10+
# end_date = df_result_aggregate['date'].max()
11+
end_date = datetime.datetime.today()
12+
frequency = '-' + day + 'D'
13+
data_by_grouped_days = pd.DataFrame(pd.date_range(end_date,start_date,freq=frequency),columns={'start_date'})
14+
data_by_grouped_days['end_date'] = data_by_grouped_days['start_date'].shift(-1)
15+
data_by_grouped_days.fillna(value=0,inplace=True,axis=1)
16+
17+
df_result_aggregate['date'] = df_result_aggregate['date'].astype('datetime64[ns]')
18+
for index,row in data_by_grouped_days.iterrows():
19+
end_date = row['end_date']
20+
start_date = row['start_date']
21+
22+
data = pd.Series(pd.DataFrame(df_result_aggregate.where((df_result_aggregate['date'] > end_date) & (df_result_aggregate['date'] <= start_date))).sum(numeric_only=True))
23+
24+
data_by_grouped_days.loc[index,'amount__sum'] = data['amount']
25+
data_by_grouped_days.loc[index,'quantity__sum'] = data['quantity']
26+
data_by_grouped_days.loc[index,'farmer_share__sum'] = data['farmer_share']
27+
data_by_grouped_days.loc[index,'transportation_cost__sum'] = data['transportation_cost']
28+
data_by_grouped_days.loc[index,'gaddidar_share__sum'] = data['gaddidar_share']
29+
data_by_grouped_days.loc[index,'aggregator_incentive__sum'] = data['aggregator_incentive']
30+
31+
data_by_grouped_days.loc[index,'active_cluster'] = df_result_aggregate.where((df_result_aggregate['date'] > end_date) & (df_result_aggregate['date'] <= start_date))['aggregator_id'].nunique()
32+
33+
data_by_grouped_days.loc[index,'distinct_farmer_count'] = df_farmers.where((df_farmers['date'] > end_date) & (df_farmers['date']<=start_date))['farmer_id'].nunique()
34+
35+
data_by_grouped_days = data_by_grouped_days.to_dict(orient='index')
36+
return data_by_grouped_days
37+
38+
39+
def get_data_from_myisam(get_total):
40+
database = DATABASES['default']['NAME']
41+
username = DATABASES['default']['USER']
42+
password = DATABASES['default']['PASSWORD']
43+
mysql_cn = MySQLdb.connect(host='localhost',user=DATABASES['default']['USER'], passwd=DATABASES['default']['PASSWORD'], db=DATABASES['default']['NAME'], charset='utf8', use_unicode=True)
44+
45+
df_result = pd.read_sql("SELECT * FROM loop_aggregated_myisam",con=mysql_cn)
46+
aggregations = {
47+
'quantity':{
48+
'quantity__sum':'sum'
49+
},
50+
'amount':{
51+
'amount__sum':'sum'
52+
},
53+
'gaddidar_share':{
54+
'gaddidar_share__sum':'sum'
55+
},
56+
'aggregator_incentive':{
57+
'aggregator_incentive__sum':'mean'
58+
},
59+
'transportation_cost':{
60+
'transportation_cost__sum':'mean'
61+
},
62+
'farmer_share':{
63+
'farmer_share__sum':'mean'
64+
}
65+
}
66+
67+
aggregate_cumm_vol_farmer = {
68+
'quantity':{
69+
'quantity__sum':'sum'
70+
},
71+
'cum_distinct_farmer':{
72+
'cum_vol_farmer':'mean'
73+
}
74+
}
75+
76+
# MyISAM table contains CT, DT, Gaddidar, AggregatorIncentive.
77+
df_result_aggregate = df_result.groupby(['date','aggregator_id','mandi_id']).agg(aggregations).reset_index()
78+
df_result_aggregate.columns = df_result_aggregate.columns.droplevel(1)
79+
80+
cumm_vol_farmer = {}
81+
if get_total == 0:
82+
df_farmers = pd.DataFrame(list(CombinedTransaction.objects.values('date','farmer_id').order_by('date')))
83+
df_farmers['date'] = df_farmers['date'].astype('datetime64[ns]')
84+
85+
dictionary = {}
86+
days = ['7','15','30','60']
87+
for day in days:
88+
data_by_grouped_days = get_grouped_data(df_result_aggregate,day,df_farmers)
89+
dictionary[day] = list(data_by_grouped_days.values())
90+
91+
# Calcualting cummulative volume and farmer count
92+
df_cum_vol_farmer = df_result.groupby('date').agg(aggregate_cumm_vol_farmer).reset_index()
93+
df_cum_vol_farmer.columns = df_cum_vol_farmer.columns.droplevel(1)
94+
df_cum_vol_farmer['cum_vol'] = df_cum_vol_farmer['quantity'].cumsum()
95+
df_cum_vol_farmer.drop('quantity',axis=1,inplace=True);
96+
cumm_vol_farmer = df_cum_vol_farmer.to_dict(orient='index')
97+
else:
98+
df_result_aggregate.drop(['mandi_id','aggregator_id'],axis=1,inplace=True)
99+
df = pd.DataFrame(df_result_aggregate.sum(numeric_only=True))
100+
dictionary = df.to_dict(orient='index')
101+
return dictionary, cumm_vol_farmer

0 commit comments

Comments
 (0)