APP下载

PyODPS DataFrame 处理笛卡尔积的几种方式

消息来源:baojiabao.com 作者: 发布时间:2026-05-19

报价宝综合消息PyODPS DataFrame 处理笛卡尔积的几种方式

PyODPS 提供了 DataFrame API 来用类似 pandas 的界面进行大规模资料分析以及预处理,本文主要介绍如何使用 PyODPS 执行笛卡尔积的操作。

笛卡尔积最常出现的场景是两两之间需要比较或者运算。以计算地理位置距离为例,假设大表 Coordinates1 储存目标点经纬度座标,共有 M 行资料,小表 Coordinates2 储存出发点经纬度座标,共有 N 行资料,现在需要计算所有离目标点最近的出发点座标。对于一个目标点来说,我们需要计算所有的出发点到目标点的距离,然后找到最小距离,所以整个中间过程需要产生 M * N 条资料,也就是一个笛卡尔积问题。

haversine 公式

首先简单介绍一下背景知识,已知两个地理位置的座标点的经纬度,求解两点之间的距离可以使用 haversine 公式,使用 Python 的表达如下:

def haversine(lat1, lon1, lat2, lon2):

# lat1, lon1 为位置 1 的经纬度座标

# lat2, lon2 为位置 2 的经纬度座标

import numpy as np

dlon = np.radians(lon2 - lon1)

dlat = np.radians(lat2 - lat1)

a = np.sin( dlat /2 ) **2 + np.cos(np.radians(lat1)) * np.cos(np.radians(lat2)) * np.sin( dlon /2 ) **2

c = 2 * np.arcsin(np.sqrt(a))

r = 6371 # 地球平均半径,单位为公里

return c * r

MapJoin

目前最推荐的方法就是使用 mapjoin,PyODPS 中使用 mapjoin 的方式十分简单,只需要两个 dataframe join 时指定 mapjoin=True,执行时会对右表做 mapjoin 操作。

In [3]: df1 = o.get_table(\'coordinates1\').to_df()

In [4]: df2 = o.get_table(\'coordinates2\').to_df()

In [5]: df3 = df1.join(df2, mapjoin=True)

In [6]: df1.schema

Out[6]:

odps.Schema {

latitude float64

longitude float64

id string

}

In [7]: df2.schema

Out[7]:

odps.Schema {

latitude float64

longitude float64

id string

}

In [8]: df3.schema

Out[8]:

odps.Schema {

latitude_x float64

longitude_x float64

id_x string

latitude_y float64

longitude_y float64

id_y string

}

可以看到在执行 join 时预设会将重名列加上 _x 和 _y 字尾,可通过在 suffixes 引数中传入一个二元 tuple 来自定义字尾,当有了 join 之后的表后,通过 PyODPS 中 DataFrame 的自建函式就可以计算出距离,十分简洁明了,并且效率很高。

In [9]: r = 6371

...: dis1 = (df3.latitude_y - df3.latitude_x).radians()

...: dis2 = (df3.longitude_y - df3.longitude_x).radians()

...: a = (dis1 / 2).sin() ** 2 + df3.latitude_x.radians().cos() * df3.latitude_y.radians().cos() * (dis2 / 2).sin() ** 2

...: df3[\'dis\'] = 2 * a.sqrt().arcsin() * r

In [12]: df3.head(10)

Out[12]:

latitude_x longitude_x id_x latitude_y longitude_y id_y dis

0 76.252432 59.628253 0 84.045210 6.517522 0 1246.864981

1 76.252432 59.628253 0 59.061796 0.794939 1 2925.953147

2 76.252432 59.628253 0 42.368304 30.119837 2 4020.604942

3 76.252432 59.628253 0 81.290936 51.682749 3 584.779748

4 76.252432 59.628253 0 34.665222 147.167070 4 6213.944942

5 76.252432 59.628253 0 58.058854 165.471565 5 4205.219179

6 76.252432 59.628253 0 79.150677 58.661890 6 323.070785

7 76.252432 59.628253 0 72.622352 123.195778 7 1839.380760

8 76.252432 59.628253 0 80.063614 138.845193 8 1703.782421

9 76.252432 59.628253 0 36.231584 90.774527 9 4717.284949

In [13]: df1.count()

Out[13]: 2000

In [14]: df2.count()

Out[14]: 100

In [15]: df3.count()

Out[15]: 200000

df3 已经是有 M * N 条资料了,接下来如果需要知道最小距离,直接对 df3 呼叫 groupby 接上 min 聚合函式就可以得到每个目标点的最小距离。

In [16]: df3.groupby(\'id_x\').dis.min().head(10)

Out[16]:

dis_min

0 323.070785

1 64.755493

2 1249.283169

3 309.818288

4 1790.484748

5 385.107739

6 498.816157

7 615.987467

8 437.765432

9 272.589621

DataFrame 自定义函式

如果我们需要知道对应最小距离的点的城市,也就是表中对应的 id ,可以在 mapjoin 之后呼叫 MapReduce,不过我们还有另一种方式是使用 DataFrame 的 apply 方法。要对一行资料使用自定义函式,可以使用 apply 方法,axis 引数必须为 1,表示在行上操作。

表资源

要注意 apply 是在服务端执行的 UDF,所以不能在函式内使用类似于df=o.get_table(\'table_name\').to_df() 的表示式去获得表资料,具体原理可以参考PyODPS DataFrame 的程式码在哪里跑。以本文中的情况为例,要想将表 1 与表 2 中所有的记录计算,那么需要将表 2 作为一个资源表,然后在自定义中引用该表资源。PyODPS 中使用表资源也十分方便,只需要将一个 collection 传入 resources 引数即可。collection 是个可迭代物件,不是一个 DataFrame 物件,不可以直接呼叫 DataFrame 的界面,每个迭代值是一个 namedtuple,可以通过字段名或者偏移来取对应的值。

## use dataframe udf

df1 = o.get_table(\'coordinates1\').to_df()

df2 = o.get_table(\'coordinates2\').to_df()

def func(collections):

import pandas as pd

collection = collections[0]

ids = []

latitudes = []

longitudes = []

for r in collection:

ids.append(r.id)

latitudes.append(r.latitude)

longitudes.append(r.longitude)

df = pd.DataFrame({\'id\': ids, \'latitude\':latitudes, \'longitude\':longitudes})

def h(x):

df[\'dis\'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)

return df.iloc[df[\'dis\'].idxmin()][\'id\']

return h

df1[df1.id, df1.apply(func, resources=[df2], axis=1, reduce=True, types=\'string\').rename(\'min_id\')].execute(

libraries=[\'pandas.zip\', \'python-dateutil.zip\', \'pytz.zip\', \'six.tar.gz\'])

在自定义函式中,将表资源通过循环读成 pandas DataFrame,利用 pandas 的 loc 可以很方便的找到最小值对应的行,从而得到距离最近的出发点 id。另外,如果在自定义函式中需要使用到三方包(例如本例中的 pandas)可以参考这篇在PyODPS DataFrame自定义函式中使用pandas、scipy和scikit-learn-云栖社群-阿里云。

全域性变数

当小表的资料量十分小的时候,我们甚至可以将小表资料作为全域性变数在自定义函式中使用。

df1 = o.get_table(\'coordinates1\').to_df()

df2 = o.get_table(\'coordinates2\').to_df()

df = df2.to_pandas()

def func(x):

df[\'dis\'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)

return df.iloc[df[\'dis\'].idxmin()][\'id\']

df1[df1.id, df1.apply(func, axis=1, reduce=True, types=\'string\').rename(\'min_id\')].execute(

libraries=[\'pandas.zip\', \'python-dateutil.zip\', \'pytz.zip\', \'six.tar.gz\'])

在上传函式的时候,会将函式内使用到的全域性变数(上面程式码中的 df) pickle 到 UDF 中。但是注意这种方式使用场景很局限,因为 ODPS 的上传的档案资源大小是有限制的,所以资料量太大会导致 UDF 生成的资源太大从而无法上传,而且这种方式最好保证三方包的客户端与服务端的版本一致,否则很有可能出现序列化的问题,所以建议只在资料量非常小的时候使用。

总结

使用 PyODPS 解决笛卡尔积的问题主要分为两种方式,一种是 mapjoin,比较直观,效能好,一般能用 mapjoin 解决的我们都推荐使用 mapjoin,并且最好使用内建函式计算,能到达最高的效率,但是它不够灵活。另一种是使用 DataFrame 自定义函式,比较灵活,效能相对差一点(可以使用 pandas 或者 numpy 获得性能上的提升),通过使用表资源,将小表作为表资源传入 DataFrame 自定义函式中,从而完成笛卡尔积的操作。

作者:继盛

2020-01-01 16:51:00

相关文章