Pyspark, Compare Two Rows In Dataframe
Solution 1:
Yes, you're using map
function in a wrong way. map
operates on a single element at the time. You can try to use window functions like this:
from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window
df = (
sc.parallelize([
(134, 30, "2016-07-02 12:01:40"), (134, 32, "2016-07-02 12:21:23"),
(125, 30, "2016-07-02 13:22:56"), (125, 32, "2016-07-02 13:27:07"),
]).toDF(["itemid", "eventid", "timestamp"])
.withColumn("timestamp", col("timestamp").cast("timestamp"))
)
w = Window.partitionBy("itemid").orderBy("timestamp")
diff = col("timestamp").cast("long") -lag("timestamp", 1).over(w).cast("long")
df.withColumn("diff", diff)
Solution 2:
The comment by @ShuaiYuan on the original answer is correct. Over the last year I've developed a much better understanding of how Spark works and have actually rewritten the program I was working on for this post.
NEW ANSWER (2017/03/27) To accomplish comparing the two rows of the dataframe I ended up using an RDD. I group the data by key (in this case the item id) and ignore eventid as it's irrelevant in this equation. I then map a lambda function onto the rows, returning a tuple of the key and a list of tuples containing the start and end of event gaps, which is derived from "findGaps" function that iterates over the list of values (sorted timestamps) linked to each key. Once this is complete I filter out keys with no time gaps and then flatMapValues to return the data to a more sql like format. This is done with the following code:
# Find time gaps in list of datetimes where firings are longer than given duration. deffindGaps(dates, duration):
result = []
length = len(dates)
# convert to dates for comparison
first = toDate(dates[0])
last = toDate(dates[length - 1])
for index, item inenumerate(dates):
if index < length -1and (dates[index + 1] - item).total_seconds() > duration:
# build outage tuple and append to list# format (start, stop, duration)
result.append(formatResult(item, dates[index + 1], kind))
return result
outage_list = outage_join_df.rdd\
.groupByKey()\
.map(lambda row: (
row[0],
findGaps(
sorted(list(row[1])),
limit
)
)
)\
.filter(lambda row: len(row[1]) > 0)\
.flatMapValues(lambda row: row)\
.map(lambda row: (
row[0]['itemid'], # itemid
row[1][0].date(), # date
row[1][0], # start
row[1][1], # stop
row[1][2] # duration
))\
.collect()
ORIGINAL ANSWER (WRONG) I managed to solve it using mapPartitions:
def findOutage(items):
outages = []
lastStamp = None
for item in items:
if lastStamp and (lastStamp - item.stamp).total_seconds() > 14400:
outages.append({"item": item.itemid,
"start": item.stamp.isoformat(),
"stop": lastStamp.isoformat()})
lastStamp = item.stamp
returniter(outages)
items = df.limit(10).orderBy('itemid', desc('stamp'))
outages = items.mapPartitions(findOutage).collect()
Thanks everyone for the help!
Post a Comment for "Pyspark, Compare Two Rows In Dataframe"