Django Celery task is showing me "attribute error"?

Hi i build django web application that fetch data from microsoft graph api and inside my django view.py i am calling my task ‘add_api_data’

from django.http import HttpResponse, HttpResponseRedirect
from django.urls import reverse
from datetime import datetime, timedelta
from dateutil import tz, parser
from tutorial.auth_helper import get_sign_in_flow, get_token_from_code, store_user, remove_user_and_token, get_token
from tutorial.graph_helper import *
from celery import shared_task
import time
from tutorial import task
import pandas as pd
from tutorial.models import demo,assignment_details

def education(request): 
  add_api_data = task.add_api_data.delay()
  api_list = assignment_details.objects.all().values()
  return render(request, 'tutorial/education.html', {'api_list':api_list,})


def home1(request):
  context = initialize_context(request)
  add_data = task.add_csv_data.delay()
  title1 = demo.objects.all().values()
  return render(request, 'tutorial/demo1.html',{'title1':title1,'context':context})

my auth_helper.py

# Licensed under the MIT License.

# <FirstCodeSnippet>
import yaml
import msal
import os
import time

# Load the oauth_settings.yml file
stream = open('oauth_settings.yml', 'r')
settings = yaml.load(stream, yaml.SafeLoader)

def load_cache(request):
  # Check for a token cache in the session
  cache = msal.SerializableTokenCache()
  if request.session.get('token_cache'):
    cache.deserialize(request.session['token_cache'])

  return cache

def save_cache(request, cache):
  # If cache has changed, persist back to session
  if cache.has_state_changed:
    request.session['token_cache'] = cache.serialize()

def get_msal_app(cache=None):
  # Initialize the MSAL confidential client
  auth_app = msal.ConfidentialClientApplication(
    settings['app_id'],
    authority=settings['authority'],
    client_credential=settings['app_secret'],
    token_cache=cache)

  return auth_app

# Method to generate a sign-in flow
def get_sign_in_flow():
  auth_app = get_msal_app()

  return auth_app.initiate_auth_code_flow(
    settings['scopes'],
    redirect_uri=settings['redirect'])

# Method to exchange auth code for access token
def get_token_from_code(request):
  cache = load_cache(request)
  auth_app = get_msal_app(cache)

  # Get the flow saved in session
  flow = request.session.pop('auth_flow', {})

  result = auth_app.acquire_token_by_auth_code_flow(flow, request.GET)
  save_cache(request, cache)

  return result
# </FirstCodeSnippet>

# <SecondCodeSnippet>
def store_user(request, user):
  try:
    request.session['user'] = {
      'is_authenticated': True,
      'name': user['displayName'],
      'email': user['mail'] if (user['mail'] != None) else user['userPrincipalName'],
      'timeZone': user['mailboxSettings']['timeZone'] if ('timeZone' in user['mailboxSettings']) else 'UTC'
    }
  except Exception as e:
    print(e)

def get_token(request):
  cache = load_cache(request)
  auth_app = get_msal_app(cache)

  accounts = auth_app.get_accounts()
  if accounts:
    result = auth_app.acquire_token_silent(
      settings['scopes'],
      account=accounts[0])

    save_cache(request, cache)

    return result['access_token']

def remove_user_and_token(request):
  if 'token_cache' in request.session:
    del request.session['token_cache']

  if 'user' in request.session:
    del request.session['user']
# </SecondCodeSnippet>

In task.py i have written two task

import requests
from django.shortcuts import render
import pandas as pd
from sqlalchemy import create_engine
from tutorial.models import demo,assignment_details
from tutorial.graph_helper import *
from tutorial.auth_helper import get_sign_in_flow, get_token_from_code, store_user, remove_user_and_token, get_token,load_cache,save_cache,get_msal_app
from graph_tutorial.celery import app
import yaml
import msal
import os
import time
from . graph_helper import *

@shared_task
def add_csv_data():
    file = 'data.csv'
    df = pd.read_csv(file)
    engine = create_engine('sqlite:///db.sqlite3')
    # engine = create_engine(':memory:')
    df1 = df.to_sql(demo._meta.db_table, if_exists='replace', con=engine, index=False)
    print(demo._meta.db_table)
    return df1


@shared_task(bind=True)
# @app.task(bind=True)
def add_api_data(request):
  import json
#   from tutorial.views import initialize_context
  from . import views
  views.initialize_context
  #graph api base url
  base_url = "https://graph.microsoft.com/v1.0/"
  print(base_url)
  e6f1326a7f02/assignments/assignmentsid{{}}/submissions/submissionid{{}}/outcomes'
  
  headers = {'Authorization':'Bearer'}
  from tutorial.auth_helper import get_token
  token = get_token(request)
  print(token)
#   print(hasattr(token,'request'))
#   print(dir(token))
  headers = {
    'Authorization': 'Bearer {0}'.format(token),
    'Content-Type': 'application/json'
  }

  # endpoint0 = base_url+ "education/classes/"
  # educatiom_data = json.loads(requests.get(endpoint0,headers=headers).text)


  # class_id = []
  # class_name = []
  # for i in range(len(educatiom_data['value'])):
  #     id_ = educatiom_data['value'][i]['id']
  #     name = educatiom_data['value'][i]['mailNickname']
  #     class_id.append(id_)
  #     class_name.append(name)

  class_id = ["9289f463-badc-4722-8b51-e6f1326a7f02","9cfdd9bf-3b7a-43da-bb6a-e19990d4252e","5c30e2ca-d0ec-4277-ad76-5559b04a7bed"]
  class_name = ["MarchFullstack2021","JanFrontendBatch2022","JanFullStackBatch2022"]

  import pandas as pd

  myDataFrame = pd.DataFrame(columns = ['Id','Name','Email','Assingments','duedate','submiteddate','unsubmiteddate','returndate','reassingndate','status','feedback','points','assingment_delay','assingment_check_delay'])
  
  for class_data_id,class_data_name in zip(class_id,class_name):
    #assingment endpoint graph api url
    endpoint1 = base_url+ "education/classes/"+class_data_id+"/assignments/"
    #saving assingment id in list from assingment end point url 
    data = json.loads(requests.get(endpoint1,headers=headers).text)
    assingment_id = []
    for i in range(len(data['value'])):
      assingment_id.append(data['value'][i]['id'])

    #creating muultiple assingment endpoint for grade url and saved into list
    
    outcomsurl = []
    for i in assingment_id:
      # endpoint2 = endpoint1 + i + "/submissions/?$filter=status"+" "+"eq"+" "+"'working'&?$expand=outcomes"
      # endpoint2 = endpoint1 + i + "/submissions/?$filter=status"+" "+"eq"+" "+"'submitted'&?$expand=outcomes"
      # endpoint2 = endpoint1 + i + '/susubmissions/?$filter=status eq "working"&?$expand=outcomes'
      endpoint2 = endpoint1 + i + "/submissions?$expand=outcomes"
      outcomsurl.append(endpoint2)

    
    #all student assingment json response
    
    data_upd_list = []
    for i in range(len(outcomsurl)):
        templist1 = []
        data2 = json.loads(requests.get(outcomsurl[i],headers=headers).text)
        value1 = data2['value']
        for j in value1:
            templist1.append(j)
        if '@odata.nextLink' in data2.keys():
          datalink = data2['@odata.nextLink']
          data_next = json.loads(requests.get(datalink,headers=headers).text)
          value2 = data_next['value']
          for k in value2:
              templist1.append(k)
        data_upd_list.append(templist1)
    
    
    def multi_date(x):
      import pandas as pd
      final_date = []
      for i in range(len(data_upd_list)):
          date = []
          temp_id = []
          final_points1 = []
          final_points2 = []
          for j in range(len(data_upd_list[i])):
              temp_id.append(data_upd_list[i][j]['submittedBy']['user']['id'])
              if data_upd_list[i][j][x]==None:
                  date.append("Not Available")
              else:
                  date.append(data_upd_list[i][j][x][:10])
          for id1,date1 in sorted(zip(temp_id,date),key = lambda x: x[0]):
              zip_data = list((id1,date1))
              final_points1.append(zip_data[0])
              final_points2.append(zip_data[1])
          final_date.append(final_points1)
          final_date.append(final_points2)
      df = pd.DataFrame(final_date).T
      return df

    submit_df = multi_date('submittedDateTime')
    unsubmit_df = multi_date('unsubmittedDateTime')
    return_df = multi_date('returnedDateTime')
    reassingn_df = multi_date('reassignedDateTime')
    status_df = multi_date('status')

    def feedback():
      import pandas as pd
      final_feedback = []
      for i in range(len(data_upd_list)):
          feedback_text = []
          temp_id = []
          final_points1 = []
          final_points2 = []
          for j in range(len(data_upd_list[i])):
              temp_id.append(data_upd_list[i][j]['submittedBy']['user']['id'])
              if data_upd_list[i][j]['outcomes'][0]['publishedFeedback']==None:
                  feedback_text.append("No feedback")
              else:
                  feedback_text.append(data_upd_list[i][j]['outcomes'][0]['publishedFeedback']['text']['content'])
          for id1,text in sorted(zip(temp_id,feedback_text),key = lambda x: x[0]):
              zip_data = list((id1,text))
              final_points1.append(zip_data[0])
              final_points2.append(zip_data[1])
          final_feedback.append(final_points1)
          final_feedback.append(final_points2)
      df = pd.DataFrame(final_feedback).T
      return df

    feedback_df = feedback()


    
    assingment_data = []
    assingment_list = json.loads(requests.get(endpoint1,headers=headers).text)
    assingment_data.append(assingment_list)

    # #all user include teacher and student dispaly name ,email,id data
    member_endpoint = base_url+ "education/classes/"+class_data_id+ "/members?$select=id,displayName,userPrincipalName"
    teacher_endpoint = base_url+ "education/classes/"+class_data_id+ "/teachers?$select=id,displayName,userPrincipalName"
    member_data = []
    import pandas as pd
    member_list = json.loads(requests.get(member_endpoint,headers=headers).text)
    member_data.append(member_list)
    df_member = pd.DataFrame(member_data[0]['value'])

    # #Only teacher dispaly name ,email,id data
    teacher_data = []
    teacher_list = json.loads(requests.get(teacher_endpoint,headers=headers).text)
    teacher_data.append(teacher_list)
    df_teacher = pd.DataFrame(teacher_data[0]['value'])

    # #to get only student data we remove techer data into  all memeber this exclude teacher data and return only student data
    student_df = pd.concat([df_teacher,df_member], axis=0, ignore_index=True).drop_duplicates(subset=["id","userPrincipalName"],keep=False, ignore_index=True)
    student_df = student_df.sort_values(by='id').reset_index().drop('index',axis=1)
  
    # #this is final grade list of student sorted for by user id beacuse all assingment grade point mismatch with corrsopond users.
    final_points = []
    for i in range(len(data_upd_list)):
        temp_points = []
        temp_id = []
        final_points1 = []
        final_points2 = []
        for j in range(len(data_upd_list[i])):
            temp_id.append(data_upd_list[i][j]['submittedBy']['user']['id'])
            if (data_upd_list[i][j]['outcomes'][1]['points'] is None):
                temp_points.append('No Points')
            else:
                temp_points.append(str(data_upd_list[i][j]['outcomes'][1]['points']['points']))
        
        for letter, number in sorted(zip(temp_points,temp_id),key = lambda x: x[1]):
            zip_data = list((number,letter))
            final_points1.append(zip_data[0])
            final_points2.append(zip_data[1])
        final_points.append(final_points1)
        final_points.append(final_points2)

    # #assingment name saved into list used with name and id for pandas dataframe
    assignment_name1 = []
    for i in range(len(assingment_data[0]['value'])):
        var = assingment_data[0]['value'][i]['displayName']
        assignment_name1.append(var)

    assignment_duedate = []
    for i in range(len(data['value'])):
        var = data['value'][i]['dueDateTime'][:10]
        assignment_duedate.append(var)

    asdue_df = pd.DataFrame(assignment_duedate)
    asdue_df.columns = ['duedate']

    # #assingment name saved into list used for final dataframe
    assignment_name = []
    for i in range(len(assingment_data[0]['value'])):
        var = assingment_data[0]['value'][i]['displayName']
        assignment_name.append(var)

    #insert id column name with respective to assingment column
    for i in range(0,len(final_points),2):
      assignment_name.insert(i,f'id')
    
    df = pd.DataFrame(final_points).T
    df.columns = assignment_name

    new = []
    for i in range(0,len(df.columns),2):
        df1 = pd.merge(student_df,df.iloc[:, i:i+2],on='id',how='outer').iloc[:,-1].values.tolist()
        new.append(df1)

    # #final dataframe of student with grade of all student with respective to submitted assingment
    new_df = pd.DataFrame(new).T
    new_df.columns = assignment_name1
    final_df = pd.concat([student_df,new_df],axis=1)
    final_df = final_df.iloc[:student_df.shape[0],:final_df.shape[1]]
    final_df

    def column_name(id1,y):
      a = [id1,y]
      date = []
      for k in range(0,len(assignment_name1)):
          date.append(a[0])
          date.append(a[-1])
      return date


    def new_df(x,y):
      new1 = []
      for i in range(0,len(x.columns),2):
          df1 = pd.merge(y,x.iloc[:, i:i+2],on='id',how='outer').iloc[:,-1].values.tolist()
          new1.append(df1)
      new_df1 = pd.DataFrame(new1).T
      final_df1 = pd.concat([y,new_df1],axis=1)
      final_df1 = final_df1.iloc[:y.shape[0],:final_df1.shape[1]]
      return final_df1

    sdf = []
    submiteddate = column_name('id','submiteddate')
    submit_df.columns = submiteddate
    df1 = new_df(submit_df,student_df)
    sdf.append(df1)
    unsubmiteddate = column_name('id','unsubmiteddate')
    unsubmit_df.columns = unsubmiteddate
    df2 = new_df(unsubmit_df,student_df)
    sdf.append(df2)
    returndate = column_name('id','return')
    return_df.columns = returndate
    df3 = new_df(return_df,student_df)
    sdf.append(df3)
    reassingndate = column_name('id','reassingndate')
    reassingn_df.columns = reassingndate
    df4 = new_df(reassingn_df,student_df)
    sdf.append(df4)
    status = column_name('id','status')
    status_df.columns = status
    df5 = new_df(status_df,student_df)
    sdf.append(df5)
    feedback = column_name('id','feedback')
    feedback_df.columns = feedback
    df6 = new_df(feedback_df,student_df)
    sdf.append(df6)
    sdf.append(final_df)

    kd =[]
    kd1 =[]
    for i in range(0,final_df.shape[0]):
        md = []
        md1 = []
        for j in sdf:
            sd = j.iloc[i,3:].values.tolist()
            sdinfo = j.iloc[i,:3].values.tolist()
            md.append(sd)
            md1.append(sdinfo)
        kd.append(md)
        kd1.append(md1)

    

    from datetime import datetime
    assingnment_delay = []
    for i in range(final_df.shape[0]):
        f_delay= []
        for j in range(0,final_df.iloc[:,3:].shape[1]):
            s = asdue_df['duedate'][j]
            fs = pd.DataFrame((kd[i])).T
            fs.columns = ['submiteddate','unsubmiteddate','returndate','reassingndate','status','feedback','points']
            p = fs['submiteddate'][j]
            r = datetime.now().strftime("%Y-%m-%d")
            if type(p) is float or p =='Not Available' or p is None:
                q = datetime.fromisoformat(r)-datetime.fromisoformat(s)
                f_delay.append(q.days)
            else:
                q = datetime.fromisoformat(p)-datetime.fromisoformat(s)
                f_delay.append(q.days)
        assingnment_delay.append(f_delay)

    from datetime import datetime
    assingnment_check_delay = []
    for i in range(final_df.shape[0]):
        c_delay= []
        for j in range(0,final_df.iloc[:,3:].shape[1]):
            fs = pd.DataFrame((kd[i])).T
            fs.columns = ['submiteddate','unsubmiteddate','returndate','reassingndate','status','feedback','points']
            p = fs['submiteddate'][j]
            u = fs['feedback'][j]
            v = fs['points'][j]
            r = datetime.now().strftime("%Y-%m-%d")
            if ((type(u) is None or u =='No feedback') and (type(v) is None or v =='No Points')) and (p !='Not Available' or p is None):
                c_data = datetime.fromisoformat(r)-datetime.fromisoformat(p)
                c_delay.append(c_data.days)
            else:
                c_delay.append('Not Available')
        assingnment_check_delay.append(c_delay)

  
    for i in range(0,final_df.shape[0]):
        fs = pd.DataFrame((kd[i])).T
        fs1 = pd.DataFrame(kd1[i][0]).T
        fs1.columns = ['Id','Name','Email']
        asgment = pd.DataFrame(assignment_name1)
        asgment.columns = ['Assingments']
        fs.columns = ['submiteddate','unsubmiteddate','returndate','reassingndate','status','feedback','points']
        assingment_delay_df = pd.DataFrame(assingnment_delay[i],columns =['assingment_delay'])
        assingment_check_delay = pd.DataFrame(assingnment_check_delay[i],columns =['assingment_check_delay'])
        newfs1 = pd.concat([fs1]*len(assignment_name1),ignore_index=True)
        newfs = pd.concat([newfs1,asgment,asdue_df,fs,assingment_delay_df,assingment_check_delay],axis=1)
        # print(newfs)
        myDataFrame = pd.concat([myDataFrame,newfs],axis=0,ignore_index=True)
  # print(myDataFrame.shape)
  
  import pandas as pd
  import sqlite3 as sq

#   table_name = 'student3batchinfo' # table and file name
#   conn = sq.connect('{}.sqlite3'.format(table_name)) 
#   final_df = myDataFrame.to_sql(table_name, conn, if_exists='replace')
#   conn.close()
  table_name = 'assignment_details'
  engine = create_engine('sqlite:///db.sqlite3')
  apidf = myDataFrame.to_sql(table_name, if_exists='replace', con=engine)
  return

add_csv_data task function run sucessfully on celery server without error

but when i call add_api_data task function it give me attribute error

  File "C:\Users\Ajay\AppData\Local\Programs\Python\Python310\lib\site-packages\celery\app\trace.py", line 451, in trace_task
    R = retval = fun(*args, **kwargs)
  File "C:\Users\Ajay\AppData\Local\Programs\Python\Python310\lib\site-packages\celery\app\trace.py", line 734, in __protected_call__
    return self.run(*args, **kwargs)
  File "F:\django_template\grapapi\graph_tutorial\tutorial\task.py", line 58, in add_api_data
    token = get_token(request)
  File "F:\django_template\grapapi\graph_tutorial\tutorial\auth_helper.py", line 72, in get_token
    cache = load_cache(request)
  File "F:\django_template\grapapi\graph_tutorial\tutorial\auth_helper.py", line 17, in load_cache
    if request.session.get('token_cache'):
AttributeError: 'add_api_data' object has no attribute 'session'

A celery task is a separate process from your view. Your celery task doesn’t have access to a session associated with a request. (It doesn’t get passed the request object. It doesn’t make sense to even try to access a user’s session from such a task.)

1 Like

Hi @KenWhitesell Thanks for the fast reply actually I don’t clearly understand your points. Can you understand a little bit more with an example?

I don’t understand why my add_api_data task doesn’t access the requested session

How can my add_api_data task access the requested session?

Because it can’t.

When you’re running Celery, you’re running it in a separate worker process - and independent of the user’s session. The Celery task is not “authenticated” nor assigned a “connection”. Even if you were to pass it some type of session identifier, there’s no guarantee that the session is still going to be active when the task is executed.

You don’t. You have your view store data in the database, and allow Celery to access it from there.

1 Like

I am trying to fetch data from MS Graph API and I want to store it in MySQLite database. I want to automate the whole process with celery it’s time-consuming task. how do you automate the whole process with the celery?

Pretty much as you’ve been doing. Your view accepts a request and calls a Celery task.

The Celery task runs in a worker process and saves the results.

Your communications between the view and the Celery task is some combination of the parameters being passed and data in the database.

1 Like

Hi, ken
I didn’t understand what’s the meaning of the above line

can you please give me a clear explanation?

It would be more helpful if you can provide pseudo-code

See the docs at First Steps with Celery — Celery 5.2.6 documentation and User Guide — Celery 5.2.6 documentation

1 Like

Thanks for providing me useful links but I have already visited so many times on this document’s and I don’t understand actually what’s wrong going with my code and how to make it correct

I am trying for the last three days but I didn’t get luck. It would be more helpful if you can provide pseudo-code

There are numerous examples throughout the docs.

I suggest you take a step back and try creating a trivial celery task first to get you to the point where you’re understanding what celery does and how you use it before you implement all your desired functionality.

If you run into issues with that, then that creates a much smaller (and easier to diagnose) problem that we can help you with.

1 Like
  1. User navigates to your page
  2. User has a session so request.session is there.
  3. User does something that triggers the invocation of the Celery task.
  4. Celery task runs when it runs, you have not much control over that. It runs asynchronously from your view/page.

Transition between 3. and 4. is important.

When the celery task is invoked, it could very well be invoked on a different server, physically running on the other side of the planet. But the important point is that the task will no longer run within the context of the request or the view.
What does that mean? Well, no more request.user, request.is_user_authenticated or other kinds of niceties you may have built for your project.

What can do is then do stuff like, from the views.py:

task.add_csv_data.delay(user_pk=request.user.pk, param1='foo', param2='bar')

and in tasks.py:

@shared_task
def add_csv_data(user_pk=None, param1=None, param2=None):
    try:
        user = User.objects.get(pk=user_pk)
    except User.DoesNotExist:
        return False

    # do stuff here

In general, a good practice with celery is to send small amounts of data/parameters to the tasks. i.e., not massive dictionaries or binary blobs. Instead just send to the task just enough information so it can literally rebuild it sown context and do what it is required to do.

Hope that makes sense, good luck :slight_smile:

1 Like

@KenWhitesel] and @plopidouThanks for the help now I am able to run the celery task I made access_token function and pass as parameter fun in my task add_api_data .its works for the celery task
In my views.py

def access_token(request):
    return get_token(request)

def education(request):
  import json
  context = initialize_context(request)
  api_data = task.add_api_data.delay(access_token(request))

in task .py my code is

def add_api_data(request,fun):
    import json
  #graph api base url
    base_url = "https://graph.microsoft.com/v1.0/"
    
    headers = {'Authorization':'Bearer'}
    token = fun
    headers = {
        'Authorization': 'Bearer {0}'.format(token),
        'Content-Type': 'application/json'
    }

    # endpoint0 = base_url+ "education/classes/"
    # educatiom_data = json.loads(requests.get(endpoint0,headers=headers).text)


    # class_id = []
    # class_name = []
    # for i in range(len(educatiom_data['value'])):
    #     id_ = educatiom_data['value'][i]['id']
    #     name = educatiom_data['value'][i]['mailNickname']
    #     class_id.append(id_)
    #     class_name.append(name)

    class_id = ["9289f463-badc-4722-8b51-e6f1326a7f02","9cfdd9bf-3b7a-43da-bb6a-e19990d4252e","5c30e2ca-d0ec-4277-ad76-5559b04a7bed"]
    class_name = ["MarchFullstack2021","JanFrontendBatch2022","JanFullStackBatch2022"]

    import pandas as pd

    myDataFrame = pd.DataFrame(columns = ['Id','Name','Email','Assingments','duedate','submiteddate','unsubmiteddate','returndate','reassingndate','status','feedback','points','assingment_delay','assingment_check_delay'])
    for class_data_id,class_data_name in zip(class_id,class_name):
        #assingment endpoint graph api url
        endpoint1 = base_url+ "education/classes/"+class_data_id+"/assignments/"
        #saving assingment id in list from assingment end point url 
        data = json.loads(requests.get(endpoint1,headers=headers).text)
        assingment_id = []
        for i in range(len(data['value'])):
            assingment_id.append(data['value'][i]['id'])

        #creating muultiple assingment endpoint for grade url and saved into list
        
        outcomsurl = []
        for i in assingment_id:
        # endpoint2 = endpoint1 + i + "/submissions/?$filter=status"+" "+"eq"+" "+"'working'&?$expand=outcomes"
        # endpoint2 = endpoint1 + i + "/submissions/?$filter=status"+" "+"eq"+" "+"'submitted'&?$expand=outcomes"
        # endpoint2 = endpoint1 + i + '/susubmissions/?$filter=status eq "working"&?$expand=outcomes'
            endpoint2 = endpoint1 + i + "/submissions?$expand=outcomes"
            outcomsurl.append(endpoint2)

       
        #all student assingment json response
        
        data_upd_list = []
        for i in range(len(outcomsurl)):
            templist1 = []
            data2 = json.loads(requests.get(outcomsurl[i],headers=headers).text)
            value1 = data2['value']
            for j in value1:
                templist1.append(j)
            if '@odata.nextLink' in data2.keys():
                datalink = data2['@odata.nextLink']
                data_next = json.loads(requests.get(datalink,headers=headers).text)
                value2 = data_next['value']
                for k in value2:
                    templist1.append(k)
            data_upd_list.append(templist1)
        
        
        def multi_date(x):
            import pandas as pd
            final_date = []
            for i in range(len(data_upd_list)):
                date = []
                temp_id = []
                final_points1 = []
                final_points2 = []
                for j in range(len(data_upd_list[i])):
                    temp_id.append(data_upd_list[i][j]['submittedBy']['user']['id'])
                    if data_upd_list[i][j][x]==None:
                        date.append("Not Available")
                    else:
                        date.append(data_upd_list[i][j][x][:10])
                for id1,date1 in sorted(zip(temp_id,date),key = lambda x: x[0]):
                    zip_data = list((id1,date1))
                    final_points1.append(str(zip_data[0]))
                    final_points2.append(str(zip_data[1]))
                final_date.append(final_points1)
                final_date.append(final_points2)
            df = pd.DataFrame(final_date).T
            return df

        submit_df = multi_date('submittedDateTime')
        unsubmit_df = multi_date('unsubmittedDateTime')
        return_df = multi_date('returnedDateTime')
        reassingn_df = multi_date('reassignedDateTime')
        status_df = multi_date('status')

        def feedback():
            import pandas as pd
            final_feedback = []
            for i in range(len(data_upd_list)):
                feedback_text = []
                temp_id = []
                final_points1 = []
                final_points2 = []
                for j in range(len(data_upd_list[i])):
                    temp_id.append(data_upd_list[i][j]['submittedBy']['user']['id'])
                    if data_upd_list[i][j]['outcomes'][0]['publishedFeedback']==None:
                        feedback_text.append("No feedback")
                    else:
                        feedback_text.append(data_upd_list[i][j]['outcomes'][0]['publishedFeedback']['text']['content'])
                for id1,text in sorted(zip(temp_id,feedback_text),key = lambda x: x[0]):
                    zip_data = list((id1,text))
                    final_points1.append(zip_data[0])
                    final_points2.append(zip_data[1])
                final_feedback.append(final_points1)
                final_feedback.append(final_points2)
            df = pd.DataFrame(final_feedback).T
            return df

        feedback_df = feedback()


        
        assingment_data = []
        assingment_list = json.loads(requests.get(endpoint1,headers=headers).text)
        assingment_data.append(assingment_list)

        # #all user include teacher and student dispaly name ,email,id data
        member_endpoint = base_url+ "education/classes/"+class_data_id+ "/members?$select=id,displayName,userPrincipalName"
        teacher_endpoint = base_url+ "education/classes/"+class_data_id+ "/teachers?$select=id,displayName,userPrincipalName"
        member_data = []
        import pandas as pd
        member_list = json.loads(requests.get(member_endpoint,headers=headers).text)
        member_data.append(member_list)
        df_member = pd.DataFrame(member_data[0]['value'])

        # #Only teacher dispaly name ,email,id data
        teacher_data = []
        teacher_list = json.loads(requests.get(teacher_endpoint,headers=headers).text)
        teacher_data.append(teacher_list)
        df_teacher = pd.DataFrame(teacher_data[0]['value'])

        # #to get only student data we remove techer data into  all memeber this exclude teacher data and return only student data
        student_df = pd.concat([df_teacher,df_member], axis=0, ignore_index=True).drop_duplicates(subset=["id","userPrincipalName"],keep=False, ignore_index=True)
        student_df = student_df.sort_values(by='id').reset_index().drop('index',axis=1)
    
        # #this is final grade list of student sorted for by user id beacuse all assingment grade point mismatch with corrsopond users.
        final_points = []
        for i in range(len(data_upd_list)):
            temp_points = []
            temp_id = []
            final_points1 = []
            final_points2 = []
            for j in range(len(data_upd_list[i])):
                temp_id.append(data_upd_list[i][j]['submittedBy']['user']['id'])
                if (data_upd_list[i][j]['outcomes'][1]['points'] is None):
                    temp_points.append('No Points')
                else:
                    temp_points.append(str(data_upd_list[i][j]['outcomes'][1]['points']['points']))
            
            for letter, number in sorted(zip(temp_points,temp_id),key = lambda x: x[1]):
                zip_data = list((number,letter))
                final_points1.append(zip_data[0])
                final_points2.append(zip_data[1])
            final_points.append(final_points1)
            final_points.append(final_points2)

        # #assingment name saved into list used with name and id for pandas dataframe
        assignment_name1 = []
        for i in range(len(assingment_data[0]['value'])):
            var = assingment_data[0]['value'][i]['displayName']
            assignment_name1.append(var)

        assignment_duedate = []
        for i in range(len(data['value'])):
            var = data['value'][i]['dueDateTime'][:10]
            assignment_duedate.append(var)

        asdue_df = pd.DataFrame(assignment_duedate)
        asdue_df.columns = ['duedate']

        # #assingment name saved into list used for final dataframe
        assignment_name = []
        for i in range(len(assingment_data[0]['value'])):
            var = assingment_data[0]['value'][i]['displayName']
            assignment_name.append(var)

        #insert id column name with respective to assingment column
        for i in range(0,len(final_points),2):
            assignment_name.insert(i,f'id')
        
        df = pd.DataFrame(final_points).T
        df.columns = assignment_name

        new = []
        for i in range(0,len(df.columns),2):
            df1 = pd.merge(student_df,df.iloc[:, i:i+2],on='id',how='outer').iloc[:,-1].values.tolist()
            new.append(df1)

        # #final dataframe of student with grade of all student with respective to submitted assingment
        new_df = pd.DataFrame(new).T
        new_df.columns = assignment_name1
        final_df = pd.concat([student_df,new_df],axis=1)
        final_df = final_df.iloc[:student_df.shape[0],:final_df.shape[1]]
        final_df

        def column_name(id1,y):
            a = [id1,y]
            date = []
            for k in range(0,len(assignment_name1)):
                date.append(a[0])
                date.append(a[-1])
            return date


        def new_df(x,y):
            new1 = []
            for i in range(0,len(x.columns),2):
                df1 = pd.merge(y,x.iloc[:, i:i+2],on='id',how='outer').iloc[:,-1].values.tolist()
                new1.append(df1)
            new_df1 = pd.DataFrame(new1).T
            final_df1 = pd.concat([y,new_df1],axis=1)
            final_df1 = final_df1.iloc[:y.shape[0],:final_df1.shape[1]]
            return final_df1

        sdf = []
        submiteddate = column_name('id','submiteddate')
        submit_df.columns = submiteddate
        df1 = new_df(submit_df,student_df)
        sdf.append(df1)
        unsubmiteddate = column_name('id','unsubmiteddate')
        unsubmit_df.columns = unsubmiteddate
        df2 = new_df(unsubmit_df,student_df)
        sdf.append(df2)
        returndate = column_name('id','return')
        return_df.columns = returndate
        df3 = new_df(return_df,student_df)
        sdf.append(df3)
        reassingndate = column_name('id','reassingndate')
        reassingn_df.columns = reassingndate
        df4 = new_df(reassingn_df,student_df)
        sdf.append(df4)
        status = column_name('id','status')
        status_df.columns = status
        df5 = new_df(status_df,student_df)
        sdf.append(df5)
        feedback = column_name('id','feedback')
        feedback_df.columns = feedback
        df6 = new_df(feedback_df,student_df)
        sdf.append(df6)
        sdf.append(final_df)

        kd =[]
        kd1 =[]
        for i in range(0,final_df.shape[0]):
            md = []
            md1 = []
            for j in sdf:
                sd = j.iloc[i,3:].values.tolist()
                sdinfo = j.iloc[i,:3].values.tolist()
                md.append(sd)
                md1.append(sdinfo)
            kd.append(md)
            kd1.append(md1)

        

        from datetime import datetime
        assingnment_delay = []
        for i in range(final_df.shape[0]):
            f_delay= []
            for j in range(0,final_df.iloc[:,3:].shape[1]):
                s = asdue_df['duedate'][j]
                fs = pd.DataFrame((kd[i])).T
                fs.columns = ['submiteddate','unsubmiteddate','returndate','reassingndate','status','feedback','points']
                p = fs['submiteddate'][j]
                r = datetime.now().strftime("%Y-%m-%d")
                if type(p) is float or p =='Not Available' or p is None:
                    q = datetime.fromisoformat(r)-datetime.fromisoformat(s)
                    f_delay.append(str(q.days))
                else:
                    q = datetime.fromisoformat(p)-datetime.fromisoformat(s)
                    f_delay.append(str(q.days))
            assingnment_delay.append(f_delay)

        from datetime import datetime
        assingnment_check_delay = []
        for i in range(final_df.shape[0]):
            c_delay= []
            for j in range(0,final_df.iloc[:,3:].shape[1]):
                fs = pd.DataFrame((kd[i])).T
                fs.columns = ['submiteddate','unsubmiteddate','returndate','reassingndate','status','feedback','points']
                p = fs['submiteddate'][j]
                u = fs['feedback'][j]
                v = fs['points'][j]
                r = datetime.now().strftime("%Y-%m-%d")
                if ((type(u) is None or u =='No feedback') and (type(v) is None or v =='No Points')) and (p !='Not Available' or p is None):
                    c_data = datetime.fromisoformat(r)-datetime.fromisoformat(p)
                    # print(type(c_data))
                    c_delay.append(str(c_data.days))
                else:
                    c_delay.append('Not Available')
            assingnment_check_delay.append(c_delay)

        
        for i in range(0,final_df.shape[0]):
            fs = pd.DataFrame((kd[i])).T
            fs1 = pd.DataFrame(kd1[i][0]).T
            fs1.columns = ['Id','Name','Email']
            asgment = pd.DataFrame(assignment_name1)
            asgment.columns = ['Assingments']
            fs.columns = ['submiteddate','unsubmiteddate','returndate','reassingndate','status','feedback','points']
            assingment_delay_df = pd.DataFrame(assingnment_delay[i],columns =['assingment_delay'])
            assingment_check_delay = pd.DataFrame(assingnment_check_delay[i],columns =['assingment_check_delay'])
            newfs1 = pd.concat([fs1]*len(assignment_name1),ignore_index=True)
            newfs = pd.concat([newfs1,asgment,asdue_df,fs,assingment_delay_df,assingment_check_delay],axis=1)
            # print(newfs)
            myDataFrame = pd.concat([myDataFrame,newfs],axis=0,ignore_index=True)
    print(myDataFrame.shape)
    import pandas as pd
    table_name = 'portal_assignment_details'
    engine = create_engine('sqlite:///db.sqlite3')
    # Code to create a new temp table here.
    apidf = myDataFrame.to_sql(table_name, if_exists='replace', con=engine)

    # create cursor
    engine = create_engine('sqlite:///db.sqlite3')
    conn = engine.connect()
    trans = conn.begin()
    # QUery to backup the assignment details to history table, with timestamp
    # conn.execute('''INSERT OR IGNORE into portal_assignment_data 
    #     select   
    #     portal_student.StudentId , portal_batch.batchid,
    #     portal_assignment_details.Id as studenthash,
    #     portal_assignment_details.Assingments as assignments,
    #     portal_assignment_details.name,
    #     portal_assignment_details.Email,
    #     portal_assignment_details.duedate as duedate,
    #     portal_assignment_details.submiteddate as submiteddate,
    #     portal_assignment_details.unsubmiteddate as unsubmiteddate,
    #     portal_assignment_details.returndate as returndate,
    #     portal_assignment_details.reassingndate as reassingndate,
    #     portal_assignment_details.status as status,
    #     portal_assignment_details.feedback as feedback,
    #     portal_assignment_details.points as points,
    #     portal_assignment_details.assingment_delay as assingment_delay,
    #     portal_assignment_details.assingment_check_delay as assingment_check_delay

    #     from portal_assignment_details join portal_student 
    #     on name = portal_student.StudentName
    #     join portal_batch 
    #     on batchname =  portal_batch.BatchName''')
    



    conn.execute('''INSERT into portal_assignment_data(studentapidata,Name,Email,assignments,duedate,submiteddate,unsubmiteddate,returndate,reassingndate,status,feedback,points,assingment_delay,assingment_check_delay,studentid_id,batchid_id)
        
        select
        portal_assignment_details.Id,   
        portal_assignment_details.name,
        portal_assignment_details.Email,
		portal_assignment_details.Assingments as assignments,
        portal_assignment_details.duedate as duedate,
        portal_assignment_details.submiteddate as submiteddate,
        portal_assignment_details.unsubmiteddate as unsubmiteddate,
        portal_assignment_details.returndate as returndate,
        portal_assignment_details.reassingndate as reassingndate,
        portal_assignment_details.status as status,
        portal_assignment_details.feedback as feedback,
        portal_assignment_details.points as points,
		portal_assignment_details.assingment_delay as assingment_delay,
        portal_assignment_details.assingment_check_delay as assingment_check_delay,
        
		portal_student.StudentId,
		portal_batchstudent.batchid
		


        from portal_assignment_details join portal_student 
        on portal_assignment_details.name = portal_student.StudentName
        join portal_batchstudent 
        on portal_batchstudent.StudentId =  portal_student.StudentId
		''')
    trans.commit()
    conn.close()
    return 
type or paste code here

but I not able to run my task as periodic
celery.py


from celery import Celery
import os

from portal.auth_helper import get_sign_in_flow, get_token_from_code, store_user, remove_user_and_token, get_token
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'batch_portal.settings')

app = Celery('batch_portal')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CAR')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

def access_token(request):
    return get_token(request)

# def another_fun():
#     return access_token

# return_fun = another_fun()



st = access_token(request)



from celery.schedules import crontab
import json
app.conf.beat_schedule = {
    # Executes every Monday morning at 10:00 p.m.
    'add-every-day at 10:00 p.m': {
        'task': 'task.add_api_data',
        'schedule': crontab(hour=22, minute=0),
        # 'args':['fun']
        'kwargs':{'fun':st},
    },
        # Executes every Monday morning at 10:00 p.m.
    'delete-every-day at 10:10 p.m': {
        'task': 'tasks.delete_api_data',
        'schedule': crontab(hour=22, minute=10),
    },

    'agian add-every-day at 10:15 p.m': {
        'task': 'tasks.add_api_data',
        'schedule': crontab(hour=22, minute=15),
        'kwargs':{'fun':st},
    },
}


How i pass my task argument “access_token(request)” into ‘kwargs’:{“fun”:“access_token(request)”}
access _token give token me authentication token but i can not call access _token due to th request parameter it show error

    st = access_token(request)
NameError: name 'request' is not defined

This line exists at the module level. I’m not sure what it’s supposed to be doing there. It’s not part of any task.

1 Like

thanks for replay “access_token(request)” this function return me access token and want pass access token into

app.conf.beat_schedule = {
    # Executes every Monday morning at 10:00 p.m.
    'add-every-day at 10:00 p.m': {
        'task': 'task.add_api_data',
        'schedule': crontab(hour=22, minute=0),
        'kwargs':{'fun':st},
    },

kwrgs argument how I pass my authorization token into kwargs argument???

This makes no sense. There is no request associated with a scheduled task.

A request is a representation of an HttpRequest issued by a browser (or JavaScript, or a library such as Python’s requests module).

Since this process is being started by a timer, there is no request.

Generally speaking, a celery task reads and writes data directly through models. There usually isn’t a view involved.

1 Like

thanks so how i run my celery beat task what am I missing here or what is wrong with this code ??

You run your task on the schedule.

Your task does whatever work needs to be performed.

There’s no view involved, so there’s no need for authentication or a token.

1 Like