Skip to content Skip to sidebar Skip to footer

Process Dask Dataframe By Chunks Of Rows

I have a dask dataframe created using chunks of a certain blocksize: df = dd.read_csv(filepath, blocksize = blocksize * 1024 * 1024) I can process it in chunks like this: partial_

Solution 1:

You can repartition the dataframe along a division which defines how index values should be allocated across partitions (assuming unique index).

import dask.dataframe as dd
import pandas as pd

df = pd.DataFrame(range(15), columns=['x'])
ddf = dd.from_pandas(df, npartitions=3)

# there will 5 rows per partitionprint(ddf.map_partitions(len).compute())

# you can see that ddf is split along these index valuesprint(ddf.divisions)

# change the divisions to have the desired spacing
new_divisions = (0, 3, 6, 9, 12, 14)
new_ddf = ddf.repartition(divisions=new_divisions)

# now there will be 3 rows per partitionprint(new_ddf.map_partitions(len).compute())

If index is not known, then it's possible to create a new index (assuming that rows do not require sorting) and repartition along the computed divisions:

import dask.dataframe as dd
import pandas as pd

# save some data into unindexed csv
num_rows = 15
df = pd.DataFrame(range(num_rows), columns=['x'])
df.to_csv('dask_test.csv', index=False)


# read from csv
ddf = dd.read_csv('dask_test.csv', blocksize=10)

# assume that rows are already ordered (so no sorting is needed)# then can modify the index using the lengths of partitions
cumlens = ddf.map_partitions(len).compute().cumsum()

# since processing will be done on a partition-by-partition basis, save them# individually
new_partitions = [ddf.partitions[0]]
for npart, partition inenumerate(ddf.partitions[1:].partitions):
    partition.index = partition.index + cumlens[npart]
    new_partitions.append(partition)

# this is our new ddf
ddf = dd.concat(new_partitions)

#  set divisions based on cumulative lengths
ddf.divisions = tuple([0] + cumlens.tolist())

# change the divisions to have the desired spacing
new_partition_size = 12
max_rows = cumlens.tolist()[-1]
new_divisions = list(range(0, max_rows, new_partition_size))
if new_divisions[-1]<max_rows:
    new_divisions.append(max_rows)
new_ddf = ddf.repartition(divisions=new_divisions)

# now there will be desired rows per partitionprint(new_ddf.map_partitions(len).compute())

Post a Comment for "Process Dask Dataframe By Chunks Of Rows"