Skip to content Skip to sidebar Skip to footer

Pyspark, Compare Two Rows In Dataframe

I'm attempting to compare one row in a dataframe with the next to see the difference in timestamp. Currently the data looks like: itemid | eventid | timestamp -------------------

Solution 1:

Yes, you're using map function in a wrong way. map operates on a single element at the time. You can try to use window functions like this:

from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window

df = (
    sc.parallelize([
        (134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"),
        (125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"),
    ]).toDF(["itemid", "eventid", "timestamp"])
    .withColumn("timestamp", col("timestamp").cast("timestamp"))
)

w = Window.partitionBy("itemid").orderBy("timestamp")

diff = col("timestamp").cast("long") -lag("timestamp", 1).over(w).cast("long")

df.withColumn("diff", diff)

Solution 2:

The comment by @ShuaiYuan on the original answer is correct. Over the last year I've developed a much better understanding of how Spark works and have actually rewritten the program I was working on for this post.

NEW ANSWER (2017/03/27) To accomplish comparing the two rows of the dataframe I ended up using an RDD. I group the data by key (in this case the item id) and ignore eventid as it's irrelevant in this equation. I then map a lambda function onto the rows, returning a tuple of the key and a list of tuples containing the start and end of event gaps, which is derived from "findGaps" function that iterates over the list of values (sorted timestamps) linked to each key. Once this is complete I filter out keys with no time gaps and then flatMapValues to return the data to a more sql like format. This is done with the following code:

# Find time gaps in list of datetimes where firings are longer than given duration.  deffindGaps(dates, duration):
    result = []
    length = len(dates)

    # convert to dates for comparison
    first = toDate(dates[0])
    last = toDate(dates[length - 1])
    for index, item inenumerate(dates):
        if index < length -1and (dates[index + 1] - item).total_seconds() > duration:
            # build outage tuple and append to list# format (start, stop, duration)
            result.append(formatResult(item, dates[index + 1], kind))
    return result

outage_list = outage_join_df.rdd\
                            .groupByKey()\
                            .map(lambda row: (
                                     row[0],
                                     findGaps(
                                         sorted(list(row[1])), 
                                         limit
                                     )
                                  )
                            )\
                            .filter(lambda row: len(row[1]) > 0)\
                            .flatMapValues(lambda row: row)\
                            .map(lambda row: (
                                 row[0]['itemid'],     # itemid
                                 row[1][0].date(),     # date
                                 row[1][0],            # start
                                 row[1][1],            # stop
                                 row[1][2]             # duration
                            ))\
                            .collect()

ORIGINAL ANSWER (WRONG) I managed to solve it using mapPartitions:

def findOutage(items):
    outages = []

    lastStamp = None
    for item in items:
        if lastStamp and (lastStamp - item.stamp).total_seconds() > 14400:
            outages.append({"item": item.itemid, 
                            "start": item.stamp.isoformat(),
                            "stop": lastStamp.isoformat()})
        lastStamp = item.stamp
    returniter(outages)

items = df.limit(10).orderBy('itemid', desc('stamp'))

outages = items.mapPartitions(findOutage).collect()

Thanks everyone for the help!

Post a Comment for "Pyspark, Compare Two Rows In Dataframe"