Welcome 微信登录

首页 / 操作系统 / Linux / Python模块化开发组织代码程序示例

样例包含三部分代码,周的处理函数部分、业务数据处理部分及多线程跑批调度处理部分。代码按功能分类存放,有助于使代码更清晰,通过from...import的方式,使代码重复使用。另外,多线程的调用部分,有效处理了程序先后依赖及多程序串并行跑批问题,为以后相似问题的处理,提供了借鉴。1、周处理函数/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/WeekCalc.py# -*- coding=utf-8 -*-
import warnings
import datetimewarnings.filterwarnings("ignore")def getNowYearWeek():
    # 当前时间年第几周的计算
    timenow = datetime.datetime.now() - datetime.timedelta(days=7)
    NowYearWeek = timenow.isocalendar()
    return str(NowYearWeek[0])+"#"+str(NowYearWeek[1])def dateRange(beginDate, endDate):
    dates = []
    dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
    date = beginDate[:]
    while date <= endDate:
        dates.append(date)
        dt = dt + datetime.timedelta(1)
        date = dt.strftime("%Y-%m-%d")
    return datesdef weekRang(beginDate, endDate):
    week = set()
    for date in dateRange(beginDate, endDate):
        week.add(datetime.date(int(date[0:4]), int(date[5:7]), int(date[8:10])).isocalendar()[0:2])    wk_l = []
    for wl in sorted(list(week)):
        wk_l.append(str(wl[0])+"#"+str(wl[1]))
    return wk_ldef currWeekList(his_week):
    last_wk = datetime.datetime.now() - datetime.timedelta(days=7)
    end_day = str(last_wk)[0:10]
    curr_week_list = []
    for week in weekRang("2015-07-01", end_day):
        if (int(week[0:4]) == int(his_week[0:4]) and int(week[5:]) >= int(his_week[5:])) or (int(week[0:4]) > int(his_week[0:4])):
            curr_week_list.append(week)
    return curr_week_listdef hisRunWeekList(his_week):
    batch_week_list = []
    for curr_week in currWeekList(his_week):
        if (int(his_week[0:4]) == int(curr_week[0:4]) and int(his_week[5:]) <= int(curr_week[5:])) or (int(his_week[0:4]) < int(curr_week[0:4])):
            batch_week_list.append(([curr_week, his_week],None))
    return batch_week_listdef RuningWeekList():
    curr_week = getNowYearWeek()
    batch_week_list = []
    for his_week in currWeekList("2015#27"):
            if (int(his_week[0:4]) == int(curr_week[0:4]) and int(his_week[5:]) <= int(curr_week[5:])) or (int(his_week[0:4]) < int(curr_week[0:4])):
                batch_week_list.append(([curr_week, his_week],None))
    return batch_week_listdef getWeekFristday(weekflag):
    yearnum = weekflag[0:4]  # 取到年份
    weeknum = weekflag[5:7]  # 取到周
    stryearstart = yearnum + "0101"  # 当年第一天
    yearstart = datetime.datetime.strptime(stryearstart, "%Y%m%d")  # 格式化为日期格式
    yearstartcalendarmsg = yearstart.isocalendar()  # 当年第一天的周信息
    yearstartweekday = yearstartcalendarmsg[2]
    yearstartyear = yearstartcalendarmsg[0]
    if yearstartyear < int(yearnum):
        daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 1) * 7
    else:
        daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 2) * 7    week1day = (yearstart + datetime.timedelta(days=daydelat)).date()
    return week1day# Batch Test
# his_week_list = ["2015#46", "2015#45", "2016#2"]
# batch_week_list = []
# for his_week in his_week_list:
#   batch_week_list.extend(hisRunWeekList(his_week))
# print batch_week_list
# print getWeekFristday("2016#11")
# his_week = "2016#11"
# print currWeekList(his_week)# print getNowYearWeek()2、业务处理部分
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/Hive_remain_byWeek_proc.py# -*- coding=utf-8 -*-
import time
import os
import re
from WeekCalc import *warnings.filterwarnings("ignore")
def newuser_byweek_proc(batch_week):
    week1day = getWeekFristday(batch_week)
    os.system("""/usr/lib/hive-current/bin/hive -e "
    alter table bi_newuser_byweek drop if exists partition(pt_week="%s");
    alter table bi_newuser_byweek add partition(pt_week="%s");
    insert into table bi_newuser_byweek partition (pt_week="%s")
    select a1.appsource,a1.appkey,a1.identifier,a1.uid from (
    select appsource,appkey,identifier,uid
    from bi_all_access_log
    where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,"#",weekofyear(pt_day)) else concat(year(pt_day),"#",weekofyear(pt_day)) end = "%s"
    group by appsource,appkey,identifier,uid) a1
    left join
    (select appsource,appkey,identifier,uid
    from bi_all_access_log
    where pt_day < "%s" ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource and a1.uid=a2.uid
    where a2.identifier is null
    ;"
    """ % (batch_week, batch_week, batch_week, batch_week, week1day));def user_remain_payamount_byweek(curr_week, his_week):
    os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uHadoop -pMysqlPass -e "use funnyai_data;
                delete from bi_user_remain_payamount_byweek where data_week="%s" and remain_week="%s";
               " """ % (his_week, curr_week))    newuser_remain_pay_data = os.popen("""source /etc/profile;
        /usr/lib/hive-current/bin/hive -e "
        add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar;
        create temporary function RadixChange as "com.kascend.hadoop.RadixChange";
        with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid
        from bi_newuser_byweek
        where pt_week = "%s"
        ),
        curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid
        from bi_all_access_log
        where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,"#",weekofyear(pt_day)) else concat(year(pt_day),"#",weekofyear(pt_day)) end = "%s"
        group by appsource,appkey,identifier,RadixChange(uid,16,10)),
        curr_week_pay as (select uid,sum(amount) amount
        from data_chushou_pay_info
        where state=0 and
        case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,"#",weekofyear(pt_day)) else concat(year(pt_day),"#",weekofyear(pt_day)) end = "%s"
        group by uid)
        select b1.appkey,b1.appsource,sum(b2.amount) pay_amount from
        (select a1.appkey,a1.appsource,a1.uid
        from his_new_user a1
        inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource
        group by a1.appkey,a1.appsource,a1.uid) b1
        left join curr_week_pay b2 on b1.uid=b2.uid
        group by b1.appkey,b1.appsource
        ;"
        """ % (his_week, curr_week, curr_week)).readlines();    nrpd_list = []
    for nrp_list in newuser_remain_pay_data:
        nrp = re.split(" ", nrp_list.replace(" ", ""))
        nrpd_list.append(nrp)
    for nrpd in nrpd_list:
        remain_week = curr_week
        appkey = nrpd[0]
        appsource = nrpd[1]
        pay_amount = nrpd[2]
        etl_time = time.strftime("%Y-%m-%d %X", time.localtime())        os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data;
        insert into bi_user_remain_payamount_byweek(data_week,appsource,appkey,remain_week,pay_amount,etl_time)
        select "%s","%s","%s","%s","%s","%s";
       " """ % (his_week, appsource, appkey, remain_week, pay_amount, etl_time))
def user_remain_pay_byweek(curr_week, his_week):
    os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data;
                delete from bi_user_remain_pay_byweek where data_week="%s" and remain_week="%s";
               " """ % (his_week, curr_week))    newuser_remain_pay_data = os.popen("""source /etc/profile;
        /usr/lib/hive-current/bin/hive -e "
        add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar;
        create temporary function RadixChange as "com.kascend.hadoop.RadixChange";
        with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid
        from bi_newuser_byweek
        where pt_week = "%s"
        ),
        curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid
        from bi_all_access_log
        where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,"#",weekofyear(pt_day)) else concat(year(pt_day),"#",weekofyear(pt_day)) end = "%s"
        group by appsource,appkey,identifier,RadixChange(uid,16,10))
        select a1.appkey,a1.appsource,count(distinct a2.identifier) remain_cnt,0 pay_amount
        from his_new_user a1
        inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource
        group by a1.appkey,a1.appsource
        ;"
        """ % (his_week, curr_week)).readlines();    nrpd_list = []
    for nrp_list in newuser_remain_pay_data:
        nrp = re.split(" ", nrp_list.replace(" ", ""))
        nrpd_list.append(nrp)
    for nrpd in nrpd_list:
        remain_week = curr_week
        appkey = nrpd[0]
        appsource = nrpd[1]
        remain_cnt = nrpd[2]
        pay_amount = nrpd[3]
        etl_time = time.strftime("%Y-%m-%d %X", time.localtime())        os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data;
        insert into bi_user_remain_pay_byweek(data_week,appsource,appkey,remain_week,remain_cnt,pay_amount,etl_time)
        select "%s","%s","%s","%s","%s","%s","%s";
       " """ % (his_week, appsource, appkey, remain_week, remain_cnt, pay_amount, etl_time))# Batch Test
# curr_week = "2016#6"
# his_week = "2015#46"
# user_remain_payamount_byweek(curr_week, his_week)
# user_remain_pay_byweek(curr_week, his_week)
# batch_week = "2015#46"
# newuser_byweek_proc(batch_week)另:供打印sql测试的代码
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/xx.py# -*- coding=utf-8 -*-
import time
import os
import re
from WeekCalc import *warnings.filterwarnings("ignore")
def newuser_byweek_proc(batch_week):
    week1day = getWeekFristday(batch_week)
    sql_text = """/usr/lib/hive-current/bin/hive -e "
    alter table bi_newuser_byweek drop if exists partition(pt_week="%s");
    alter table bi_newuser_byweek add partition(pt_week="%s");
    insert into table bi_newuser_byweek partition (pt_week="%s")
    select a1.appsource,a1.appkey,a1.identifier,a1.uid from (
    select appsource,appkey,identifier,uid
    from bi_all_access_log
    where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,"#",weekofyear(pt_day)) else concat(year(pt_day),"#",weekofyear(pt_day)) end = "%s"
    group by appsource,appkey,identifier,uid) a1
    left join
    (select appsource,appkey,identifier,uid
    from bi_all_access_log
    where pt_day < "%s" ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource and a1.uid=a2.uid
    where a2.identifier is null
    ;"
    """ % (batch_week, batch_week, batch_week, batch_week, week1day);
    print sql_textdef user_remain_payamount_byweek(curr_week, his_week):
    sql_text="""source /etc/profile;
        /usr/lib/hive-current/bin/hive -e "
        add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar;
        create temporary function RadixChange as "com.kascend.hadoop.RadixChange";
        with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid
        from bi_newuser_byweek
        where pt_week = "%s"
        ),
        curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid
        from bi_all_access_log
        where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,"#",weekofyear(pt_day)) else concat(year(pt_day),"#",weekofyear(pt_day)) end = "%s"
        group by appsource,appkey,identifier,RadixChange(uid,16,10)),
        curr_week_pay as (select uid,sum(amount) amount
        from data_chushou_pay_info
        where state=0 and
        case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,"#",weekofyear(pt_day)) else concat(year(pt_day),"#",weekofyear(pt_day)) end = "%s"
        group by uid)
        select b1.appkey,b1.appsource,sum(b2.amount) pay_amount from
        (select a1.appkey,a1.appsource,a1.uid
        from his_new_user a1
        inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource
        group by a1.appkey,a1.appsource,a1.uid) b1
        left join curr_week_pay b2 on b1.uid=b2.uid
        group by b1.appkey,b1.appsource
        ;"
        """ % (his_week, curr_week, curr_week);
    print sql_text
def user_remain_pay_byweek(curr_week, his_week):
    sql_text="""source /etc/profile;
        /usr/lib/hive-current/bin/hive -e "
        add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar;
        create temporary function RadixChange as "com.kascend.hadoop.RadixChange";
        with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid
        from bi_newuser_byweek
        where pt_week = "%s"
        ),
        curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid
        from bi_all_access_log
        where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,"#",weekofyear(pt_day)) else concat(year(pt_day),"#",weekofyear(pt_day)) end = "%s"
        group by appsource,appkey,identifier,RadixChange(uid,16,10))
        select a1.appkey,a1.appsource,count(distinct a2.identifier) remain_cnt,0 pay_amount
        from his_new_user a1
        inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource
        group by a1.appkey,a1.appsource
        ;"
        """ % (his_week, curr_week);
    print sql_text# Batch Test
# curr_week = "2016#6"
# his_week = "2015#46"
# user_remain_payamount_byweek(curr_week, his_week)
# user_remain_pay_byweek(curr_week, his_week)
# batch_week = "2015#46"
# newuser_byweek_proc(batch_week)3、多线程批调度
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/BatchThread.py# -*- coding=utf-8 -*-
import threadpool
from Hive_remain_byWeek_proc import *
# from xx import *
warnings.filterwarnings("ignore")today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
tomorrow = today + datetime.timedelta(days=1)now_time = time.strftime("%Y-%m-%d %X", time.localtime())
print "当前时间是:",now_time# 新用户数据先跑出来
last_week = [getNowYearWeek()]
request_newuser_byweek_proc = threadpool.makeRequests(newuser_byweek_proc, last_week)
frist_pool = threadpool.ThreadPool(8)
[frist_pool.putRequest(req) for req in request_newuser_byweek_proc]
frist_pool.wait()# 然后再执行用户留存和充值金额数据
if True:
    batch_week_list = RuningWeekList()
    requests = []
    request_user_remain_payamount_byweek = threadpool.makeRequests(user_remain_payamount_byweek, batch_week_list)
    request_user_remain_pay_byweek = threadpool.makeRequests(user_remain_pay_byweek, batch_week_list)    requests.extend(request_user_remain_payamount_byweek)
    requests.extend(request_user_remain_pay_byweek)    main_pool = threadpool.ThreadPool(8)
    [main_pool.putRequest(req) for req in requests]if __name__ == "__main__":
    while True:
        try:
            time.sleep(960)
            main_pool.poll()
        except KeyboardInterrupt:
            print("**** Interrupted!")
            break
        except threadpool.NoResultsPending:
            break    if main_pool.dismissedWorkers:
        print("Joining all dismissed worker threads...")
        main_pool.joinAllDismissedWorkers()now_time = time.strftime("%Y-%m-%d %X", time.localtime())
print "当前时间是:",now_time本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-01/139752.htm